Skip to content

Python: [Bug]: WorkflowExecutor re-sends already-answered RequestInfoEvents after checkpoint restore #3255

@Sherlocco

Description

@Sherlocco

Description

Environment

  • Package: agent-framework[azure]==1.0.0b260106
  • Python: 3.12
  • OS: Windows 11

Description

When using request_response() within a sub-workflow that is managed by a WorkflowExecutor, resuming from a checkpoint causes already-answered requests to be re-sent to the parent workflow. This results in either:

  1. ValueError: Response provided for unknown request ID: <uuid> when trying to reply to duplicate requests
  2. Workflow hanging because expected_response_count is incorrect

Reproduction Steps

  1. Create a parent workflow with an Orchestrator executor
  2. Create a sub-workflow (e.g., TicketingWorkflow) wrapped in a WorkflowExecutor
  3. In the sub-workflow, use request_response() to pause and wait for user input (e.g., ticket confirmation)
  4. Save a checkpoint when the workflow becomes idle with pending requests
  5. Resume the workflow from checkpoint and provide a response
  6. The sub-workflow continues and makes another request_response() call (e.g., for ticket description)
  7. Bug occurs: The parent receives BOTH the old (already-answered) request AND the new request

Root Cause Analysis

The issue is in WorkflowExecutor._process_workflow_result() (file: _workflow_executor.py):

async def _process_workflow_result(
    self,
    result: WorkflowRunResult,
    execution_context: ExecutionContext,
    ctx: WorkflowContext,
) -> None:
    # Get all pending request events from the workflow result
    request_info_events = result.get_request_info_events()  # <-- Returns ALL events, including old ones
    
    for event in request_info_events:
        if event.request_id not in execution_context.pending_requests:
            execution_context.pending_requests[event.request_id] = event
            # ...
    
    execution_context.expected_response_count = len(execution_context.pending_requests)  # <-- Incorrect count

The problem:

  1. result.get_request_info_events() returns ALL RequestInfoEvents from the workflow's event stream, including ones that were already answered before the checkpoint
  2. After checkpoint restore, when the sub-workflow continues and yields a NEW request, the old request is ALSO in the event stream
  3. This causes expected_response_count to be 2 when only 1 new request actually exists
  4. The parent workflow receives duplicate SubWorkflowRequestMessages for requests it already handled

Expected Behavior

After checkpoint restore, _process_workflow_result should only process NEW RequestInfoEvents that haven't been answered yet. Already-processed request IDs should be tracked and filtered out.

Code Sample

"""
Minimal reproduction: WorkflowExecutor re-sends already-answered RequestInfoEvents after checkpoint restore

This demonstrates a bug in agent-framework where resuming from a checkpoint causes
already-answered requests to be re-sent when a sub-workflow has multiple request_response() calls.

Based on: https://github.com/microsoft/agent-framework/blob/main/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py

Bug: After checkpoint restore, when the sub-workflow continues and makes a SECOND
request_response() call, the parent workflow receives BOTH the old (already-answered)
request AND the new request, causing:
- ValueError: Response provided for unknown request ID
- OR incorrect expected_response_count

Requirements:
    pip install "agent-framework[azure]==1.0.0b260106"
"""

import asyncio
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, override

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    RequestInfoEvent,
    SubWorkflowRequestMessage,
    SubWorkflowResponseMessage,
    Workflow,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowExecutor,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)

CHECKPOINT_DIR = Path(__file__).parent / "tmp" / "bug_repro_checkpoints"


# =============================================================================
# Request/Response models
# =============================================================================

@dataclass
class StartRequest:
    """Initial request to start the workflow."""
    topic: str


@dataclass
class ConfirmationRequest:
    """FIRST request_response() - asks user for confirmation."""
    id: str = ""
    prompt: str = "Do you want to proceed? (yes/no)"
    
    def __post_init__(self):
        if not self.id:
            import uuid
            self.id = str(uuid.uuid4())


@dataclass
class DescriptionRequest:
    """SECOND request_response() - asks user for description."""
    id: str = ""
    prompt: str = "Please provide a description:"
    
    def __post_init__(self):
        if not self.id:
            import uuid
            self.id = str(uuid.uuid4())


@dataclass
class FinalOutput:
    """Final output from the workflow."""
    topic: str
    confirmed: bool
    description: str


# =============================================================================
# Sub-workflow executors
# =============================================================================

