Source code for analysis.svd

"""Singular value decomposition (SVD) of the TR-WAXS data."""


from copy import deepcopy
import numpy as np
from scipy.optimize import curve_fit

from txs.analysis.utils import get_time_delta


[docs] class SVD: """Compute the SVD and store results for analysis and plotting. Parameters ---------- data : np.ndarray Assumes an array where lines correspond to q-values and columns correspond to measured delays. Examples -------- Assuming `res` is the result dict from `txs.datared.datared`: >>> svd = SVD(res['diff_av']).run() """ def __init__(self, data): self._data = deepcopy(data) # outputs self._result = None self._autocorr = None self._fit = None # make self.data read only and hide the actual data under self._data @property def data(self): return self._data
[docs] def run(self): """Run the SVD calculation and stores result. The **result** can be obtained using the :py:func:`result` method. **Left and right singular vectors** can be obtained using the :py:func:`autocorr` method. The **basis patterns** can be obtained using the :py:func:`patterns` method. The **recomposed signal** can be obtained by using the :py:func:`recompose` method. """ u, s, v = np.linalg.svd(self.data, False) self._result = (u, s, v) self._compute_autocorr() return self
[docs] def result(self, rank=None): """Returns the singular values and vectors up to `rank`. Returns ------- u : 2D array The left-singular vectors (column-wise) s : 1D array The computed singular values arranged on the diagonal v : 2D array The right-singular vectors (row-wise) """ rank = self._get_rank_slice(rank) u = self._result[0][:, rank] s = self._result[1][rank] v = self._result[2][rank] return u, s, v
[docs] def autocorr(self): """Returns the autocorrelation of U and V singular vectors.""" return self._autocorr
[docs] def patterns(self, rank=None): """Returns the left and right basis patterns obtained from the SVD.""" u, s, v = self.result(rank) out = (u @ np.diag(s), v) return out[0], out[1]
[docs] def recompose(self, rank=None): """Recomposes the signal by computing the matrix product USV.""" u, s, v = self.result(rank) return u @ np.diag(s) @ v
[docs] def get_distance(self, other, rank=None): """Computes the distance from vector dot product with another SVD. For the matrices Ua and Va of this SVD, the dot products with the corresponding singular vectors in matrices Ub and Vb of `other` are computed. Then averages are computed for dot products obtained from U and V singular vectors separately. Parameters ---------- other : :py:class:`SVD` Another instance of SVD. rank : int, optional The maximum rank of singular vector to be used. (default, None, all are used) Returns ------- dU : float The average inner product value for the U singular vectors. dV : float The average inner product value for the V singular vectors. """ rank = self._get_rank_slice(rank) dU = np.sum(self._result[0] * other._result[0], 0) dV = np.sum(self._result[2] * other._result[2], 1) dU = np.mean(dU[rank]) dV = np.mean(dV[rank]) return dU, dV
[docs] def fit(self, f, xdata, vectors="v", rank=None, **kwargs): """Fit the selected vectors up to given rank. The `scipy.optimize.curve_fit` function is used with the provided function. This can be used to exctract a time evolution of the sample during an experiment. Parameters ---------- f : callable A fit function to be used (typically a decaying exponential). xdata : array-like An array containing the values for the x-axis (typically either q-values or experimental time). vectors : {'u', 'v'}, optional Which vectors to use. Should be either 'u' for left-singular vectors or 'v' for right-singular vectors. (default, 'v') rank : int, optional The maximum rank, or number of vectors, to fit. (default, None, all are used) kwargs : dict-like Additional arguments to be passed to the `scipy.optimize.curve_fit` method. Returns ------- result : list of tuples (popt, perr) A list of outputs from `scipy.optimize.curve_fit` as tuples of the form (popt, perr) for each vector in vectors. The covariance matrix is directly converted to errors using `perr = np.sqrt(np.diag(pcov))`. """ u, _, v = self.result(rank) if vectors not in ["u", "v"]: raise ValueError("Argument 'vectors' should be 'u' or 'v'.") vectors = u.T if vectors == "u" else v result = [ curve_fit(f, xdata, vector, **kwargs) for vector in vectors ] return result
[docs] def maximize_autocorrelations(self, rank=(2, 10), side="left"): """Rotation of the SVD left or right matrix to maximize autocorrelation. Parameters ---------- rank : {int, list, slice}, optional Selection of singular vectors to include in the rotation procedure. If an integer, it will select all vectors up to rank. If a 2-tuple, it will select vectors in the range defined by the tuple of the form (min, max). If a list or slice, it will select the ranks given. (default, (2, 10)) side : {'left', 'right'}, optional Which singular vectors to use, should be either "left" or "right". (default, "left") """ u, s, v = self.result() rank = self._get_rank_slice(rank) vectors = u[:, rank] if side == "left" else v[rank].T autocorr = [] for vec in vectors.T: autocorr.append([ np.sum(vec[1:] * other_vec[:-1]) for other_vec in vectors.T ]) corr = np.array(autocorr).T sym_corr = (corr + corr.T) / 2 _, eigvec = np.linalg.eigh(sym_corr) eigvec = eigvec[:, ::-1] u[:, rank] = u[:, rank] @ eigvec v[rank] = (v[rank].T @ eigvec).T self._result = (u, s, v) self._compute_autocorr() return self
def _compute_autocorr(self): """Compute the autocorrelation of basis patterns and amplitude.""" u, _, v = self.result() u_autocorr = np.array([ np.sum(val[1:] * val[:-1]) for val in u.T ]) v_autocorr = np.array([ np.sum(val[1:] * val[:-1]) for val in v ]) self._autocorr = (u_autocorr, v_autocorr) def _get_rank_slice(self, rank): """Helper function to process the rank argument""" if isinstance(rank, int): return slice(0, rank, 1) if isinstance(rank, (np.ndarray, list, set, slice)): return rank if isinstance(rank, tuple): return slice(rank[0], rank[1], 1) if rank is None: return slice(0, self._result[1].shape[0], 1) else: raise ValueError("Requested rank has invalid value.")
[docs] def reconstruct_corrected_signal( data, threshold=0.6, max_rank_rescale=1, maximize_autocorrelations=None, exclude=None, ): """Use SVD to apply corrections on data and recontruct them. Parameters ---------- data : dict-like or list of dict A data set or a list of data set obtained from txs `datared` routine. threshold : float in [0, 1], optional Value of autocorrelation below which singular vectors are discarded. After a first crossing of the threshold, all subsequent vectors are discarded, whatever the value of their autocorrelation. (default, 0.6) max_rank_rescale : int, optional The maximum rank of singular vectors for which a rescaling will be done. The time evolution of the right-singular vectors is fitted. The fitted model is then normalized and the corresponding patterns are divided by the normalized model. (default, 1) maximize_autocorrelation : 2-tuple, optional If not None, a 2-tuple giving the range of singular vector rank to be rotated such that the autocorrelation is maximized in increaing order. (default, None) exclude : list of int If `data` is a list, `exclude` should contains the indices of data that should not be corrected for the kinetics. Returns ------- corr_data : dict or list of dict The corrected data in the same format as the input `data`. """ if not isinstance(data, list): data = [data] corr_data = [deepcopy(val) for val in data] if exclude is None: exclude = [] for s_idx, sample in enumerate(corr_data): if s_idx not in exclude: for t_idx, delay in enumerate(sample['t']): tdelta = get_time_delta(sample, t_idx) svd = SVD(sample['diffs'][t_idx]).run() if maximize_autocorrelations is not None: svd = svd.maximize_autocorrelations( maximize_autocorrelations ) autocorr = svd.autocorr() keep = np.arange(autocorr[0].size)[autocorr[0] > threshold] print( f"Keeping {keep.size} of {len(autocorr[0])} patterns " f"at time delay {delay}." ) u, s, v = svd.result(keep) if max_rank_rescale > 0: f = lambda t, a, b, tau: a * np.exp(-t / tau) + b fit_res = svd.fit( f, tdelta, "v", rank=max_rank_rescale, p0=[1, 0, 30], maxfev=10000, ) print( f"Decay v0 at time delay {delay} -> " f"{fit_res[0][0][-1]:.1f}." ) for idx, res in enumerate(fit_res): norm = f(tdelta, *res[0]) v[idx] /= (norm / norm[0]) new_diffs = u @ np.diag(s) @ v corr_data[s_idx]['diffs'][t_idx] = new_diffs corr_data[s_idx]['diff_av'][:, t_idx] = np.median(new_diffs, 1) corr_data[s_idx]['diff_err'][:, t_idx] = ( np.nanstd(new_diffs, 1) / np.sqrt(new_diffs.shape[1]) ) return corr_data