From b2148195c4127ef999589ff35e64edee8dc22cdf Mon Sep 17 00:00:00 2001 From: "jmlebeau@gmail.com" Date: Wed, 17 Sep 2025 23:39:13 -0400 Subject: [PATCH] initial nion support --- phaser/hooks/__init__.py | 16 ++++-- phaser/hooks/io/nion.py | 116 +++++++++++++++++++++++++++++++++++++++ phaser/io/nion.py | 107 ++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 phaser/hooks/io/nion.py create mode 100644 phaser/io/nion.py diff --git a/phaser/hooks/__init__.py b/phaser/hooks/__init__.py index a77fa63..5ede8bf 100644 --- a/phaser/hooks/__init__.py +++ b/phaser/hooks/__init__.py @@ -35,13 +35,19 @@ class LoadEmpadProps(Dataclass): adu: t.Optional[float] = None det_flips: t.Optional[t.Tuple[bool, bool, bool]] = None + class LoadGatanProps(Dataclass): path: Path - + diff_step: t.Optional[float] = None kv: t.Optional[float] = None adu: t.Optional[float] = None +class LoadNionProps(Dataclass): + path: Path + + diff_step: float + detector_rotation_offset: t.Optional[float] = None class LoadManualProps(Dataclass, kw_only=True): path: Path @@ -73,10 +79,13 @@ class RawDataHook(Hook[None, RawData]): known = { 'empad': ('phaser.hooks.io.empad:load_empad', LoadEmpadProps), 'gatan': ('phaser.hooks.io.gatan:load_gatan', LoadGatanProps, ('rsciio',)), + 'nion': ('phaser.hooks.io.nion:load_nion', LoadNionProps), 'manual': ('phaser.hooks.io.manual:load_manual', LoadManualProps), } + + class ProbeHookArgs(t.TypedDict): sampling: 'Sampling' wavelength: float @@ -171,15 +180,14 @@ class PostInitArgs(t.TypedDict): class ScaleProps(Dataclass): scale: float - class OffsetProps(Dataclass): offset: float - class BinProps(Dataclass): bin: int + class CropDataProps(Dataclass): crop: t.Tuple[ # y_i, y_f, x_i, x_f @@ -220,7 +228,7 @@ class PostInitHook(Hook[PostInitArgs, t.Tuple['Patterns', 'ReconsState']]): class EngineArgs(t.TypedDict): data: 'Patterns' state: 'ReconsState' - dtype: t.Type[numpy.floating] + dtype: DTypeLike xp: t.Any recons_name: str observer: 'Observer' diff --git a/phaser/hooks/io/nion.py b/phaser/hooks/io/nion.py new file mode 100644 index 0000000..d16dabe --- /dev/null +++ b/phaser/hooks/io/nion.py @@ -0,0 +1,116 @@ +from pathlib import Path +import logging +import typing as t + +import numpy + +from phaser.utils.num import Sampling +from phaser.utils.physics import Electron +from phaser.io.nion import load_4d, NionMetadata +from phaser.types import cast_length +from .. import LoadNionProps, RawData +import json + +import zipfile as zf + + +def load_nion(args: None, props: LoadNionProps) -> RawData: + logger = logging.getLogger(__name__) + + path = Path(props.path).expanduser() + + with zf.ZipFile(path, "r") as data_file: + data_file = zf.ZipFile(path, mode="r") + json_metadata = data_file.read( + "metadata.json" + ) # Get the metadata from the file + json_metadata = json.loads(json_metadata.decode("utf8").replace("'", '"')) + + nion_metadata = NionMetadata.from_data(json_metadata) + + scan_meta = nion_metadata.metadata.scan + instr_meta = nion_metadata.metadata.instrument + + voltage = instr_meta.high_tension + scan_shape = scan_meta.scan_size + scan_shape = tuple(map(int, scan_shape)) + spatial_calibration = nion_metadata.spatial_calibrations[0] + camera_processing = nion_metadata.properties.camera_processing_parameters.processing + + spatial_units = spatial_calibration.units + + match spatial_units: + case "nm": + scale_factor = 1e-9 + case _: + scale_factor = 1 + + scan_step = spatial_calibration.scale * scale_factor + diff_step = props.diff_step + + logger.info(f"Scan shape: {scan_shape}, Step size: {scan_step}") + + scan_hook = { + "type": "raster", + # [x, y] -> [y, x] + "shape": tuple(reversed(scan_shape)), + "step_size": scan_step * 1e10, + "rotation": (props.detector_rotation_offset or 0.0) + (scan_meta.rotation_deg or 0.0), # may be the other way around + # 'affine': metadata.scan_correction[::-1, ::-1] if metadata.scan_correction is not None else None, + } + + if voltage is None: + raise ValueError( + "'kv'/'voltage' must be specified by metadata or passed to 'raw_data'" + ) + if diff_step is None: + raise ValueError( + "'diff_step' must be specified by metadata or passed to 'raw_data'" + ) + + wavelength = Electron(voltage).wavelength + + if not path.exists(): + raise ValueError(f"Couldn't find nion data at path {path}") + + flips: t.List[bool] = [False, False, False] + + for process_step in camera_processing: + match process_step: + case "flip_l_r": + flips[1] = True + + logger.info(f"Loading with flips: {flips}") + + patterns = load_4d(path, cast_length(scan_shape, 2), flips=cast_length(flips, 3), memmap=False) + patterns = numpy.fft.ifftshift(patterns, axes=(-1, -2)).astype(numpy.float32) + + # if needs_scale: + # if metadata.e_scaling is None: + # warnings.warn("ADU not supplied for experimental dataset. This is not recommended.") + # else: + # logger.info(f"Offsetting patterns by {metadata.background_offset:.3e} and scaling by {metadata.e_scaling:.5e}") + # patterns -= metadata.background_offset + # patterns *= metadata.e_scaling + + # patterns = numpy.transpose(patterns, (1, 0, 2, 3)) + + a = float( + wavelength / (diff_step * 1e-3) + ) # recip. pixel size -> 1 / real space extent + + sampling = Sampling(cast_length(patterns.shape[-2:], 2), extent=(a, a)) + + mask = numpy.zeros_like(patterns, shape=patterns.shape[-2:]).astype(numpy.float32) + + mask[2:-2, 2:-2] = 1.0 + + return { + "patterns": patterns, + "mask": numpy.fft.ifftshift(mask, axes=(-1, -2)).astype(numpy.float32), + "sampling": sampling, + "wavelength": wavelength, + # 'probe_hook': probe_hook, + "scan_hook": scan_hook, + "seed": None, + } diff --git a/phaser/io/nion.py b/phaser/io/nion.py new file mode 100644 index 0000000..5ccf472 --- /dev/null +++ b/phaser/io/nion.py @@ -0,0 +1,107 @@ + +from pathlib import Path +import typing as t + +import numpy +import pane +import pane.io +from numpy.typing import NDArray +from pane.convert import IntoConverterHandlers, from_data +from typing_extensions import Self + +from phaser.utils.image import apply_flips + +import zipfile as zf +import io + + +class SpatialCalibrations(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + offset: float + scale: float + units: str + +class DetectorConfiguration(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + pass + +class CameraProcessingParameters(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + readout_area: t.Tuple[float, float, float, float] + processing: t.List[str] + + +class Properties(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + detector_configuration:DetectorConfiguration + camera_processing_parameters: CameraProcessingParameters + + +class InstrumentMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + high_tension: float + defocus:float + +class ScanMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + fov_nm: float + scan_context_size: t.Tuple[float, float] + scan_size: t.Tuple[float, float] + rotation_deg: t.Optional[float] = 0 + +class Metadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + scan: ScanMetadata + instrument: InstrumentMetadata + +class NionMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True): + file_type: t.Literal['nion_metadata'] = 'nion_metadata' + + title: str + """Experiment name""" + + version: t.Optional[float] + + """Metadata version""" + spatial_calibrations: t.List[SpatialCalibrations] + + intensity_calibration: t.Dict + + # intensity_calibration['offset']: t.Float + # intensity_calibration['scale']: t.Float + + metadata: Metadata + properties: Properties + + +def load_4d(path: t.Union[str, Path], scan_shape: t.Optional[t.Union[t.Tuple[int, int], t.Tuple[float, float]]] = None, *, + memmap: bool = False, flips: t.Optional[t.Tuple[bool, bool, bool]] = None) -> NDArray[numpy.float32]: + """ + Load a raw nion dataset into memory. + + The file is loaded so the dimensions are: (scan_y, scan_x, k_y, k_x), with y decreasing downwards. + + Patterns are not fftshifted or normalized upon loading. + + # Parameters + + - `path`: Path to file to load + - `scan_shape`: Scan shape of dataset. Will be inferred from the filename if not specified. + - `memmap`: If specified, memmap the file as opposed to loading it eagerly. + - `flips`: Flips to apply to the diffraction patterns, `(flip_y, flip_x, transpose)`. + Defaults to `(True, False, False)` (appears to be the most common orientation). + + Returns a numpy array (or `numpy.memmap`) + """ + path = Path(path) + + scan_shape = tuple(map(int, scan_shape)) + + n_y, n_x = scan_shape + + with zf.ZipFile(path, 'r') as data_file: + + with io.BufferedReader(data_file.open('data.npy', mode='r')) as f: + a = numpy.load(f) + print(f"Loaded 'data.npy'") + + if a.shape[0] != n_y: + raise ValueError(f"Got {a.shape[0]} y probes, expected {n_y}.") + + if a.shape[1] != n_x: + raise ValueError(f"Got {a.shape[1]} x probes, expected {n_x}.") + + return apply_flips(a, flips or (False, False, False))