Skip to content

zlccccc/Agentic-AI-Workflow-Simulator

Repository files navigation

Agentic AI Workflow 调度器仿真系统

本项目是一个用于 Agentic AI Workflow 任务调度的仿真系统,模拟了多模型部署、任务分配和资源管理的完整流程。

系统架构

时序图

sequenceDiagram
    participant Simulator
    participant Scheduler
    participant ResourceManager
    participant TaskTracker
  
    Note over Simulator,TaskTracker: 系统初始化
    Simulator->>ResourceManager: 1. 初始化资源
    Simulator->>TaskTracker: 2. 初始化任务跟踪器
  
    rect rgb(255, 240, 240)
        Note over Simulator,TaskTracker: Loop [每个时间步]
        Note over Simulator: 3. 获取新到达的工作流
        Simulator->>Scheduler: 4. schedule(timestamp, resource_manager,<br/>new_wfs, completed_tasks)
        Scheduler->>ResourceManager: 5. 查询资源状态
        Scheduler-->>Simulator: 6. 返回 actions[]
        Simulator->>ResourceManager: 7. 执行动作 (deploy/assign/unload)
        Simulator->>TaskTracker: 8. start_task() 启动任务
        Simulator->>ResourceManager: 9. update_deployments() 更新部署
        Simulator->>TaskTracker: 10. update_tasks() 更新任务
        TaskTracker-->>Simulator: 11. 返回 completed_tasks[]
        Simulator->>ResourceManager: 12. 释放资源 (complete_task/release_cpu)
        Note over Simulator: 13. 时间推进 (timestamp += 1)
    end
Loading

架构逻辑图

graph TB
    subgraph 配置文件
        M[models.json]
        R[resources.json]
        W[workflow_*.json]
    end
  
    subgraph 核心模块
        SIM[Simulator<br/>仿真器入口]
        SCH[Scheduler<br/>调度决策核心<br/><i>需要实现</i>]
        RM[ResourceManager<br/>资源管理]
        TT[TaskTracker<br/>任务跟踪]
        WF[WorkflowDAG<br/>工作流定义]
    end
  
    SIM <-->|schedule| SCH
    SIM <-->|execute_action| RM
    SIM <-->|update_tasks| TT
    SIM -->|load workflows| WF
    SCH -.->|查询资源状态| RM
  
    M -.->|加载| RM
    R -.->|加载| RM
    W -.->|加载| WF
  
    style SIM fill:#f88,stroke:#f00,stroke-width:3px
    style SCH fill:#8f8,stroke:#0f0,stroke-width:3px
    style RM fill:#88f,stroke:#00f,stroke-width:3px
    style TT fill:#f8f,stroke:#90f,stroke-width:3px
    style WF fill:#fa8,stroke:#f80,stroke-width:3px
Loading

调度器接口流程图

flowchart TD
    Start([Scheduler.schedule 开始]) --> Input[接收输入参数<br/>timestamp, resource_manager,<br/>new_wfs, completed_tasks]
    Input --> Logic[调度逻辑<br/><i>选手实现</i>]
    Logic --> Query[可查询 resource_manager<br/>获取资源状态]
    Query --> Output[返回动作列表 actions]
    Output --> End([结束])
  
    Note1[输入: 时间戳、新工作流、已完成任务]
    Note2[输出: deploy_model, unload_model,<br/>assign_prefill, assign_decoding,<br/>assign_tool_call]
  
    Input -.-> Note1
    Output -.-> Note2
  
    style Start fill:#0f0
    style End fill:#f00
    style Logic fill:#ffa,stroke:#f00,stroke-width:3px
    style Query fill:#88f
Loading

项目结构

