Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions phaser/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,19 @@ class LoadEmpadProps(Dataclass):
adu: t.Optional[float] = None
det_flips: t.Optional[t.Tuple[bool, bool, bool]] = None


class LoadGatanProps(Dataclass):
path: Path

diff_step: t.Optional[float] = None
kv: t.Optional[float] = None
adu: t.Optional[float] = None

class LoadNionProps(Dataclass):
path: Path

diff_step: float
detector_rotation_offset: t.Optional[float] = None

class LoadManualProps(Dataclass, kw_only=True):
path: Path
Expand Down Expand Up @@ -73,10 +79,13 @@ class RawDataHook(Hook[None, RawData]):
known = {
'empad': ('phaser.hooks.io.empad:load_empad', LoadEmpadProps),
'gatan': ('phaser.hooks.io.gatan:load_gatan', LoadGatanProps, ('rsciio',)),
'nion': ('phaser.hooks.io.nion:load_nion', LoadNionProps),
'manual': ('phaser.hooks.io.manual:load_manual', LoadManualProps),
}




class ProbeHookArgs(t.TypedDict):
sampling: 'Sampling'
wavelength: float
Expand Down Expand Up @@ -171,15 +180,14 @@ class PostInitArgs(t.TypedDict):
class ScaleProps(Dataclass):
scale: float


class OffsetProps(Dataclass):
offset: float


class BinProps(Dataclass):
bin: int



