Source code for nexgen.command_line.cli_utils

Writer for NeXus format files.

from __future__ import annotations

import logging
from pathlib import Path
from typing import Any, Dict, List, Tuple

import numpy as np
from freephil.common import scope_extract as ScopeExtract

from ..nxs_utils import (
from ..nxs_utils.detector import EIGER_CONST, UnknownDetectorTypeError
from ..nxs_write.nxmx_writer import EventNXmxFileWriter, NXmxFileWriter
from ..utils import coerce_to_path, coord2mcstas, imgcif2mcstas

logger = logging.getLogger("nexgen.cli_utils")

def add_tristan_spec(detector: ScopeExtract, tristanSpec: ScopeExtract):
    Add metadata specific to LATRD Tristan to detector scope.

        detector (scope_extract):      Scope defining the detector
        tristanSpec (scope_extract):   Scope defining Tristan specific input
    for k, v in tristanSpec.__dict__.items():
        if "__phil" in k:
        detector.__inject__(k, v)

def phil2dict(D: Dict):
    l: List = [k for k in D.keys() if "__phil_" in k]
    for i in l:
    return D

def split_arrays(axes_names: List, array: List) -> Dict[str, Tuple]:
    """Split a list of values into arrays.

    This function splits up the list of values passed as input (eg. phil parameters, dictionary) \
    for vector, offset for all existing axes.

        axes_names (List): Axes names.
        array (List): Array of values to be split up. It must be

        ValueError: When each axes doesn't have a corresponding array of size 3.

        array_dict (Dict[str, Tuple]): Dictionary of arrays corresponding to each axis. Keys are axes names.
    array_dict = {}
    if len(axes_names) == len(array):
        array_dict = {ax: tuple(v) for ax, v in zip(axes_names, array)}
        return array_dict
    elif len(array) == 3 * len(axes_names):
        for j in range(len(axes_names)):
            a = array[3 * j : 3 * j + 3]
            array_dict[axes_names[j]] = tuple(a)
        return array_dict
        error_msg = (
            f"Number of axes {len(axes_names)} doesn't match the lenght of the array list {len(array)}."
            "Please check again and make sure that all axes have a matching array of size 3."
        raise ValueError(error_msg)

def reframe_arrays(
    goniometer: Dict[str, Any],
    detector: Dict[str, Any],
    module: Dict[str, Any],
    coordinate_frame: str = "mcstas",
    new_coord_system: Dict[str, Any] = None,
    Split a list of offset/vector values into arrays. If the coordinate frame is not mcstas, \
    convert the arrays using the base vectors of the new coordinate system.

        goniometer (Dict[str, Any]): Goniometer geometry description.
        detector (Dict[str, Any]): Detector specific parameters and its axes.
        module (Dict[str, Any]): Geometry and description of detector module.
        coordinate_frame (str, optional): Coordinate system being used. If "imgcif", there's no need to pass a \
            new coordinate system definition, as the conversion is already included in nexgen. Defaults to "mcstas".
        new_coord_system (Dict[str, Any], optional): Definition of the current coordinate system. \
            It should at least contain a string defining the convention, origin and axes information as a tuple of (depends_on, type, units, vector). \
            e.g. for X axis: {"x": (".", "translation", "mm", [1,0,0])}. \
            Defaults to None.

        ValueError: When the input coordinate system name and the coordinate system convention for the vectors doesn't match.
    # If the arrays of vectors/offsets are not yet split, start by doing that
    goniometer["vectors"] = list(
        split_arrays(goniometer["axes"], goniometer["vectors"]).values()
    goniometer["offsets"] = list(
        split_arrays(goniometer["axes"], goniometer["offsets"]).values()

    detector["vectors"] = list(
        split_arrays(detector["axes"], detector["vectors"]).values()

    if "offsets" in module.keys():
        module["offsets"] = list(
            split_arrays(["fast_axis", "slow_axis"], module["offsets"]).values()

    # Now proceed with conversion if needed
    if coordinate_frame.lower() != "mcstas":
        if coordinate_frame.lower() == "imgcif":
            # Goniometer
            goniometer["vectors"] = [imgcif2mcstas(v) for v in goniometer["vectors"]]
            goniometer["offsets"] = [imgcif2mcstas(v) for v in goniometer["offsets"]]

            # Detector
            detector["vectors"] = [imgcif2mcstas(v) for v in detector["vectors"]]

            # Module
            module["fast_axis"] = imgcif2mcstas(module["fast_axis"])
            module["slow_axis"] = imgcif2mcstas(module["slow_axis"])
            if "offsets" in module.keys():
                module["offsets"] = [imgcif2mcstas(off) for off in module["offsets"]]
            if coordinate_frame != new_coord_system["convention"]:
                error_msg = (
                    "The input coordinate frame value doesn't match the current cordinate system convention."
                    "Impossible to convert to mcstas."
                raise ValueError(error_msg)
            mat = np.array(

            # Goniometer
            goniometer["vectors"] = [
                coord2mcstas(v, mat) for v in goniometer["vectors"]
            goniometer["offsets"] = [
                coord2mcstas(v, mat) for v in goniometer["offsets"]

            # Detector
            detector["vectors"] = [coord2mcstas(v, mat) for v in detector["vectors"]]

            # Module
            module["fast_axis"] = coord2mcstas(module["fast_axis"], mat)
            module["slow_axis"] = coord2mcstas(module["slow_axis"], mat)
            if "offsets" in module.keys():
                module["offsets"] = [
                    coord2mcstas(off, mat) for off in module["offsets"]

def _update_detector_constants(det_params: DetectorType, params: Dict):
    for k in list(det_params.constants.keys()):
        if k in list(params.keys()):
            det_params[k] = params[k]

def set_detector_params(
    det_name: str, use_meta: bool = False, **params
) -> DetectorType:
    """Set up the detector parameters from parsed input

        det_name (str): Detector name.
        use_meta (bool, optional): Whether a meta file is present. Mostly important for Eiger links.

    Keyword Args:
        Required parameters to set up detector.
        For Eiger: {"description", "image_size", "sensor_material", "overload", "underload"}
        For Tristan: {"description", "image_size"}
        Plus any additional arguments fo overwrite constants.

        ValueError: When missing required tristan kwargs.
        ValueError: When missing required eiger kwargs.
        UnknownDetectorTypeError: Id a detector drifferent from tristan or eiger has been requested.

        DetectorType: Parameters which define the detector in use.
    if "tristan" in det_name.lower():
        required_params = {"description", "image_size"}
        if not required_params.issubset(params.keys()):
            logger.error("Missing detector parameters information for Tristan.")
            raise ValueError("Missing tristan information.")
        det_params = TristanDetector(params["description"], params["image_size"])
        if len(params.keys()) > 2:
            _update_detector_constants(det_params, params)
    elif "eiger" in det_name.lower():
        required_params = {
        if not required_params.issubset(params.keys()):
            logger.error("Missing detector parameters information for Eiger.")
            raise ValueError("Missing eiger information.")
        det_params = EigerDetector(
        if use_meta is False and len(params.keys()) > 5:
            _update_detector_constants(det_params, params)
        if use_meta is False and len(params.keys()) == 5:
            for k in list(det_params.constants.keys()):
                det_params.constants[k] = None
        error_msg = "This CLI functionality is currently only available for Eiger and Tristan detectors."
        raise UnknownDetectorTypeError(error_msg)
    return det_params

# Write NeXus base classes
[docs] def call_writers( nxsfile: Path | str, datafiles: List[Path | str], coordinate_frame: str, data_type: Tuple[str, int], goniometer: Dict[str, Any], detector: Dict[str, Any], module: Dict[str, Any], source: Dict[str, Any], beam: Dict[str, Any], attenuator: Dict[str, Any], metafile: bool = False, timestamps: Tuple[str, str] = None, notes: Dict[str, Any] = None, ): """ Call the writers for the NeXus base classes. Args: nxsfile (Path | str): NeXus file to be written. datafiles (List[Path | str]): List of at least 1 Path object to a HDF5 data file. coordinate_frame (str): Coordinate system being used. Accepted frames are imgcif and mcstas. data_type (Tuple[str, int]): Images or event-mode data, and eventually how many are being written. goniometer (Dict[str, Any] Goniometer geometry description. detector (Dict[str, Any]): Detector specific parameters and its axes. module (Dict[str, Any]): Geometry and description of detector module. source (Dict[str, Any]): Facility information. beam (Dict[str, Any]): Beam properties. attenuator (Dict[str, Any]): Attenuator properties. metafile (bool, optional): Whether a metafile is present. Defaults to False. timestamps (Tuple[str], optional): Start and end collection timestamps in ISO format. Defaults to None. notes (Dict, optional): Any additional information to write as NXnote. Defaults to None. """"Calling the writer ...") nxsfile = coerce_to_path(nxsfile) # Split vectors and offsets in goniometer and detector for writing reframe_arrays( goniometer, detector, module, coordinate_frame, ) # Check that filenames are paths datafiles = [coerce_to_path(f) for f in datafiles] # Set up Source, Beam, Attenuator attenuator_new = Attenuator(attenuator["transmission"]) beam_new = Beam(beam["wavelength"], beam["flux"]) facility = Facility( source["name"], source["short_name"], source["type"], source["facility_id"] ) source_new = Source(source["beamline_name"], facility, source["probe"]) # Other params that might have been passed if "eiger" in detector["description"].lower(): contants = { k: detector[k] for k in list(EIGER_CONST.keys()) if k in list(detector.keys()) } else: contants = {} # Set up detector detector_params = set_detector_params( detector["description"], metafile, image_size=detector["image_size"], sensor_material=detector["sensor_material"], overload=detector["overload"], underload=["underload"], **contants, ) det_axes = [] for idx, ax in enumerate(detector["axes"]): transf = ( TransformationType.TRANSLATION if "translation" == detector["types"][idx] else TransformationType.ROTATION ) a = Axis( ax, detector["depends"][idx], transf, detector["vectors"][idx], start_pos=detector["starts"][idx], ) det_axes.append(a) det = Detector( detector_params, det_axes, detector["beam_center"], detector["exposure_time"], [module["fast_axis"], module["slow_axis"]], ) # Set up gonio gonio_axes = [] for idx, ax in enumerate(goniometer["axes"]): transf = ( TransformationType.TRANSLATION if "translation" == goniometer["types"][idx] else TransformationType.ROTATION ) num = 0 if goniometer["increments"][idx] == 0 else data_type[1] a = Axis( ax, goniometer["depends"][idx], transf, goniometer["vectors"][idx], start_pos=goniometer["starts"][idx], increment=goniometer["increments"][idx], num_steps=num if data_type[0] == "images" else 0, ) gonio_axes.append(a) # Scan can be identified by goniometer functionality gonio = Goniometer(gonio_axes) # Aaaand write if data_type[0] == "images": writer = NXmxFileWriter( nxsfile, gonio, det, source_new, beam_new, attenuator_new, data_type[1], ) writer.write(image_datafiles=datafiles, start_time=timestamps[0]) writer.write_vds() else: writer = EventNXmxFileWriter( nxsfile, gonio, det, source_new, beam_new, attenuator_new ) writer.write(start_time=timestamps[0]) if notes: writer.add_NXnote(notes, "/entry/notes") if timestamps[1]: writer.update_timestamps(timestamps[1])