.
├── config/                      # 配置文件目录
│   ├── models.json             # 模型配置
│   └── resources.json          # 资源配置
├── multi_workflows/             # 工作流数据目录
├── old_schedulers/              # 旧版本调度器目录
├── schedulers/                  # 调度器目录
│   ├── __init__.py             # 包初始化文件
│   ├── scheduler_v3_1_main.py  # 当前使用的调度器版本
│   ├── scheduler_v3_sa_module.cpp  # C++实现的模拟退火模块
│   └── scheduler_v3_sa_module.py   # Python包装器模块
├── source/                      # 工具模块
│   └── logger.py               # 日志工具
├── tests/                       # 测试目录
├── simulator.py                 # 仿真器入口文件
├── ResourceManager.py           # 资源管理器
├── TaskTracker.py              # 任务跟踪器
├── workflow.py                  # 工作流定义
├── generator.py                 # 工作流生成器
├── visualize.py                 # 工作流可视化工具
├── setup.py                     # C++模块编译脚本
└── requirements.txt             # Python依赖包

快速开始

1. 环境配置

conda activate hackathon2025
pip install -r requirements.txt

2. 编译C++加速模块(可选)

系统提供了基于C++的模拟退火算法实现,以提高性能。要使用C++加速版本,需要先编译C++模块:

# 安装pybind11依赖
pip install pybind11

# 编译C++模块
python setup.py build_ext --inplace

编译成功后,会生成一个动态链接库文件(Windows上为.pyd文件,Linux/Mac上为.so文件),Python代码可以直接导入使用。

3. 运行仿真器

python simulator.py

4. 生成新的工作流数据

python generator.py

generator.py 使用说明

根据 GENERATION_CONFIG 中的概率分布生成工作流:

GENERATION_CONFIG = {
    "fixed": {
        "num_workflows": 1000,        # 生成1000个工作流
        "output_dir": "multi_workflows_1000",
        "merge_final": True           # 是否合并最终节点
    },
    "distributions": {
        "layers": {1: 0.10, 2: 0.40, 3: 0.35, 4: 0.15},  # 层数分布
        "tool_call_counts_per_layer": {0: 0.10, 1: 0.15, 2: 0.30, 3: 0.25, 4: 0.15, 5: 0.05},  # 每层tool_call数
        "merge_after_tool": {True: 0.60, False: 0.40},     # 合并方式分布
        "time_range": {10: 0.50, 60: 0.30, 300: 0.15, 1200: 0.05},  # 到达时间分布
        "max_all_zero_ratio": 0.10    # 允许的最大"全0"比例
    }
}

执行默认生成(推荐):

python generator.py

修改生成参数:

编辑 GENERATION_CONFIG 来控制工作流生成的分布特性,然后运行 generator.py。

工作流结构说明:

生成的工作流包含以下任务类型:

  1. Prefill 任务:模型推理的第一阶段
  2. Decoding 任务:模型推理的第二阶段
  3. Tool Call 任务:工具调用任务,通过CPU执行

工作流图示例:

  • 开始节点(Start Prefill → Start Decoding)
  • 多层处理层(每层包含tool_call任务和prefill/decoding对)
  • 最终总结层(可选)

核心模块说明

schedulers/(调度器目录 - 需要实现)

调度器位于 schedulers/ 目录下,选手需要实现 Scheduler 类的 schedule() 方法。

当前仿真器使用的是 schedulers/scheduler_v3_1_main.py,它使用了模拟退火算法进行模型部署优化。

C++加速模块

系统提供了基于C++的模拟退火算法实现,通过pybind11提供Python接口,以提高执行速度。主要文件包括:

  • schedulers/scheduler_v3_sa_module.cpp: C++实现的模拟退火算法
  • schedulers/scheduler_v3_sa_module.py: Python包装器,负责导入C++模块或使用Python备选实现
  • setup.py: 用于编译C++模块的安装脚本

自动回退机制:如果C++模块编译失败或无法导入,代码会自动回退到Python实现,确保系统仍能正常运行。在日志中会显示使用的是哪个版本的实现。

