"""
A writer for NXmx format NeXus Files.
"""
from __future__ import annotations
import logging
import math
from datetime import datetime
from pathlib import Path
import h5py
import numpy as np
from numpy.typing import DTypeLike
from ..nxs_utils.detector import Detector
from ..nxs_utils.goniometer import Goniometer
from ..nxs_utils.sample import Sample
from ..nxs_utils.source import Attenuator, Beam, Source
from ..tools.vds_tools import (
clean_unused_links,
image_vds_writer,
jungfrau_vds_writer,
)
from ..utils import (
MAX_FRAMES_PER_DATASET,
MAX_SUFFIX_DIGITS,
get_filename_template,
)
from .nxclass_writers import (
write_NXdata,
write_NXdatetime,
write_NXdetector,
write_NXdetector_module,
write_NXentry,
write_NXinstrument,
write_NXnote,
write_NXsample,
write_NXsource,
)
from .write_utils import TSdset, calculate_estimated_end_time
# Logger
nxmx_logger = logging.getLogger("nexgen.NXmxFileWriter")
nxmx_logger.setLevel(logging.DEBUG)
[docs]
class NXmxFileWriter:
"""A class to generate NXmx format NeXus files."""
def __init__(
self,
filename: Path | str,
goniometer: Goniometer,
detector: Detector,
source: Source,
beam: Beam,
attenuator: Attenuator,
tot_num_imgs: int, # | None = None,
sample: Sample | None = None,
):
self.filename = Path(filename).expanduser().resolve()
self.goniometer = goniometer
self.detector = detector
self.source = source
self.beam = beam
self.attenuator = attenuator
self.tot_num_imgs = tot_num_imgs
self.sample = sample
def _get_meta_file(self, image_filename: str = None) -> Path | None:
"""Get filename_meta.h5 file in directory if it's supposed to exist."""
if self.detector.detector_params.hasMeta is False:
nxmx_logger.debug("No meta file for this collection.")
return None
if image_filename:
return self.filename.parent / f"{image_filename}_meta.h5"
else:
return self.filename.parent / f"{self.filename.stem}_meta.h5"
def _get_collection_time(self) -> float:
"""_Returns total collection time."""
return self.detector.exp_time * self.tot_num_imgs
def _get_data_files_list(
self,
image_filename: str | None = None,
) -> list[Path]:
"""Get list of datafiles.
Args:
image_filename (str | None, optional): Filename stem to use to look for image files. Needed in case it doesn't match \
the NeXus file name. Defaults to None.
Returns:
list[Path]: List of data files to link to.
"""
num_files = math.ceil(self.tot_num_imgs / MAX_FRAMES_PER_DATASET)
template = (
get_filename_template(self.filename)
if not image_filename
else (
self.filename.parent / f"{image_filename}_%0{MAX_SUFFIX_DIGITS}d.h5"
).as_posix()
)
datafiles = [Path(template % i) for i in range(1, num_files + 1)]
nxmx_logger.debug(f"Number of datafiles to be written: {len(datafiles)}.")
return datafiles
[docs]
def update_timestamps(
self, timestamp: datetime | str, dset_name: TSdset = "end_time"
):
"""Save timestamps for start and/or end collection if not written before.
Args:
timestamp (datetime | str): Timestamp, as datetime or str.
dset_name (TSdset, optional): Name of dataset to write to nexus file.\
Allowed values: ["start_time", "end_time", "end_time_estimated". Defaults to "end_time".
"""
with h5py.File(self.filename, "r+") as nxs:
write_NXdatetime(nxs, timestamp, dset_name)
nxmx_logger.info(f"{dset_name} timestamp for collection updated.")
[docs]
def add_NXnote(self, notes: dict, loc: str = "/entry/notes"):
"""Save any additional information as NXnote at the end of the collection.
Args:
notes (dict): Dictionary of (key, value) pairs where key represents the \
dataset name and value its data.
loc (str, optional): Location in the NeXus file to save metadata. \
Defaults to "/entry/notes".
"""
with h5py.File(self.filename, "r+") as nxs:
write_NXnote(nxs, loc, notes)
nxmx_logger.debug(f"Notes saved in {loc}.")
[docs]
def write(
self,
image_datafiles: list | None = None,
image_filename: str | None = None,
start_time: datetime | str | None = None,
est_end_time: datetime | str | None = None,
write_mode: str = "x",
add_non_standard: bool = True,
data_entry_key: str = "data",
):
"""Write the NXmx format NeXus file.
This function calls the writers for the main NXclass objects.
Args:
image_datafiles (list | None, optional): List of image data files. If not passed, \
the program will look for files with the stem_######.h5 in the target directory. \
Defaults to None.
image_filename (str | None, optional): Filename stem to use to look for image files. \
Needed in case it doesn't match the NeXus file name. Format: filename_runnumber. \
Defaults to None.
start_time (datetime | str, optional): Collection start time if available, in the \
format "%Y-%m-%dT%H:%M:%SZ".Defaults to None.
est_end_time (datetime | str, optional): Collection estimated end time if available, \
in the format "%Y-%m-%dT%H:%M:%SZ". Defaults to None.
write_mode (str, optional): String indicating writing mode for the output NeXus file. \
Accepts any valid h5py file opening mode. Defaults to "x".
add_non_standard (bool, optional): Flag if non-standard NXsample fields should be added \
for processing to work. Defaults to True, will change in the future.
data_entry_key (str, optional): Dataset entry key in datafiles. Defaults to data.
"""
metafile = self._get_meta_file(image_filename)
if metafile:
nxmx_logger.debug(f"Metafile name: {metafile.as_posix()}.")
datafiles = (
image_datafiles
if image_datafiles
else self._get_data_files_list(image_filename)
)
module = self.detector.get_module_info()
osc, transl = self.goniometer.define_scan_from_goniometer_axes()
with h5py.File(self.filename, write_mode) as nxs:
# NXentry and NXmx definition
write_NXentry(nxs)
# Start and estimated end time if known
if start_time:
write_NXdatetime(nxs, start_time, "start_time")
if not est_end_time:
tot_exp_time = self._get_collection_time()
est_end = calculate_estimated_end_time(start_time, tot_exp_time)
write_NXdatetime(nxs, est_end, "end_time_estimated")
if est_end_time:
write_NXdatetime(nxs, est_end_time, "end_time_estimated")
# NXdata: entry/data
write_NXdata(
nxs, datafiles, "images", list(osc.keys())[0], entry_key=data_entry_key
)
# NXinstrument: entry/instrument
write_NXinstrument(
nxs,
self.beam,
self.attenuator,
self.source,
)
# NXdetector: entry/instrument/detector
write_NXdetector(
nxs,
self.detector,
self.tot_num_imgs,
metafile,
)
# NXmodule: entry/instrument/detector/module
write_NXdetector_module(
nxs,
module,
self.detector.detector_params.image_size,
self.detector.detector_params.pixel_size,
beam_center=self.detector.beam_center,
)
# NXsource: entry/source
write_NXsource(nxs, self.source)
# NXsample: entry/sample
sample_dep = self.sample.depends_on if self.sample else None
sample_info = self.sample.get_sample_info_as_dict() if self.sample else None
write_NXsample(
nxs,
self.goniometer.axes_list,
"images",
osc,
transl,
sample_depends_on=sample_dep,
sample_details=sample_info,
add_nonstandard_fields=add_non_standard,
)
[docs]
def write_vds(
self,
vds_offset: int = 0,
vds_shape: tuple[int, int, int] = None,
vds_dtype: DTypeLike = np.uint16,
clean_up: bool = False,
):
"""Write a Virtual Dataset.
This method adds a VDS under /entry/data/data in the NeXus file, linking to either the full datasets or the subset defined by \
vds_offset (used as start index) and vds_shape.
WARNING. Only use clean up if the data collection is finished and all the files have already been written.
Args:
vds_offset (int, optional): Start index for the vds writer. Defaults to 0.
vds_shape (tuple[int,int,int], optional): Shape of the data which will be linked in the VDS. If not passed, it will be defined as \
(tot_num_imgs - start_idx, *image_size). Defaults to None.
vds_dtype (DTypeLike, optional): The type of the input data. Defaults to np.uint16.
clean_up(bool, optional): Clean up unused links in vds. Defaults to False.
"""
if not vds_shape:
vds_shape = (
self.tot_num_imgs - vds_offset,
*self.detector.detector_params.image_size,
)
if self.goniometer.get_number_of_scan_points() != vds_shape[0]:
vds_shape = (
self.goniometer.get_number_of_scan_points(),
*vds_shape[1:],
)
nxmx_logger.warning(
"The number of scan points doesn't match the calculated vds_shape. \
Resetting it to match the number of frames indicated by the scan."
)
nxmx_logger.debug(f"VDS shape set to {vds_shape}.")
with h5py.File(self.filename, "r+") as nxs:
if "jungfrau" in self.detector.detector_params.description.lower():
jungfrau_vds_writer(
nxs,
(self.tot_num_imgs, *self.detector.detector_params.image_size),
data_type=vds_dtype,
)
else:
image_vds_writer(
nxs,
(self.tot_num_imgs, *self.detector.detector_params.image_size),
start_index=vds_offset,
vds_shape=vds_shape,
data_type=vds_dtype,
)
if clean_up is True:
nxmx_logger.warning("Starting clean up of unused links.")
clean_unused_links(
nxs,
vds_shape=vds_shape,
start_index=vds_offset,
)
# If number of frames in the VDS is lower than the total, nimages in NXcollection should be overwritten to match this
if vds_shape[0] < self.tot_num_imgs:
del nxs["/entry/instrument/detector/detectorSpecific/nimages"]
nxs["/entry/instrument/detector/detectorSpecific"].create_dataset(
"nimages", data=vds_shape[0]
)
[docs]
class EventNXmxFileWriter(NXmxFileWriter):
"""A class to generate NXmx-like NeXus files for event mode data."""
def __init__(
self,
filename: Path | str,
goniometer: Goniometer,
detector: Detector,
source: Source,
beam: Beam,
attenuator: Attenuator,
axis_end_position: float | None = None,
sample: Sample | None = None,
):
super().__init__(
filename,
goniometer,
detector,
source,
beam,
attenuator,
None,
sample,
)
self.end_pos = axis_end_position
[docs]
def write(
self,
image_filename: str | None = None,
start_time: datetime | str | None = None,
write_mode: str = "x",
add_non_standard: bool = False,
data_entry_key: str = "data",
):
"""Write a NXmx-like NeXus file for event mode data collections.
This method overrides the write() method of NXmxFileWriter, from which thsi class inherits.
Args:
start_time (datetime | str, optional): Collection estimated end time if available, in the \
format "%Y-%m-%dT%H:%M:%SZ". Defaults to None.
image_filename (str | None, optional): Filename stem to use to look for image files. \
Needed in case it doesn't match the NeXus file name. Format: filename_runnumber. \
Defaults to None.
write_mode (str, optional): String indicating writing mode for the output NeXus file. \
Accepts any valid h5py file opening mode. Defaults to "x".
add_non_standard (bool, optional): Flag if non-standard NXsample fields should be added \
for processing to work. Defaults to False.
data_entry_key (str, optional): Dataset entry key in datafiles. Defaults to data.
"""
# Get metafile
# No data files, just link to meta
metafile = super()._get_meta_file(image_filename=image_filename)
module = self.detector.get_module_info()
# Get scan info
# Here no scan, just get (start, stop) from omega/phi as osc and None as transl
osc, _ = self.goniometer.define_scan_axes_for_event_mode(self.end_pos)
with h5py.File(self.filename, write_mode) as nxs:
# NXentry and NXmx definition
write_NXentry(nxs)
# Write start time if known
if start_time:
write_NXdatetime(nxs, start_time, "start_time")
# NXdata: entry/data
write_NXdata(
nxs,
[metafile],
"events",
list(osc.keys())[0],
entry_key=data_entry_key,
)
# NXinstrument: entry/instrument
write_NXinstrument(
nxs,
self.beam,
self.attenuator,
self.source,
)
# NXdetector: entry/instrument/detector
write_NXdetector(
nxs,
self.detector,
meta=metafile,
)
# NXmodule: entry/instrument/detector/module
write_NXdetector_module(
nxs,
module,
self.detector.detector_params.image_size,
self.detector.detector_params.pixel_size,
beam_center=self.detector.beam_center,
)
# NXsource: entry/source
write_NXsource(nxs, self.source)
# NXsample: entry/sample
sample_dep = self.sample.depends_on if self.sample else None
sample_info = self.sample.get_sample_info_as_dict() if self.sample else None
write_NXsample(
nxs,
self.goniometer.axes_list,
"events",
osc,
sample_depends_on=sample_dep,
sample_details=sample_info,
add_nonstandard_fields=add_non_standard,
)