Source code for dynsight._internal.trajectory.insight

from __future__ import annotations

import json
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Literal

import numpy as np

if TYPE_CHECKING:
    from pathlib import Path

    from numpy.typing import NDArray

    from dynsight.trajectory import Trj

import dynsight
from dynsight.logs import logger
from dynsight.trajectory import OnionInsight, OnionSmoothInsight

UNIVAR_DIM = 2


[docs] @dataclass(frozen=True) class Insight: """Contains an analysis perfomed on a trajectory. Attributes: dataset: The values of a some trajectory's descriptor. meta: A dictionary containing the relevant parameters. """ dataset: NDArray[np.float64] meta: dict[str, Any] = field(default_factory=dict) def __post_init__(self) -> None: if logger.auto_recording: logger.record_data(self)
[docs] def dump_to_json(self, file_path: Path) -> None: """Save the Insight to a JSON file and .npy file.""" # Save dataset as .npy npy_path = file_path.with_suffix(".npy") np.save(npy_path, self.dataset.astype(np.float64)) # Prepare JSON data json_data = { "dataset_file": npy_path.name, "meta": self.meta, } with file_path.open("w") as file: json.dump(json_data, file, indent=4) logger.log(f"Insight saved to {file_path} and dataset to {npy_path}.")
[docs] @classmethod def load_from_json( cls, file_path: Path, mmap_mode: Literal["r", "r+", "w+", "c"] | None = None, ) -> Insight: """Load the Insight object from .json file. Parameters: file_path: Path to the .json file. mmap_mode: If given, used as np.load(..., mmap_mode=mmap_mode) for memory mapping. Raises: ValueError: if required keys are missing. """ with file_path.open("r") as file: data = json.load(file) dataset_file = data.get("dataset_file") if not dataset_file: msg = "'dataset_file' key not found in JSON file." logger.log(msg) raise ValueError(msg) dataset_path = file_path.with_name(dataset_file) dataset = np.load(dataset_path, mmap_mode=mmap_mode) logger.log( f"Insight loaded from {file_path}, dataset from {dataset_path}." ) return cls( dataset=dataset, meta=data.get("meta", {}), )
[docs] def spatial_average( self, trj: Trj, r_cut: float, selection: str = "all", n_jobs: int = 1, ) -> Insight: """Average the descriptor over the neighboring particles. The returned Insight contains the following meta: sp_av_r_cut, selection. """ averaged_dataset = dynsight.analysis.spatialaverage( universe=trj.universe, descriptor_array=self.dataset, selection=selection, r_cut=r_cut, trajslice=trj.trajslice, n_jobs=n_jobs, ) attr_dict = {"sp_av_r_cut": r_cut, "selection": selection} logger.log(f"Computed spatial average with args {attr_dict}.") return Insight( dataset=averaged_dataset, meta=attr_dict, )
[docs] def get_time_correlation( self, max_delay: int | None = None, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Self time correlation function of the time-series signal.""" attr_dict = {"max_delay": max_delay} logger.log( f"Computed time corrleation function with args {attr_dict}." ) return dynsight.analysis.self_time_correlation( self.dataset, max_delay, )
[docs] def get_angular_velocity(self, delay: int = 1) -> Insight: """Computes the angular displacement of a vectorial descriptor. Raises: ValueError if the dataset does not have the right dimensions. """ if self.dataset.ndim != UNIVAR_DIM + 1: msg = "dataset.ndim != 3." logger.log(msg) raise ValueError(msg) theta = dynsight.soap.timesoap(self.dataset, delay=delay) attr_dict = self.meta.copy() attr_dict.update({"delay": delay}) if self.meta.get("name") == "soap": attr_dict.update({"name": "timesoap"}) else: attr_dict.update({"name": "angular_velocity"}) logger.log(f"Computed angular velocity with args {attr_dict}.") return Insight(dataset=theta, meta=attr_dict)
[docs] def get_tica( self, lag_time: int, tica_dim: int, ) -> tuple[NDArray[np.float64], NDArray[np.float64], Insight]: """Perform tICA on trajectories from a many-body system. The attributes "lag_time" and "tica_dim" are added to the meta. Raises: ValueError if the dataset does not have the right dimensions. """ if self.dataset.ndim != UNIVAR_DIM + 1: msg = "dataset.ndim != 3." logger.log(msg) raise ValueError(msg) relax_times, coeffs, tica = dynsight.descriptors.many_body_tica( self.dataset, lag_time=lag_time, tica_dim=tica_dim, ) attr_dict = self.meta.copy() attr_dict.update({"lag_time": lag_time, "tica_dim": tica_dim}) logger.log(f"Computed many-body tICA with args {attr_dict}.") return relax_times, coeffs, Insight(dataset=tica, meta=attr_dict)
[docs] def get_onion( self, delta_t: int, bins: str | int = "auto", number_of_sigmas: float = 2.0, ) -> OnionInsight: """Perform onion clustering. The returned OnionInsight contains the following meta: delta_t, bins, number_of_sigma. """ if self.dataset.ndim == UNIVAR_DIM: reshaped_data = dynsight.onion.helpers.reshape_from_nt( self.dataset, delta_t ) onion_clust = dynsight.onion.OnionUni( bins=bins, number_of_sigmas=number_of_sigmas, ) else: reshaped_data = dynsight.onion.helpers.reshape_from_dnt( self.dataset.transpose(2, 0, 1), delta_t ) onion_clust = dynsight.onion.OnionMulti( ndims=self.dataset.ndim - 1, bins=bins, number_of_sigmas=number_of_sigmas, ) onion_clust.fit(reshaped_data) attr_dict = { "delta_t": delta_t, "bins": bins, "number_of_sigmas": number_of_sigmas, } logger.log(f"Performed onion clustering with args {attr_dict}.") return OnionInsight( labels=onion_clust.labels_, state_list=onion_clust.state_list_, reshaped_data=reshaped_data, meta=attr_dict, )
[docs] def get_onion_smooth( self, delta_t: int, bins: str | int = "auto", number_of_sigmas: float = 3.0, max_area_overlap: float = 0.8, ) -> OnionSmoothInsight: """Perform smooth onion clustering. The returned OnionInsight contains the following meta: delta_t, bins, number_of_sigma, max_area_overlap. """ if self.dataset.ndim == UNIVAR_DIM: onion_clust = dynsight.onion.OnionUniSmooth( delta_t=delta_t, bins=bins, number_of_sigmas=number_of_sigmas, max_area_overlap=max_area_overlap, ) else: onion_clust = dynsight.onion.OnionMultiSmooth( delta_t=delta_t, bins=bins, number_of_sigmas=number_of_sigmas, ) onion_clust.fit(self.dataset) attr_dict = { "delta_t": delta_t, "bins": bins, "number_of_sigmas": number_of_sigmas, "max_area_overlap": max_area_overlap, } logger.log(f"Performed onion clustering smooth with args {attr_dict}.") return OnionSmoothInsight( labels=onion_clust.labels_, state_list=onion_clust.state_list_, meta=attr_dict, )
[docs] def get_onion_analysis( self, delta_t_min: int = 1, delta_t_max: int | None = None, delta_t_num: int = 20, fig1_path: Path | None = None, fig2_path: Path | None = None, bins: str | int = "auto", number_of_sigmas: float = 3.0, max_area_overlap: float = 0.8, ) -> tuple[ NDArray[np.float64], NDArray[np.int64], NDArray[np.float64], NDArray[np.int64], ]: """Perform the full onion time resolution analysis. Note: this method uses the "onion smooth" functions (see documentation for details). Parameters: delta_t_min: Smaller value for delta_t_list. delta_t_max: Larger value for delta_t_list. delta_t_num: Number of values in delta_t_list. fig1_path: If is not None, the time resolution analysis plot is saved in this location. fig2_path: If is not None, the populations fractions plot is saved in this location. bins: The 'bins' parameter for onion clustering. number_of_sigmas: The 'number_of_sigmas' parameter for onion clustering. max_area_overlap: The 'max_area_overlap' parameter for onion clustering. Returns: tuple: * delta_t_list: The list of ∆t used. * n_clust: The number of clusters at each ∆t. * unclass_frac: The fraction of unclassified data at each ∆t. * list_of_labels: Labels at each ∆t. """ if delta_t_max is None: delta_t_max = self.dataset.shape[1] delta_t_list = np.unique( np.geomspace(delta_t_min, delta_t_max, delta_t_num, dtype=int) ) n_clust = np.zeros(delta_t_list.size, dtype=int) unclass_frac = np.zeros(delta_t_list.size) list_of_labels = np.zeros( (self.dataset.shape[0], self.dataset.shape[1], delta_t_list.size), dtype=np.int64, ) list_of_pop = [] for i, delta_t in enumerate(delta_t_list): on_cl = self.get_onion_smooth( delta_t, bins, number_of_sigmas, max_area_overlap, ) list_of_labels[:, :, i] = on_cl.labels n_clust[i] = len(on_cl.state_list) unclass_frac[i] = np.sum(on_cl.labels == -1) / self.dataset.size list_of_pop.append( [ np.sum(on_cl.labels == i) / self.dataset.size for i in np.unique(on_cl.labels) ] ) tra = np.array([delta_t_list, n_clust, unclass_frac]).T if fig1_path is not None: dynsight.onion.plot_smooth.plot_time_res_analysis(fig1_path, tra) if fig2_path is not None: dynsight.onion.plot_smooth.plot_pop_fractions( fig2_path, list_of_pop, tra ) attr_dict = { "delta_t_min": delta_t_min, "delta_t_max": delta_t_max, "delta_t_num": delta_t_num, "fig1_path": fig1_path, "fig2_path": fig2_path, "bins": bins, "number_of_sigmas": number_of_sigmas, "max_area_overlap": max_area_overlap, } logger.log( f"Performed full onion clustering analysis with args {attr_dict}." ) return delta_t_list, n_clust, unclass_frac, list_of_labels