性能优势:C++实现相比Python实现有显著的性能提升,特别是在处理大量模型分配任务时。性能提升主要来自于:

  1. C++的原生性能优势
  2. 更高效的内存管理
  3. 优化的随机数生成和数学计算

注意事项

  1. 确保已安装pybind11:pip install pybind11
  2. 需要C++编译器支持(Windows上需要Visual Studio,Linux上需要g++)
  3. 如果编译遇到问题,可以参考pybind11官方文档

实现 schedule() 方法:

def schedule(self, current_timestamp, resource_manager, new_wfs, completed_tasks):
    # 返回动作列表
    actions = []
    # 1. 处理新到达的工作流
    # 2. 处理已完成的任务,更新依赖关系
    # 3. 决定模型部署/卸载
    # 4. 分配任务到资源
    return actions

方法参数说明:

  • current_timestamp: 当前时间戳
  • resource_manager: ResourceManager 实例,用于查询和管理资源
  • new_wfs: 新到达的工作流列表(WorkflowDAG对象)
  • completed_tasks: 上一时间步完成的任务ID列表

支持的动作类型:

  • deploy_model: 部署模型到NPU
  • unload_model: 卸载模型
  • assign_prefill: 分配prefill任务
  • assign_decoding: 分配decoding任务
  • assign_tool_call: 分配tool_call任务

simulator.py(仿真器入口)

系统主入口,协调整个仿真过程:

  • 管理时间步进
  • 加载工作流
  • 调用调度器进行决策
  • 执行调度动作
  • 更新资源和任务状态

仿真器在初始化时从 schedulers/ 目录导入调度器实例。选手主要需要修改调度器实现,一般不需要修改仿真器代码。

ResourceManager.py(资源管理器)

管理CPU、NPU和模型部署,主要接口:

  • deploy_model() / unload_model(): 模型部署与卸载
  • allocate_cpu() / release_cpu(): CPU资源分配与释放
  • assign_prefill() / assign_decoding(): 分配任务到模型
  • get_model_info() / get_npu_info(): 查询资源状态

TaskTracker.py(任务跟踪器)

跟踪任务运行状态:

  • start_task(): 启动任务
  • update_tasks(): 更新任务状态,返回完成的任务列表
  • complete_task(): 标记任务完成

source/logger.py(日志工具)

提供日志记录功能,调度器可以使用此模块记录调度决策和系统状态,便于调试和分析。

配置文件说明

models.json

定义可用的AI模型及其性能参数:

  • model_name: 模型名称
  • prefill_tokens_per_second: Prefill阶段吞吐量
  • decoding_tokens_per_second: Decoding阶段吞吐量
  • required_cpus: 所需CPU核心数
  • required_npus: 所需NPU数量
  • deployment_time: 部署时间

resources.json

定义系统资源配置:

  • cpu_cores: 总CPU核心数
  • npus: NPU列表,包含类型、数量和性能缩放因子

任务类型说明

Prefill任务

  • 模型推理的第一阶段
  • 独占模型资源(每个模型同时只能运行1个prefill任务)
  • 需要指定模型

Decoding任务

  • 模型推理的第二阶段
  • 可批处理(每个模型最多同时运行4个decoding任务)
  • 需要指定模型

Tool Call任务

  • 工具调用任务
  • 使用CPU资源
  • 不需要模型

调度策略建议

  1. 任务依赖处理:确保前置任务完成后才调度后续任务
  2. 资源管理:合理部署和卸载模型以优化资源利用率
  3. 任务分配:根据任务类型和优先级选择合适的资源
  4. 性能优化:减少模型部署/卸载次数,平衡负载

注意事项

  1. 调度决策应基于 predicted_time/tokens,而非 actual_time/tokens
  2. 不要修改传入参数,避免破坏仿真器运行
  3. 确保返回的动作可以成功执行
  4. NPU必须是同一类型才能部署同一个模型

About

AgentAI转DAG后的模拟退火调度代码

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •