From ad4bb026728a3a767eafb5f0118fd2afb931ac96 Mon Sep 17 00:00:00 2001 From: Sandro Wenzel Date: Wed, 1 Oct 2025 19:39:41 +0200 Subject: [PATCH] o2dpg_sim_metrics: Introduce JSON statistics and merging of statistics --- MC/utils/o2dpg_sim_metrics.py | 243 +++++++++++++++++++++++++++++++++- 1 file changed, 239 insertions(+), 4 deletions(-) diff --git a/MC/utils/o2dpg_sim_metrics.py b/MC/utils/o2dpg_sim_metrics.py index 711e616dd..c78fb1a59 100755 --- a/MC/utils/o2dpg_sim_metrics.py +++ b/MC/utils/o2dpg_sim_metrics.py @@ -12,6 +12,7 @@ import matplotlib import json import numpy as np +import math import pandas as pd ############################################################################ @@ -744,6 +745,108 @@ def extract_resources(pipelines): # Collect all metrics we got, here we want to have the median from all the iterations return [Resources(p) for p in pipelines] +def merge_stats(elementary, running): + """ + Merge an incoming elementary JSON into a running stats structure. + Also maintains running std using Welford's method. + + Each metric stores: + mean, std, M2, min, max, count + """ + if not elementary: + return running + + n_new_total = int(elementary.get("count", 1)) + running["count"] = running.get("count", 0) + n_new_total + + for name, metrics in elementary.items(): + if name == "count": + continue + + if name not in running: + running[name] = {"count": 0} + + # existing count for this name + n_old_name = running[name].get("count", 0) + + for metric, vals in metrics.items(): + if not isinstance(vals, dict): + continue + + if metric not in running[name]: + running[name][metric] = { + "min": vals.get("min"), + "max": vals.get("max"), + "mean": vals.get("mean"), + "std": 0.0, + "M2": 0.0, + "count": n_new_total + } + continue + + rmetric = running[name][metric] + n_old = rmetric.get("count", 0) + n_new = n_new_total + + # update min / max + e_min = vals.get("min") + e_max = vals.get("max") + if e_min is not None: + rmetric["min"] = e_min if rmetric["min"] is None else min(rmetric["min"], e_min) + if e_max is not None: + rmetric["max"] = e_max if rmetric["max"] is None else max(rmetric["max"], e_max) + + # combine means & M2 + mean_a = rmetric.get("mean") + mean_b = vals.get("mean") + + # If either mean is missing, use the one that exists + if mean_a is None and mean_b is None: + # Nothing to do + continue + elif mean_a is None: + rmetric["mean"] = mean_b + rmetric["M2"] = 0.0 + rmetric["count"] = n_new + elif mean_b is None: + # keep existing stats + rmetric["mean"] = mean_a + rmetric["M2"] = rmetric.get("M2", 0.0) + rmetric["count"] = n_old + else: + # both defined → do weighted merge + delta = mean_b - mean_a + new_count = n_old + n_new + new_mean = mean_a + delta * (n_new / new_count) + new_M2 = rmetric.get("M2", 0.0) + 0.0 + (delta**2) * (n_old * n_new / new_count) + + rmetric["mean"] = new_mean + rmetric["M2"] = new_M2 + rmetric["count"] = new_count + + # update std from M2 + c = rmetric["count"] + rmetric["std"] = math.sqrt(rmetric["M2"] / c) if c > 1 else 0.0 + + running[name]["count"] = n_old_name + n_new_total + + # round mean and std for readability + for name, metrics in running.items(): + if name == "count": + continue + for metric, vals in metrics.items(): + if not isinstance(vals, dict): + continue + if "mean" in vals: + vals["mean"] = r3(vals["mean"]) + if "std" in vals: + vals["std"] = r3(vals["std"]) + if "min" in vals: + vals["min"] = r3(vals["min"]) + if "max" in vals: + vals["max"] = r3(vals["max"]) + + return running def print_statistics(resource_object): """ @@ -787,9 +890,105 @@ def print_statistics(resource_object): print(f" {comp:<20s} {mem:10.2f} MB") #(d) max disc consumption - print ("\nMax-DISC usage (MB): ", dframe['disc'].max()) - print ("Mean-DISC usage (MB): ", dframe['disc'].mean()) - print ("---> ") + if 'disc' in dframe: + print ("\nMax-DISC usage (MB): ", dframe['disc'].max()) + print ("Mean-DISC usage (MB): ", dframe['disc'].mean()) + print ("---> ") + +def r3(x): + """Round to 3 decimals, return None for None/NaN.""" + if x is None: + return None + try: + xf = float(x) + except Exception: + return None + if math.isnan(xf): + return None + return round(xf, 3) + +def produce_json_stat(resource_object): + print ("<--- Producing resource json from file ", resource_object.pipeline_file) + dframe = resource_object.df + meta = resource_object.meta + + # also write json summary; This is a file that can be used + # to adjust the resource estimates in o2dpg_workflow_runner.py + # + resource_json = {} + # group by 'name' and compute all needed stats for each metric + stats = ( + dframe + .groupby('name') + .agg({ + 'pss': ['min', 'max', 'mean'], + 'uss': ['min', 'max', 'mean'], + 'cpu': ['min', 'max', 'mean'] + }) + ) + + # turn the multi-level columns into flat names + stats.columns = [f"{col[0]}_{col[1]}" for col in stats.columns] + stats = stats.reset_index() + + # ----- compute lifetime ~ walltime per (timeframe, name) ----- + # ------------------------------------------------ + # Filter out unrealistic timeframes (nice == 19) because it's not the realistic runtime + df_nice_filtered = dframe[dframe['nice'] != 19].copy() + + # the calculates of mean runtime should be averaged over timeframes + lifetime_per_tf = ( + df_nice_filtered + .groupby(['timeframe', 'name'])['iter'] + .agg(lambda x: x.max() - x.min() + 1) # +1 to include both ends + .reset_index(name='lifetime') + ) + + # now average over timeframes for each name + mean_lifetime = ( + lifetime_per_tf + .groupby('name')['lifetime'] + .mean() + ) + max_lifetime = ( + lifetime_per_tf + .groupby('name')['lifetime'] + .max() + ) + min_lifetime = ( + lifetime_per_tf + .groupby('name')['lifetime'] + .max() + ) + + resource_json["count"] = 1 # basic sample size + + # convert to nested dictionary + for _, row in stats.iterrows(): + name = row['name'] + resource_json[name] = { + 'pss': { + 'min': r3(row['pss_min']), + 'max': r3(row['pss_max']), + 'mean': r3(row['pss_mean']) + }, + 'uss': { + 'min': r3(row['uss_min']), + 'max': r3(row['uss_max']), + 'mean': r3(row['uss_mean']) + }, + 'cpu': { + 'min': r3(row['cpu_min']), + 'max': r3(row['cpu_max']), + 'mean': r3(row['cpu_mean']) + }, + 'lifetime': { + 'min' : r3(float(min_lifetime.get(name, np.nan))), + 'max' : r3(float(max_lifetime.get(name, np.nan))), + 'mean' : r3(float(mean_lifetime.get(name, np.nan))) + } + } + return resource_json def stat(args): """ @@ -801,6 +1000,32 @@ def stat(args): print_statistics(res) +def merge_stats_into(list_of_json_stats, outputfile): + running = {} + # read all the inputs + for inp_json in list_of_json_stats: + running = merge_stats(inp_json, running) + + # now write out the result into the output file + if running: + with open(outputfile, 'w') as f: + json.dump(running, f) + + +def json_stat(args): + resources = extract_resources(args.pipelines) + all_stats = [produce_json_stat(res) for res in resources] + merge_stats_into(all_stats, args.output) + + +def merge_json_stats(args): + all_stats = [] + for inp in args.inputs: + # load the json as a dict + with open(inp,'r') as f: + all_stats.append(json.load(f)) + merge_stats_into(all_stats, args.output) + def history(args): """ Entrypoint for history @@ -1048,7 +1273,17 @@ def main(): stat_parser.set_defaults(func=stat) stat_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True) - plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrcis from extracted metrics JSON file(s)") + json_stat_parser = sub_parsers.add_parser("json-stat", help="Produce basic json stat (compatible with o2dog_workflow_runner injection)") + json_stat_parser.set_defaults(func=json_stat) + json_stat_parser.add_argument("-p", "--pipelines", nargs="*", help="Pipeline_metric files from o2_dpg_workflow_runner; Merges information", required=True) + json_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True) + + merge_stat_parser = sub_parsers.add_parser("merge-json-stats", help="Merge information from json-stats into an aggregated stat") + merge_stat_parser.set_defaults(func=merge_json_stats) + merge_stat_parser.add_argument("-i", "--inputs", nargs="*", help="List of incoming/input json stat files", required=True) + merge_stat_parser.add_argument("-o", "--output", type=str, help="Output json filename", required=True) + + plot_parser = sub_parsers.add_parser("history", help="Plot (multiple) metrics from extracted metrics JSON file(s)") plot_parser.set_defaults(func=history) plot_parser.add_argument("-p", "--pipelines", nargs="*", help="pipeline_metric files from o2_dpg_workflow_runner", required=True) plot_parser.add_argument("--output", help="output directory", default="resource_history")