Source code for nexgen.tools.data_writer

"""
General tools for blank data writing.
"""

from __future__ import annotations

import logging
import time
from pathlib import Path
from typing import List, Tuple

import h5py
import numpy as np
from hdf5plugin import Bitshuffle
from numpy.typing import ArrayLike

from .constants import (
    clock_freq,
    eiger_gap_size,
    eiger_mod_size,
    eiger_modules,
    tristan_chunk,
    tristan_gap_size,
    tristan_mod_size,
    tristan_modules,
)

data_logger = logging.getLogger("nexgen.DataWriter")

# Random number generator
rng = np.random.default_rng()


# Build-a-detector functions
[docs] def build_an_eiger( image_size: List | Tuple, det_description: str, n_modules: Tuple[int, int] = None, ) -> ArrayLike: """ Generate an Eiger-like blank image. Args: image_size (List | Tuple): Detector size, defines image dimensions as (slow_axis , fast_axis). det_description (str): Identifies the type of Eiger detector. n_modules (Tuple[int, int], optional): Number of modules in the detector. Defaults to None. Returns: IM (ArrayLike): Blank image - an array of zeros with an Eiger-like mask. """ for k, v in eiger_modules.items(): if k in det_description.upper(): n_modules = v IM = np.zeros(image_size, dtype=np.uint16) # Horizontal modules for i in range(1, n_modules[0] + 1): IM[ :, i * eiger_mod_size[1] + (i - 1) * eiger_gap_size[1] : i * (eiger_mod_size[1] + eiger_gap_size[1]), ] = -1 # Vertical modules for j in range(1, n_modules[1] + 1): IM[ j * eiger_mod_size[0] + (j - 1) * eiger_gap_size[0] : j * (eiger_mod_size[0] + eiger_gap_size[0]), :, ] = -1 # Intra module gap mid = [] for n in range(n_modules[0]): mid.append( int(eiger_mod_size[1] / 2) + n * (eiger_mod_size[1] + eiger_gap_size[1]) ) for m in mid: IM[:, (m - 1) : (m + 1)] = -1 return IM
[docs] def build_a_tristan( image_size: List | Tuple, det_description: str, ) -> ArrayLike: """ Generate a Tristan-like blank image. Args: image_size (List | Tuple): Detector size, fefines image dimensions as (slow_axis , fast_axis). det_description (str): Identifies the Tristan detector. Returns: IM (ArrayLike): Blank image - an array of zeros with an Eiger-like mask. """ for k, v in tristan_modules.items(): if k in det_description.upper(): n_modules = v IM = np.zeros(image_size, dtype=np.uint16) # Horizontal modules for i in range(1, n_modules[0] + 1): IM[ :, i * tristan_mod_size[1] + (i - 1) * tristan_gap_size[1] : i * (tristan_mod_size[1] + tristan_gap_size[1]), ] = -1 # Vertical modules for j in range(1, n_modules[1] + 1): IM[ j * tristan_mod_size[0] + (j - 1) * tristan_gap_size[0] : j * (tristan_mod_size[0] + tristan_gap_size[0]), :, ] = -1 return IM
[docs] def generate_image_files( datafiles: List[Path | str], image_size: List | Tuple, det_description: str, tot_num_images: int, ): """ Generate HDF5 files of blank images. Args: datafiles (List[Path | str]): List of HDF5 files to be written. image_size (List | Tuple): Image dimensions as (slow_axis, fast_axis). det_description (str): Type of detector. The string should include the number of modules. tot_num_images (int): Total number of images to be written across the files. Raises: ValueError: If the number of files requested and the total number of images to write don't match. """ # Write some blank data in the shape of a detector if "eiger" in det_description.lower(): img = build_an_eiger(image_size, det_description) elif "tristan" in det_description.lower(): img = build_a_tristan(image_size, det_description) else: # Do nothing for now, just add zeros img = np.zeros(image_size, dtype=np.uint16) # Determine single dataset shape: (num, *img_size), where max(num)=1000. dset_shape = (tot_num_images // 1000) * [1000] + [tot_num_images % 1000] # if tot_num_images <= 1000: # dset_shape = [tot_num_images] # elif tot_num_images % 1000 == 0: # dset_shape = (tot_num_images // 1000) * [1000] # else: # dset_shape = (tot_num_images // 1000) * [1000] + [tot_num_images % 1000] # Just a quick check if len(dset_shape) != len(datafiles): raise ValueError( "Impossible to write blank images to file: number of files desn't match the dataset shape." ) # Start writing files for filename, sh0 in zip(datafiles, dset_shape): data_logger.info(f"Writing {filename} ...") tic = time.process_time() with h5py.File(filename, "w") as fh: dset = fh.create_dataset( "data", shape=(sh0, *image_size), dtype=np.uint16, chunks=(1, *image_size), **Bitshuffle(), ) # Use direct chunk write dset[0, :, :] = img f, ch = dset.id.read_direct_chunk((0, 0, 0)) for j in range(1, sh0): dset.id.write_direct_chunk((j, 0, 0), ch, f) toc = time.process_time() data_logger.info(f"Writing {sh0} images took {toc - tic:.2f} s.")
# Event list generator # TODO Better than before, but this is still pretty slow.
[docs] def pseudo_event_list( x_lim: Tuple[int, int | None], y_lim: Tuple[int, int | None], exp_time: float, ) -> Tuple[List, List]: """ Generate a pseudo-events list with positions and timestamps. Args: x_lim (Tuple[int, Union[int, None]]): Minimum and maximum position along the fast axis. y_lim (Tuple[int, Union[int, None]]): Minimum and maximum position along the slow axis. exp_time (float): Total exposure time, in seconds. Returns: pos_list, time_list (Tuple[List, List]): Lists of pseudo-event positions and relative timestamps. """ pos_list = [] time_list = [] for _ in range(tristan_chunk): dist = rng.uniform(0, 1) x = ( np.random.randint(x_lim[0], x_lim[1]) if len(x_lim) > 1 else np.random.randint(x_lim[0]) ) y = ( np.random.randint(y_lim[0], y_lim[1]) if len(y_lim) > 1 else np.random.randint(y_lim[0]) ) loc = np.uint32(x * np.uint32(0x2000) + y) pos_list.append(loc) t = np.uint64((exp_time + dist) * clock_freq) time_list.append(t) return pos_list, time_list
[docs] def generate_event_files( datafiles: List[Path | str], num_chunks: int, det_description: str, exp_time: float, ): """ Generate HDF5 files of pseudo events. Args: datafiles (List[Union[Path, str]]): List of HDF5 files to be written. num_chunks (int): Chunks of events to be written per file. det_description (str): Type of detector. The string should include the number of modules. exp_time (float): Total exposure time, in seconds. """ # A bunch of things to be done here first ... # Get number of modules in the Tristan detector for k, v in tristan_modules.items(): if k in det_description.upper(): n_modules = v # Some blank cues blank_cues = np.zeros(tristan_chunk, dtype=np.uint16) # TODO FIXME speed this up! data_logger.info( f"Start generating one chunk of pseudo events for {n_modules} modules of {det_description}" ) t0 = time.process_time() EV_dict = {} for i in range(n_modules[0]): for j in range(n_modules[1]): I = ( i * (tristan_mod_size[1] + tristan_gap_size[1]), (i + 1) * tristan_mod_size[1] + i * tristan_gap_size[1], ) J = ( j * (tristan_mod_size[0] + tristan_gap_size[0]), (j + 1) * tristan_mod_size[0] + j * tristan_gap_size[0], ) EV_dict[(i, j)] = pseudo_event_list(I, J, exp_time) t1 = time.process_time() data_logger.info(f"Time taken to generate pseudo-event list: {t1-t0:.2f} s.") # Find total number of events to be written to file num_events = tristan_chunk * num_chunks # Start writing files for filename, K in zip(datafiles, EV_dict.keys()): data_logger.info(f"Writing {filename} ...") tic = time.process_time() with h5py.File(filename, "w") as fh: fh.create_dataset("cue_id", data=blank_cues, **Bitshuffle()) fh.create_dataset("cue_timestamp_zero", data=blank_cues, **Bitshuffle()) ev_id = fh.create_dataset( "event_id", shape=(num_events,), dtype=np.uint32, chunks=(tristan_chunk,), **Bitshuffle(), ) ev_t = fh.create_dataset( "event_timestamp_zero", shape=(num_events,), dtype=np.uint64, chunks=(tristan_chunk,), **Bitshuffle(), ) ev_en = fh.create_dataset( "event_energy", shape=(num_events,), dtype=np.uint32, chunks=(tristan_chunk,), **Bitshuffle(), ) # Use direct chunk write ev_id[:tristan_chunk] = EV_dict[K][0] ev_t[:tristan_chunk] = EV_dict[K][1] ev_en[:tristan_chunk] = blank_cues f_id, ch_id = ev_id.id.read_direct_chunk((0,)) f_t, ch_t = ev_t.id.read_direct_chunk((0,)) f_en, ch_en = ev_en.id.read_direct_chunk((0,)) for h in range(1, num_chunks): ev_id.id.write_direct_chunk((h * tristan_chunk,), ch_id, f_id) ev_t.id.write_direct_chunk((h * tristan_chunk,), ch_t, f_t) ev_en.id.write_direct_chunk((h * tristan_chunk,), ch_en, f_en) toc = time.process_time() data_logger.info(f"Writing {num_events} events took {toc - tic:.2f} s.")