Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ repos:
hooks:
- id: sync-with-uv
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.14.7
rev: v0.14.11
hooks:
- id: ruff-check
args: [--fix, --exit-non-zero-on-fix]
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `tilebox-datasets`: The `create_dataset` method of the `Client` has been removed. Use `create_or_update_dataset` instead.

### Fixed

- `tilebox-storage`: Fixed a bug on Windows, where the `CopernicusStorageClient` and `USGSLandsatStorageClient` were
Expand Down
26 changes: 16 additions & 10 deletions tilebox-datasets/tilebox/datasets/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from _tilebox.grpc.aio.channel import open_channel
from _tilebox.grpc.aio.error import with_pythonic_errors
from _tilebox.grpc.error import NotFoundError
from tilebox.datasets.aio.dataset import DatasetClient
from tilebox.datasets.client import Client as BaseClient
from tilebox.datasets.client import token_from_env
Expand Down Expand Up @@ -33,33 +34,38 @@ def __init__(self, *, url: str = "https://api.tilebox.com", token: str | None =
)
self._client = BaseClient(service)

async def create_dataset(
async def create_or_update_dataset(
self,
kind: DatasetKind,
code_name: str,
fields: list[FieldDict],
fields: list[FieldDict] | None = None,
*,
name: str | None = None,
description: str | None = None,
) -> DatasetClient:
"""Create a new dataset.

Args:
kind: The kind of the dataset.
code_name: The code name of the dataset.
fields: The fields of the dataset.
fields: The custom fields of the dataset.
name: The name of the dataset. Defaults to the code name.
description: A short description of the dataset. Optional.

Returns:
The created dataset.
"""
if name is None:
name = code_name
if description is None:
description = ""

return await self._client.create_dataset(kind, code_name, fields, name, description, DatasetClient)
try:
dataset = await self.dataset(code_name)
except NotFoundError:
return await self._client.create_dataset(kind, code_name, fields or [], name or code_name, DatasetClient)

return await self._client.update_dataset(
kind,
dataset._dataset.id, # noqa: SLF001
fields or [],
name or dataset._dataset.name, # noqa: SLF001
DatasetClient,
)

async def datasets(self) -> Group:
"""Fetch all available datasets."""
Expand Down
27 changes: 23 additions & 4 deletions tilebox-datasets/tilebox/datasets/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,32 @@ class Client:
def __init__(self, service: TileboxDatasetService) -> None:
self._service = service

def create_dataset( # noqa: PLR0913
self, kind: DatasetKind, code_name: str, fields: list[FieldDict], name: str, summary: str, dataset_type: type[T]
def create_dataset(
self,
kind: DatasetKind,
code_name: str,
fields: list[FieldDict] | None,
name: str | None,
py_dataset_class: type[T],
) -> Promise[T]:
return (
self._service.create_dataset(kind, code_name, fields, name, summary)
self._service.create_dataset(kind, code_name, name or code_name, fields or [])
.then(_ensure_registered)
.then(lambda dataset: dataset_type(self._service, dataset))
.then(lambda dataset: py_dataset_class(self._service, dataset))
)

def update_dataset(
self,
kind: DatasetKind,
dataset_id: UUID,
fields: list[FieldDict] | None,
name: str | None,
py_dataset_class: type[T],
) -> Promise[T]:
return (
self._service.update_dataset(kind, dataset_id, name, fields or [])
.then(_ensure_registered)
.then(lambda dataset: py_dataset_class(self._service, dataset))
)

def datasets(self, dataset_type: type[T]) -> Promise[Group]:
Expand Down
57 changes: 52 additions & 5 deletions tilebox-datasets/tilebox/datasets/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
GetDatasetRequest,
ListDatasetsRequest,
Package,
UpdateDatasetRequest,
)
from tilebox.datasets.datasets.v1.datasets_pb2_grpc import DatasetServiceStub
from tilebox.datasets.query.pagination import Pagination
Expand Down Expand Up @@ -64,24 +65,70 @@ def __init__(
self._data_ingestion_service = data_ingestion_service_stub

def create_dataset(
self, kind: DatasetKind, code_name: str, fields: list[FieldDict], name: str, summary: str
self, kind: DatasetKind, code_name: str, name: str, custom_fields: list[FieldDict]
) -> Promise[Dataset]:
"""Create a new dataset.

Args:
kind: The kind of the dataset.
code_name: The code name of the dataset.
fields: The fields of the dataset.
name: The name of the dataset.
summary: A short summary of the dataset.
fields: The custom fields of the dataset

Returns:
The created dataset.
"""
dataset_type = DatasetType(kind, _REQUIRED_FIELDS_PER_DATASET_KIND[kind] + [Field.from_dict(f) for f in fields])
req = CreateDatasetRequest(name=name, type=dataset_type.to_message(), summary=summary, code_name=code_name)
dataset_type = DatasetType(
kind, _REQUIRED_FIELDS_PER_DATASET_KIND[kind] + [Field.from_dict(f) for f in custom_fields]
)
req = CreateDatasetRequest(name=name, type=dataset_type.to_message(), code_name=code_name)
return Promise.resolve(self._dataset_service.CreateDataset(req)).then(Dataset.from_message)

def update_dataset(
self, kind: DatasetKind, dataset_id: UUID, name: str | None, custom_fields: list[FieldDict]
) -> Promise[Dataset]:
"""Update a dataset.

Args:
kind: The kind of the dataset to update, cannot be changed.
dataset_id: The id of the dataset to update, cannot be changed.
name: The new name of the dataset.
custom_fields: The new list of custom fields of the dataset.

Returns:
The updated dataset.
"""
dataset_type = DatasetType(
kind, _REQUIRED_FIELDS_PER_DATASET_KIND[kind] + [Field.from_dict(f) for f in custom_fields]
)
req = UpdateDatasetRequest(id=uuid_to_uuid_message(dataset_id), name=name, type=dataset_type.to_message())
return Promise.resolve(self._dataset_service.UpdateDataset(req)).then(Dataset.from_message)

def create_or_update_dataset(
self, kind: DatasetKind, code_name: str, name: str, custom_fields: list[FieldDict]
) -> Promise[Dataset]:
"""Create a new dataset, or update it if it already exists.

Args:
kind: The kind of the dataset.
code_name: The code name of the dataset.
name: The name of the dataset.
custom_fields: The custom fields of the dataset

Returns:
The created or updated dataset.
"""
return (
Promise.resolve(self._dataset_service.GetDataset(GetDatasetRequest(slug=code_name)))
.then(
did_fulfill=lambda dataset: self.update_dataset(
kind, Dataset.from_message(dataset).id, name, custom_fields
),
did_reject=lambda _: self.create_dataset(kind, code_name, name, custom_fields),
)
.then(Dataset.from_message)
)

def list_datasets(self) -> Promise[ListDatasetsResponse]:
"""List all datasets and dataset groups."""
return Promise.resolve(
Expand Down
27 changes: 16 additions & 11 deletions tilebox-datasets/tilebox/datasets/sync/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from uuid import UUID

from _tilebox.grpc.channel import open_channel
from _tilebox.grpc.error import with_pythonic_errors
from _tilebox.grpc.error import NotFoundError, with_pythonic_errors
from tilebox.datasets.client import Client as BaseClient
from tilebox.datasets.client import token_from_env
from tilebox.datasets.data.datasets import DatasetKind, FieldDict
Expand Down Expand Up @@ -33,33 +33,38 @@ def __init__(self, *, url: str = "https://api.tilebox.com", token: str | None =
)
self._client = BaseClient(service)

def create_dataset(
def create_or_update_dataset(
self,
kind: DatasetKind,
code_name: str,
fields: list[FieldDict],
fields: list[FieldDict] | None = None,
*,
name: str | None = None,
description: str | None = None,
) -> DatasetClient:
"""Create a new dataset.

Args:
kind: The kind of the dataset.
code_name: The code name of the dataset.
fields: The fields of the dataset.
fields: The custom fields of the dataset.
name: The name of the dataset. Defaults to the code name.
description: A short description of the dataset. Optional.

Returns:
The created dataset.
"""
if name is None:
name = code_name
if description is None:
description = ""

return self._client.create_dataset(kind, code_name, fields, name, description, DatasetClient).get()
try:
dataset = self.dataset(code_name)
except NotFoundError:
return self._client.create_dataset(kind, code_name, fields or [], name or code_name, DatasetClient).get()

return self._client.update_dataset(
kind,
dataset._dataset.id, # noqa: SLF001
fields or [],
name or dataset._dataset.name, # noqa: SLF001
DatasetClient,
).get()

def datasets(self) -> Group:
"""Fetch all available datasets."""
Expand Down
4 changes: 2 additions & 2 deletions tilebox-workflows/tilebox/workflows/runner/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,9 @@ def submit_subtask(
def submit_subtasks(
self,
tasks: Sequence[TaskInstance],
depends_on: FutureTask | list[FutureTask] | None = None,
cluster: str | None = None,
max_retries: int = 0,
depends_on: FutureTask | list[FutureTask] | None = None,
) -> list[FutureTask]:
return [
self.submit_subtask(task, cluster=cluster, max_retries=max_retries, depends_on=depends_on) for task in tasks
Expand All @@ -575,7 +575,7 @@ def submit_batch(
DeprecationWarning,
stacklevel=2,
)
return self.submit_subtasks(tasks, cluster, max_retries)
return self.submit_subtasks(tasks, cluster=cluster, max_retries=max_retries)

def progress(self, label: str | None = None) -> ProgressUpdate:
if label == "":
Expand Down
12 changes: 10 additions & 2 deletions tilebox-workflows/tilebox/workflows/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,11 @@ class ExecutionContext(ABC):

@abstractmethod
def submit_subtask(
self, task: Task, depends_on: list[FutureTask] | None = None, cluster: str | None = None, max_retries: int = 0
self,
task: Task,
depends_on: FutureTask | list[FutureTask] | None = None,
cluster: str | None = None,
max_retries: int = 0,
) -> FutureTask:
"""Submit a subtask of the current task.

Expand All @@ -374,7 +378,11 @@ def submit_subtask(

@abstractmethod
def submit_subtasks(
self, tasks: Sequence[Task], cluster: str | None = None, max_retries: int = 0
self,
tasks: Sequence[Task],
depends_on: FutureTask | list[FutureTask] | None = None,
cluster: str | None = None,
max_retries: int = 0,
) -> list[FutureTask]:
"""Submit a batch of subtasks of the current task. Similar to `submit_subtask`, but for multiple tasks."""

Expand Down
Loading
Loading