Source code for mlspm.preprocessing

import random
from typing import List, Literal, Optional, Tuple

import numpy as np
import scipy.ndimage as nimg
from PIL import Image


[docs] def add_norm(Xs: list[np.ndarray], per_layer: bool = True): """ Normalize arrays by subracting the mean and dividing by standard deviation. In-place operation. Arguments: Xs: Arrays to normalize. Each array should be of shape ``(batch_size, ...)``. per_layer: If True, normalized separately for each element in last axis of **Xs**. """ for X in Xs: sh = X.shape for j in range(sh[0]): if per_layer: for i in range(sh[-1]): X[j, ..., i] = (X[j, ..., i] - np.mean(X[j, ..., i])) / np.std(X[j, ..., i]) else: X[j] = (X[j] - np.mean(X[j])) / np.std(X[j])
[docs] def add_noise(Xs: list[np.ndarray], c: float = 0.1, randomize_amplitude: bool = False, normal_amplitude: bool = False): """ Add uniform random noise to arrays. In-place operation. Arguments: Xs: Arrays to add noise to. Each array should be of shape ``(batch_size, ...)``. c: Amplitude of noise. Is multiplied by (max-min) of sample. randomize_amplitude: If True, noise amplitude is uniform random in ``[0, c]`` for each sample in the batch. normal_amplitude: If True and ``randomize_amplitude==True``, then instead of uniform, the noise amplitude is distributed like the absolute value of a normally distributed variable with zero mean and standard deviation equal to **c**. """ for X in Xs: sh = X.shape R = np.random.rand(*sh) - 0.5 if randomize_amplitude: if normal_amplitude: amp = np.abs(np.random.normal(0, c, sh[0])) else: amp = np.random.uniform(0.0, 1.0, sh[0]) * c else: amp = [c] * sh[0] for j in range(sh[0]): X[j] += R[j] * amp[j] * (X[j].max() - X[j].min())
[docs] def add_cutout(Xs: list[np.ndarray], n_holes: int = 5): """ Randomly add cutouts (rectangular patches of zeros) to images. In-place operation. Arguments: Xs: Arrays to add cutouts to. Each array should be of shape ``(batch_size, x, y, z)``. n_holes: Maximum number of cutouts to add. """ def get_random_eraser(input_img, p=0.2, s_l=0.001, s_h=0.01, r_1=0.1, r_2=1.0 / 0.1): """ p : the probability that random erasing is performed s_l, s_h : minimum / maximum proportion of erased area against input image r_1, r_2 : minimum / maximum aspect ratio of erased area """ sh = input_img.shape img_h, img_w = [sh[0], sh[1]] if np.random.uniform(0, 1) > p: return input_img while True: s = np.exp(np.random.uniform(np.log(s_l), np.log(s_h))) * img_h * img_w r = np.exp(np.random.uniform(np.log(r_1), np.log(r_2))) w = int(np.sqrt(s / r)) h = int(np.sqrt(s * r)) left = np.random.randint(0, img_w) top = np.random.randint(0, img_h) if left + w <= img_w and top + h <= img_h: break input_img[top : top + h, left : left + w] = 0.0 return input_img for X in Xs: sh = X.shape for j in range(sh[0]): for i in range(sh[3]): for _ in range(n_holes): X[j, :, :, i] = get_random_eraser(X[j, :, :, i])
[docs] def add_gradient(Xs: list[np.ndarray], c: float = 0.3): """ Add a constant gradient plane with random direction to arrays. In-place operation. Arguments: Xs: Arrays to add gradients to. Each array should be of shape ``(batch_size, x, y, z)``. c: Maximum range of gradient plane as a fraction of the range of the array values. """ assert len(set([X.shape for X in Xs])) == 1 # All same shape x, y = np.meshgrid(np.arange(0, Xs[0].shape[1]), np.arange(0, Xs[0].shape[2]), indexing="ij") for i in range(Xs[0].shape[0]): c_eff = c * np.random.rand() angle = 2 * np.pi * np.random.rand() n = [np.cos(angle), np.sin(angle), 1] z = -(n[0] * x + n[1] * y) z -= z.mean() z /= np.ptp(z) for X in Xs: X[i] += z[:, :, None] * c_eff * np.ptp(X[i])
[docs] def rand_shift_xy_trend(Xs: list[np.ndarray], max_layer_shift: float = 0.02, max_total_shift: float = 0.1): """ Randomly shift z-layers in the xy-plane. Each shift is relative to previous one. In-place operation. Arguments: Xs: Arrays to shift. Each array should be of shape ``(batch_size, x, y, z)``. shift_step_max: Maximum fraction of image size by which to shift for each layer. Should be in the interval ``[0, 1]``. max_shift_total: Maximum fraction of image size by which to shift in total. Should be in the interval ``[0, 1]`` and more than **shift_step_max**. """ if not (0 <= max_layer_shift <= 1.0): raise ValueError(f"Max layer shift should be in the interval from 0 to 1, but got {max_layer_shift}") if not (0 <= max_total_shift <= 1.0): raise ValueError(f"Max total shift should be in the interval from 0 to 1, but got {max_total_shift}") if max_layer_shift > max_total_shift: raise ValueError("Max layer shift cannot be larger than the max total shift.") for X in Xs: sh = X.shape # Calculate max possible shifts in pixels between neighbor slices and maximum shift xy_max = np.maximum(sh[1], sh[2]) max_slice_shift_pix = np.floor(xy_max * max_layer_shift).astype(int) max_trend_pix = np.floor(xy_max * max_total_shift).astype(int) for j in range(sh[0]): # Iterate over batch items # Calculate values of random shift for slices in reverse order # 0 value for closest slice and biggest values for further slices rand_shift = np.zeros((sh[3], 2)) for i in range(sh[3] - 1, 0, -1): shift_values = [random.choice(np.arange(-max_slice_shift_pix, max_slice_shift_pix + 1)) for _ in range(2)] for slice_ind in range(i): rand_shift[slice_ind, :] = rand_shift[slice_ind, :] + shift_values rand_shift = np.clip(rand_shift, -max_trend_pix, max_trend_pix).astype(int) # Clip too big shifts # Apply shifts for i, (shift_x, shift_y) in enumerate(rand_shift): X[j, :, :, i] = nimg.shift(X[j, :, :, i], (shift_y, shift_x), mode="mirror")
[docs] def top_atom_to_zero(xyzs: list[np.ndarray]): """ Set the z coordinate of the highest atom in each molecule to 0. In-place operation. Arguments: xyzs: Molecule arrays to modify. Each array should be of shape ``(num_atoms, :)``, such that the first three elements on the second axis are the xyz coordinates. """ new_xyzs = [] for xyz in xyzs: xyz[:, 2] -= xyz[:, 2].max() new_xyzs.append(xyz) return new_xyzs
[docs] def interpolate_and_crop( Xs: list[np.ndarray], real_dim: Tuple[int, int], target_res: float = 0.125, target_multiple: int = 8 ) -> list[np.ndarray]: """ Interpolate a batch of AFM images to target resolution and crop to a target multiple of pixels in the xy plane. Arguments: X: AFM images to interpolate and crop. Each array should be of shape ``(batch_size, x, y, z)``. real_dim: Real-space size of AFM image region in x- and y-directions in Ã…ngstroms. target_res: Target size for a pixel in angstroms. target_multiple: Target multiple of pixels of output image. Returns: Interpolated and cropped AFM images. """ resized_shape = (int(real_dim[1] // target_res), int(real_dim[0] // target_res)) for k, x in enumerate(Xs): # Interpolate for correct resolution Xs_ = np.empty((x.shape[0], resized_shape[1], resized_shape[0], x.shape[-1])) for i in range(x.shape[0]): for j in range(x.shape[-1]): Xs_[i, :, :, j] = np.array(Image.fromarray(x[i, :, :, j]).resize(resized_shape, Image.BILINEAR)) # Crop for correct multiple of pixels dx = resized_shape[1] % target_multiple x_start = int(np.floor(dx / 2)) x_end = resized_shape[1] - int(np.ceil(dx / 2)) dy = resized_shape[0] % target_multiple y_start = int(np.floor(dy / 2)) y_end = resized_shape[0] - int(np.ceil(dy / 2)) Xs[k] = Xs_[:, x_start:x_end, y_start:y_end] return Xs
[docs] def minimum_to_zero(Ys: List[np.ndarray]): """ Shift values in arrays such that minimum is at zero. In-place operation. Arguments: Ys: Arrays of shape (batch_size, ...). """ for Y in Ys: for j in range(Y.shape[0]): Y[j] -= Y[j].min()
[docs] def add_rotation_reflection( X: List[np.ndarray], Y: List[np.ndarray], reflections: bool = True, multiple: int = 2, crop: Optional[Tuple[int]] = None, per_batch_item: bool = False, ) -> Tuple[np.ndarray, np.ndarray]: """ Augment batch with random rotations and reflections. Arguments: X: AFM images to augment. Each array should be of shape ``(batch_size, x, y, z)``. Y: Reference image descriptors to augment. Each array should be of shape ``(batch, x, y)``. reflections: Whether to augment with reflections. If True, each rotation is randomly reflected with 50% probability. multiple: Multiplier for how many rotations to generate for every sample. crop: If not None, then output batch is cropped to specified size ``(x_size, y_size)`` in the middle of the image. per_batch_item: If True, rotation is randomized per batch item, otherwise same rotation for all. Returns: Tuple (**X**, **Y**), where - **X** - Batch of rotation-augmented AFM images of shape ``(batch*multiple, x_new, y_new, z)``. - **Y** - Batch of rotation-augmented reference image descriptors of shape ``(batch*multiple, x_new, y_new)`` """ X_aug = [[] for _ in range(len(X))] Y_aug = [[] for _ in range(len(Y))] for _ in range(multiple): if per_batch_item: rotations = 360 * np.random.rand(len(X[0])) else: rotations = [360 * np.random.rand()] * len(X[0]) if reflections: flip = np.random.randint(2) for k, x in enumerate(X): x = x.copy() for i in range(x.shape[0]): for j in range(x.shape[-1]): x[i, :, :, j] = np.array(Image.fromarray(x[i, :, :, j]).rotate(rotations[i], resample=Image.BICUBIC)) if flip: x = x[:, :, ::-1] X_aug[k].append(x) for k, y in enumerate(Y): y = y.copy() for i in range(y.shape[0]): y[i, :, :] = np.array(Image.fromarray(y[i, :, :]).rotate(rotations[i], resample=Image.BICUBIC)) if flip: y = y[:, :, ::-1] Y_aug[k].append(y) X = [np.concatenate(x, axis=0) for x in X_aug] Y = [np.concatenate(y, axis=0) for y in Y_aug] if crop is not None: x_start = (X[0].shape[1] - crop[0]) // 2 y_start = (X[0].shape[2] - crop[1]) // 2 X = [x[:, x_start : x_start + crop[0], y_start : y_start + crop[1]] for x in X] Y = [y[:, x_start : x_start + crop[0], y_start : y_start + crop[1]] for y in Y] return X, Y
[docs] def random_crop( X: List[np.ndarray], Y: List[np.ndarray], min_crop: float = 0.5, max_aspect: float = 2.0, multiple: int = 8, distribution: Literal["flat", "exp-log"] = "flat", ) -> Tuple[np.ndarray, np.ndarray]: """ Randomly crop images in a batch to a different size and aspect ratio. Arguments: X: AFM images to crop. Each array should be of shape ``(batch_size, x, y, z)``. Y: Reference image descriptors to crop. Each array should be of shape ``(batch, x, y)``. min_crop: Minimum crop size as a fraction of the original size. max_aspect: Maximum aspect ratio for crop. Cannot be more than 1/min_crop. multiple: The crop size is rounded down to the specified integer multiple. distribution: 'flat' or 'exp-log'. How aspect ratios are distributed. If 'flat', then distribution is random uniform between (1, max_aspect) and half of time is flipped. If 'exp-log', then distribution is exp of log of uniform distribution over (1/max_aspect, max_aspect). 'exp-log' is more biased towards square aspect ratios. Returns: Tuple (**X**, **Y**), where - **X** - Batch of cropped AFM images of shape ``(batch, x_new, y_new, z)``. - **Y** - Batch of cropped reference image descriptors of shape ``(batch, x_new, y_new)``. """ assert 0 < min_crop <= 1.0 assert max_aspect >= 1.0 assert 1 / min_crop >= max_aspect if distribution == "flat": aspect = np.random.uniform(1, max_aspect) if np.random.rand() > 0.5: aspect = 1 / aspect elif distribution == "exp-log": aspect = np.exp(np.random.uniform(np.log(1 / max_aspect), np.log(max_aspect))) else: raise ValueError(f"Unrecognized aspect ratio distribution {distribution}") x_size, y_size = X[0].shape[1], X[0].shape[2] if aspect > 1.0: height = int(np.random.uniform(int(min_crop * y_size), int(y_size / aspect))) width = int(height * aspect) else: width = int(np.random.uniform(int(min_crop * x_size), int(x_size * aspect))) height = int(width / aspect) width = width - (width % multiple) height = height - (height % multiple) start_x = int(np.random.uniform(0, x_size - width - 1e-6)) start_y = int(np.random.uniform(0, y_size - height - 1e-6)) X = [x[:, start_x : start_x + width, start_y : start_y + height] for x in X] Y = [y[:, start_x : start_x + width, start_y : start_y + height] for y in Y] return X, Y