import warnings
import numpy as np
from magtrack._cupy import cp, cupyx, is_cupy_available
if is_cupy_available():
try: # pragma: no cover - exercised when CuPy is installed without SciPy extras
import cupyx.scipy.signal # type: ignore
import cupyx.scipy.ndimage # type: ignore
except ImportError:
warnings.warn(
"GPU-acceleration with CuPy SciPy extensions is unavailable. "
"Falling back to CPU implementations."
)
else:
warnings.warn(
"GPU-acceleration with CuPy is not available. Will use CPU only."
)
np.seterr(divide='ignore', invalid='ignore')
# ---------- Helper functions ---------- #
[docs]
def binmean(x, weights, n_bins: int):
"""Compute mean values per bin for 2D arrays, similar to ``numpy.bincount``.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
The input ``x`` is clipped *in place* so that values above ``n_bins`` fall
back within the valid bin range; entries clipped to ``n_bins`` are ignored
when returning the binned means.
Parameters
----------
x : 2D int array, shape (n_values, n_datasets)
Input array to bin.
weights : 2D float array, shape (n_values, n_datasets)
Weights associated with ``x``; should be floating point to allow
averaging.
n_bins : int
The number of bins to be used. Values will be binned as integers
between 0 and n_bins.
Returns
----------
bin_means : 2D float array, shape (n_bins, n_datasets)
Binned average values of weights.
"""
n_datasets = x.shape[1]
# GPU or CPU?
xp = cp.get_array_module(x)
# Clip the maximum x value to nbins (we will discard them latter)
xp.minimum(x, n_bins, out=x)
# Create an index to keep track of each row/dataset of x
i_base = xp.arange(x.shape[1], dtype=xp.min_scalar_type(x.shape[1]))
i = xp.broadcast_to(i_base, x.shape)
# Binning
bin_means = xp.zeros((n_bins + 1, n_datasets), dtype=weights.dtype)
xp.add.at(bin_means, (x, i), weights)
bin_counts = xp.zeros((n_bins + 1, n_datasets), dtype=xp.uint32)
xp.add.at(bin_counts, (x, i), 1)
# Divide (suppress NumPy warning)
if xp is np:
with np.errstate(divide='ignore', invalid='ignore'):
bin_means /= bin_counts
else:
bin_means /= bin_counts
# Return without the overflow row
return bin_means[:-1, :]
[docs]
def pearson(x, y):
"""
Calculate the Pearson correlation coefficient between each row of x and y.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
x : array, shape (n, m)
2D array whose columns are correlated with the columns of ``y``.
y : array, shape (n, k)
2D array whose columns are correlated with the columns of ``x``.
Returns
-------
r : array, shape (k, m)
Pearson correlation coefficients between each column of ``y`` and each
column of ``x``.
"""
# GPU or CPU?
xp = cp.get_array_module(x)
X = x - xp.nanmean(x, axis=0, keepdims=True)
Y = y - xp.nanmean(y, axis=0, keepdims=True)
X = xp.nan_to_num(X, copy=False)
Y = xp.nan_to_num(Y, copy=False)
sx = xp.sqrt((X * X).sum(axis=0)) # (m,)
sy = xp.sqrt((Y * Y).sum(axis=0)) # (k,)
num = Y.T @ X # (k,m)
den = sy[:, None] * sx[None, :] # (k,m)
r = num / den
return r
[docs]
def gaussian(x, mu, sigma):
"""
Calculate a 1D Gaussian function.
Parameters
----------
x : array_like
x coordinates where to evaluate the gaussian
mu : float
Mean (center)
sigma : float
Standard deviation
Returns
-------
array_like
1D array containing the gaussian evaluated at x coordinates
"""
# GPU or CPU?
xp = cp.get_array_module(x)
return xp.exp(-((x - mu) ** 2) / (2 * sigma ** 2))
[docs]
def gaussian_2d(x, y, mu_x, mu_y, sigma):
"""
Calculate a 2D Gaussian image.
Calculates a 2D Gaussian image for each center (mu_x, mu_y) provided along
the grid (x, y) all sharing the same sigma in x and y (sigma).
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
x : 1D array
x coordinates where to evaluate the gaussian
y : 1D array
y coordinates where to evaluate the gaussian
mu_x : 1D array
Mean (center) in x direction for each image (one center per image)
mu_y : 1D array
Mean (center) in y direction for each image (one center per image)
sigma : float
Standard deviation in x and y direction
Returns
-------
array
3D array of shape ``(len(x), len(y), n_images)`` containing the gaussian
evaluated at ``(x, y)`` coordinates for each image center
"""
# GPU or CPU?
xp = cp.get_array_module(x)
return xp.exp(-((x[:, xp.newaxis, xp.newaxis] - mu_x[xp.newaxis, xp.newaxis, :]) ** 2 / (2 * sigma ** 2) +
(y[xp.newaxis, :, xp.newaxis] - mu_y[xp.newaxis, xp.newaxis, :]) ** 2 / (2 * sigma ** 2)))
[docs]
def crop_stack_to_rois(stack, rois):
"""
Takes a 3D image-stack and crops it to the region of interests (ROIs).
Given a 3D image-stack and a list of ROIs, this function will crop around
each ROI and return a 4D array. Note the ROIs must be squares.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU. However, it is
recommended to use the CPU and then transfer the result to the GPU and
perform downstream analysis on the GPU.
Parameters
----------
stack : 3D ndarray of any type, shape (stack_width, stack_height, n_images)
Note the images must be square.
rois : 2D int ndarray, shape (n_roi, 4)
Each row is an ROI. The columns are [top, bottom, left, right].
Returns
----------
cropped_stack : 4D ndarray, shape (cropped_width, cropped_width, n_images, n_roi)
Same type as input stack
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
# Pre-allocate space for cropped stack
n_images = stack.shape[2]
n_rois = rois.shape[0]
width = rois[0, 1] - rois[0, 0]
shape = (width, width, n_images, n_rois)
cropped_stack = xp.ndarray(
shape, dtype=stack.dtype
) # width, width, frames, rois
# Crop
for i in range(n_rois):
cropped_stack[:, :, :, i] = (
stack[rois[i, 0]:rois[i, 1], rois[i, 2]:rois[i, 3], :]
)
return cropped_stack
[docs]
def parabolic_vertex(data, vertex_est, n_local: int, weighted=True):
"""Refine local min/max using parabolic interpolation.
Given an estimated location of a local minimum or maximum, this function
fits the surrounding datapoints to a parabola and interpolates the vertex.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
data : array of float, shape (n_datasets, n_datapoints)
Sequence of datasets arranged row-wise for fitting.
vertex_est : array of float, shape (n_datasets,)
Initial vertex estimates corresponding to each dataset.
n_local : int
The number of local datapoints to be fit. Must be an odd integer >= 3.
weighted : bool, optional
Whether to apply a simple weighting procedure to emphasize the more
central points in the fit. Default is True.
Returns
-------
vertex : array of float, shape (n_datasets,)
Refined vertex locations
"""
# GPU or CPU?
xp = cp.get_array_module(data)
# Setup
n_local_half = (n_local // 2)
# Convert the estimated vertex to an int for use as an index
vertex_int = vertex_est.round().astype(xp.int64)
# Force index to be with the limits
index_min = n_local_half
index_max = data.shape[1] - n_local_half - 1
xp.clip(vertex_int, index_min, index_max, out=vertex_int)
# Get the local data to be fit
n_datasets = data.shape[0]
rel_idx = xp.arange(-n_local_half, n_local_half + 1, dtype=xp.int64)
idx = rel_idx + vertex_int[:, xp.newaxis]
y = data[xp.arange(n_datasets)[:, xp.newaxis], idx].T
x = xp.arange(n_local, dtype=xp.float64)
# Fit to parabola
if weighted:
w = n_local_half - xp.abs(xp.arange(n_local) - n_local_half) + 1
p = xp.polyfit(x, y, 2, w=w)
else:
p = xp.polyfit(x, y, 2)
# Calculate the location of the vertex
vertex = -p[1, :] / (2. * p[0, :]) + vertex_int - n_local // 2. # -b/2a
# Exclude points outside limits
vertex[vertex_int == index_min] = xp.nan
vertex[vertex_int == index_max] = xp.nan
return vertex
# ---------- QI functions ---------- #
[docs]
def _qi_sample_axis_profiles(stack, x, y, axis):
"""Sample 1D profiles along ``axis`` using quadratic interpolation support.
Parameters
----------
stack : 3D array_like
Image stack where the first two axes correspond to ``y`` and ``x`` and
the third axis indexes frames.
x, y : 1D array_like
Approximate center coordinates for each frame. They must have the same
length as the number of frames in ``stack``.
axis : int
Axis along which to collect the three-point profile. ``0`` samples the
column profile (varying ``y``); ``1`` samples the row profile (varying
``x``).
Returns
-------
array_like
Samples of shape ``(n_frames, 3)`` corresponding to offsets of ``-1``,
``0`` and ``+1`` pixels along the chosen axis.
"""
xp = cp.get_array_module(stack)
xpx = cupyx.scipy.get_array_module(stack)
stack_xp = stack
x = xp.asarray(x, dtype=xp.float64)
y = xp.asarray(y, dtype=xp.float64)
n_frames = stack.shape[2]
offsets = xp.array([-1.0, 0.0, 1.0], dtype=xp.float64)
frame_coords = xp.arange(n_frames, dtype=xp.float64)[:, xp.newaxis]
frame_coords = xp.broadcast_to(frame_coords, (n_frames, offsets.size))
if axis == 0:
primary = (y[:, xp.newaxis] + offsets[xp.newaxis, :])
secondary = xp.broadcast_to(x[:, xp.newaxis], primary.shape)
elif axis == 1:
primary = xp.broadcast_to(y[:, xp.newaxis], (n_frames, offsets.size))
secondary = (x[:, xp.newaxis] + offsets[xp.newaxis, :])
else:
raise ValueError("axis must be 0 (column) or 1 (row)")
coords = xp.stack(
(
primary.reshape(-1),
secondary.reshape(-1),
frame_coords.reshape(-1),
),
axis=0,
)
samples = xpx.ndimage.map_coordinates(
stack_xp,
coords,
order=2,
mode="nearest",
)
return samples.reshape(n_frames, offsets.size)
[docs]
def _qi_quadratic_offsets(samples):
"""Compute quadratic-interpolation offsets from three-point samples."""
xp = cp.get_array_module(samples)
values = xp.asarray(samples, dtype=xp.float64)
left = values[:, 0]
center = values[:, 1]
right = values[:, 2]
denom = left - 2.0 * center + right
eps = xp.finfo(values.dtype).eps
valid = xp.abs(denom) > eps
offsets = xp.full(left.shape, xp.nan, dtype=values.dtype)
offsets = xp.where(
valid,
0.5 * (left - right) / denom,
offsets,
)
return offsets
[docs]
def qi(stack, x_old, y_old):
"""Refine centers using quadratic interpolation along x and y axes.
This routine samples the intensity profiles along the horizontal and
vertical axes through the supplied center estimates and performs quadratic
interpolation to recover sub-pixel offsets.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU. Intermediate values
remain on the originating device and the function respects the caller's
backend.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
Image stack containing square frames to refine.
x_old : 1D float array, shape (n_images)
Initial estimates of the x coordinates.
y_old : 1D float array, shape (n_images)
Initial estimates of the y coordinates.
Returns
-------
tuple of array_like
Refined ``(x, y)`` coordinates with sub-pixel precision.
"""
xp = cp.get_array_module(stack)
x_old = xp.asarray(x_old, dtype=xp.float64)
y_old = xp.asarray(y_old, dtype=xp.float64)
row_samples = _qi_sample_axis_profiles(stack, x_old, y_old, axis=1)
col_samples = _qi_sample_axis_profiles(stack, x_old, y_old, axis=0)
dx = _qi_quadratic_offsets(row_samples)
dy = _qi_quadratic_offsets(col_samples)
return x_old + dx, y_old + dy
# ---------- Center-of-Mass functions ---------- #
[docs]
def center_of_mass(stack, background='median'):
"""
Calculate x and y by center-of-mass
For each 2D image of a 3D image-stack compute the center-of-mass along the
x- and y-axes. To avoid bias from the images' background, a pre-processing
step can be taken to remove the background with the ``background`` keyword
argument. The default, ``background='median'`` subtracts the per-frame
median to provide robust centering. ``background='none'`` leaves the data
unchanged and ``background='mean'`` subtracts the per-frame mean. This
function is faster than the version from ``scipy`` or ``cupyx.scipy``.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. The images must be square.
background : str, optional
Background pre-processing. ``'median'`` (default) subtracts the
per-image median, ``'none'`` uses the raw data, and ``'mean'``
subtracts the per-image mean.
Returns
----------
x : 1D float array, shape (n_images,)
The x coordinates of the center
y : 1D float array, shape (n_images,)
The y coordinates of the center
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
# Checks
if stack.ndim != 3:
raise ValueError('stack must be a 3D array, stack.shape=(n_pixels, n_pixels, n_images)')
if stack.shape[0] != stack.shape[1]:
raise ValueError('stack images must be square, stack.shape=(n_pixels, n_pixels, n_images)')
if stack.dtype != xp.float32 and stack.dtype != xp.float64:
raise TypeError('stack dtype must be float32 or float64')
# Correct background of each image
if background == 'none':
stack_norm = stack.view()
elif background == 'mean':
stack_norm = stack.copy()
xp.subtract(stack_norm, xp.mean(stack, axis=(0, 1)), out=stack_norm)
xp.absolute(stack_norm, out=stack_norm)
elif background == 'median':
stack_norm = stack.copy()
xp.subtract(stack_norm, xp.median(stack, axis=(0, 1)), out=stack_norm)
xp.absolute(stack_norm, out=stack_norm)
else:
raise ValueError('background must be "none", "mean" or "median"')
# Calculate projections and total mass
sum_x = xp.sum(stack_norm, axis=0)
total_mass = xp.sum(sum_x, axis=0)
# Prevent divide by zero
total_mass = xp.where(total_mass == 0., xp.nan, total_mass)
# Coordinate grid
grid = xp.arange(stack_norm.shape[0], dtype=stack.dtype)
# Calculate center
x_num = xp.tensordot(grid, sum_x, axes=(0, 0))
y_num = xp.einsum('ijk,i->k', stack_norm, grid)
x = x_num / total_mass
y = y_num / total_mass
return x, y
# ---------- Auto-convolution functions ---------- #
[docs]
def auto_conv(stack, x_old, y_old, return_conv=False):
"""Recalculate the center of a symmetric object using auto-convolution.
For each 2D image of a 3D image stack, use the previous center to select
the central row and column. Convolve these against reversed versions of
themselves (auto-convolution). Then take the maximum as the new center.
Optionally, by setting ``return_conv`` to ``True`` the convolution results
can be returned directly, which is useful for sub-pixel fitting.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image stack. The images must be square.
x_old : 1D float array, shape (n_images,)
Estimated x coordinates near the true centers.
y_old : 1D float array, shape (n_images,)
Estimated y coordinates near the true centers.
return_conv : bool, optional
Whether to return the convolutions instead of the updated centers.
The default is ``False``.
Returns
-------
tuple of ndarray
Return values differ depending on ``return_conv``:
If ``return_conv`` is ``False``
x : 1D float array, shape (n_images,)
The x coordinates of the center.
y : 1D float array, shape (n_images,)
The y coordinates of the center.
If ``return_conv`` is ``True``
col_max : 1D int array, shape (n_images,)
Indices of the maxima of the column convolutions.
row_max : 1D int array, shape (n_images,)
Indices of the maxima of the row convolutions.
col_con : 2D float array, shape (n_pixels, n_images)
Column convolutions (unchanged orientation).
row_con : 2D float array, shape (n_images, n_pixels)
Row convolutions; note the axes are ordered
``(n_images, n_pixels)``.
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
xpx = cupyx.scipy.get_array_module(stack)
# Get the row and column slices corresponding to the previous x & y
frame_idx = xp.arange(stack.shape[2], dtype=xp.int64)
x_idx = xp.round(x_old).astype(xp.int64)
y_idx = xp.round(y_old).astype(xp.int64)
cols = stack[:, x_idx, frame_idx]
rows = stack[y_idx, :, frame_idx]
# Subtract means
cols -= xp.mean(cols, axis=0, keepdims=True)
rows -= xp.mean(rows, axis=1, keepdims=True)
# Apply gaussian weights to reduce edge effects
width = stack.shape[0]
px_idx = xp.arange(width)
cols *= gaussian(px_idx[:, xp.newaxis], y_old[xp.newaxis, :], width/6.)
rows *= gaussian(px_idx[xp.newaxis, :], x_old[:, xp.newaxis], width/6.)
# Convolve the signals
col_con = xpx.signal.fftconvolve(cols, cols, 'same', axes=0)
row_con = xpx.signal.fftconvolve(rows, rows, 'same', axes=1)
# Find the convolution maxima
col_max = xp.argmax(col_con, axis=0)
row_max = xp.argmax(row_con, axis=1)
if return_conv:
return col_max, row_max, col_con, row_con
else:
# Use the maximum of the convolution to find center of the beads
radius = (stack.shape[0] - 1) // 2
x = radius - ((radius - row_max) / 2)
y = radius - ((radius - col_max) / 2)
return x, y
[docs]
def auto_conv_sub_pixel(stack, x_old, y_old, n_local=5):
"""
Re-calculate center of symmetric object by auto-convolution sub-pixel fit
For each 2D image of a 3D image-stack: use the previous center to select
the central row and column. Convolve these against themselves. Use several
points around the maximum of the convolution to fit a parabola and use the
vertex of the parabola as the center to find the sub-pixel coordinates.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the
parameters are on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. Note, the images must be square.
x_old : 1D float array, shape (n_images)
Estimated x coordinates near the true centers.
y_old : 1D float array, shape (n_images)
Estimated y coordinates near the true centers.
n_local : int
The number of local points around the vertex to be used in parabolic
fitting. Must be an odd int >=3.
Returns
----------
x : 1D float array, shape (n_images,)
The x coordinates of the center.
y : 1D float array, shape (n_images,)
The y coordinates of the center.
"""
col_max, row_max, col_con, row_con = auto_conv(
stack, x_old, y_old, return_conv=True
)
x = parabolic_vertex(row_con, row_max, n_local)
y = parabolic_vertex(col_con.T, col_max, n_local)
radius = (stack.shape[0] - 1) // 2
x = radius - ((radius - x) / 2)
y = radius - ((radius - y) / 2)
return x, y
[docs]
def auto_conv_multiline(stack, x_old, y_old, line_ratio=0.05, return_conv=False):
"""
Re-calculate center of symmetric object by multi-line auto-convolution
For each 2D image of a 3D image-stack: use the previous center to select
multiple rows and columns (determined by ``line_ratio``). Average the
resulting signals, convolve them against themselves (auto-convolution)
Then take the maximum as the new center.
Optionally, by setting return_conv to True the convolution results can be
returned directly. This is useful for sub-pixel fitting.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. The images must be square.
x_old : 1D float array, shape (n_images)
Estimated x coordinates near the true centers.
y_old : 1D float array, shape (n_images)
Estimated y coordinates near the true centers.
line_ratio : float, optional
Fraction of the frame width that determines how many neighbouring
lines are averaged before convolution.
return_conv : bool, optional
Whether to return the convolution or return the new center.
The default is False.
Returns
----------
tuple
see information below
If return_conv is False:
x : 1D float array, shape (n_images,)
The x coordinates of the center
y : 1D float array, shape (n_images,)
The y coordinates of the center
If return_conv is True:
col_max : 1D int array, shape (n_images,)
The index of the maximum of the column convolution
row_max : 1D int array, shape (n_images,)
The index of the maximum of the row convolution
col_con : 2D float array, shape (n_pixels, n_images)
The column convolution
row_con : 2D float array, shape (n_images, n_pixels)
The row convolution
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
xpx = cupyx.scipy.get_array_module(stack)
# Get the row and column slices corresponding to the previous x & y
half_n_lines = int(stack.shape[0] * line_ratio // 2)
n_lines = half_n_lines * 2 + 1
line_idx = xp.arange(-half_n_lines, half_n_lines + 1)
width = stack.shape[0]
n_images = stack.shape[2]
frame_idx = xp.arange(n_images, dtype=xp.int64)
frame_idx = xp.repeat(frame_idx, n_lines)
x_idx = xp.round(xp.nan_to_num(x_old)).astype(xp.int64)
y_idx = xp.round(xp.nan_to_num(y_old)).astype(xp.int64)
x_idx = x_idx[:, xp.newaxis] + line_idx
y_idx = y_idx[:, xp.newaxis] + line_idx
x_idx = x_idx.flatten()
y_idx = y_idx.flatten()
cols = stack[:, x_idx, frame_idx]
rows = stack[y_idx, :, frame_idx]
# Average multi-lines
cols = cols.reshape(width, n_images, n_lines)
rows = rows.reshape(n_images, n_lines, width)
cols = xp.mean(cols, axis=2)
rows = xp.mean(rows, axis=1)
# Subtract means
cols -= xp.mean(cols, axis=0, keepdims=True)
rows -= xp.mean(rows, axis=1, keepdims=True)
# Apply gaussian filter to reduce edge effects
width = stack.shape[0]
px_idx = xp.arange(width)
cols *= gaussian(px_idx[:, xp.newaxis], y_old[xp.newaxis, :], width / 6.)
rows *= gaussian(px_idx[xp.newaxis, :], x_old[:, xp.newaxis], width / 6.)
# Convolve the signals
col_con = xpx.signal.fftconvolve(cols, cols, 'same', axes=0)
row_con = xpx.signal.fftconvolve(rows, rows, 'same', axes=1)
# Find the convolution maxima
col_max = xp.argmax(col_con, axis=0)
row_max = xp.argmax(row_con, axis=1)
if return_conv:
return col_max, row_max, col_con, row_con
else:
# Use the maximum of the convolution to find center of the beads
radius = (stack.shape[0] - 1) // 2
x = radius - ((radius - row_max) / 2)
y = radius - ((radius - col_max) / 2)
return x, y
[docs]
def auto_conv_multiline_sub_pixel(stack, x_old, y_old, line_ratio=0.1, n_local=5):
"""
Re-calculate center of symmetric object by multi-line auto-convolution with sub-pixel fit
For each 2D image of a 3D image-stack: use the previous center to select
multiple rows and columns (determined by ``line_ratio``). Average the
resulting signals, convolve them against themselves (auto-convolution) and
use several points around the maximum to fit a parabola. The vertex of the
parabola is used to determine the sub-pixel coordinates of the center.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. The images must be square.
x_old : 1D float array, shape (n_images)
Estimated x coordinates near the true centers.
y_old : 1D float array, shape (n_images)
Estimated y coordinates near the true centers.
line_ratio : float, optional
The ratio relative to the total image width of lines to be used in the
convolutions.
n_local : int, optional
The number of local points around the vertex to be used in parabolic
fitting. Must be an odd int >=3.
Returns
----------
x : 1D float array, shape (n_images,)
The x coordinates of the center.
y : 1D float array, shape (n_images,)
The y coordinates of the center.
"""
col_max, row_max, col_con, row_con = auto_conv_multiline(
stack, x_old, y_old, return_conv=True, line_ratio=line_ratio
)
x = parabolic_vertex(row_con, row_max, n_local)
y = parabolic_vertex(col_con.T, col_max, n_local)
radius = (stack.shape[0] - 1) // 2
x = radius - ((radius - x) / 2)
y = radius - ((radius - y) / 2)
return x, y
# ---------- Radial profile functions ---------- #
[docs]
def radial_profile(stack, x, y, oversample=1):
"""
Calculate the average radial profile about a center
For each 2D image of a 3D image-stack: calculate the average radial profile
about the corresponding center (x and y). The profile is calculated by
binning. For each pixel in an image the Euclidean distance from the center
is calculated. The distance is then used to bin each pixel. When
``oversample`` equals 1 the bin widths are 1 pixel wide; higher values split
each native bin into finer ``1 / oversample`` pixel slices. The bins are
then normalized by the number of pixels in each bin to find the average
intensity in each bin. The number of bins (n_bins) is
((stack.shape[0] // 2) * oversample).
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. Note, the images must be square.
x : 1D float array, shape (n_images)
x-coordinates of the center.
y : 1D float array, shape (n_images)
y-coordinates of the center.
oversample : int, optional
Oversampling factor applied to the radial distances before binning.
Increasing the factor multiplies the number of radial bins and thus the
resolution of the profile by the same amount. Must be an integer
greater than or equal to 1.
Returns
----------
profiles : 2D float array, shape (n_bins, n_images)
The average radial profile of each image about the center
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
# Setup
width = stack.shape[0]
n_images = stack.shape[2]
n_bins = (width // 2) * oversample
grid = xp.indices((width, width), dtype=xp.float32)
flat_stack = stack.reshape((width * width, n_images))
# Calculate the distance of each pixel from x and y
# cast to uint16 because min and max r for 1024x1024 would be 0 and 1449
r = xp.round(xp.hypot(grid[1][:, :, xp.newaxis] - x, grid[0][:, :, xp.newaxis] - y) * oversample)
r = r.astype(xp.uint16).reshape(-1, n_images)
# Calculate profiles
profiles = binmean(r, flat_stack, n_bins)
return profiles
[docs]
def fft_profile(stack, oversample=4, rmin=0.0, rmax=0.5):
"""Compute FFT-based radial intensity profiles without pre-filtering.
Each image is transformed via a real 2D FFT, and the magnitude spectrum is
azimuthally averaged into oversampled radial bins that correspond to the
requested normalized frequency range. Unlike
:func:`fft_profile_with_center`, this routine does not apply Gaussian
weighting or require bead center coordinates.
Parameters
----------
stack : array_like, shape (n_pixels, n_pixels, n_images)
Image stack to profile. The images must be square with an even width.
oversample : int, default=4
Radial oversampling factor (>=1) applied when binning FFT magnitudes.
rmin : float, default=0.0
Minimum normalized radial frequency (0–0.5 Nyquist) to keep in the
returned profile.
rmax : float, default=0.5
Maximum normalized radial frequency (0–0.5 Nyquist) considered when
building the radial profile.
Returns
-------
profile : array_like, shape (n_selected_bins, n_images)
Oversampled radial magnitude profiles for each image, sliced to the
bins corresponding to the radial range ``[rmin, rmax]``.
"""
xp = cp.get_array_module(stack)
n_images = stack.shape[2]
width = stack.shape[0]
center = width // 2
n_bins = int(round(center * rmax * oversample))
n_start = int(round(center * rmin * oversample))
grid = xp.indices((width, center + 1), dtype=xp.float32)
r_int = xp.round(
xp.hypot(grid[1], grid[0] - center) * oversample
).astype(xp.uint16)
r = xp.tile(r_int.reshape(-1, 1), (1, n_images))
fft_cpx = xp.fft.fftshift(xp.fft.rfft2(stack, axes=(0, 1)), axes=(0,))
fft = xp.abs(fft_cpx).reshape(-1, n_images)
profile = binmean(r, fft, n_bins)[n_start:]
return profile
[docs]
def fft_profile_with_center(stack, x, y, oversample=4, rmin=0.0, rmax=0.5, gaus_factor=6.):
"""Compute FFT-based radial intensity profiles using Gaussian weighting.
The images are first weighted in-place by a 2D Gaussian centered at the
requested locations. A real 2D FFT is then evaluated for each weighted
image, and the magnitude spectrum is azimuthally averaged into
oversampled radial bins.
Parameters
----------
stack : array_like, shape (n_pixels, n_pixels, n_images)
Image stack to profile. The images must be square with an even width.
This array is modified in-place by Gaussian weighting prior to the
FFT step.
x : array_like, shape (n_images,)
X-coordinates of the Gaussian centers in pixel units.
y : array_like, shape (n_images,)
Y-coordinates of the Gaussian centers in pixel units.
oversample : int, default=4
Radial oversampling factor (>=1) applied when binning FFT magnitudes.
rmin : float, default=0.0
Minimum normalized radial frequency (0–0.5 Nyquist) to keep in the
returned profile.
rmax : float, default=0.5
Maximum normalized radial frequency (0–0.5 Nyquist) considered when
building the radial profile.
gaus_factor : float, default=6.0
Divisor controlling the Gaussian width relative to the image size.
Returns
-------
profile : array_like, shape (n_selected_bins, n_images)
Oversampled radial magnitude profiles for each image, sliced to the
bins corresponding to the radial range ``[rmin, rmax]``.
"""
xp = cp.get_array_module(stack)
n_images = stack.shape[2]
width = stack.shape[0]
center = width // 2
n_bins = int(round(center * rmax * oversample))
n_start = int(round(center * rmin * oversample))
grid = xp.indices((width, center + 1), dtype=xp.float32)
r_int = xp.round(
xp.hypot(grid[1], grid[0] - center) * oversample
).astype(xp.uint16)
r = xp.tile(r_int.reshape(-1, 1), (1, n_images))
w = gaussian_2d(xp.arange(width), xp.arange(width), x, y, width / gaus_factor)
stack *= w
fft_cpx = xp.fft.fftshift(xp.fft.rfft2(stack, axes=(0, 1)), axes=(0,))
fft = xp.abs(fft_cpx).reshape(-1, n_images)
profile = binmean(r, fft, n_bins)[n_start:]
return profile
# ---------- Z-Lookup functions ---------- #
[docs]
class LookupZProfileSizeError(ValueError):
"""Raised when ``lookup_z`` inputs have mismatched radial bin counts."""
[docs]
class LookupZProfileSizeWarning(UserWarning):
"""Raised when ``lookup_z`` must fall back or cannot use the requested fit."""
[docs]
def lookup_z(profiles, zlut, n_local=7):
"""
Calculate the corresponding sub-planar z-coordinate of each profile by LUT
For each image's profile in ``profiles``: find the best matching profile in
the Z-LUT (lookup table). The lookup table stores a leading row of
z-coordinates and radial profiles below; the first radial bin corresponds to
the central pixel and is ignored during correlation (hence ``zlut[2:, :]``).
Fits the local points around the best matching profile to find sub-planar
fit in between columns of the LUT.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
profiles : 2D float array, shape (n_bins, n_images)
The average radial profile of each image about the center
zlut : 2D float array, shape (1+n_bins, n_ref)
The reference radial profiles and corresponding z-coordinates. The
first row (``zlut[0, :]``) holds the z-axis values. The remaining rows
contain the reference radial profiles produced by
:func:`radial_profile`; their first bin (``zlut[1, :]``) is skipped to
avoid the central pixel when matching.
n_local : int, optional
The number of local points around the vertex to be used in parabolic
fitting. Must be an odd int >=3. Default is 7.
Returns
----------
z : 1D float array, shape (n_images)
z-coordinates
"""
# GPU or CPU?
xp = cp.get_array_module(profiles)
expected_bins = zlut.shape[0] - 1
if profiles.shape[0] != expected_bins:
raise LookupZProfileSizeError(
"profiles and zlut must have matching radial bins: got "
f"{profiles.shape[0]} bins in profiles and {expected_bins} in zlut"
)
ref_z = zlut[0, :]
ref_profiles = zlut[2:, :] # Skip the first pixel
n_ref = ref_profiles.shape[1]
# Calculate the pearson correlation coefficient between Z-LUT and current profiles.
# This (likely) needs to be done in a loop to prevent the
# operation from taking too much memory at once.
# Skip the first pixel
r = pearson(ref_profiles, profiles[1:, :])
# Find index of the max
z_int_idx = xp.argmax(r, axis=1).astype(xp.float64)
# Find the sub-planar index of the max when enough reference planes exist.
# Otherwise, fall back to the coarse best-match index.
if n_ref < n_local:
warnings.warn(
"lookup_z requires at least "
f"{n_local} reference profiles for parabolic refinement, but zlut "
f"contains only {n_ref}; falling back to coarse z lookup.",
LookupZProfileSizeWarning,
stacklevel=2,
)
z_idx = z_int_idx
else:
z_idx = parabolic_vertex(r, z_int_idx, n_local)
# Interpolate z from the reference index
z = xp.interp(z_idx, xp.arange(n_ref), ref_z, left=xp.nan, right=xp.nan)
return z
# ---------- Complete pipeline functions ---------- #
[docs]
def stack_to_xyzp(stack, zlut=None):
"""Estimate XYZ coordinates and radial profiles from an image stack.
This convenience wrapper orchestrates the CPU/GPU-agnostic pipeline used
throughout MagTrack: the x and y are first estimated with
:func:`center_of_mass`, refined with :func:`auto_conv`, further refined by
five iterations of :func:`auto_conv_multiline_sub_pixel`, and then
converted into radial profiles via :func:`radial_profile`. When a Z-look-up
table is provided, :func:`lookup_z` translates those profiles into axial
coordinates; otherwise, NaNs are returned for the z.
Note: CPU or GPU: The code is agnostic of CPU and GPU usage. If the first
parameter is on the GPU the computation/result will be on the GPU.
Otherwise, the computation/result will be on the CPU.
Parameters
----------
stack : array-like, shape (n_pixels, n_pixels, n_images)
3-D image stack containing square images. The array can reside on the
CPU (NumPy) or GPU (CuPy).
zlut : array-like, shape (1 + n_bins, n_ref), optional
Radial-profile look-up table whose first row stores the reference
z-positions and remaining rows contain the corresponding template
profiles. If omitted, the axial coordinate output is filled with NaNs.
Returns
-------
x : 1D float array, shape (n_images)
x-coordinates
y : 1D float array, shape (n_images)
y-coordinates
z : 1D float array, shape (n_images)
z-coordinates or a NaN array when ``zlut`` is None
profiles : 2D float array, shape (n_bins, n_images)
The average radial profile of each image about the center
"""
# GPU or CPU?
xp = cp.get_array_module(stack)
# XY
x, y = center_of_mass(stack)
x, y = auto_conv(stack, x, y)
for _ in range(5): # Repeat
x, y = auto_conv_multiline_sub_pixel(stack, x, y)
# Z
profiles = radial_profile(stack, x, y)
if zlut is None:
z = x * xp.nan
else:
try:
z = lookup_z(profiles, zlut)
except LookupZProfileSizeError as e:
warnings.warn(str(e), LookupZProfileSizeWarning)
z = x * xp.nan
return x, y, z, profiles
[docs]
def stack_to_xyzp_advanced(stack, zlut=None, **kwargs):
"""
Calculate image-stack XYZ and profiles (Z is nan if Z-LUT is None)
Parameters
----------
stack : 3D float array, shape (n_pixels, n_pixels, n_images)
The image-stack. Note, the images must be square. It is expected it is
in the regular CPU memory. It will be transferred to the GPU.
zlut : 2D float array, shape (1+n_bins, n_ref), optional
The reference radial profiles and corresponding z-coordinates. The
first row (zlut[0, :]) are the z-coordinates. The rest of the rows are
the corresponding profiles as generated by radial_profile. It is
expected it is already in the GPU memory. The defualt is None.
**kwargs : dict, optional
Additional keyword arguments controlling individual processing stages.
The following keys are recognised:
* ``"center_of_mass"`` (dict, default ``{}``): forwarded to
:func:`center_of_mass`.
* ``"auto_conv"`` (dict, default ``{}``): forwarded to :func:`auto_conv`.
* ``"n auto_conv_multiline_sub_pixel"`` (int, default ``5``): number of
:func:`auto_conv_multiline_sub_pixel` refinement iterations.
* ``"auto_conv_multiline_sub_pixel"`` (dict, default ``{}``): forwarded
to :func:`auto_conv_multiline_sub_pixel`.
* ``"use fft_profile"`` (bool, default ``False``): when ``True`` compute
profiles with :func:`fft_profile_with_center`; when ``False`` compute profiles
with :func:`radial_profile`.
* ``"fft_profile"`` (dict, default ``{}``): forwarded to
:func:`fft_profile_with_center` when ``use fft_profile`` is ``True``.
* ``"radial_profile"`` (dict, default ``{}``): forwarded to
:func:`radial_profile` when ``use fft_profile`` is ``False``.
* ``"lookup_z"`` (dict, default ``{}``): forwarded to :func:`lookup_z`
when ``zlut`` is provided.
Returns
----------
x : 1D float array, shape (n_images)
x-coordinates
y : 1D float array, shape (n_images)
y-coordinates
z : 1D float array, shape (n_images)
z-coordinates
profiles : 2D float array, shape (n_bins, n_images)
The average radial profile of each image about the center
"""
# Move stack to GPU memory
gpu_stack = cp.asarray(stack, dtype=cp.float64)
x, y = center_of_mass(gpu_stack, **kwargs.get('center_of_mass', {}))
x, y = auto_conv(gpu_stack, x, y, **kwargs.get('auto_conv', {}))
for _ in range(kwargs.get('n auto_conv_multiline_sub_pixel', 5)):
x, y = auto_conv_multiline_sub_pixel(
gpu_stack, x, y, **kwargs.get('auto_conv_multiline_sub_pixel', {})
)
if kwargs.get('use fft_profile', False):
profiles = fft_profile_with_center(
gpu_stack, x, y, **kwargs.get('fft_profile', {})
)
else:
profiles = radial_profile(gpu_stack, x, y, **kwargs.get('radial_profile', {}))
if zlut is None:
z = x * cp.nan
else:
try:
z = lookup_z(profiles, zlut, **kwargs.get('lookup_z', {}))
except LookupZProfileSizeError as e:
warnings.warn(str(e), LookupZProfileSizeWarning)
z = x * cp.nan
# Return and move back to regular memory
return cp.asnumpy(x), cp.asnumpy(y), cp.asnumpy(z), cp.asnumpy(profiles)