Source code for dynsight._internal.vision.vision

"""dynsight.vision module for particle detection from media files."""

from __future__ import annotations

import logging
from pathlib import Path
from typing import TYPE_CHECKING, Callable

import numpy as np
import torch
import yaml
from PIL import Image
from ultralytics import YOLO

if TYPE_CHECKING:
    from ultralytics.engine.results import Results
    from ultralytics.utils.metrics import DetMetrics

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)

logger = logging.getLogger(__name__)

# Defaults hyperparameters dictionary.
default_hyperparams = {
    "lr0": 0.01,
    "lrf": 0.01,
    "momentum": 0.937,
    "weight_decay": 0.0005,
    "warmup_epochs": 3.0,
    "warmup_momentum": 0.8,
    "box": 7.5,
    "cls": 0.5,
    "dfl": 1.5,
    "hsv_h": 0.015,
    "hsv_s": 0.7,
    "hsv_v": 0.4,
    "degrees": 0.0,
    "translate": 0.1,
    "scale": 0.5,
    "shear": 0.0,
    "perspective": 0.0,
    "flipud": 0.0,
    "fliplr": 0.5,
    "bgr": 0.0,
    "mosaic": 1,
    "mixup": 0.0,
    "cutmix": 0.0,
    "copy_paste": 0.0,
}


