Skip to content
9 changes: 3 additions & 6 deletions backend/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,17 +891,14 @@ async def retry_processing(uri=Form(None), userName=Form(None), password=Form(No
try:
start = time.time()
graph = create_graph_database_connection(uri, userName, password, database)
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name})
# chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS,params={"filename":file_name})
end = time.time()
elapsed_time = end - start
json_obj = {'api_name':'retry_processing', 'db_url':uri, 'userName':userName, 'database':database, 'file_name':file_name,'retry_condition':retry_condition,
'logging_time': formatted_time(datetime.now(timezone.utc)), 'elapsed_api_time':f'{elapsed_time:.2f}','email':email}
logger.log_struct(json_obj, "INFO")
if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
return create_api_response('Success',message=f"Chunks are not created for the file{file_name}. Please upload again the file to re-process.",data=chunks)
else:
await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition)
return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}")
await asyncio.to_thread(set_status_retry, graph,file_name,retry_condition)
return create_api_response('Success',message=f"Status set to Ready to Reprocess for filename : {file_name}")
except Exception as e:
job_status = "Failed"
message="Unable to set status to Retry"
Expand Down
41 changes: 22 additions & 19 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def create_source_node_graph_url_wikipedia(graph, model, wiki_query, source_type
async def extract_graph_from_file_local_file(uri, userName, password, database, model, merged_file_path, fileName, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):

logging.info(f'Process file name :{fileName}')
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri, fileName)
Expand All @@ -244,7 +244,7 @@ async def extract_graph_from_file_local_file(uri, userName, password, database,
return await processing_source(uri, userName, password, database, model, fileName, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, True, merged_file_path, retry_condition, additional_instructions=additional_instructions)

async def extract_graph_from_file_s3(uri, userName, password, database, model, source_url, aws_access_key_id, aws_secret_access_key, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
if(aws_access_key_id==None or aws_secret_access_key==None):
raise LLMGraphBuilderException('Please provide AWS access and secret keys')
else:
Expand All @@ -258,7 +258,7 @@ async def extract_graph_from_file_s3(uri, userName, password, database, model, s
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)

async def extract_graph_from_web_page(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
pages = get_documents_from_web_page(source_url)
if pages==None or len(pages)==0:
raise LLMGraphBuilderException(f'Content is not available for given URL : {file_name}')
Expand All @@ -267,7 +267,7 @@ async def extract_graph_from_web_page(uri, userName, password, database, model,
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)

async def extract_graph_from_file_youtube(uri, userName, password, database, model, source_url, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
file_name, pages = get_documents_from_youtube(source_url)

if pages==None or len(pages)==0:
Expand All @@ -277,7 +277,7 @@ async def extract_graph_from_file_youtube(uri, userName, password, database, mod
return await processing_source(uri, userName, password, database, model, file_name, [], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)

async def extract_graph_from_file_Wikipedia(uri, userName, password, database, model, wiki_query, language, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
file_name, pages = get_documents_from_Wikipedia(wiki_query, language)
if pages==None or len(pages)==0:
raise LLMGraphBuilderException(f'Wikipedia page is not available for file : {file_name}')
Expand All @@ -286,7 +286,7 @@ async def extract_graph_from_file_Wikipedia(uri, userName, password, database, m
return await processing_source(uri, userName, password, database, model, file_name,[], allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition=retry_condition, additional_instructions=additional_instructions)

async def extract_graph_from_file_gcs(uri, userName, password, database, model, gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token, file_name, allowedNodes, allowedRelationship, token_chunk_size, chunk_overlap, chunks_to_combine, retry_condition, additional_instructions):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
file_name, pages = get_documents_from_gcs(gcs_project_id, gcs_bucket_name, gcs_bucket_folder, gcs_blob_filename, access_token)
if pages==None or len(pages)==0:
raise LLMGraphBuilderException(f'File content is not available for file : {file_name}')
Expand Down Expand Up @@ -431,7 +431,7 @@ async def processing_source(uri, userName, password, database, model, file_name,

# merged_file_path have value only when file uploaded from local

if is_uploaded_from_local:
if is_uploaded_from_local and bool(is_cancelled_status) == False:
gcs_file_cache = os.environ.get('GCS_FILE_CACHE')
if gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
Expand Down Expand Up @@ -511,7 +511,7 @@ async def processing_chunks(chunkId_chunkDoc_list,graph,uri, userName, password,
return node_count,rel_count,latency_processing_chunk

def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_overlap, retry_condition):
if not retry_condition:
if retry_condition in ["", None] or retry_condition not in [DELETE_ENTITIES_AND_START_FROM_BEGINNING, START_FROM_LAST_PROCESSED_POSITION]:
logging.info("Break down file into chunks")
bad_chars = ['"', "\n", "'"]
for i in range(0,len(pages)):
Expand All @@ -532,7 +532,7 @@ def get_chunkId_chunkDoc_list(graph, file_name, pages, token_chunk_size, chunk_o
chunks = execute_graph_query(graph,QUERY_TO_GET_CHUNKS, params={"filename":file_name})

if chunks[0]['text'] is None or chunks[0]['text']=="" or not chunks :
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file and try again.")
raise LLMGraphBuilderException(f"Chunks are not created for {file_name}. Please re-upload file or reprocess the file with option Start From Beginning.")
else:
for chunk in chunks:
chunk_doc = Document(page_content=chunk['text'], metadata={'id':chunk['id'], 'position':chunk['position']})
Expand Down Expand Up @@ -714,15 +714,9 @@ def manually_cancelled_job(graph, filenames, source_types, merged_dir, uri):
obj_source_node.updated_at = datetime.now()
graphDb_data_Access = graphDBdataAccess(graph)
graphDb_data_Access.update_source_node(obj_source_node)
count_response = graphDb_data_Access.update_node_relationship_count(file_name)
#Update the nodeCount and relCount properties in Document node
graphDb_data_Access.update_node_relationship_count(file_name)
obj_source_node = None
merged_file_path = os.path.join(merged_dir, file_name)
if source_type == 'local file' and gcs_file_cache == 'True':
folder_name = create_gcs_bucket_folder_name_hashed(uri, file_name)
delete_file_from_gcs(BUCKET_UPLOAD,folder_name,file_name)
else:
logging.info(f'Deleted File Path: {merged_file_path} and Deleted File Name : {file_name}')
delete_uploaded_local_file(merged_file_path,file_name)
return "Cancelled the processing job successfully"

def populate_graph_schema_from_text(text, model, is_schema_description_checked, is_local_storage):
Expand All @@ -749,10 +743,19 @@ def set_status_retry(graph, file_name, retry_condition):
obj_source_node.is_cancelled = False
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING or retry_condition == START_FROM_BEGINNING:
obj_source_node.processed_chunk=0
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING:
execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})
obj_source_node.node_count=0
obj_source_node.relationship_count=0
obj_source_node.chunkNodeCount=0
obj_source_node.chunkRelCount=0
obj_source_node.communityNodeCount=0
obj_source_node.communityRelCount=0
obj_source_node.entityEntityRelCount=0
obj_source_node.entityNodeCount=0
obj_source_node.processingTime=0
obj_source_node.total_chunks=0
if retry_condition == DELETE_ENTITIES_AND_START_FROM_BEGINNING:
execute_graph_query(graph,QUERY_TO_DELETE_EXISTING_ENTITIES, params={"filename":file_name})

logging.info(obj_source_node)
graphDb_data_Access.update_source_node(obj_source_node)

Expand Down
115 changes: 109 additions & 6 deletions frontend/src/components/FileTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
const columnHelper = createColumnHelper<CustomFile>();
const [columnFilters, setColumnFilters] = useState<ColumnFiltersState>([]);
const [isLoading, setIsLoading] = useState<boolean>(false);
const [isCancellingQueue, setIsCancellingQueue] = useState<boolean>(false);
const [statusFilter, setStatusFilter] = useState<string>('');
const [filetypeFilter, setFiletypeFilter] = useState<string>('');
const [fileSourceFilter, setFileSourceFilter] = useState<string>('');
Expand Down Expand Up @@ -833,6 +834,73 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
}
}, [connectionStatus, filesData.length, isReadOnlyUser]);

const refreshFileData = async () => {
try {
const res = await getSourceNodes();
if (res.data && res.data.status !== 'Failed' && res.data.data.length) {
const updatedFiles = res.data.data
.map((item: SourceNode) => {
const existingFile = filesData.find((f) => f.name === item.fileName);
if (existingFile) {
// Check if file is in queue
const isInQueue = queue.items.some((f) => f.name === item.fileName);
return {
...existingFile,
status: isInQueue ? 'Waiting' : getFileSourceStatus(item),
nodesCount: item?.nodeCount ?? existingFile.nodesCount,
relationshipsCount: item?.relationshipCount ?? existingFile.relationshipsCount,
processingTotalTime: item?.processingTime ?? existingFile.processingTotalTime,
};
}
return existingFile;
})
.filter(Boolean);

setFilesData(updatedFiles as CustomFile[]);
setRowSelection((prev) => {
const updated = { ...prev };
updatedFiles.forEach((file) => {
if (file?.status === 'Cancelled' && updated[file.id]) {
delete updated[file.id];
}
});
return updated;
});
}
} catch (error) {
console.error('Refresh failed:', error);
}
};

const cancelQueue = async () => {
if (queue.isEmpty()) {
showNormalToast('No files in queue to cancel');
return;
}

setIsCancellingQueue(true);
try {
const queuedFileNames = queue.items.map((f) => f.name as string).filter(Boolean);
const queuedFileSources = queue.items.map((f) => f.fileSource as string).filter(Boolean);
const res = await cancelAPI(queuedFileNames, queuedFileSources);

if (res.data.status === 'Success') {
queue.clear();
await refreshFileData();

showNormalToast(`Successfully cancelled ${queuedFileNames.length} waiting file(s)`);
} else {
throw new Error(res.data.error || 'Failed to cancel queue');
}
} catch (err) {
if (err instanceof Error) {
showErrorToast(`Failed to cancel queue: ${err.message}`);
}
} finally {
setIsCancellingQueue(false);
}
};

const cancelHandler = async (fileName: string, id: string, fileSource: string) => {
setFilesData((prevfiles) =>
prevfiles.map((curfile) => {
Expand Down Expand Up @@ -860,6 +928,11 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
return curfile;
})
);
setRowSelection((prev) => {
const updated = { ...prev };
delete updated[id];
return updated;
});
setProcessedCount((prev) => {
if (prev == batchSize) {
return batchSize - 1;
Expand Down Expand Up @@ -1036,14 +1109,44 @@ const FileTable: ForwardRefRenderFunction<ChildRef, FileTableProps> = (props, re
</DataGridComponents.TableResults>
);
} else if (connectionStatus) {
const queueSize = queue.size();
return (
<DataGridComponents.TableResults>
<Flex flexDirection='row' gap='0' alignItems='center'>
<span>
<InformationCircleIconOutline className='n-size-token-6' />
</span>
{`Large files may be partially processed up to 10K characters due to resource limit.`}
<span></span>
<Flex flexDirection='row' gap='4' alignItems='center'>
<Flex flexDirection='row' gap='0' alignItems='center'>
<span>
<InformationCircleIconOutline className='n-size-token-6' />
</span>
{`Large files may be partially processed up to 10K characters due to resource limit.`}
</Flex>
{queueSize > 0 && (
<Flex
flexDirection='row'
gap='2'
alignItems='center'
className={`${isCancellingQueue ? 'opacity-50' : 'animate-pulse'} bg-palette-warning-bg-weak rounded-md px-3 py-2 border border-palette-warning-border`}
>
<InformationCircleIconOutline className='n-size-token-5 text-palette-warning-text' />
<Typography variant='body-medium' className='font-semibold text-palette-warning-text'>
{isCancellingQueue
? 'Cancelling files in waiting queue...'
: `${queueSize} file${queueSize !== 1 ? 's' : ''} waiting in queue`}
</Typography>
{!isReadOnlyUser && (
<IconButtonWithToolTip
placement='right'
text={isCancellingQueue ? 'Cancelling...' : 'Cancel all waiting files'}
size='small'
label='Cancel Queue'
clean
disabled={isCancellingQueue}
onClick={cancelQueue}
>
<XMarkIconOutline className='n-size-token-4' />
</IconButtonWithToolTip>
)}
</Flex>
)}
</Flex>
</DataGridComponents.TableResults>
);
Expand Down
Loading
Loading