1+ import enum
12import json
2- import time
33import logging
4+ import time
5+ import warnings
46from collections import namedtuple
57from datetime import datetime , timezone
68from pathlib import Path
7- from typing import Dict , Union , Iterable
9+ from typing import Dict , Union , Iterable , List , Optional
810from urllib .parse import urlparse
9- import requests
11+
1012import ndjson
13+ import requests
1114
1215from labelbox import utils
13- from labelbox .schema .data_row import DataRow
14- from labelbox .orm import query
15- from labelbox .schema .bulk_import_request import BulkImportRequest
1616from labelbox .exceptions import InvalidQueryError , LabelboxError
17+ from labelbox .orm import query
1718from labelbox .orm .db_object import DbObject , Updateable , Deletable
1819from labelbox .orm .model import Entity , Field , Relationship
1920from labelbox .pagination import PaginatedCollection
21+ from labelbox .schema .bulk_import_request import BulkImportRequest
22+ from labelbox .schema .data_row import DataRow
2023
2124try :
2225 datetime .fromisoformat # type: ignore[attr-defined]
2326except AttributeError :
2427 from backports .datetime_fromisoformat import MonkeyPatch
28+
2529 MonkeyPatch .patch_fromisoformat ()
2630
2731try :
3135
3236logger = logging .getLogger (__name__ )
3337
38+ MAX_QUEUE_BATCH_SIZE = 1000
39+
40+
41+ class QueueMode (enum .Enum ):
42+ Batch = "Batch"
43+ Dataset = "Dataset"
44+
45+
46+ class QueueErrors (enum .Enum ):
47+ InvalidDataRowType = 'InvalidDataRowType'
48+ AlreadyInProject = 'AlreadyInProject'
49+ HasAttachedLabel = 'HasAttachedLabel'
50+
3451
3552class Project (DbObject , Updateable , Deletable ):
3653 """ A Project is a container that includes a labeling frontend, an ontology,
@@ -79,6 +96,14 @@ class Project(DbObject, Updateable, Deletable):
7996 benchmarks = Relationship .ToMany ("Benchmark" , False )
8097 ontology = Relationship .ToOne ("Ontology" , True )
8198
99+ def update (self , ** kwargs ):
100+
101+ mode : Optional [QueueMode ] = kwargs .pop ("queue_mode" , None )
102+ if mode :
103+ self ._update_queue_mode (mode )
104+
105+ return super ().update (** kwargs )
106+
82107 def members (self ):
83108 """ Fetch all current members for this project
84109
@@ -407,14 +432,14 @@ def setup(self, labeling_frontend, labeling_frontend_options):
407432 a.k.a. project ontology. If given a `dict` it will be converted
408433 to `str` using `json.dumps`.
409434 """
410- organization = self . client . get_organization ()
435+
411436 if not isinstance (labeling_frontend_options , str ):
412437 labeling_frontend_options = json .dumps (labeling_frontend_options )
413438
414439 self .labeling_frontend .connect (labeling_frontend )
415440
416441 LFO = Entity .LabelingFrontendOptions
417- labeling_frontend_options = self .client ._create (
442+ self .client ._create (
418443 LFO , {
419444 LFO .project : self ,
420445 LFO .labeling_frontend : labeling_frontend ,
@@ -424,6 +449,103 @@ def setup(self, labeling_frontend, labeling_frontend_options):
424449 timestamp = datetime .now (timezone .utc ).strftime ("%Y-%m-%dT%H:%M:%SZ" )
425450 self .update (setup_complete = timestamp )
426451
452+ def queue (self , data_row_ids : List [str ]):
453+ """Add Data Rows to the Project queue"""
454+
455+ method = "submitBatchOfDataRows"
456+ return self ._post_batch (method , data_row_ids )
457+
458+ def dequeue (self , data_row_ids : List [str ]):
459+ """Remove Data Rows from the Project queue"""
460+
461+ method = "removeBatchOfDataRows"
462+ return self ._post_batch (method , data_row_ids )
463+
464+ def _post_batch (self , method , data_row_ids : List [str ]):
465+ """Post batch methods"""
466+
467+ if self .queue_mode () != QueueMode .Batch :
468+ raise ValueError ("Project must be in batch mode" )
469+
470+ if len (data_row_ids ) > MAX_QUEUE_BATCH_SIZE :
471+ raise ValueError (
472+ f"Batch exceeds max size of { MAX_QUEUE_BATCH_SIZE } , consider breaking it into parts"
473+ )
474+
475+ query = """mutation %sPyApi($projectId: ID!, $dataRowIds: [ID!]!) {
476+ project(where: {id: $projectId}) {
477+ %s(data: {dataRowIds: $dataRowIds}) {
478+ dataRows {
479+ dataRowId
480+ error
481+ }
482+ }
483+ }
484+ }
485+ """ % (method , method )
486+
487+ res = self .client .execute (query , {
488+ "projectId" : self .uid ,
489+ "dataRowIds" : data_row_ids
490+ })["project" ][method ]["dataRows" ]
491+
492+ # TODO: figure out error messaging
493+ if len (data_row_ids ) == len (res ):
494+ raise ValueError ("No dataRows were submitted successfully" )
495+
496+ if len (data_row_ids ) > 0 :
497+ warnings .warn ("Some Data Rows were not submitted successfully" )
498+
499+ return res
500+
501+ def _update_queue_mode (self , mode : QueueMode ) -> QueueMode :
502+
503+ if self .queue_mode () == mode :
504+ return mode
505+
506+ if mode == QueueMode .Batch :
507+ status = "ENABLED"
508+ elif mode == QueueMode .Dataset :
509+ status = "DISABLED"
510+ else :
511+ raise ValueError (
512+ "Must provide either `BATCH` or `DATASET` as a mode" )
513+
514+ query_str = """mutation %s($projectId: ID!, $status: TagSetStatusInput!) {
515+ project(where: {id: $projectId}) {
516+ setTagSetStatus(input: {tagSetStatus: $status}) {
517+ tagSetStatus
518+ }
519+ }
520+ }
521+ """ % "setTagSetStatusPyApi"
522+
523+ self .client .execute (query_str , {
524+ 'projectId' : self .uid ,
525+ 'status' : status
526+ })
527+
528+ return mode
529+
530+ def queue_mode (self ):
531+
532+ query_str = """query %s($projectId: ID!) {
533+ project(where: {id: $projectId}) {
534+ tagSetStatus
535+ }
536+ }
537+ """ % "GetTagSetStatusPyApi"
538+
539+ status = self .client .execute (
540+ query_str , {'projectId' : self .uid })["project" ]["tagSetStatus" ]
541+
542+ if status == "ENABLED" :
543+ return QueueMode .Batch
544+ elif status == "DISABLED" :
545+ return QueueMode .Dataset
546+ else :
547+ raise ValueError ("Status not known" )
548+
427549 def validate_labeling_parameter_overrides (self , data ):
428550 for idx , row in enumerate (data ):
429551 if len (row ) != 3 :
0 commit comments