[docs] class VisionInstance: def __init__( self, source: str | Path, output_path: Path, model: str | Path = "yolo12n.pt", device: str | None = None, workers: int = 8, ) -> None: """Class for performing computer vision tasks using YOLO models. This class supports object detection, Convolutional Neural Network (CNN) training and fine-tuning, as well as the creation and management of training datasets. .. caution:: This class is still under development and may not function as intended. Parameters: source: The source of the images or videos to be processed. For the list of the possible sources, we refer the user to the following `sources table <https://docs.ultralytics.com/modes/predict/#inference-sources>`_. For the list of the supported formats see this `formats table <https://docs.ultralytics.com/modes/predict/#images>`_. output_path: The path to save the output folder. model: The path to the YOLO model file. Defaults to "yolo12n.pt". See `here <https://docs.ultralytics.com/models/yolo12/>`_ for more information. device: Allows users to select between cpu, a specific gpu ID or "mps" for MacOS users to perform the calculation ("cuda:0" or "0" for GPUs, "cpu" or "mps" for MacOS). workers: Number of worker threads for data loading. Influences the speed of data preprocessing and feeding into the model, especially useful in multi-GPU setups. (only for training sessions). """ self.output_path = Path(output_path) self.training_data_yaml: Path | None = None self.model = YOLO(model) self.source = source self.device = self._normalize_device_string(device) self.workers = workers self.prediction_results: list[Results] | None = None self.training_results: DetMetrics | None = None self._check_device()
[docs] def set_training_dataset(self, training_data_yaml: Path) -> None: """Set the training dataset for the model training. Training dataset are setted through a ``yaml`` file that should have the following structure: .. code-block:: yaml path: path/to/dataset/folder train: path/to/train/images val: path/to/val/images nc: number_of_classes names: [class1, class2, ...] With a dataset folder structure like this: .. code-block:: none dataset/ ├── images/ │ ├── train/ │ │ ├── 1.jpg │ │ ├── 2.jpg │ │ └── ... │ └── val/ │ ├── 5.jpg │ ├── 6.jpg │ └── ... └── labels/ ├── train/ │ ├── 1.txt │ ├── 2.txt │ └── ... └── val/ ├── 5.txt ├── 6.txt └── ... Parameters: training_data_yaml: Path to the training data YAML file. """ self.training_data_yaml = training_data_yaml
[docs] def predict( self, prediction_title: str, augment: bool = False, agnostic_nms: bool = False, show_labels: bool = False, class_filter: list[int] | None = None, confidence: float = 0.25, iou: float = 0.7, imgsz: int | tuple[int, int] = 640, max_det: int = 500, ) -> None: """Detect objects within the source. Parameters: prediction_title: The name of the prediction session. augment: Enables test-time augmentation (TTA) for predictions, potentially improving detection robustness at the cost of inference speed. agnostic_nms: Enables class-agnostic Non-Maximum Suppression (NMS), which merges overlapping boxes of different classes. Useful in multi-class detection scenarios where class overlap is common. show_labels: Show labels names in the detected source version. class_filter: Filters predictions to a set of class IDs. Only detections belonging to the specified classes will be returned. confidence: Sets the minimum confidence threshold for detections. Objects detected with confidence below this threshold will be disregarded. iou: Lower values result in fewer detections by eliminating overlapping boxes, useful for reducing duplicates. imgsz: Defines the image size for inference. Can be a single integer for square resizing or a tuple. Proper sizing can improve detection accuracy and processingspeed. max_det: The maximum number of detections for a single frame / image. """ self.prediction_results = self.model.predict( source=self.source, save=True, save_txt=False, save_conf=True, show_labels=show_labels, name=prediction_title, project=self.output_path, device=self.device, augment=augment, agnostic_nms=agnostic_nms, classes=class_filter, conf=confidence, iou=iou, imgsz=imgsz, max_det=max_det, )
[docs] def create_dataset_from_predictions( self, dataset_name: str, train_split: float = 0.8, load_dataset: bool = True, ) -> None: """Create a YOLO training dataset from ``predict`` results. Parameters: dataset_name: Name of the dataset that will be created. train_split: Fraction of images to be used as training set, the remaining fraction will be used for the validation set. load_dataset: Directly load the dataset for the next training sessions. """ if self.prediction_results is None: msg = "No prediction results available." raise ValueError(msg) dataset_path = self.output_path / dataset_name images_train = dataset_path / "images" / "train" images_val = dataset_path / "images" / "val" labels_train = dataset_path / "labels" / "train" labels_val = dataset_path / "labels" / "val" images_train.mkdir(parents=True, exist_ok=True) images_val.mkdir(parents=True, exist_ok=True) labels_train.mkdir(parents=True, exist_ok=True) labels_val.mkdir(parents=True, exist_ok=True) names = self.prediction_results[0].names sorted_results = sorted(self.prediction_results, key=lambda r: r.path) num_train = int(len(sorted_results) * train_split) video_exts = {".mp4", ".avi", ".mov", ".mkv", ".webm"} is_video = False if ( isinstance(self.source, (str, Path)) and Path(self.source).suffix.lower() in video_exts ): is_video = True for idx, result in enumerate(sorted_results): src = Path(result.path) subset = "train" if idx < num_train else "val" if is_video: frame_name = f"{src.stem}_{idx:06d}.jpg" img_dst = dataset_path / "images" / subset / frame_name lbl_dst = ( dataset_path / "labels" / subset / (Path(frame_name).stem + ".txt") ) img = Image.fromarray(result.orig_img[..., ::-1]) img.save(img_dst) else: img_dst = dataset_path / "images" / subset / src.name lbl_dst = ( dataset_path / "labels" / subset / (src.stem + ".txt") ) img_dst.write_bytes(src.read_bytes()) boxes = result.boxes if boxes is None: lbl_dst.write_text("") continue xywhn = boxes.xywhn classes = boxes.cls with lbl_dst.open("w") as f: for xywh, cls in zip(xywhn, classes): f.write( f"{int(cls)} {xywh[0]:.6f} {xywh[1]:.6f} " f"{xywh[2]:.6f} {xywh[3]:.6f}\n" ) dataset_yaml = dataset_path / "dataset.yaml" yaml_content = { "path": str(dataset_path.resolve()), "train": "images/train", "val": "images/val", "nc": len(names), "names": [names[i] for i in range(len(names))], } with dataset_yaml.open("w") as f: yaml.safe_dump(yaml_content, f) if load_dataset: self.training_data_yaml = dataset_yaml
[docs] def tune_hyperparams( self, iterations: int = 15, epochs: int = 50, imgsz: int | tuple[int, int] = 640, batch_size: int = 16, ) -> dict[str, float]: """Tune hyperparameters for the model. Optimize the CNN hyperparameters by leveraging the Ultralytics YOLO `genetic algorithm <https://docs.ultralytics.com/guides/hyperparameter-tuning/>`_. It returns a dictionary of the best hyperparameters, which can be directly used as input to the hyperparameters parameter in the train method. Parameters: iterations: The number of exploring iterations. The higher the number, the more accurate the results will be, increasing the computational cost. epochs: The number of epochs to perform for each iteration. Each epoch represents a full pass over the entire dataset. imgsz: Defines the image size for inference. Can be a single integer for square resizing or a tuple. Proper sizing can improve detection accuracy and processing speed. batch_size: Three modes available: set as an integer (batch=16), auto mode for 60% GPU memory utilization (batch=-1), or auto mode with specified utilization fraction (batch=0.70). """ if self.training_data_yaml is None: msg = "Training dataset has not been set." raise ValueError(msg) self.model.tune( data=self.training_data_yaml, epochs=epochs, iterations=iterations, project=self.output_path / "tuning", name="results", device=self.device, imgsz=imgsz, batch=batch_size, ) yaml_path = ( self.output_path / "tuning" / "results" / "best_hyperparameters.yaml" ) with yaml_path.open("r") as f: return yaml.safe_load(f)
[docs] def train( self, title: str, hyperparams: dict[str, float] | None = None, epochs: int = 100, batch_size: int = 16, patience: int = 20, imgsz: int | tuple[int, int] = 640, ) -> None: """Train a custom model using a training dataset. This function trains a custom model using a training dataset. The dataset should be set before calling this function with the ``set_training_data`` method. Parameters: title: The name of the resulting model. hyperparams: The dictionary that contains all the hyperparameters for the model training. The following default ``dict`` is used if not provided: .. code-block:: python # Defaults hyperparameters dictionary. default_hyperparams = { "lr0": 0.01, "lrf": 0.01, "momentum": 0.937, "weight_decay": 0.0005, "warmup_epochs": 3.0, "warmup_momentum": 0.8, "box": 7.5, "cls": 0.5, "dfl": 1.5, "hsv_h": 0.015, "hsv_s": 0.7, "hsv_v": 0.4, "degrees": 0.0, "translate": 0.1, "scale": 0.5, "shear": 0.0, "perspective": 0.0, "flipud": 0.0, "fliplr": 0.5, "bgr": 0.0, "mosaic": 1, "mixup": 0.0, "cutmix": 0.0, "copy_paste": 0.0 } Manually customize this ``dict`` to change the training performance or use the ``tune_hyperparams`` method to automatically optimize hyperparameters. epochs: Total number of training epochs. Each epoch represents a full pass over the entire dataset. batch_size: Three modes available: set as an integer (batch=16), auto mode for 60% GPU memory utilization (batch=-1), or auto mode with specified utilization fraction (batch=0.70). patience: Number of epochs to wait without improvement in validation metrics before early stopping the training. Helps to prevent overfitting. imgsz: Defines the image size for inference. Can be a single integer for square resizing or a tuple. Proper sizing can improve detection accuracy and processing speed. """ if self.training_data_yaml is None: msg = "Training dataset has not been set." raise ValueError(msg) full_params = default_hyperparams.copy() if hyperparams is not None: unknown_keys = set(hyperparams.keys()) - set(full_params.keys()) if unknown_keys: msg = f"Unknown hyperparameters: {unknown_keys}" raise ValueError(msg) for key in hyperparams: full_params[key] = hyperparams[key] self.training_results = self.model.train( data=self.training_data_yaml, epochs=epochs, imgsz=imgsz, batch=batch_size, workers=self.workers, name=title, project=self.output_path, patience=patience, device=self.device, **full_params, ) self.model = YOLO(self.output_path / title / "weights" / "best.pt")
[docs] def export_prediction_to_xyz( self, file_name: Path, class_filter: list[int] | None = None ) -> Path: """Export prediction results into a single ``.xyz`` file. Each frame of the resulting ``.xyz`` corresponds to one of the images/frames present in the source and used in the ``predict`` method. Parameters: file_name: File name for the ``.xyz`` file. class_filter: Limit exported detections to the specified class IDs. If ``None`` all detected objects will be exported. Returns: Path to the exported ``.xyz`` file. """ if self.prediction_results is None: msg = "No prediction results available." raise ValueError(msg) sorted_results = sorted(self.prediction_results, key=lambda r: r.path) file_path = self.output_path / file_name with file_path.open("w") as f: for result in sorted_results: boxes = result.boxes coords: list[str] = [] if boxes is not None: xyxy_raw = boxes.xyxy if isinstance(xyxy_raw, torch.Tensor): xyxy = xyxy_raw.cpu().numpy() else: xyxy = np.asarray(xyxy_raw) cls_raw = boxes.cls if isinstance(cls_raw, torch.Tensor): classes = cls_raw.cpu().numpy().astype(int) else: classes = np.asarray(cls_raw).astype(int) for (x1, y1, x2, y2), cls_id in zip(xyxy, classes): if ( class_filter is not None and cls_id not in class_filter ): continue cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 coords.append(f"{cls_id} {cx:.6f} {cy:.6f} 0.0") f.write(f"{len(coords)}\n") f.write("class x y z\n") for line in coords: f.write(f"{line}\n") return file_path
def _normalize_device_string(self, device: str | None) -> str: """Normalize device string to match Ultralytics expectations.""" if device is None: return "0" if torch.cuda.is_available() else "cpu" device = str(device).lower() if device in {"cpu", "mps", "cuda"}: return device # Allow "cuda:0" -> "0", "cuda:0,1" -> "0,1" if device.startswith("cuda:"): return device.replace("cuda:", "") # Allow "0", "0,1", etc. if all(part.strip().isdigit() for part in device.split(",")): return device msg = f"Unsupported device string: '{device}'" raise ValueError(msg) def _check_device(self) -> None: """Verify and validate the selected device for compatibility.""" self.device = self._normalize_device_string(self.device) def _device_error(msg: str) -> None: raise RuntimeError(msg) try: if self.device == "cpu": self._check_cpu_device() elif self.device == "mps": self._check_mps_device(_device_error) elif self.device == "cuda": self._check_single_cuda_device(_device_error) elif all( part.strip().isdigit() for part in self.device.split(",") ): self._check_multi_cuda_devices(_device_error) else: _device_error(f"Unsupported device string: '{self.device}'") except (ValueError, RuntimeError, IndexError, OSError) as e: _device_error(str(e)) def _check_cpu_device(self) -> None: logger.info("Using CPU.") def _check_mps_device(self, _device_error: Callable[[str], None]) -> None: if not ( hasattr(torch.backends, "mps") and torch.backends.mps.is_available() ): _device_error("MPS device requested but not available.") logger.info("Using Apple MPS backend.") def _check_single_cuda_device( self, _device_error: Callable[[str], None] ) -> None: if not torch.cuda.is_available(): _device_error("CUDA requested but not available.") name = torch.cuda.get_device_name(0) backend = "ROCm" if torch.version.hip else "CUDA" mem_free, mem_total = torch.cuda.mem_get_info(0) logger.info(f"Using GPU 0: {name} [{backend}]") logger.info( "Memory: %.1f MB free / %.1f MB total", mem_free / 1024**2, mem_total / 1024**2, ) _ = torch.tensor([0.0]).to("cuda:0") def _check_multi_cuda_devices( self, _device_error: Callable[[str], None] ) -> None: gpus = [int(d) for d in self.device.split(",")] for idx in gpus: if idx >= torch.cuda.device_count(): _device_error( f"Requested GPU index {idx}, but only " f"{torch.cuda.device_count()} available." ) name = torch.cuda.get_device_name(idx) mem_free, mem_total = torch.cuda.mem_get_info(idx) logger.info(f"Using GPU {idx}: {name}") logger.info( "Memory: %.1f MB free / %.1f MB total", mem_free / 1024**2, mem_total / 1024**2, ) _ = torch.tensor([0.0]).to(f"cuda:{gpus[0]}")