class CropDataProps(Dataclass):
crop: t.Tuple[
# y_i, y_f, x_i, x_f
Expand Down Expand Up @@ -220,7 +228,7 @@ class PostInitHook(Hook[PostInitArgs, t.Tuple['Patterns', 'ReconsState']]):
class EngineArgs(t.TypedDict):
data: 'Patterns'
state: 'ReconsState'
dtype: t.Type[numpy.floating]
dtype: DTypeLike
xp: t.Any
recons_name: str
observer: 'Observer'
Expand Down
116 changes: 116 additions & 0 deletions phaser/hooks/io/nion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from pathlib import Path
import logging
import typing as t

import numpy

from phaser.utils.num import Sampling
from phaser.utils.physics import Electron
from phaser.io.nion import load_4d, NionMetadata
from phaser.types import cast_length
from .. import LoadNionProps, RawData
import json

import zipfile as zf


def load_nion(args: None, props: LoadNionProps) -> RawData:
logger = logging.getLogger(__name__)

path = Path(props.path).expanduser()

with zf.ZipFile(path, "r") as data_file:
data_file = zf.ZipFile(path, mode="r")
json_metadata = data_file.read(
"metadata.json"
) # Get the metadata from the file
json_metadata = json.loads(json_metadata.decode("utf8").replace("'", '"'))

nion_metadata = NionMetadata.from_data(json_metadata)

scan_meta = nion_metadata.metadata.scan
instr_meta = nion_metadata.metadata.instrument

voltage = instr_meta.high_tension
scan_shape = scan_meta.scan_size
scan_shape = tuple(map(int, scan_shape))
spatial_calibration = nion_metadata.spatial_calibrations[0]
camera_processing = nion_metadata.properties.camera_processing_parameters.processing

spatial_units = spatial_calibration.units

match spatial_units:
case "nm":
scale_factor = 1e-9
case _:
scale_factor = 1

scan_step = spatial_calibration.scale * scale_factor
diff_step = props.diff_step

logger.info(f"Scan shape: {scan_shape}, Step size: {scan_step}")

scan_hook = {
"type": "raster",
# [x, y] -> [y, x]
"shape": tuple(reversed(scan_shape)),
"step_size": scan_step * 1e10,
"rotation": (props.detector_rotation_offset or 0.0) + (scan_meta.rotation_deg or 0.0), # may be the other way around
# 'affine': metadata.scan_correction[::-1, ::-1] if metadata.scan_correction is not None else None,
}

if voltage is None:
raise ValueError(
"'kv'/'voltage' must be specified by metadata or passed to 'raw_data'"
)
if diff_step is None:
raise ValueError(
"'diff_step' must be specified by metadata or passed to 'raw_data'"
)

wavelength = Electron(voltage).wavelength

if not path.exists():
raise ValueError(f"Couldn't find nion data at path {path}")

flips: t.List[bool] = [False, False, False]

for process_step in camera_processing:
match process_step:
case "flip_l_r":
flips[1] = True

logger.info(f"Loading with flips: {flips}")

patterns = load_4d(path, cast_length(scan_shape, 2), flips=cast_length(flips, 3), memmap=False)
patterns = numpy.fft.ifftshift(patterns, axes=(-1, -2)).astype(numpy.float32)

# if needs_scale:
# if metadata.e_scaling is None:
# warnings.warn("ADU not supplied for experimental dataset. This is not recommended.")
# else:
# logger.info(f"Offsetting patterns by {metadata.background_offset:.3e} and scaling by {metadata.e_scaling:.5e}")
# patterns -= metadata.background_offset
# patterns *= metadata.e_scaling

# patterns = numpy.transpose(patterns, (1, 0, 2, 3))

a = float(
wavelength / (diff_step * 1e-3)
) # recip. pixel size -> 1 / real space extent

sampling = Sampling(cast_length(patterns.shape[-2:], 2), extent=(a, a))

mask = numpy.zeros_like(patterns, shape=patterns.shape[-2:]).astype(numpy.float32)

mask[2:-2, 2:-2] = 1.0

return {
"patterns": patterns,
"mask": numpy.fft.ifftshift(mask, axes=(-1, -2)).astype(numpy.float32),
"sampling": sampling,
"wavelength": wavelength,
# 'probe_hook': probe_hook,
"scan_hook": scan_hook,
"seed": None,
}
107 changes: 107 additions & 0 deletions phaser/io/nion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@

from pathlib import Path
import typing as t

import numpy
import pane
import pane.io
from numpy.typing import NDArray
from pane.convert import IntoConverterHandlers, from_data
from typing_extensions import Self

from phaser.utils.image import apply_flips

import zipfile as zf
import io


class SpatialCalibrations(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
offset: float
scale: float
units: str

class DetectorConfiguration(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
pass

class CameraProcessingParameters(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
readout_area: t.Tuple[float, float, float, float]
processing: t.List[str]


class Properties(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
detector_configuration:DetectorConfiguration
camera_processing_parameters: CameraProcessingParameters


class InstrumentMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
high_tension: float
defocus:float

class ScanMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
fov_nm: float
scan_context_size: t.Tuple[float, float]
scan_size: t.Tuple[float, float]
rotation_deg: t.Optional[float] = 0

class Metadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
scan: ScanMetadata
instrument: InstrumentMetadata

class NionMetadata(pane.PaneBase, frozen=False, kw_only=True, allow_extra=True):
file_type: t.Literal['nion_metadata'] = 'nion_metadata'

title: str
"""Experiment name"""

version: t.Optional[float]

"""Metadata version"""
spatial_calibrations: t.List[SpatialCalibrations]

intensity_calibration: t.Dict

# intensity_calibration['offset']: t.Float
# intensity_calibration['scale']: t.Float

metadata: Metadata
properties: Properties


def load_4d(path: t.Union[str, Path], scan_shape: t.Optional[t.Union[t.Tuple[int, int], t.Tuple[float, float]]] = None, *,
memmap: bool = False, flips: t.Optional[t.Tuple[bool, bool, bool]] = None) -> NDArray[numpy.float32]:
"""
Load a raw nion dataset into memory.

The file is loaded so the dimensions are: (scan_y, scan_x, k_y, k_x), with y decreasing downwards.

Patterns are not fftshifted or normalized upon loading.

# Parameters

- `path`: Path to file to load
- `scan_shape`: Scan shape of dataset. Will be inferred from the filename if not specified.
- `memmap`: If specified, memmap the file as opposed to loading it eagerly.
- `flips`: Flips to apply to the diffraction patterns, `(flip_y, flip_x, transpose)`.
Defaults to `(True, False, False)` (appears to be the most common orientation).

Returns a numpy array (or `numpy.memmap`)
"""
path = Path(path)

scan_shape = tuple(map(int, scan_shape))

n_y, n_x = scan_shape

with zf.ZipFile(path, 'r') as data_file:

with io.BufferedReader(data_file.open('data.npy', mode='r')) as f:
a = numpy.load(f)
print(f"Loaded 'data.npy'")

if a.shape[0] != n_y:
raise ValueError(f"Got {a.shape[0]} y probes, expected {n_y}.")

if a.shape[1] != n_x:
raise ValueError(f"Got {a.shape[1]} x probes, expected {n_x}.")

return apply_flips(a, flips or (False, False, False))
Loading