Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies = [
"deltalake>=1.0.2",
"graphviz>=0.21",
"gitpython>=3.1.45",
"universal-pathlib>=0.3.8",
]
readme = "README.md"
requires-python = ">=3.11.0"
Expand Down
4 changes: 4 additions & 0 deletions src/orcapod/contexts/data/v0.1.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
"_class": "orcapod.semantic_types.semantic_registry.SemanticTypeRegistry",
"_config": {
"converters": {
"upath": {
"_class": "orcapod.semantic_types.semantic_struct_converters.UPathStructConverter",
"_config": {}
},
"path": {
"_class": "orcapod.semantic_types.semantic_struct_converters.PathStructConverter",
"_config": {}
Expand Down
126 changes: 60 additions & 66 deletions src/orcapod/semantic_types/semantic_struct_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
making semantic types visible in schemas and preserved through operations.
"""

from abc import ABC
from typing import Any, TYPE_CHECKING
from pathlib import Path
from upath import UPath
from orcapod.utils.lazy_module import LazyModule

if TYPE_CHECKING:
Expand Down Expand Up @@ -68,102 +70,80 @@ def _compute_content_hash(self, content: bytes) -> bytes:

return hashlib.sha256(content).digest()


# Path-specific implementation
class PathStructConverter(SemanticStructConverterBase):
"""Converter for pathlib.Path objects to/from semantic structs."""

def __init__(self):
super().__init__("path")
self._python_type = Path

# Define the Arrow struct type for paths
self._arrow_struct_type = pa.struct(
[
pa.field("path", pa.large_string()),
]
)

class FilePathStructConverterBase(SemanticStructConverterBase, ABC):
"""Base converter for file path types Path and UPath, since they have similar APIs."""

def __init__(self, name: str, path_type: type):
super().__init__(name)
self._python_type = path_type
self._field_name = name
self._arrow_struct_type = pa.struct([
pa.field(name, pa.large_string()),
])

@property
def python_type(self) -> type:
return self._python_type

@property
def arrow_struct_type(self) -> pa.StructType:
def arrow_struct_type(self) -> "pa.StructType":
return self._arrow_struct_type

def python_to_struct_dict(self, value: Path) -> dict[str, Any]:
"""Convert Path to struct dictionary."""
if not isinstance(value, Path):
raise TypeError(f"Expected Path, got {type(value)}")

return {
"path": str(value),
}

def struct_dict_to_python(self, struct_dict: dict[str, Any]) -> Path:
"""Convert struct dictionary back to Path."""
path_str = struct_dict.get("path")

def python_to_struct_dict(self, value: Any) -> dict[str, Any]:
"""Convert path object to struct dictionary."""
if not isinstance(value, self._python_type):
raise TypeError(f"Expected {self._python_type.__name__}, got {type(value)}")
return {self._field_name: str(value)}

def struct_dict_to_python(self, struct_dict: dict[str, Any]) -> Any:
"""Convert struct dictionary back to path object."""
path_str = struct_dict.get(self._field_name)
if path_str is None:
raise ValueError("Missing 'path' field in struct")

return Path(path_str)

raise ValueError(f"Missing '{self._field_name}' field in struct")
return self._python_type(path_str)

def can_handle_python_type(self, python_type: type) -> bool:
"""Check if this converter can handle the given Python type."""
return issubclass(python_type, Path)

def can_handle_struct_type(self, struct_type: pa.StructType) -> bool:
return issubclass(python_type, self._python_type)
def can_handle_struct_type(self, struct_type: "pa.StructType") -> bool:
"""Check if this converter can handle the given struct type."""
# Check if struct has the expected fields
field_names = [field.name for field in struct_type]
expected_fields = {"path"}

if set(field_names) != expected_fields:
if set(field_names) != {self._field_name}:
return False

# Check field types
field_types = {field.name: field.type for field in struct_type}

return field_types["path"] == pa.large_string()

return field_types[self._field_name] == pa.large_string()

def is_semantic_struct(self, struct_dict: dict[str, Any]) -> bool:
"""Check if a struct dictionary represents this semantic type."""
return set(struct_dict.keys()) == {"path"} and isinstance(
struct_dict["path"], str
return (
set(struct_dict.keys()) == {self._field_name}
and isinstance(struct_dict[self._field_name], str)
)

def hash_struct_dict(
self, struct_dict: dict[str, Any], add_prefix: bool = False
) -> str:
"""
Compute hash of the file content pointed to by the path.

Args:
struct_dict: Arrow struct dictionary with 'path' field
struct_dict: Arrow struct dictionary with path field
add_prefix: If True, prefix with semantic type and algorithm info

Returns:
Hash string of the file content, optionally prefixed

Raises:
FileNotFoundError: If the file doesn't exist
PermissionError: If the file can't be read
OSError: For other file system errors
"""
path_str = struct_dict.get("path")
path_str = struct_dict.get(self._field_name)
if path_str is None:
raise ValueError("Missing 'path' field in struct")

path = Path(path_str)

raise ValueError(f"Missing '{self._field_name}' field in struct")
path = self._python_type(path_str)
try:
# TODO: replace with FileHasher implementation
# Read file content and compute hash
content = path.read_bytes()
hash_bytes = self._compute_content_hash(content)
return self._format_hash_string(hash_bytes, add_prefix)

except FileNotFoundError:
raise FileNotFoundError(f"File not found: {path}")
except PermissionError:
Expand All @@ -172,3 +152,17 @@ def hash_struct_dict(
raise ValueError(f"Path is a directory, not a file: {path}")
except OSError as e:
raise OSError(f"Error reading file {path}: {e}")


class PathStructConverter(FilePathStructConverterBase):
"""Converter for pathlib.Path objects to/from semantic structs."""

def __init__(self):
super().__init__("path", Path)


class UPathStructConverter(FilePathStructConverterBase):
"""Converter for universal_pathlib.UPath objects to/from semantic structs."""

def __init__(self):
super().__init__("upath", UPath)
13 changes: 13 additions & 0 deletions src/orcapod/semantic_types/universal_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,19 @@ def _create_python_to_arrow_converter(
f"f{i}": converters[i](item) for i, item in enumerate(value)
}

# Handle Optional[T] unions; complex unions (e.g., A | B) are not currently supported
elif origin is typing.Union or origin is types.UnionType:
non_none_types = [t for t in args if t is not type(None)]
if len(non_none_types) == 1:
# Optional[T] - use converter for T, pass through None
inner_converter = self.get_python_to_arrow_converter(non_none_types[0])
return lambda value: inner_converter(value) if value is not None else None
else:
raise ValueError(
f"Complex unions with multiple non-None types are not supported: {python_type}. "
f"Only Optional[T] (i.e., T | None) is allowed."
)

else:
# Default passthrough
return lambda value: value
Expand Down
32 changes: 32 additions & 0 deletions tests/test_semantic_types/test_universal_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,38 @@ def test_python_type_to_arrow_type_custom():
assert field.name == "path"
assert field.type == pa.large_string()

def test_python_type_to_arrow_type_upath():
from upath import UPath
arrow_type = universal_converter.python_type_to_arrow_type(UPath)
# Should be a StructType with field 'upath' of type large_string
assert isinstance(arrow_type, pa.StructType)
assert len(arrow_type) == 1
field = arrow_type[0]
assert field.name == "upath"
assert field.type == pa.large_string()

def test_optional_upath_converter():
"""Test that Optional[UPath] correctly converts UPath values."""
from upath import UPath

to_arrow, to_python = universal_converter.get_conversion_functions(UPath | None)

# Test with UPath value
path = UPath("/tmp/test.txt")
result = to_arrow(path)
assert result == {"upath": "/tmp/test.txt"}

# Test with None
assert to_arrow(None) is None


def test_complex_union_raises_error():
"""Test that complex unions (multiple non-None types) raise ValueError."""
from upath import UPath
from pathlib import Path

with pytest.raises(ValueError, match="Complex unions"):
universal_converter.get_conversion_functions(UPath | Path)

def test_python_type_to_arrow_type_context():
ctx = get_default_context()
Expand Down
Loading