"""
Create a NeXus file for time-resolved collections on I19-2.
"""
from __future__ import annotations
import logging
from collections import namedtuple
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Sequence, Tuple, Union
import h5py
import numpy as np
from pydantic import BaseModel
from .. import log
from ..nxs_utils import (
Attenuator,
Beam,
Detector,
EigerDetector,
Goniometer,
Source,
TristanDetector,
)
from ..nxs_utils.scan_utils import calculate_scan_points, identify_osc_axis
from ..nxs_write.nxmx_writer import EventNXmxFileWriter, NXmxFileWriter
from ..tools.meta_reader import define_vds_data_type, update_axes_from_meta
from ..tools.metafile import DectrisMetafile
from ..utils import find_in_dict, get_iso_timestamp, get_nexus_filename
from .beamline_utils import collection_summary_log
[docs]
class CollectionParams(BaseModel):
"""Parameters passed as input from the beamline.
Args:
metafile: Path to _meta.h5 file.
detector_name: Name of the detector in use for current experiment.
exposure_time: Exposure time, in s.
beam_center: Beam center (x,y) position, in pixels.
wavelength: Incident beam wavelength, in A.
transmission: Attenuator transmission, in %.
tot_num_images: Total number of frames in a collection.
scan_axis: Rotation scan axis. Must be passed for Tristan.
"""
metafile: Union[Path, str]
detector_name: str
exposure_time: float
beam_center: Sequence[float]
wavelength: Optional[float]
transmission: Optional[float]
tot_num_images: Optional[int]
scan_axis: Optional[str]
class ExperimentTypeError(Exception):
pass
# Define a logger object
logger = logging.getLogger("nexgen.I19-2_NeXus")
# Useful axis definitions
axes = namedtuple("axes", ("id", "start", "inc", "end"), defaults=(None, 0.0, 0.0, 0.0))
axes.__doc__ = """Goniometer axis name, start and end position, increment."""
axes.id.__doc__ = "Axis name."
axes.start.__doc__ = "Axis start position. Defaults fo 0.0."
axes.inc.__doc__ = (
"Axis increment value. Defaults fo 0.0. Only needed for the scan axis."
)
axes.end.__doc__ = (
"Axis end position. Defaults fo 0.0. Only really needed for Tristan collections."
)
det_axes = namedtuple("det_axes", ("id", "start"), defaults=(None, 0.0))
det_axes.__doc__ = """Detector axis name and position."""
det_axes.id.__doc__ = "Axis name."
det_axes.start.__doc__ = "Axis position. Defaults to 0.0."
[docs]
def tristan_writer(
master_file: Path,
TR: CollectionParams,
timestamps: Tuple[str, str] = (None, None),
axes_pos: List[axes] = None,
det_pos: List[det_axes] = None,
):
"""
A function to call the nexus writer for Tristan 10M detector.
Args:
master_file (Path): Path to nexus file to be written.
TR (CollectionParams): Parameters passed from the beamline.
timestamps (Tuple[str, str], optional): Collection start and end time. Defaults to (None, None).
axes_pos (List[axes], optional): List of (axis_name, start, end) values for the \
goniometer, passed from command line. Defaults to None.
det_pos (List[det_axes], optional): List of (axis_name, start) values for the \
detector, passed from command line. Defaults to None.
"""
source = Source("I19-2")
from .I19_2_params import I19_2Tristan as axes_params
# Define Tristan params
tristan_params = TristanDetector("Tristan 10M", (3043, 4183))
# Define Goniometer axes
gonio_axes = axes_params.gonio
# Define Detector
det_axes = axes_params.det_axes
# Update axes
# Goniometer
end_pos = None
for gax in axes_pos:
idx = [n for n, ax in enumerate(gonio_axes) if ax.name == gax.id][0]
gonio_axes[idx].start_pos = gax.start
if gax.start != gax.end:
end_pos = gax.end
# Detector
for dax in det_pos:
idx = [n for n, ax in enumerate(det_axes) if ax.name == dax.id][0]
det_axes[idx].start_pos = dax.start
# Identify scan axis and calculate scan range
scan_axis = TR.scan_axis if TR.scan_axis else "phi"
scan_idx = [n for n, ax in enumerate(gonio_axes) if ax.name == scan_axis][0]
if not end_pos:
end_pos = gonio_axes[scan_idx].end_pos
OSC = {scan_axis: (gonio_axes[scan_idx].start_pos, end_pos)}
# Define Detector
detector = Detector(
tristan_params,
det_axes,
TR.beam_center,
TR.exposure_time,
[axes_params.fast_axis, axes_params.slow_axis],
)
# Define Goniometer
goniometer = Goniometer(gonio_axes, OSC)
# Define beam and attenuator
attenuator = Attenuator(TR.transmission)
beam = Beam(TR.wavelength)
collection_summary_log(
logger,
gonio_axes,
[scan_axis],
detector,
attenuator,
beam,
source,
timestamps,
)
# Write
try:
EventFileWriter = EventNXmxFileWriter(
master_file,
goniometer,
detector,
source,
beam,
attenuator,
)
EventFileWriter.write(start_time=timestamps[0])
if timestamps[1]:
EventFileWriter.update_timestamps(timestamps[1], "end_time")
logger.info(f"The file {master_file} was written correctly.")
except Exception as err:
logger.exception(err)
logger.info(
f"An error occurred and {master_file} couldn't be written correctly."
)
raise
[docs]
def eiger_writer(
master_file: Path,
TR: CollectionParams,
timestamps: Tuple[str, str] = (None, None),
use_meta: bool = False,
n_frames: int | None = None,
axes_pos: List[axes] = None,
det_pos: List[det_axes] = None,
vds_offset: int = 0,
):
"""
A function to call the NXmx nexus file writer for Eiger 2X 4M detector.
If use_meta is set to False, axes_pos and det_pos become required arguments. Otherwise, \
axes_pos and det_pos can be None but the code requires the information contained inside \
the meta file to work correctly.
Args:
master_file (Path): Path to nexus file to be written.
TR (CollectionParams): Parameters passed from the beamline.
timestamps (Tuple[str, str], optional): Collection start and end time. Defaults to (None, None).
use_meta (bool, optional): If True, metadata such as axes positions, wavelength etc. \
will be updated using the meta.h5 file. Defaults to False.
num_frames (int, optional): Number of images for the nexus file. Not necessary if same as the \
tot_num_images from the CollectionParameters. If different, the VDS will onlu contain the \
number of frames specified here. Defaults to None.
axes_pos (List[axes], optional): List of (axis_name, start, inc) values for the \
goniometer, passed from command line. Defaults to None.
det_pos (List[det_axes], optional): List of (axis_name, start) values for the \
detector, passed from command line. Defaults to None.
vds_offset (int, optional): Start index for the vds writer. Defaults to 0.
Raises:
ValueError: If use_meta is set to False but axes_pos and det_pos haven't been passed.
IOError: If the axes positions can't be read from the metafile (missing config or broken links).
"""
if not use_meta:
if axes_pos is None or det_pos is None:
logger.error(
"""
If not using the meta file, please pass the complete axis information for goniometer
and/or detector.
"""
)
raise ValueError("Missing at least one of axes_pos or det_pos.")
if n_frames is None and TR.tot_num_images is None:
logger.error(
"""
If not using the meta file, please make sure either the total number of images is passed \
or the number of frames has been passed. These values could be the same for a standard \
collection, or different if the vds needs to only point to part of the dataset.
"""
)
raise ValueError("Missing number of images.")
source = Source("I19-2")
from .I19_2_params import I19_2Eiger as axes_params
# Define Eiger 4M params
eiger_params = EigerDetector(
"Eiger 2X 4M",
(2162, 2068),
"CdTe",
50649,
-1,
)
# Read some parameters
transmission = TR.transmission if TR.transmission else None
wl = TR.wavelength
beam_center = TR.beam_center
# Define Goniometer axes
gonio_axes = axes_params.gonio
# Define Detector
det_axes = axes_params.det_axes
# Update axes
if use_meta:
logger.info("User requested to update metadata using meta file.")
with h5py.File(TR.metafile, "r", libver="latest", swmr=True) as mh:
meta = DectrisMetafile(mh)
TR.tot_num_images = meta.get_full_number_of_images()
logger.info(
f"Total number of images for this collection found in meta file: {TR.tot_num_images}."
)
if not n_frames:
n_frames = TR.tot_num_images
logger.info(
"No specific numnber of frames requested, VDS will contain the full dataset."
)
vds_dtype = define_vds_data_type(meta)
update_axes_from_meta(
meta, gonio_axes, osc_axis=TR.scan_axis, use_config=True
)
update_axes_from_meta(meta, det_axes)
# WARNING.det_z not in _dectris, but det_distance is. Getting that.
logger.info(
"Goniometer and detector axes positions have been updated with values from the meta file."
)
if TR.wavelength is None:
logger.info(
"Wavelength hasn't been passed by user. Looking for it in the meta file."
)
wl = meta.get_wavelength()
if TR.beam_center is None:
logger.info(
"Beam center position has't been passed by user. Looking for it in the meta file."
)
beam_center = meta.get_beam_center()
else:
logger.info(
"Not using meta file to update metadata, only the external links will be set up."
)
vds_dtype = np.uint32
# Update axes
# Goniometer
for gax in axes_pos:
idx = [n for n, ax in enumerate(gonio_axes) if ax.name == gax.id][0]
gonio_axes[idx].start_pos = gax.start
if gax.inc != 0.0:
gonio_axes[idx].increment = gax.inc
# Detector
for dax in det_pos:
idx = [n for n, ax in enumerate(det_axes) if ax.name == dax.id][0]
det_axes[idx].start_pos = dax.start
logger.info(
"Goniometer and detector axes positions have been updated with values passed by the user."
)
if not n_frames:
n_frames = TR.tot_num_images
if not TR.tot_num_images:
TR.tot_num_images = n_frames
logger.warning(
"""
As the total number of images was not set in the collection parameters, it has been set to \
the requested number of frames.
"""
)
scan_axis = identify_osc_axis(gonio_axes)
# Check that found scan_axis matches
if scan_axis != TR.scan_axis:
logger.warning(
f"Scan axis {scan_axis} found different from requested one {TR.scan_axis}."
f"Defaulting to {TR.scan_axis}. If wrong please check meta file."
)
scan_axis = TR.scan_axis
scan_idx = [n for n, ax in enumerate(gonio_axes) if ax.name == scan_axis][0]
gonio_axes[scan_idx].num_steps = n_frames
OSC = calculate_scan_points(
gonio_axes[scan_idx],
rotation=True,
tot_num_imgs=n_frames,
)
# Define beam and attenuator
attenuator = Attenuator(transmission)
beam = Beam(wl)
# Define Detector
detector = Detector(
eiger_params,
det_axes,
beam_center,
TR.exposure_time,
[axes_params.fast_axis, axes_params.slow_axis],
)
# Define Goniometer
goniometer = Goniometer(gonio_axes, OSC)
collection_summary_log(
logger,
goniometer,
detector,
attenuator,
beam,
source,
timestamps,
)
# Write
try:
image_filename = TR.metafile.as_posix().replace("_meta.h5", "")
NXmx_writer = NXmxFileWriter(
master_file,
goniometer,
detector,
source,
beam,
attenuator,
TR.tot_num_images,
)
NXmx_writer.write(image_filename=image_filename, start_time=timestamps[0])
NXmx_writer.write_vds(
vds_offset=vds_offset,
vds_shape=(n_frames, *detector.detector_params.image_size),
vds_dtype=vds_dtype,
)
if timestamps[1]:
NXmx_writer.update_timestamps(timestamps[1], "end_time")
logger.info(f"The file {master_file} was written correctly.")
except Exception as err:
logger.exception(err)
logger.info(
f"An error occurred and {master_file} couldn't be written correctly."
)
raise
[docs]
def nexus_writer(
meta_file: Path | str,
detector_name: str,
exposure_time: float,
scan_axis: str = "phi",
start_time: datetime | None = None,
stop_time: datetime | None = None,
**params,
):
"""
Gather all parameters from the beamline and call the NeXus writers.
Args:
meta_file (Path | str): Path to _meta.h5 file.
detector_name (str): Detector in use.
exposure_time (float): Exposure time, in s.
scan_axis (str, optional): Name of the oscillation axis. Defaults to phi.
start_time (datetime, optional): Experiment start time. Defaults to None.
stop_time (datetime, optional): Experiment end time. Defaults to None.
Keyword Args:
n_imgs (int): Total number of images to be collected.
transmission (float): Attenuator transmission, in %.
wavelength (float): Wavelength of incident beam, in A.
beam_center (List[float, float]): Beam center position, in pixels.
gonio_pos (List[axes]): Name, start and end positions \
of the goniometer axes.
det_pos (List[det_axes]): Name, start and end positions \
of detector axes.
outdir (str): Directory where to save the file. Only specify if different \
from meta_file directory.
serial (bool): Specify whether it's a serial crystallography dataset.
det_dist (float): Distance between sample and detector, in mm.
use_meta (bool): For Eiger, if True use metadata from meta.h5 file. Otherwise \
will require all other information to be passed manually.
"""
if find_in_dict("serial", params) and params["serial"] is True:
raise ExperimentTypeError(
"This is writer is not enabled for ssx collections."
"Pleas look into SSX_Eiger or SSX_Tristan for this functionality."
)
TR = CollectionParams(
metafile=Path(meta_file).expanduser().resolve(),
detector_name=detector_name.lower(),
exposure_time=exposure_time,
beam_center=(
params["beam_center"] if find_in_dict("beam_center", params) else (0, 0)
),
wavelength=params["wavelength"] if find_in_dict("wavelength", params) else None,
transmission=(
params["transmission"] if find_in_dict("transmission", params) else None
),
tot_num_images=params["n_imgs"] if find_in_dict("n_imgs", params) else None,
scan_axis=scan_axis,
)
# Check that the new NeXus file is to be written in the same directory
if find_in_dict("outdir", params) and params["outdir"]:
wdir = Path(params["outdir"]).expanduser().resolve()
else:
wdir = TR.metafile.parent
# Define a file handler
logfile = wdir / "I19_2_nxs_writer.log"
# Configure logging
log.config(logfile.as_posix())
logger.info("NeXus file writer for beamline I19-2 at DLS.")
logger.info(f"Detector in use for this experiment: {TR.detector_name}.")
logger.info(f"Current collection directory: {TR.metafile.parent}")
# Add some information to logger
logger.info("Creating a NeXus file for %s ..." % TR.metafile.name)
# Get NeXus filename
master_file = get_nexus_filename(TR.metafile)
master_file = wdir / master_file.name
logger.info("NeXus file will be saved as %s" % master_file)
# Get timestamps in the correct format if they aren't already
start_time = start_time.strftime("%Y-%m-%dT%H:%M:%S") if start_time else None
stop_time = stop_time.strftime("%Y-%m-%dT%H:%M:%S") if stop_time else None
timestamps = (
get_iso_timestamp(start_time),
get_iso_timestamp(stop_time),
)
if not find_in_dict("gonio_pos", params):
params["gonio_pos"] = None
if not find_in_dict("det_pos", params):
params["det_pos"] = None
if "tristan" in TR.detector_name.lower():
if params["gonio_pos"] is None or params["det_pos"] is None:
logger.error("Please pass the axes positions for a Tristan collection.")
raise ValueError(
"Missing goniometer and/or detector axes information for tristan collection"
)
if TR.scan_axis is None:
logger.warning(
"No scan axis has been specified. Phi will be set as default."
)
if not find_in_dict("use_meta", params):
# If by any chance not passed, assume False
params["use_meta"] = False
if params["use_meta"] is True:
params["gonio_pos"] = None
params["det_pos"] = None
else:
if not find_in_dict("n_imgs", params) and "eiger" in TR.detector_name:
raise ValueError(
"""
Missing input parameter n_imgs. \n
For an Eiger collection, if meta file is to be ignored, the number of images to
be collected has to be passed to the writer.
"""
)
if TR.beam_center == (0, 0):
logger.warning(
"""
Beam centre was not passed to the writer.
As it won't be updated from the meta file, it will be set to (0, 0).
"""
)
if "eiger" in TR.detector_name:
if not find_in_dict("n_imgs", params):
params["n_imgs"] = None
eiger_writer(
master_file,
TR,
timestamps,
params["use_meta"],
params["n_imgs"],
params["gonio_pos"],
params["det_pos"],
)
elif "tristan" in TR.detector_name:
tristan_writer(
master_file, TR, timestamps, params["gonio_pos"], params["det_pos"]
)