From e8a0afded5ac473aecf3d53ff6ddd7234c0bef46 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Fri, 7 Nov 2025 22:46:13 -0500 Subject: [PATCH 1/3] added arrow bytes codec --- src/zarr/codecs/arrow.py | 80 +++++++++++++++++++++++++++++++++ tests/test_codecs/test_arrow.py | 42 +++++++++++++++++ tests/test_codecs/test_bytes.py | 43 ++++++++++++++++++ 3 files changed, 165 insertions(+) create mode 100644 src/zarr/codecs/arrow.py create mode 100644 tests/test_codecs/test_arrow.py create mode 100644 tests/test_codecs/test_bytes.py diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py new file mode 100644 index 0000000000..078e385c6d --- /dev/null +++ b/src/zarr/codecs/arrow.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +import io +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from arro3.core import Array, Table +from arro3.io import read_ipc_stream, write_ipc_stream + +from zarr.abc.codec import ArrayBytesCodec +from zarr.core.common import JSON, parse_named_configuration + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer, NDBuffer + +# TODO: encode the field name with the zarr array name? +# (might not be possible because codecs doesn't have this context) +_FIELD_NAME = "zarr_array" + + +@dataclass(frozen=True) +class ArrowIPCCodec(ArrayBytesCodec): + """Arrow IPC codec""" + + def __init__(self) -> None: + pass + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "arrow_ipc", require_configuration=False + ) + configuration_parsed = configuration_parsed or {} + return cls(**configuration_parsed) + + def to_dict(self) -> dict[str, JSON]: + return {"name": "arrow_ipc", "configuration": {}} + + def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: + # TODO: possibly parse array dtype to configure codec + return self + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + record_batch_reader = read_ipc_stream(io.BytesIO(chunk_bytes.as_buffer_like())) + # Note: we only expect a single batch per chunk + record_batch = record_batch_reader.read_next_batch() + array = record_batch.column(_FIELD_NAME) + numpy_array = array.to_numpy() + # all arrow arrays are flat; reshape to chunk shape + numpy_array.shape = chunk_spec.shape + # make sure we got the right dtype out + assert numpy_array.dtype == chunk_spec.dtype.to_native_dtype(), ( + f"dtype mismatch, got {numpy_array.dtype}, expected {chunk_spec.dtype.to_native_dtype()}" + ) + return chunk_spec.prototype.nd_buffer.from_numpy_array(numpy_array) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer: + # TODO: generalize flattening strategy to prevent memory copies + numpy_array = chunk_array.as_ndarray_like().ravel(order="C") + arrow_array = Array.from_numpy(numpy_array) + table = Table.from_arrays(arrays=[arrow_array], names=[_FIELD_NAME]) + # TODO: figure out how to avoid copying the bytes to a new buffer! + # Doh, this is the whole point of Arrow, right? + buffer = io.BytesIO() + write_ipc_stream(table, buffer) + return chunk_spec.prototype.buffer.from_bytes(buffer.getvalue()) + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + raise NotImplementedError diff --git a/tests/test_codecs/test_arrow.py b/tests/test_codecs/test_arrow.py new file mode 100644 index 0000000000..f7bd319d14 --- /dev/null +++ b/tests/test_codecs/test_arrow.py @@ -0,0 +1,42 @@ +from typing import Any + +import numpy as np +import pytest + +from zarr.codecs.arrow import ArrowIPCCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import numpy_buffer_prototype +from zarr.dtype import parse_dtype + +CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype() + + +numpy_array_fixtures = [ + np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), + np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), + np.array([[True, False, True], [False, True, False]], dtype="bool"), + # both come back as object dtype, but if we pass object array to Zarr, it complains about dtype resolution + # np.array(['foo', 'barry', 'bazo'], dtype='U5'), + # np.array(['foo', 'barry', 'bazo'], dtype=np.dtypes.StringDType()) +] + + +@pytest.mark.parametrize("numpy_array", numpy_array_fixtures) +async def test_arrow_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None: + codec = ArrowIPCCodec() + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=parse_dtype(numpy_array.dtype, zarr_format=3), + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array) diff --git a/tests/test_codecs/test_bytes.py b/tests/test_codecs/test_bytes.py new file mode 100644 index 0000000000..31b72f84a6 --- /dev/null +++ b/tests/test_codecs/test_bytes.py @@ -0,0 +1,43 @@ +from typing import Any + +import numpy as np +import pytest + +from zarr.codecs.bytes import BytesCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import numpy_buffer_prototype +from zarr.dtype import parse_dtype + +CPU_BUFFER_PROTOTYPE = numpy_buffer_prototype() + + +numpy_array_fixtures = [ + np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), + np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), + np.array([[True, False, True], [False, True, False]], dtype="bool"), +] + + +@pytest.mark.parametrize("numpy_array", numpy_array_fixtures) +async def test_bytes_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None: + # Test default initialization + codec = BytesCodec() + + # numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype='int64') + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=parse_dtype(numpy_array.dtype, zarr_format=3), + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + assert encoded is not None + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array) From 9c0b409e0ea0a5ab8e6ac8c4781417bc3c29d78e Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Wed, 3 Dec 2025 14:44:04 -0500 Subject: [PATCH 2/3] make column name customizable --- src/zarr/codecs/arrow.py | 24 +++++------ src/zarr/core/metadata/v3.py | 5 ++- tests/test_codecs/test_arrow.py | 71 +++++++++++++++++++++++++++++---- 3 files changed, 79 insertions(+), 21 deletions(-) diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py index 078e385c6d..c9aa8093b0 100644 --- a/src/zarr/codecs/arrow.py +++ b/src/zarr/codecs/arrow.py @@ -16,28 +16,26 @@ from zarr.core.array_spec import ArraySpec from zarr.core.buffer import Buffer, NDBuffer -# TODO: encode the field name with the zarr array name? -# (might not be possible because codecs doesn't have this context) -_FIELD_NAME = "zarr_array" - @dataclass(frozen=True) class ArrowIPCCodec(ArrayBytesCodec): """Arrow IPC codec""" - def __init__(self) -> None: - pass + column_name: str + + def __init__(self, *, column_name: str = "zarr_array") -> None: + object.__setattr__(self, "column_name", column_name) @classmethod def from_dict(cls, data: dict[str, JSON]) -> Self: _, configuration_parsed = parse_named_configuration( - data, "arrow_ipc", require_configuration=False + data, "arrow-ipc", require_configuration=False ) configuration_parsed = configuration_parsed or {} return cls(**configuration_parsed) def to_dict(self) -> dict[str, JSON]: - return {"name": "arrow_ipc", "configuration": {}} + return {"name": "arrow_ipc", "configuration": {"column_name": self.column_name}} def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: # TODO: possibly parse array dtype to configure codec @@ -51,14 +49,14 @@ async def _decode_single( record_batch_reader = read_ipc_stream(io.BytesIO(chunk_bytes.as_buffer_like())) # Note: we only expect a single batch per chunk record_batch = record_batch_reader.read_next_batch() - array = record_batch.column(_FIELD_NAME) + array = record_batch.column(self.column_name) numpy_array = array.to_numpy() # all arrow arrays are flat; reshape to chunk shape numpy_array.shape = chunk_spec.shape # make sure we got the right dtype out - assert numpy_array.dtype == chunk_spec.dtype.to_native_dtype(), ( - f"dtype mismatch, got {numpy_array.dtype}, expected {chunk_spec.dtype.to_native_dtype()}" - ) + # assert numpy_array.dtype == chunk_spec.dtype.to_native_dtype(), ( + # f"dtype mismatch, got {numpy_array.dtype}, expected {chunk_spec.dtype.to_native_dtype()}" + # ) return chunk_spec.prototype.nd_buffer.from_numpy_array(numpy_array) async def _encode_single( @@ -69,7 +67,7 @@ async def _encode_single( # TODO: generalize flattening strategy to prevent memory copies numpy_array = chunk_array.as_ndarray_like().ravel(order="C") arrow_array = Array.from_numpy(numpy_array) - table = Table.from_arrays(arrays=[arrow_array], names=[_FIELD_NAME]) + table = Table.from_arrays(arrays=[arrow_array], names=[self.column_name]) # TODO: figure out how to avoid copying the bytes to a new buffer! # Doh, this is the whole point of Arrow, right? buffer = io.BytesIO() diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index a14373c340..47656a3d2a 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -105,7 +105,10 @@ def validate_codecs(codecs: tuple[Codec, ...], dtype: ZDType[TBaseDType, TBaseSc # TODO: use codec ID instead of class name codec_class_name = abc.__class__.__name__ # TODO: Fix typing here - if isinstance(dtype, VariableLengthUTF8) and not codec_class_name == "VLenUTF8Codec": # type: ignore[unreachable] + if isinstance(dtype, VariableLengthUTF8) and codec_class_name not in ( + "VLenUTF8Codec", + "ArrowIPCCodec", + ): # type: ignore[unreachable] raise ValueError( f"For string dtype, ArrayBytesCodec must be `VLenUTF8Codec`, got `{codec_class_name}`." ) diff --git a/tests/test_codecs/test_arrow.py b/tests/test_codecs/test_arrow.py index f7bd319d14..7c00af21aa 100644 --- a/tests/test_codecs/test_arrow.py +++ b/tests/test_codecs/test_arrow.py @@ -1,8 +1,10 @@ -from typing import Any +import io import numpy as np import pytest +from arro3.io import read_ipc_stream +import zarr from zarr.codecs.arrow import ArrowIPCCodec from zarr.core.array_spec import ArrayConfig, ArraySpec from zarr.core.buffer import numpy_buffer_prototype @@ -12,22 +14,31 @@ numpy_array_fixtures = [ - np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), - np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), - np.array([[True, False, True], [False, True, False]], dtype="bool"), + (np.array([[1, 2, 3], [4, 5, 6]], dtype="int64"), None), + (np.array([[1.5, 2.5], [3.5, 4.5]], dtype="float32"), None), + (np.array([[True, False, True], [False, True, False]], dtype="bool"), None), + ( + np.array(["foo", "barry", "bazo"], dtype=np.dtypes.StringDType()), + zarr.dtype.VariableLengthUTF8(), + ), # both come back as object dtype, but if we pass object array to Zarr, it complains about dtype resolution # np.array(['foo', 'barry', 'bazo'], dtype='U5'), # np.array(['foo', 'barry', 'bazo'], dtype=np.dtypes.StringDType()) ] -@pytest.mark.parametrize("numpy_array", numpy_array_fixtures) -async def test_arrow_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None: +@pytest.mark.parametrize("numpy_array_and_zdtype", numpy_array_fixtures) +async def test_arrow_codec_round_trip(numpy_array_and_zdtype) -> None: + numpy_array, zdtype = numpy_array_and_zdtype + if zdtype is None: + spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3) + else: + spec_dtype = zdtype codec = ArrowIPCCodec() array_config = ArrayConfig(order="C", write_empty_chunks=True) array_spec = ArraySpec( shape=numpy_array.shape, - dtype=parse_dtype(numpy_array.dtype, zarr_format=3), + dtype=spec_dtype, fill_value=0, config=array_config, prototype=CPU_BUFFER_PROTOTYPE, @@ -40,3 +51,49 @@ async def test_arrow_codec_round_trip(numpy_array: np.ndarray[Any, Any]) -> None # Test that the decoded array matches the original numpy_array_decoded = decoded.as_ndarray_like() np.testing.assert_array_equal(numpy_array_decoded, numpy_array) + + +async def test_custom_field_name() -> None: + numpy_array = np.array([[1, 2, 3], [4, 5, 6]], dtype="int64") + spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3) + codec = ArrowIPCCodec(column_name="custom_field_name") + array_config = ArrayConfig(order="C", write_empty_chunks=True) + array_spec = ArraySpec( + shape=numpy_array.shape, + dtype=spec_dtype, + fill_value=0, + config=array_config, + prototype=CPU_BUFFER_PROTOTYPE, + ) + + ndbuffer = CPU_BUFFER_PROTOTYPE.nd_buffer.from_numpy_array(numpy_array) + encoded = await codec._encode_single(ndbuffer, array_spec) + decoded = await codec._decode_single(encoded, array_spec) + + # Test that the decoded array matches the original + numpy_array_decoded = decoded.as_ndarray_like() + np.testing.assert_array_equal(numpy_array_decoded, numpy_array) + + # test that we can read the arrow data directly + record_batch_reader = read_ipc_stream(io.BytesIO(encoded.as_buffer_like())) + record_batch = record_batch_reader.read_next_batch() + assert record_batch.num_columns == 1 + _ = record_batch.column("custom_field_name") + + +def test_string_array() -> None: + # IMO codec tests should be much more self contained, + # not end-to-end array round-tripping tests. + # But don't see a better way to test this at the moment.. + + a = zarr.create_array( + shape=4, + chunks=2, + dtype=zarr.dtype.VariableLengthUTF8(), + serializer=ArrowIPCCodec(), + store=zarr.storage.MemoryStore(), + ) + + a[:] = np.array(["abc", "1234", "foo", "bar"]) + result = a[:] + np.testing.assert_equal(a, result) From 96273a841e8d3a6cbef3bb6d7b607360da4a2cb9 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Wed, 3 Dec 2025 21:25:19 +0100 Subject: [PATCH 3/3] fix type checking errors --- src/zarr/codecs/arrow.py | 2 +- src/zarr/core/metadata/v3.py | 4 ++-- tests/test_codecs/test_arrow.py | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/zarr/codecs/arrow.py b/src/zarr/codecs/arrow.py index c9aa8093b0..12bd2a7767 100644 --- a/src/zarr/codecs/arrow.py +++ b/src/zarr/codecs/arrow.py @@ -32,7 +32,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: data, "arrow-ipc", require_configuration=False ) configuration_parsed = configuration_parsed or {} - return cls(**configuration_parsed) + return cls(**configuration_parsed) # type: ignore[arg-type] def to_dict(self) -> dict[str, JSON]: return {"name": "arrow_ipc", "configuration": {"column_name": self.column_name}} diff --git a/src/zarr/core/metadata/v3.py b/src/zarr/core/metadata/v3.py index 47656a3d2a..c313509ba9 100644 --- a/src/zarr/core/metadata/v3.py +++ b/src/zarr/core/metadata/v3.py @@ -105,10 +105,10 @@ def validate_codecs(codecs: tuple[Codec, ...], dtype: ZDType[TBaseDType, TBaseSc # TODO: use codec ID instead of class name codec_class_name = abc.__class__.__name__ # TODO: Fix typing here - if isinstance(dtype, VariableLengthUTF8) and codec_class_name not in ( + if isinstance(dtype, VariableLengthUTF8) and codec_class_name not in ( # type: ignore[unreachable] "VLenUTF8Codec", "ArrowIPCCodec", - ): # type: ignore[unreachable] + ): raise ValueError( f"For string dtype, ArrayBytesCodec must be `VLenUTF8Codec`, got `{codec_class_name}`." ) diff --git a/tests/test_codecs/test_arrow.py b/tests/test_codecs/test_arrow.py index 7c00af21aa..bb42f1c35d 100644 --- a/tests/test_codecs/test_arrow.py +++ b/tests/test_codecs/test_arrow.py @@ -1,4 +1,5 @@ import io +from typing import Any import numpy as np import pytest @@ -28,7 +29,9 @@ @pytest.mark.parametrize("numpy_array_and_zdtype", numpy_array_fixtures) -async def test_arrow_codec_round_trip(numpy_array_and_zdtype) -> None: +async def test_arrow_codec_round_trip( + numpy_array_and_zdtype: tuple[np.ndarray[Any, Any], zarr.dtype.ZDType[Any, Any] | None], +) -> None: numpy_array, zdtype = numpy_array_and_zdtype if zdtype is None: spec_dtype = parse_dtype(numpy_array.dtype, zarr_format=3) @@ -88,8 +91,8 @@ def test_string_array() -> None: a = zarr.create_array( shape=4, - chunks=2, - dtype=zarr.dtype.VariableLengthUTF8(), + chunks=(2,), + dtype=zarr.dtype.VariableLengthUTF8(), # type: ignore[arg-type] serializer=ArrowIPCCodec(), store=zarr.storage.MemoryStore(), )