class MultiStepExecutor(Executor):
    """
    Executor with TWO request_info() calls - this triggers the bug.
    
    The bug occurs when:
    1. First request_info() (confirmation) - checkpoint saved
    2. User responds, checkpoint restored
    3. Second request_info() (description) - BUG: old confirmation request re-sent!
    """

    def __init__(self) -> None:
        super().__init__(id="multi_step_executor")
        self._topic: str = ""
        self._confirmed: bool = False

    @handler
    async def handle_start(
        self, 
        request: StartRequest, 
        ctx: WorkflowContext
    ) -> None:
        print(f"\n[SubWorkflow] Received start request: {request.topic}")
        self._topic = request.topic
        
        # =====================================================================
        # FIRST request_info() - Confirmation
        # =====================================================================
        print("[SubWorkflow] Sending FIRST request (confirmation)...")
        await ctx.request_info(
            request_data=ConfirmationRequest(prompt=f"Confirm processing '{request.topic}'? (yes/no)"),
            response_type=str,
        )

    @response_handler
    async def handle_confirmation_response(
        self,
        original_request: ConfirmationRequest,
        response: str,
        ctx: WorkflowContext,
    ) -> None:
        """Handle confirmation response and send second request."""
        print(f"[SubWorkflow] Received confirmation response: {response}")
        
        if response.lower().strip() not in ("yes", "y"):
            await ctx.yield_output(FinalOutput(
                topic=self._topic,
                confirmed=False,
                description="",
            ))
            return
        
        self._confirmed = True
        
        # =====================================================================
        # SECOND request_info() - Description
        # This is where the BUG manifests after checkpoint restore
        # =====================================================================
        print("[SubWorkflow] Sending SECOND request (description)...")
        await ctx.request_info(
            request_data=DescriptionRequest(prompt=f"Provide description for '{self._topic}':"),
            response_type=str,
        )

    @response_handler
    async def handle_description_response(
        self,
        original_request: DescriptionRequest,
        response: str,
        ctx: WorkflowContext[None, FinalOutput],
    ) -> None:
        """Handle description response and output final result."""
        print(f"[SubWorkflow] Received description response: {response}")
        
        await ctx.yield_output(FinalOutput(
            topic=self._topic,
            confirmed=True,
            description=response,
        ))

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"topic": self._topic, "confirmed": self._confirmed}

    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._topic = state.get("topic", "")
        self._confirmed = state.get("confirmed", False)
        print(f"[SubWorkflow] Restored state: topic={self._topic}, confirmed={self._confirmed}")


# =============================================================================
# Parent workflow executor
# =============================================================================

class Coordinator(Executor):
    """
    Parent workflow coordinator that handles sub-workflow requests.
    
    This executor intercepts SubWorkflowRequestMessage and re-emits
    the request to the outer workflow for user interaction.
    """

    def __init__(self) -> None:
        super().__init__(id="coordinator")
        self._pending_requests: dict[str, SubWorkflowRequestMessage] = {}

    @handler
    async def kick_off(self, topic: str, ctx: WorkflowContext[StartRequest]) -> None:
        """Start the workflow with a topic."""
        print(f"\n[Coordinator] Starting workflow with topic: {topic}")
        await ctx.send_message(StartRequest(topic=topic))

    @handler
    async def collect_output(
        self, 
        output: FinalOutput, 
        ctx: WorkflowContext[None, FinalOutput]
    ) -> None:
        """Collect and yield the final output."""
        print(f"\n[Coordinator] Received final output: {output}")
        await ctx.yield_output(output)

    @handler
    async def handle_sub_workflow_request(
        self,
        request: SubWorkflowRequestMessage,
        ctx: WorkflowContext,
    ) -> None:
        """
        Handle requests from the sub-workflow.
        
        This is where we can observe the bug: after checkpoint restore,
        this handler receives BOTH the old confirmation request AND the new
        description request.
        """
        data = request.source_event.data
        
        if not isinstance(data, (ConfirmationRequest, DescriptionRequest)):
            raise TypeError(f"Unexpected request type: {type(data)}")
        
        print(f"\n[Coordinator] Received sub-workflow request:")
        print(f"  - Type: {type(data).__name__}")
        print(f"  - ID: {data.id}")
        print(f"  - Prompt: {data.prompt}")
        
        # Track pending request
        self._pending_requests[data.id] = request
        print(f"  - Total pending requests: {len(self._pending_requests)}")
        
        # Forward to outer workflow
        await ctx.request_info(request_data=data, response_type=str)

    @response_handler
    async def handle_request_response(
        self,
        original_request: ConfirmationRequest | DescriptionRequest,
        response: str,
        ctx: WorkflowContext[SubWorkflowResponseMessage],
    ) -> None:
        """Process response and send back to sub-workflow."""
        print(f"\n[Coordinator] Handling response for request ID: {original_request.id}")
        print(f"  - Response: {response}")
        
        request_message = self._pending_requests.pop(original_request.id, None)
        
        if request_message is None:
            raise ValueError(
                f"No matching pending request found for ID: {original_request.id}"
            )
        
        await ctx.send_message(request_message.create_response(response))

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"pending_requests": self._pending_requests}

    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        self._pending_requests = state.get("pending_requests", {})
        print(f"\n[Coordinator] Restored {len(self._pending_requests)} pending requests")


# =============================================================================
# Workflow construction
# =============================================================================

