Source code for nexgen.tools.data_writer

"""
General tools for blank data writing.
"""

from __future__ import annotations

import logging
import time
from pathlib import Path

import h5py
import numpy as np
from hdf5plugin import Bitshuffle
from numpy.typing import ArrayLike

from .constants import (
    clock_freq,
    eiger_gap_size,
    eiger_mod_size,
    eiger_modules,
    tristan_chunk,
    tristan_gap_size,
    tristan_mod_size,
    tristan_modules,
)

data_logger = logging.getLogger("nexgen.DataWriter")

# Random number generator
rng = np.random.default_rng()


# Build-a-detector functions
[docs] def build_an_eiger( image_size: list | tuple, det_description: str, n_modules: tuple[int, int] = None, ) -> ArrayLike: """ Generate an Eiger-like blank image. Args: image_size (list | tuple): Detector size, defines image dimensions as (slow_axis , fast_axis). det_description (str): Identifies the type of Eiger detector. n_modules (tuple[int, int], optional): Number of modules in the detector. Defaults to None. Returns: IM (ArrayLike): Blank image - an array of zeros with an Eiger-like mask. """ for k, v in eiger_modules.items(): if k in det_description.upper(): n_modules = v IM = np.zeros(image_size, dtype=np.uint16) # Horizontal modules for i in range(1, n_modules[0] + 1): IM[ :, i * eiger_mod_size[1] + (i - 1) * eiger_gap_size[1] : i * (eiger_mod_size[1] + eiger_gap_size[1]), ] = 65535 # Vertical modules for j in range(1, n_modules[1] + 1): IM[ j * eiger_mod_size[0] + (j - 1) * eiger_gap_size[0] : j * (eiger_mod_size[0] + eiger_gap_size[0]), :, ] = 65535 # Intra module gap mid = [] for n in range(n_modules[0]): mid.append( int(eiger_mod_size[1] / 2) + n * (eiger_mod_size[1] + eiger_gap_size[1]) ) for m in mid: IM[:, (m - 1) : (m + 1)] = 65535 return IM
[docs] def build_a_tristan( image_size: list | tuple, det_description: str, ) -> ArrayLike: """ Generate a Tristan-like blank image. Args: image_size (list | tuple): Detector size, fefines image dimensions as (slow_axis , fast_axis). det_description (str): Identifies the Tristan detector. Returns: IM (ArrayLike): Blank image - an array of zeros with an Eiger-like mask. """ for k, v in tristan_modules.items(): if k in det_description.upper(): n_modules = v IM = np.zeros(image_size, dtype=np.uint16) # Horizontal modules for i in range(1, n_modules[0] + 1): IM[ :, i * tristan_mod_size[1] + (i - 1) * tristan_gap_size[1] : i * (tristan_mod_size[1] + tristan_gap_size[1]), ] = 65535 # Vertical modules for j in range(1, n_modules[1] + 1): IM[ j * tristan_mod_size[0] + (j - 1) * tristan_gap_size[0] : j * (tristan_mod_size[0] + tristan_gap_size[0]), :, ] = 65535 return IM
[docs] def generate_image_files( datafiles: list[Path | str], image_size: list | tuple, det_description: str, tot_num_images: int, ): """ Generate HDF5 files of blank images. Args: datafiles (list[Path | str]): List of HDF5 files to be written. image_size (list | tuple): Image dimensions as (slow_axis, fast_axis). det_description (str): Type of detector. The string should include the number of modules. tot_num_images (int): Total number of images to be written across the files. Raises: ValueError: If the number of files requested and the total number of images to write don't match. """ # Write some blank data in the shape of a detector if "eiger" in det_description.lower(): img = build_an_eiger(image_size, det_description) elif "tristan" in det_description.lower(): img = build_a_tristan(image_size, det_description) else: # Do nothing for now, just add zeros img = np.zeros(image_size, dtype=np.uint16) # Determine single dataset shape: (num, *img_size), where max(num)=1000. dset_shape = (tot_num_images // 1000) * [1000] + [tot_num_images % 1000] # if tot_num_images <= 1000: # dset_shape = [tot_num_images] # elif tot_num_images % 1000 == 0: # dset_shape = (tot_num_images // 1000) * [1000] # else: # dset_shape = (tot_num_images // 1000) * [1000] + [tot_num_images % 1000] # Just a quick check if len(dset_shape) != len(datafiles): raise ValueError( "Impossible to write blank images to file: number of files desn't match the dataset shape." ) # Start writing files for filename, sh0 in zip(datafiles, dset_shape): data_logger.info(f"Writing {filename} ...") tic = time.process_time() with h5py.File(filename, "w") as fh: dset = fh.create_dataset( "data", shape=(sh0, *image_size), dtype=np.uint16, chunks=(1, *image_size), **Bitshuffle(), ) # Use direct chunk write dset[0, :, :] = img f, ch = dset.id.read_direct_chunk((0, 0, 0)) for j in range(1, sh0): dset.id.write_direct_chunk((j, 0, 0), ch, f) toc = time.process_time() data_logger.info(f"Writing {sh0} images took {toc - tic:.2f} s.")
# Event list generator # TODO Better than before, but this is still pretty slow.
[docs] def pseudo_event_list( x_lim: tuple[int, int | None], y_lim: tuple[int, int | None], exp_time: float, ) -> tuple[list, list]: """ Generate a pseudo-events list with positions and timestamps. Args: x_lim (tuple[int, Union[int, None]]): Minimum and maximum position along the fast axis. y_lim (tuple[int, Union[int, None]]): Minimum and maximum position along the slow axis. exp_time (float): Total exposure time, in seconds. Returns: pos_list, time_list (tuple[list, list]): Lists of pseudo-event positions and relative timestamps. """ pos_list = [] time_list = [] for _ in range(tristan_chunk): dist = rng.uniform(0, 1) x = ( np.random.randint(x_lim[0], x_lim[1]) if len(x_lim) > 1 else np.random.randint(x_lim[0]) ) y = ( np.random.randint(y_lim[0], y_lim[1]) if len(y_lim) > 1 else np.random.randint(y_lim[0]) ) loc = np.uint32(x * np.uint32(0x2000) + y) pos_list.append(loc) t = np.uint64((exp_time + dist) * clock_freq) time_list.append(t) return pos_list, time_list
[docs] def generate_event_files( datafiles: list[Path | str], num_chunks: int, det_description: str, exp_time: float, ): """ Generate HDF5 files of pseudo events. Args: datafiles (list[Union[Path, str]]): list of HDF5 files to be written. num_chunks (int): Chunks of events to be written per file. det_description (str): Type of detector. The string should include the number of modules. exp_time (float): Total exposure time, in seconds. """ # A bunch of things to be done here first ... # Get number of modules in the Tristan detector for k, v in tristan_modules.items(): if k in det_description.upper(): n_modules = v # Some blank cues blank_cues = np.zeros(tristan_chunk, dtype=np.uint16) # TODO FIXME speed this up! data_logger.info( f"Start generating one chunk of pseudo events for {n_modules} modules of {det_description}" ) t0 = time.process_time() EV_dict = {} for i in range(n_modules[0]): for j in range(n_modules[1]): I = ( i * (tristan_mod_size[1] + tristan_gap_size[1]), (i + 1) * tristan_mod_size[1] + i * tristan_gap_size[1], ) J = ( j * (tristan_mod_size[0] + tristan_gap_size[0]), (j + 1) * tristan_mod_size[0] + j * tristan_gap_size[0], ) EV_dict[(i, j)] = pseudo_event_list(I, J, exp_time) t1 = time.process_time() data_logger.info(f"Time taken to generate pseudo-event list: {t1-t0:.2f} s.") # Find total number of events to be written to file num_events = tristan_chunk * num_chunks # Start writing files for filename, K in zip(datafiles, EV_dict.keys()): data_logger.info(f"Writing {filename} ...") tic = time.process_time() with h5py.File(filename, "w") as fh: fh.create_dataset("cue_id", data=blank_cues, **Bitshuffle()) fh.create_dataset("cue_timestamp_zero", data=blank_cues, **Bitshuffle()) ev_id = fh.create_dataset( "event_id", shape=(num_events,), dtype=np.uint32, chunks=(tristan_chunk,), **Bitshuffle(), ) ev_t = fh.create_dataset( "event_timestamp_zero", shape=(num_events,), dtype=np.uint64, chunks=(tristan_chunk,), **Bitshuffle(), ) ev_en = fh.create_dataset( "event_energy", shape=(num_events,), dtype=np.uint32, chunks=(tristan_chunk,), **Bitshuffle(), ) # Use direct chunk write ev_id[:tristan_chunk] = EV_dict[K][0] ev_t[:tristan_chunk] = EV_dict[K][1] ev_en[:tristan_chunk] = blank_cues f_id, ch_id = ev_id.id.read_direct_chunk((0,)) f_t, ch_t = ev_t.id.read_direct_chunk((0,)) f_en, ch_en = ev_en.id.read_direct_chunk((0,)) for h in range(1, num_chunks): ev_id.id.write_direct_chunk((h * tristan_chunk,), ch_id, f_id) ev_t.id.write_direct_chunk((h * tristan_chunk,), ch_t, f_t) ev_en.id.write_direct_chunk((h * tristan_chunk,), ch_en, f_en) toc = time.process_time() data_logger.info(f"Writing {num_events} events took {toc - tic:.2f} s.")