Source code for dynsight._internal.analysis.time_correlations

"""Functions for computing correlation functions between particles."""

from __future__ import annotations

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from numpy.typing import NDArray

import numpy as np


[docs] def self_time_correlation( data: NDArray[np.float64], max_delay: int | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Computes the mean self time correlation function for time-series. Takes as input an array of shape (n_particles, n_frames), where the element [i][t] is the value of some quantity for particle i at time t. For each particle, computes the self time correlation function (self-TCF). Returns the self-TCF averaged over all the particles, normalized so that the value at t = 0 is equal to 1. Parameters: data: Array of shape (n_particles, n_frames). max_delay: The TCF will be computed for all the delay values from zero to max_delay - 1 frames. If None, il will be set to the maximum possible delay. Default is None. Returns: tuple[np.ndarray, np.ndarray]: * The values of the TCF for all delays from zero to max_delay - 1. * The stndard error on the TCF for all delays from zero to max_delay - 1. Example: .. testcode:: tcf-test import numpy as np from dynsight.analysis import self_time_correlation # Create random input np.random.seed(1234) n_particles = 100 n_frames = 100 data = np.random.rand(n_particles, n_frames) time_corr, _ = self_time_correlation(data) .. testcode:: tcf-test :hide: assert np.isclose(time_corr[1], 0.005519088806189553) """ n_part, n_frames = data.shape data2 = data.copy() # Subtract the mean for each particle data2 = data2 - np.mean(data2, axis=1, keepdims=True) if max_delay is None: max_delay = n_frames correlation = np.zeros(max_delay) correlation_error = np.zeros(max_delay) for t_prime in range(max_delay): # Compute correlation for time lag t_prime valid_t = n_frames - t_prime corr_sum = [ np.dot( tmp[:valid_t], tmp[t_prime : valid_t + t_prime], ) for tmp in data2 ] correlation[t_prime] = np.mean(corr_sum) / valid_t correlation_error[t_prime] = np.std(corr_sum) / ( valid_t * np.sqrt(n_part) ) # Normalize the correlation function norm_fact = correlation[0] correlation /= norm_fact correlation_error /= norm_fact return correlation, correlation_error
[docs] def cross_time_correlation( data: NDArray[np.float64], max_delay: int | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Computes the mean cross time correlation function for time-series. Takes as input an array of shape (n_particles, n_frames), where the element [i][t] is the value of some quantity for particle i at time t. For each particles pair, computes the cross time correlation function (cross-TCF). Returns the cross-TCF averaged over all the particles pairs. Parameters: data: Array of shape (n_particles, n_frames). max_delay: The TCF will be computed for all the delay values from zero to max_delay - 1 frames. If None, il will be set to the maximum possible delay. Default is None. Returns: tuple[np.ndarray, np.ndarray]: * The values of the TCF for all delays from zero to max_delay - 1. * The stndard error on the TCF for all delays from zero to max_delay - 1. Example: .. testcode:: tcf-test import numpy as np from dynsight.analysis import cross_time_correlation # Create random input np.random.seed(1234) n_particles = 100 n_frames = 100 data = np.random.rand(n_particles, n_frames) time_corr, _ = cross_time_correlation(data) .. testcode:: tcf-test :hide: assert np.isclose(time_corr[0], 0.0002474572311281272) """ n_part, n_frames = data.shape # Subtract the mean for each particle data = data - np.mean(data, axis=1, keepdims=True) if max_delay is None: max_delay = n_frames # Initialize the cross-correlation array cross_correlation = np.zeros(max_delay) correlation_error = np.zeros(max_delay) # Loop over all time lags t' for t_prime in range(max_delay): valid_t = n_frames - t_prime corr_sum: list[float] = [] # Compute cross-correlation for each pair of particles (i, j) for i in range(n_part): corr_sum.extend( np.dot( data[i, :valid_t], data[j, t_prime : valid_t + t_prime], ) for j in range(n_part) if i != j # Skip self-correlation ) # Average over particle pairs (i, j) cross_correlation[t_prime] = np.mean(corr_sum) / valid_t correlation_error[t_prime] = np.std(corr_sum) / ( valid_t * np.sqrt(n_part * (n_part - 1)) ) return cross_correlation, correlation_error