From 152f6605897b3e707f6e4697f9825e9e8e0e18b5 Mon Sep 17 00:00:00 2001 From: swenzel Date: Thu, 25 Sep 2025 16:30:49 +0200 Subject: [PATCH] O2DPG workflow_runner: New early-file removal feature This implements a major new feature in the O2DPG workflow/pipeline runner. The runner can now auto-delete artefacts from intermediate stages as soon as these artefacts are no longer needed. For example, we can delete TPC hits, as soon as TPC digitization finishes. This allows then to operate on smaller disc spaces or to simulate more timeframes within a job. To use the feature, one needs to provide a "file-access" report with `--remove-files-early access_report.json`. This report is a "learned" structure containing the list of files that are written/consumed in a workflow and by which task. Such report needs to be generated, in a prior pilot job with the same workflow, by a user with sudo rights. See here https://github.com/AliceO2Group/O2DPG/pull/2126. This is primarily useful for productions on the GRID, and the idea would be to (a) for each new MC production, we produce the file-access file in a pilot job or github actions when releasing software (b) we then use this file to optimize the disc space in MC productions on the GRID This development is related to https://its.cern.ch/jira/browse/O2-4365 --- MC/bin/o2_dpg_workflow_runner.py | 107 +++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/MC/bin/o2_dpg_workflow_runner.py b/MC/bin/o2_dpg_workflow_runner.py index 02d25588f..33c875782 100755 --- a/MC/bin/o2_dpg_workflow_runner.py +++ b/MC/bin/o2_dpg_workflow_runner.py @@ -14,6 +14,7 @@ import traceback import platform import tarfile +from copy import deepcopy try: from graphviz import Digraph havegraphviz=True @@ -65,6 +66,9 @@ parser.add_argument('--retry-on-failure', help=argparse.SUPPRESS, default=0) # number of times a failing task is retried parser.add_argument('--no-rootinit-speedup', help=argparse.SUPPRESS, action='store_true') # disable init of ROOT environment vars to speedup init/startup +parser.add_argument('--remove-files-early', type=str, default="", help="Delete intermediate files early (using the file graph information in the given file)") + + # Logging parser.add_argument('--action-logfile', help='Logfilename for action logs. If none given, pipeline_action_#PID.log will be used') parser.add_argument('--metric-logfile', help='Logfilename for metric logs. If none given, pipeline_metric_#PID.log will be used') @@ -894,6 +898,38 @@ def ok_to_submit_backfill(res, backfill_cpu_factor=1.5, backfill_mem_factor=1.5) break +def filegraph_expand_timeframes(data: dict, timeframes: set) -> dict: + """ + A utility function for the fileaccess logic. Takes a template and duplicates + for the multi-timeframe structure. + """ + tf_entries = [ + entry for entry in data.get("file_report", []) + if re.match(r"^\./tf\d+/", entry["file"]) + ] + + result = {} + for i in timeframes: + if i == -1: + continue + # Deepcopy to avoid modifying original + new_entries = deepcopy(tf_entries) + for entry in new_entries: + # Fix filepath + entry["file"] = re.sub(r"^\./tf\d+/", f"./tf{i}/", entry["file"]) + # Fix written_by and read_by (preserve prefix, change numeric suffix) + entry["written_by"] = [ + re.sub(r"_\d+$", f"_{i}", w) for w in entry["written_by"] + ] + entry["read_by"] = [ + re.sub(r"_\d+$", f"_{i}", r) for r in entry["read_by"] + ] + result[f"timeframe-{i}"] = new_entries + + return result + + + class WorkflowExecutor: # Constructor def __init__(self, workflowfile, args, jmax=100): @@ -929,6 +965,7 @@ def __init__(self, workflowfile, args, jmax=100): # construct task ID <-> task name lookup self.idtotask = [ 0 for _ in self.taskuniverse ] self.tasktoid = {} + self.idtotf = [ l['timeframe'] for l in self.workflowspec['stages'] ] for i, name in enumerate(self.taskuniverse): self.tasktoid[name]=i self.idtotask[i]=name @@ -970,6 +1007,72 @@ def __init__(self, workflowfile, args, jmax=100): # init alternative software environments self.init_alternative_software_environments() + # initialize container to keep track of file-task relationsships + self.file_removal_candidates = {} + self.do_early_file_removal = False + self.timeframeset = set([ task["timeframe"] for task in self.workflowspec['stages'] ]) + if args.remove_files_early != "": + with open(args.remove_files_early) as f: + filegraph_data = json.load(f) + self.do_early_file_removal = True + self.file_removal_candidates = filegraph_expand_timeframes(filegraph_data, self.timeframeset) + + + def perform_early_file_removal(self, taskids): + """ + This function checks which files can be deleted upon completion of task + and optionally does so. + """ + + def remove_if_exists(filepath: str) -> None: + """ + Check if a file exists, and remove it if found. + """ + if os.path.exists(filepath): + fsize = os.path.getsize(filepath) + os.remove(filepath) + actionlogger.info(f"Removing {filepath} since no longer needed. Freeing {fsize/1024.} MB.") + return True + + return False + + def remove_for_task_id(taskname, file_dict, timeframe_id, listofalltimeframes): + marked_for_removal = [] + + timeframestoscan = [ timeframe_id ] + if timeframe_id == -1: + timeframestoscan = [ i for i in listofalltimeframes if i != -1 ] + + # TODO: Note that this traversal of files is not certainly not optimal + # We should (and will) keep an mapping of tasks->potential files and just + # scan these. This is already provided by the FileIOGraph analysis tool. + for tid in timeframestoscan: + for i,file_entry in enumerate(file_dict[f"timeframe-{tid}"]): + filename = file_entry['file'] + read_by = file_entry['read_by'] + written_by = file_entry['written_by'] + if taskname in read_by: + file_entry['read_by'].remove(taskname) + if taskname in written_by: + file_entry['written_by'].remove(taskname) + + # TODO: in principle the written_by criterion might not be needed + if len(file_entry['read_by']) == 0 and len(file_entry['written_by']) == 0: + # the filename mentioned here is no longer needed and we can remove it + # make sure it is there and then delete it + if remove_if_exists(filename): + # also take out the file entry from the dict altogether + marked_for_removal.append(file_entry) + + #for k in marked_for_removal: + # file_dict[f"timeframe-{tid}"].remove(k) + + for tid in taskids: + taskname = self.idtotask[tid] + timeframe_id = self.idtotf[tid] + remove_for_task_id(taskname, self.file_removal_candidates, timeframe_id, self.timeframeset) + + def SIGHandler(self, signum, frame): """ basically forcing shut down of all child processes @@ -1737,6 +1840,10 @@ def speedup_ROOT_Init(): actionlogger.debug("finished now :" + str(finished_from_started)) finishedtasks = finishedtasks + finished + # perform file cleanup + if self.do_early_file_removal: + self.perform_early_file_removal(finished_from_started) + if self.is_productionmode: # we can do some generic cleanup of finished tasks in non-interactive/GRID mode # TODO: this can run asynchronously