def build_sub_workflow() -> WorkflowExecutor:
    """Build the sub-workflow with the multi-step executor."""
    sub_workflow = (
        WorkflowBuilder()
        .register_executor(MultiStepExecutor, name="multi_step")
        .set_start_executor("multi_step")
        .build()
    )
    return WorkflowExecutor(sub_workflow, id="sub_workflow")


def build_parent_workflow(storage: FileCheckpointStorage) -> Workflow:
    """Build the parent workflow that embeds the sub-workflow."""
    return (
        WorkflowBuilder()
        .register_executor(Coordinator, name="coordinator")
        .register_executor(build_sub_workflow, name="sub_executor")
        .set_start_executor("coordinator")
        .add_edge("coordinator", "sub_executor")
        .add_edge("sub_executor", "coordinator")
        .with_checkpointing(storage)
        .build()
    )


# =============================================================================
# Main - demonstrates the bug
# =============================================================================

async def main() -> None:
    # Setup
    CHECKPOINT_DIR.mkdir(parents=True, exist_ok=True)
    for file in CHECKPOINT_DIR.glob("*.json"):
        file.unlink()
    
    storage = FileCheckpointStorage(CHECKPOINT_DIR)
    
    # =========================================================================
    # STEP 1: Run until first request_response() (confirmation)
    # =========================================================================
    print("\n" + "=" * 70)
    print("STEP 1: Run until first request (confirmation)")
    print("=" * 70)
    
    workflow = build_parent_workflow(storage)
    
    request_id: str | None = None
    async for event in workflow.run_stream("Test Topic"):
        if isinstance(event, RequestInfoEvent):
            request_id = event.request_id
            print(f"\n>>> Captured request ID: {request_id}")
            print(f">>> Request data: {event.data}")
        
        if (isinstance(event, WorkflowStatusEvent) and 
            event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS):
            print("\n>>> Workflow idle with pending requests - checkpoint saved")
            break
    
    if request_id is None:
        raise RuntimeError("No request captured!")
    
    # Get checkpoint
    checkpoints = await storage.list_checkpoints(workflow.id)
    checkpoints.sort(key=lambda cp: cp.timestamp)
    resume_checkpoint = checkpoints[-1]
    print(f"\n>>> Checkpoint ID: {resume_checkpoint.checkpoint_id}")
    
    # =========================================================================
    # STEP 2: Resume and respond to first request (confirmation)
    # =========================================================================
    print("\n" + "=" * 70)
    print("STEP 2: Resume from checkpoint and respond 'yes' to confirmation")
    print("=" * 70)
    
    workflow2 = build_parent_workflow(storage)
    
    # Resume from checkpoint
    request_info_event: RequestInfoEvent | None = None
    async for event in workflow2.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id):
        if isinstance(event, RequestInfoEvent):
            request_info_event = event
            print(f"\n>>> Captured request after resume: {event.data}")
    
    if request_info_event is None:
        raise RuntimeError("No request_info_event captured after resume!")
    
    # Send response "yes" - this should trigger the second request_response()
    print("\n>>> Sending response 'yes' to confirmation...")
    
    second_request_id: str | None = None
    async for event in workflow2.send_responses_streaming({request_info_event.request_id: "yes"}):
        if isinstance(event, RequestInfoEvent):
            second_request_id = event.request_id
            print(f"\n>>> Captured SECOND request ID: {second_request_id}")
            print(f">>> Request data: {event.data}")
        
        if (isinstance(event, WorkflowStatusEvent) and 
            event.state is WorkflowRunState.IDLE_WITH_PENDING_REQUESTS):
            print("\n>>> Workflow idle with pending requests - checkpoint saved")
            break
        
        if isinstance(event, WorkflowOutputEvent):
            print(f"\n>>> Got output: {event.data}")
            break
    
    if second_request_id is None:
        raise RuntimeError("No second request captured!")
    
    # =========================================================================
    # BUG ANALYSIS
    # =========================================================================
    print("\n" + "=" * 70)
    print("BUG ANALYSIS")
    print("=" * 70)
    print("""
The bug was demonstrated in STEP 2 above.

After resuming from checkpoint and responding to the FIRST request (confirmation),
the sub-workflow continued and made a SECOND request (description).

However, the Coordinator received TWO requests:
1. ConfirmationRequest (ID: 5efdd296...) - THIS WAS ALREADY ANSWERED!
2. DescriptionRequest (ID: fbe52803...) - This is the new, legitimate request

The log shows:
  - Total pending requests: 2   <-- BUG! Should be 1

Root cause: WorkflowExecutor._process_workflow_result() calls 
result.get_request_info_events() which returns ALL RequestInfoEvents,
including ones that were already answered before the checkpoint.

This causes:
- expected_response_count = 2 (should be 1)
- Duplicate SubWorkflowRequestMessage sent to parent
- ValueError when trying to respond to duplicate request
""")


if __name__ == "__main__":
    asyncio.run(main())

Error Messages / Stack Traces

Package Versions

agent-framework[azure]==1.0.0b260106

Python Version

python 3.12

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions