Skip to content

Commit b0d1f1b

Browse files
Oleg TrygubOleg Trygub
authored andcommitted
Clear_global_keys
1 parent 1b45ef7 commit b0d1f1b

File tree

4 files changed

+339
-0
lines changed

4 files changed

+339
-0
lines changed

labelbox/client.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,6 +1229,110 @@ def _format_failed_rows(rows: List[str],
12291229
)
12301230
time.sleep(sleep_time)
12311231

1232+
def clear_global_keys(
1233+
self,
1234+
global_keys: List[str],
1235+
timeout_seconds=60) -> Dict[str, Union[str, List[Any]]]:
1236+
"""
1237+
Gets data row ids for a list of global keys.
1238+
1239+
Args:
1240+
A list of global keys
1241+
Returns:
1242+
Dictionary containing 'status', 'results' and 'errors'.
1243+
1244+
'Status' contains the outcome of this job. It can be one of
1245+
'Success', 'Partial Success', or 'Failure'.
1246+
1247+
'Results' contains a list of data row ids successfully fetchced. It may
1248+
not necessarily contain all data rows requested.
1249+
1250+
'Errors' contains a list of global_keys that could not be fetched, along
1251+
with the failure reason
1252+
Examples:
1253+
>>> job_result = client.get_data_row_ids_for_global_keys(["key1","key2"])
1254+
>>> print(job_result['status'])
1255+
Partial Success
1256+
>>> print(job_result['results'])
1257+
['cl7tv9wry00hlka6gai588ozv', 'cl7tv9wxg00hpka6gf8sh81bj']
1258+
>>> print(job_result['errors'])
1259+
[{'global_key': 'asdf', 'error': 'Data Row not found'}]
1260+
"""
1261+
1262+
def _format_global_key_er(rows: List[str],
1263+
error_msg: str) -> List[Dict[str, str]]:
1264+
return [{'global_key': r, 'error': error_msg} for r in rows]
1265+
1266+
# Start get data rows for global keys job
1267+
query_str = """mutation clearGlobalKeysPyApi($globalKeys: [ID!]!) {
1268+
clearGlobalKeys(where: {ids: $globalKeys}) { jobId}}
1269+
"""
1270+
params = {"globalKeys": global_keys}
1271+
clear_global_keys_job = self.execute(query_str, params)
1272+
1273+
# Query string for retrieving job status and result, if job is done
1274+
result_query_str = """query clearGlobalKeysResultPyApi($jobId: ID!) {
1275+
clearGlobalKeysResult(jobId: {id: $jobId}) { data {
1276+
clearedGlobalKeys
1277+
failedToClearGlobalKeys
1278+
notFoundGlobalKeys
1279+
accessDeniedGlobalKeys
1280+
} jobStatus}}
1281+
"""
1282+
result_params = {
1283+
"jobId":
1284+
clear_global_keys_job["clearGlobalKeys"]["jobId"]
1285+
}
1286+
1287+
def _format_failed_rows(rows: List[str],
1288+
error_msg: str) -> List[Dict[str, str]]:
1289+
return [{'global_key': r, 'error': error_msg} for r in rows]
1290+
1291+
# Poll job status until finished, then retrieve results
1292+
sleep_time = 2
1293+
start_time = time.time()
1294+
while True:
1295+
res = self.execute(result_query_str, result_params)
1296+
if res["clearGlobalKeysResult"]['jobStatus'] == "COMPLETE":
1297+
data = res["clearGlobalKeysResult"]['data']
1298+
results, errors = [], []
1299+
results.extend(data['clearedGlobalKeys'])
1300+
errors.extend(
1301+
_format_failed_rows(data['failedToClearGlobalKeys'],
1302+
"Clearing global key failed"))
1303+
errors.extend(
1304+
_format_failed_rows(data['notFoundGlobalKeys'],
1305+
"Failed to find data row matching provided global key"))
1306+
errors.extend(
1307+
_format_failed_rows(data['accessDeniedGlobalKeys'],
1308+
"Denied access to modify data row matching provided global key"))
1309+
1310+
# Invalid results may contain empty string, so we must filter
1311+
# them prior to checking for PARTIAL_SUCCESS
1312+
filtered_results = list(filter(lambda r: r != '', results))
1313+
if not errors:
1314+
status = CollectionJobStatus.SUCCESS.value
1315+
elif errors and len(filtered_results) > 0:
1316+
status = CollectionJobStatus.PARTIAL_SUCCESS.value
1317+
else:
1318+
status = CollectionJobStatus.FAILURE.value
1319+
1320+
if errors:
1321+
logger.warning(
1322+
"There are errors present. Please look at 'errors' in the returned dict for more details"
1323+
)
1324+
1325+
return {"status": status, "results": results, "errors": errors}
1326+
elif res["clearGlobalKeysResult"]['jobStatus'] == "FAILED":
1327+
raise labelbox.exceptions.LabelboxError(
1328+
"Job clearGlobalKeys failed.")
1329+
current_time = time.time()
1330+
if current_time - start_time > timeout_seconds:
1331+
raise labelbox.exceptions.TimeoutError(
1332+
"Timed out waiting for clear_global_keys job to complete."
1333+
)
1334+
time.sleep(sleep_time)
1335+
12321336
def get_catalog_slice(self, slice_id) -> CatalogSlice:
12331337
"""
12341338
Fetches a Catalog Slice by ID.

labelbox_dev/fetching_iterator.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from collections import deque
2+
3+
TASK_ID_KEY = 'jobID'
4+
TASK_STATUS_KEY = 'jobStatus'
5+
TOTAL_PAGES_KEY = 'totalPage'
6+
PAGE_NUMBER_KEY = 'pageNumber'
7+
DATA_KEY = 'data'
8+
DEFAULT_QUERY = ''
9+
DEFAULT_QUERY_NAME = ''
10+
11+
class FetchingIterator:
12+
def __init__(self,
13+
object_class,
14+
query,
15+
params,
16+
prefetched_page_json = None,
17+
total_pages = None):
18+
self.queue = deque()
19+
self.object_class = object_class
20+
self.query = query
21+
self.params = params
22+
self.iteration_ended = False
23+
self.page_number = 0
24+
self.total_pages = 0
25+
26+
self.__fetch_next_page(prefetched_page_json, total_pages)
27+
28+
29+
def __len__(self):
30+
return len(self._queue)
31+
32+
def __iter__(self):
33+
return self
34+
35+
def __next__(self):
36+
if not self._queue and not self._fetch_next_page():
37+
raise StopIteration()
38+
39+
return self.queue.popleft()
40+
41+
def __instatiate_typed_entities(self, json_entities):
42+
entities = []
43+
for json_entity in json_entities:
44+
entities.append(self.object_class(json_entity))
45+
return entities
46+
47+
def __parse_response(self, response):
48+
total_pages = response.get(TOTAL_PAGES_KEY, 0)
49+
page_number = response.get(PAGE_NUMBER_KEY, 0)
50+
json_entities = response.get(DATA_KEY, {})
51+
return json_entities, page_number, total_pages
52+
53+
def __fetch_next_page(self, prefetched_page, total_pages):
54+
if self.iteration_ended:
55+
return False
56+
57+
if not prefetched_page:
58+
page_response = self.session.execute(self.query, self.params)
59+
json_entities, self.page_number, self.total_pages = self.__parse_response(page_response)
60+
else:
61+
json_entities, self.page_number, self.total_pages = self.__parse_response(prefetched_page)
62+
63+
entities = self.__instatiate_typed_entities(json_entities)
64+
self.queue.append(entities)
65+
66+
if self.page_number == total_pages:
67+
self.iteration_ended = True
68+
69+
return True

labelbox_dev/globalkeys.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from labelbox_dev import entEnity
2+
3+
from collections import namedtuple
4+
5+
from pydantic import NoneStr
6+
sys.path.append("/Users/olegtrygub/src/labelbox-python")
7+
import labelbox
8+
9+
from labelbox.client import Client
10+
from labelbox.utils import camel_case
11+
12+
from typing import Optional
13+
14+
from time import time
15+
16+
client = labelbox.Client(
17+
api_key="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOiJjbDh1dHpxMm8wMDBieHBwMTR0a2o4ZWJzIiwib3JnYW5pemF0aW9uSWQiOiJjbDh1dHpxMXAwMDBheHBwMWNrdHhod3p0IiwiYXBpS2V5SWQiOiJjbDh1dTB4b2YwMDBleHBwMWQwcmMweDhsIiwic2VjcmV0IjoiMmExNDQ4NGZmZmIzMjQyNzk0ZTNlNmIwOTRkMDY3OWYiLCJpYXQiOjE2NjQ5MjYxMjIsImV4cCI6MjI5NjA3ODEyMn0.cqtxIEKVY1GPAQjyqQ8VrbMGkUywx-IwC9BGkCi8ahU",
18+
endpoint = 'http://localhost:8080/graphql',
19+
app_url="localhost:3000"
20+
)
21+
22+
23+
class BaseObject:
24+
def create_from_json(self):
25+
pass
26+
27+
28+
class DataRowInfo(Entity):
29+
def __init__(self):
30+
self.data_row_id = None
31+
self.globalkey = None
32+
33+
class DataRowAssignment(Entity):
34+
pass
35+
36+
class BaseError(BaseObject):
37+
def __init__(self, message):
38+
self.message = message
39+
40+
41+
42+
43+
TASK_ID_KEY = 'jobID'
44+
TASK_STATUS_KEY = 'jobStatus'
45+
TOTAL_PAGES_KEY = 'totalPage'
46+
PAGE_NUMBER_KEY = 'pageNumber'
47+
DATA_KEY = 'data'
48+
DEFAULT_QUERY = ''
49+
DEFAULT_QUERY_NAME = ''
50+
51+
52+
DEFAULT_PAGE_SCHEMA = {}
53+
54+
55+
56+
57+
58+
def get_data_row_ids_for_globaleys(globalkeys):
59+
pass
60+
61+
62+
def assign_global_keys_to_data_rows(global_key_to_data_row_inputs: List[Dict[str, str]], timeout_seconds=60) -> Dict[str, Union[str, List[Any]]]:
63+
pass
64+
65+
if __name__ == "__main__":
66+
67+
68+
69+
PATTERN = "gs://labelbox-datasets/coco_dataset/train2017/00000000{}.jpg-1"
70+
gks = [PATTERN.format(num ) for num in range(1006, 1300)]
71+
dr_gk_ids = client.get_data_row_ids_for_global_keys(gks, timeout_seconds=4)
72+
print(dr_gk_ids.keys())
73+
74+
75+

labelbox_dev/task.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from typing import TYPE_CHECKING, List, Dict, Any, Union, Optional
2+
from time import time
3+
from datetime import datetime
4+
from enum import Enum
5+
6+
class TaskStatus(Enum):
7+
SUBMITTED = 0
8+
COMPLETE = 1
9+
RUNNING = 2
10+
FAILED = 3
11+
RETRYING = 3
12+
13+
class TaskType(Enum):
14+
BEST_EFFORT = 0
15+
TRANSACTION = 1
16+
17+
class BaseTask:
18+
def __init__(self, task_id, object_class, metadata_query, query_name, created_at, expires_at = None):
19+
self._session = client
20+
21+
self._metadata_query = metadata_query
22+
self._query_name = query_name
23+
self._task_id = task_id
24+
self.object_class = object_class
25+
26+
self._status = None
27+
self._created_at = created_at
28+
self._expires_at = expires_at
29+
30+
31+
def status(self) -> Optional[TaskStatus]:
32+
# fetch if stale
33+
if self._status is None or self.status == TaskStatus.RUNNING:
34+
self.__fetch_task_metadata()
35+
36+
return self._status
37+
38+
def wait_till_done(self):
39+
pass
40+
41+
def created_at(self) -> time:
42+
return self._created_at
43+
44+
def expires_at(self) -> Optional[time]:
45+
pass
46+
47+
def __fetch_task_metadata(self):
48+
response = self._session.execute(self._metadata_query, {TASK_ID_KEY: self.task_id})
49+
self.task_status = response[TASK_STATUS_KEY]
50+
51+
52+
class Task(BaseTask):
53+
def error(self) -> Optional[BaseError]:
54+
pass
55+
56+
def result(self) -> Optional[BaseObject]:
57+
pass
58+
59+
class BulkTask(BaseTask):
60+
def __init__(
61+
self,
62+
task_id,
63+
object_class,
64+
task_type = TaskType.TRANSACTION,
65+
query = DEFAULT_QUERY,
66+
query_name = DEFAULT_QUERY_NAME,
67+
created_at = datetime.now()
68+
):
69+
super(BaseTask, self).__init__(task_id, query, query_name, created_at, object_class)
70+
self.results_iterator = None
71+
self.errors_iterator = None
72+
73+
def results(self) -> Optional[FetchingIterator]:
74+
if self.status() == TaskStatus.COMPLETE:
75+
if self.results_iterator:
76+
return self.results_iterator
77+
78+
self.results_iterator = FetchingIterator()
79+
return None
80+
81+
def errors(self) -> FetchingIterator:
82+
if self.status() != TaskStatus.SUBMITTED and self.status() != TaskStatus.RUNNING:
83+
if self.errors_iterator:
84+
return self.errors_iterator
85+
86+
self.errors_iterator = FetchingIterator()
87+
return None
88+
89+
@property
90+
def kind(self) -> TaskType:
91+
return self.task_type

0 commit comments

Comments
 (0)