diff --git a/python/packages/core/agent_framework/_workflows/_concurrent.py b/python/packages/core/agent_framework/_workflows/_concurrent.py index 033946afff..4204c8cd6d 100644 --- a/python/packages/core/agent_framework/_workflows/_concurrent.py +++ b/python/packages/core/agent_framework/_workflows/_concurrent.py @@ -292,9 +292,7 @@ class MyCustomExecutor(Executor): ... wf2 = ConcurrentBuilder().register_participants([create_researcher, MyCustomExecutor]).build() """ if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: raise ValueError("register_participants() has already been called on this builder instance.") @@ -330,9 +328,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Con wf2 = ConcurrentBuilder().participants([researcher_agent, my_custom_executor]).build() """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants() has already been called on this builder instance.") @@ -498,6 +494,10 @@ def with_request_info( def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + participants: list[Executor | AgentProtocol] = [] if self._participant_factories: # Resolve the participant factories now. This doesn't break the factory pattern @@ -549,11 +549,6 @@ def build(self) -> Workflow: workflow = ConcurrentBuilder().participants([agent1, agent2]).build() """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants provided. Call .participants([...]) or .register_participants([...]) first." - ) - # Internal nodes dispatcher = _DispatchToAllParticipants(id="dispatcher") aggregator = ( diff --git a/python/packages/core/agent_framework/_workflows/_group_chat.py b/python/packages/core/agent_framework/_workflows/_group_chat.py index e1ba5156ee..3d698ba72c 100644 --- a/python/packages/core/agent_framework/_workflows/_group_chat.py +++ b/python/packages/core/agent_framework/_workflows/_group_chat.py @@ -519,9 +519,11 @@ class GroupChatBuilder: def __init__(self) -> None: """Initialize the GroupChatBuilder.""" self._participants: dict[str, AgentProtocol | Executor] = {} + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] # Orchestrator related members self._orchestrator: BaseGroupChatOrchestrator | None = None + self._orchestrator_factory: Callable[[], ChatAgent | BaseGroupChatOrchestrator] | None = None self._selection_func: GroupChatSelectionFunction | None = None self._agent_orchestrator: ChatAgent | None = None self._termination_condition: TerminationCondition | None = None @@ -535,21 +537,53 @@ def __init__(self) -> None: self._request_info_enabled: bool = False self._request_info_filter: set[str] = set() - def with_orchestrator(self, orchestrator: BaseGroupChatOrchestrator) -> "GroupChatBuilder": + def with_orchestrator( + self, + *, + agent: ChatAgent | None = None, + selection_func: GroupChatSelectionFunction | None = None, + orchestrator: BaseGroupChatOrchestrator | None = None, + orchestrator_factory: Callable[[], ChatAgent | BaseGroupChatOrchestrator] | None = None, + orchestrator_name: str | None = None, + ) -> "GroupChatBuilder": """Set the orchestrator for this group chat workflow. An group chat orchestrator is responsible for managing the flow of conversation, making sure all participants are synced and picking the next speaker according to the defined logic until the termination conditions are met. + There are a few ways to configure the orchestrator: + 1. Provide a ChatAgent instance to use an agent-based orchestrator that selects the next speaker intelligently + 2. Provide a selection function to use that picks the next speaker based on the function logic + 3. Provide a BaseGroupChatOrchestrator instance to use a custom orchestrator + 4. Provide an orchestrator factory to create either a ChatAgent or BaseGroupChatOrchestrator at build time + + You can only use one of the above methods to configure the orchestrator. + Args: + agent: An instance of ChatAgent to manage the group chat. + selection_func: Callable that receives the current GroupChatState and returns + the name of the next participant to speak, or None to finish. orchestrator: An instance of BaseGroupChatOrchestrator to manage the group chat. + orchestrator_factory: A callable that produces either a ChatAgent or + BaseGroupChatOrchestrator when invoked. + orchestrator_name: Optional display name for the orchestrator in the workflow if + using a selection function. If not provided, defaults to + `GroupChatBuilder.DEFAULT_ORCHESTRATOR_ID`. This parameter is + ignored if using an agent or custom orchestrator. Returns: Self for fluent chaining. Raises: - ValueError: If an orchestrator has already been set + ValueError: If an orchestrator has already been set or if none or multiple + of the parameters are provided. + + Note: + When using a custom orchestrator that implements `BaseGroupChatOrchestrator`, either + via the `orchestrator` or `orchestrator_factory` parameters, setting `termination_condition` + and `max_rounds` on the builder will have no effect since the orchestrator is already + fully defined. Example: .. code-block:: python @@ -560,114 +594,67 @@ def with_orchestrator(self, orchestrator: BaseGroupChatOrchestrator) -> "GroupCh orchestrator = CustomGroupChatOrchestrator(...) workflow = GroupChatBuilder().with_orchestrator(orchestrator).participants([agent1, agent2]).build() """ - if self._orchestrator is not None: - raise ValueError("An orchestrator has already been configured. Call with_orchestrator(...) at most once.") if self._agent_orchestrator is not None: raise ValueError( - "An agent orchestrator has already been configured. " - "Call only one of with_orchestrator(...) or with_agent_orchestrator(...)." + "An agent orchestrator has already been configured. Call with_orchestrator(...) once only." ) - if self._selection_func is not None: - raise ValueError( - "A selection function has already been configured. " - "Call only one of with_orchestrator(...) or with_select_speaker_func(...)." - ) - - self._orchestrator = orchestrator - return self - - def with_agent_orchestrator(self, agent: ChatAgent) -> "GroupChatBuilder": - """Set an agent-based orchestrator for this group chat workflow. - - An agent-based group chat orchestrator uses a ChatAgent to select the next speaker - intelligently based on the conversation context. - Args: - agent: An instance of ChatAgent to manage the group chat. - - Returns: - Self for fluent chaining. + if self._selection_func is not None: + raise ValueError("A selection function has already been configured. Call with_orchestrator(...) once only.") - Raises: - ValueError: If an orchestrator has already been set - """ - if self._agent_orchestrator is not None: - raise ValueError( - "Agent orchestrator has already been configured. Call with_agent_orchestrator(...) at most once." - ) if self._orchestrator is not None: + raise ValueError("An orchestrator has already been configured. Call with_orchestrator(...) once only.") + + if self._orchestrator_factory is not None: raise ValueError( - "An orchestrator has already been configured. " - "Call only one of with_agent_orchestrator(...) or with_orchestrator(...)." + "An orchestrator factory has already been configured. Call with_orchestrator(...) once only." ) - if self._selection_func is not None: + + if sum(x is not None for x in [agent, selection_func, orchestrator, orchestrator_factory]) != 1: raise ValueError( - "A selection function has already been configured. " - "Call only one of with_agent_orchestrator(...) or with_select_speaker_func(...)." + "Exactly one of agent, selection_func, orchestrator, or orchestrator_factory must be provided." ) - self._agent_orchestrator = agent + if agent is not None: + self._agent_orchestrator = agent + elif selection_func is not None: + self._selection_func = selection_func + self._orchestrator_name = orchestrator_name + elif orchestrator is not None: + self._orchestrator = orchestrator + else: + self._orchestrator_factory = orchestrator_factory + return self - def with_select_speaker_func( + def register_participants( self, - selection_func: GroupChatSelectionFunction, - *, - orchestrator_name: str | None = None, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], ) -> "GroupChatBuilder": - """Define a custom function to select the next speaker in the group chat. - - This is a quick way to implement simple orchestration logic without needing a full - GroupChatOrchestrator. The provided function receives the current state of - the group chat and returns the name of the next participant to speak. + """Register participant factories for this group chat workflow. Args: - selection_func: Callable that receives the current GroupChatState and returns - the name of the next participant to speak, or None to finish. - orchestrator_name: Optional display name for the orchestrator in the workflow. - If not provided, defaults to `GroupChatBuilder.DEFAULT_ORCHESTRATOR_ID`. + participant_factories: Sequence of callables that produce participant definitions + when invoked. Each callable should return either an AgentProtocol instance + (auto-wrapped as AgentExecutor) or an Executor instance. Returns: Self for fluent chaining Raises: - ValueError: If an orchestrator has already been set - - Example: - .. code-block:: python - - from agent_framework import GroupChatBuilder, GroupChatState - - - async def round_robin_selector(state: GroupChatState) -> str: - # Simple round-robin selection among participants - return state.participants[state.current_round % len(state.participants)] + ValueError: If participant_factories is empty, or participants + or participant factories are already set + """ + if self._participants: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") - workflow = ( - GroupChatBuilder() - .with_select_speaker_func(round_robin_selector, orchestrator_name="Coordinator") - .participants([agent1, agent2]) - .build() - ) - """ - if self._selection_func is not None: - raise ValueError( - "select_speakers_func has already been configured. Call with_select_speakers_func(...) at most once." - ) - if self._orchestrator is not None: - raise ValueError( - "An orchestrator has already been configured. " - "Call only one of with_select_speaker_func(...) or with_orchestrator(...)." - ) - if self._agent_orchestrator is not None: - raise ValueError( - "An agent orchestrator has already been configured. " - "Call only one of with_select_speaker_func(...) or with_agent_orchestrator(...)." - ) + if not participant_factories: + raise ValueError("participant_factories cannot be empty") - self._selection_func = selection_func - self._orchestrator_name = orchestrator_name + self._participant_factories = list(participant_factories) return self def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "GroupChatBuilder": @@ -682,7 +669,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Gro Self for fluent chaining Raises: - ValueError: If participants are empty, names are duplicated, or already set + ValueError: If participants are empty, names are duplicated, or participants + or participant factories are already set TypeError: If any participant is not AgentProtocol or Executor instance Example: @@ -693,13 +681,16 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Gro workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, agent2, custom_executor]) .build() ) """ + if self._participant_factories: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + if self._participants: - raise ValueError("participants have already been set. Call participants(...) at most once.") + raise ValueError("participants have already been set. Call participants() at most once.") if not participants: raise ValueError("participants cannot be empty.") @@ -752,13 +743,13 @@ def stop_after_two_calls(conversation: list[ChatMessage]) -> bool: specialist_agent = ... workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, specialist_agent]) .with_termination_condition(stop_after_two_calls) .build() ) """ - if self._orchestrator is not None: + if self._orchestrator is not None or self._orchestrator_factory is not None: logger.warning( "Orchestrator has already been configured; setting termination condition on builder has no effect." ) @@ -778,6 +769,9 @@ def with_max_rounds(self, max_rounds: int | None) -> "GroupChatBuilder": Returns: Self for fluent chaining """ + if self._orchestrator is not None or self._orchestrator_factory is not None: + logger.warning("Orchestrator has already been configured; setting max rounds on builder has no effect.") + self._max_rounds = max_rounds return self @@ -802,7 +796,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "GroupCha storage = MemoryCheckpointStorage() workflow = ( GroupChatBuilder() - .with_select_speaker_func(my_selection_function) + .with_orchestrator(selection_func=my_selection_function) .participants([agent1, agent2]) .with_checkpointing(storage) .build() @@ -846,15 +840,22 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: Args: participants: List of resolved participant executors """ - if self._orchestrator is not None: - return self._orchestrator + if all( + x is None + for x in [self._agent_orchestrator, self._selection_func, self._orchestrator, self._orchestrator_factory] + ): + raise ValueError("No orchestrator has been configured. Call with_orchestrator() to set one.") + # We don't need to check if multiple are set since that is handled in with_orchestrator() - if self._agent_orchestrator is not None and self._selection_func is not None: - raise ValueError( - "Both agent-based orchestrator and selection function are configured; only one can be used at a time." + if self._agent_orchestrator: + return AgentBasedGroupChatOrchestrator( + agent=self._agent_orchestrator, + participant_registry=ParticipantRegistry(participants), + max_rounds=self._max_rounds, + termination_condition=self._termination_condition, ) - if self._selection_func is not None: + if self._selection_func: return GroupChatOrchestrator( id=self.DEFAULT_ORCHESTRATOR_ID, participant_registry=ParticipantRegistry(participants), @@ -864,23 +865,44 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: termination_condition=self._termination_condition, ) - if self._agent_orchestrator is not None: - return AgentBasedGroupChatOrchestrator( - agent=self._agent_orchestrator, - participant_registry=ParticipantRegistry(participants), - max_rounds=self._max_rounds, - termination_condition=self._termination_condition, + if self._orchestrator: + return self._orchestrator + + if self._orchestrator_factory: + orchestrator_instance = self._orchestrator_factory() + if isinstance(orchestrator_instance, ChatAgent): + return AgentBasedGroupChatOrchestrator( + agent=orchestrator_instance, + participant_registry=ParticipantRegistry(participants), + max_rounds=self._max_rounds, + termination_condition=self._termination_condition, + ) + if isinstance(orchestrator_instance, BaseGroupChatOrchestrator): + return orchestrator_instance + raise TypeError( + f"Orchestrator factory must return ChatAgent or BaseGroupChatOrchestrator instance. " + f"Got {type(orchestrator_instance).__name__}." ) - raise RuntimeError( - "Orchestrator could not be resolved. Please provide one via with_orchestrator(), " - "with_agent_orchestrator(), or with_select_speaker_func()." - ) + # This should never be reached due to the checks above + raise RuntimeError("Orchestrator could not be resolved. Please provide one via with_orchestrator()") def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + participant = factory() + participants.append(participant) + else: + participants = list(self._participants.values()) + executors: list[Executor] = [] - for participant in self._participants.values(): + for participant in participants: if isinstance(participant, Executor): executors.append(participant) elif isinstance(participant, AgentProtocol): @@ -908,9 +930,6 @@ def build(self) -> Workflow: Returns: Validated Workflow instance ready for execution """ - if not self._participants: - raise ValueError("participants must be configured before build()") - # Resolve orchestrator and participants to executors participants: list[Executor] = self._resolve_participants() orchestrator: Executor = self._resolve_orchestrator(participants) diff --git a/python/packages/core/agent_framework/_workflows/_handoff.py b/python/packages/core/agent_framework/_workflows/_handoff.py index 9e9e115bd4..fc3bf87112 100644 --- a/python/packages/core/agent_framework/_workflows/_handoff.py +++ b/python/packages/core/agent_framework/_workflows/_handoff.py @@ -599,7 +599,7 @@ def __init__( self._participant_factories: dict[str, Callable[[], AgentProtocol]] = {} self._start_id: str | None = None if participant_factories: - self.participant_factories(participant_factories) + self.register_participants(participant_factories) if participants: self.participants(participants) @@ -619,7 +619,7 @@ def __init__( # Termination related members self._termination_condition: Callable[[list[ChatMessage]], bool | Awaitable[bool]] | None = None - def participant_factories( + def register_participants( self, participant_factories: Mapping[str, Callable[[], AgentProtocol]] ) -> "HandoffBuilder": """Register factories that produce agents for the handoff workflow. @@ -637,7 +637,7 @@ def participant_factories( Self for method chaining. Raises: - ValueError: If participant_factories is empty or `.participants(...)` or `.participant_factories(...)` + ValueError: If participant_factories is empty or `.participants(...)` or `.register_participants(...)` has already been called. Example: @@ -666,17 +666,14 @@ def create_billing_agent() -> ChatAgent: # Handoff will be created automatically unless specified otherwise # The default creates a mesh topology where all agents can handoff to all others - builder = HandoffBuilder().participant_factories(factories) + builder = HandoffBuilder().register_participants(factories) builder.with_start_agent("triage") """ if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: - raise ValueError("participant_factories() has already been called on this builder instance.") - + raise ValueError("register_participants() has already been called on this builder instance.") if not participant_factories: raise ValueError("participant_factories cannot be empty") @@ -694,8 +691,8 @@ def participants(self, participants: Sequence[AgentProtocol]) -> "HandoffBuilder Self for method chaining. Raises: - ValueError: If participants is empty, contains duplicates, or `.participants(...)` or - `.participant_factories(...)` has already been called. + ValueError: If participants is empty, contains duplicates, or `.participants()` or + `.register_participants()` has already been called. TypeError: If participants are not AgentProtocol instances. Example: @@ -714,9 +711,7 @@ def participants(self, participants: Sequence[AgentProtocol]) -> "HandoffBuilder builder.with_start_agent(triage) """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .participant_factories() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants have already been assigned") @@ -896,7 +891,7 @@ def with_start_agent(self, agent: str | AgentProtocol) -> "HandoffBuilder": if agent not in self._participant_factories: raise ValueError(f"Start agent factory name '{agent}' is not in the participant_factories list") else: - raise ValueError("Call participant_factories(...) before with_start_agent(...)") + raise ValueError("Call register_participants(...) before with_start_agent(...)") self._start_id = agent elif isinstance(agent, AgentProtocol): resolved_id = self._resolve_to_id(agent) @@ -1039,15 +1034,6 @@ def build(self) -> Workflow: ValueError: If participants or coordinator were not configured, or if required configuration is invalid. """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants or participant_factories have been configured. " - "Call participants(...) or participant_factories(...) first." - ) - - if self._start_id is None: - raise ValueError("Must call with_start_agent(...) before building the workflow.") - # Resolve agents (either from instances or factories) # The returned map keys are either executor IDs or factory names, which is need to resolve handoff configs resolved_agents = self._resolve_agents() @@ -1058,6 +1044,8 @@ def build(self) -> Workflow: executors = self._resolve_executors(resolved_agents, resolved_handoffs) # Build the workflow graph + if self._start_id is None: + raise ValueError("Must call with_start_agent(...) before building the workflow.") start_executor = executors[self._resolve_to_id(resolved_agents[self._start_id])] builder = WorkflowBuilder( name=self._name, @@ -1096,8 +1084,9 @@ def _resolve_agents(self) -> dict[str, AgentProtocol]: Returns: Map of executor IDs or factory names to `AgentProtocol` instances """ - if self._participants and self._participant_factories: - raise ValueError("Cannot have both executors and participant_factories configured") + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods if self._participants: return self._participants diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 052a59766f..c1f0019c63 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -7,7 +7,7 @@ import re import sys from abc import ABC, abstractmethod -from collections.abc import Sequence +from collections.abc import Callable, Sequence from dataclasses import dataclass, field from enum import Enum from typing import Any, ClassVar, TypeVar, cast @@ -1376,11 +1376,47 @@ class MagenticBuilder: """ def __init__(self) -> None: + """Initialize the Magentic workflow builder.""" self._participants: dict[str, AgentProtocol | Executor] = {} + self._participant_factories: list[Callable[[], AgentProtocol | Executor]] = [] + + # Manager related members self._manager: MagenticManagerBase | None = None + self._manager_factory: Callable[[], MagenticManagerBase] | None = None + self._manager_agent_factory: Callable[[], AgentProtocol] | None = None + self._standard_manager_options: dict[str, Any] = {} self._enable_plan_review: bool = False + self._checkpoint_storage: CheckpointStorage | None = None + def register_participants( + self, + participant_factories: Sequence[Callable[[], AgentProtocol | Executor]], + ) -> "MagenticBuilder": + """Register participant factories for this Magentic workflow. + + Args: + participant_factories: Sequence of callables that return AgentProtocol or Executor instances. + + Returns: + Self for method chaining + + Raises: + ValueError: If participant_factories is empty, or participants + or participant factories are already set + """ + if self._participants: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + + if self._participant_factories: + raise ValueError("register_participants() has already been called on this builder instance.") + + if not participant_factories: + raise ValueError("participant_factories cannot be empty") + + self._participant_factories = list(participant_factories) + return self + def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self: """Define participants for this Magentic workflow. @@ -1393,7 +1429,8 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self Self for method chaining Raises: - ValueError: If participants are empty, names are duplicated, or already set + ValueError: If participants are empty, names are duplicated, or participants + or participant factories are already set TypeError: If any participant is not AgentProtocol or Executor instance Example: @@ -1403,7 +1440,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self workflow = ( MagenticBuilder() .participants([research_agent, writing_agent, coding_agent, review_agent]) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .build() ) @@ -1412,6 +1449,9 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> Self - Agent descriptions (if available) are extracted and provided to the manager - Can be called multiple times to add participants incrementally """ + if self._participant_factories: + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") + if self._participants: raise ValueError("participants have already been set. Call participants(...) at most once.") @@ -1468,7 +1508,7 @@ def with_plan_review(self, enable: bool = True) -> "MagenticBuilder": workflow = ( MagenticBuilder() .participants(agent1=agent1) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .with_plan_review(enable=True) .build() ) @@ -1515,7 +1555,7 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic workflow = ( MagenticBuilder() .participants([agent1]) - .with_standard_manager(agent=manager_agent) + .with_manager(agent=manager_agent) .with_checkpointing(storage) .build() ) @@ -1537,10 +1577,12 @@ def with_checkpointing(self, checkpoint_storage: CheckpointStorage) -> "Magentic self._checkpoint_storage = checkpoint_storage return self - def with_standard_manager( + def with_manager( self, manager: MagenticManagerBase | None = None, *, + manager_factory: Callable[[], MagenticManagerBase] | None = None, + agent_factory: Callable[[], AgentProtocol] | None = None, # Constructor args for StandardMagenticManager when manager is not provided agent: AgentProtocol | None = None, task_ledger: _MagenticTaskLedger | None = None, @@ -1560,17 +1602,21 @@ def with_standard_manager( """Configure the workflow manager for task planning and agent coordination. The manager is responsible for creating plans, selecting agents, tracking progress, - and deciding when to replan or complete. This method supports two usage patterns: + and deciding when to replan or complete. This method supports four usage patterns: 1. **Provide existing manager**: Pass a pre-configured manager instance (custom or standard) for full control over behavior - 2. **Auto-create with agent**: Pass an agent to automatically create a - StandardMagenticManager that uses the agent's configured instructions and - options (temperature, seed, etc.) + 2. **Factory for custom manager**: Pass a callable that returns a new manager + instance for more advanced scenarios so that the builder can be reused + 3. **Factory for agent**: Pass a callable that returns a new agent instance to + automatically create a `StandardMagenticManager` + 4. **Auto-create with agent**: Pass an agent to automatically create a `StandardMagenticManager` Args: - manager: Pre-configured manager instance (StandardMagenticManager or custom - MagenticManagerBase subclass). If provided, all other arguments are ignored. + manager: Pre-configured manager instance (`StandardMagenticManager` or custom + `MagenticManagerBase` subclass). If provided, all other arguments are ignored. + manager_factory: Callable that returns a new manager instance. + agent_factory: Callable that returns a new agent instance. agent: Agent instance for generating plans and decisions. The agent's configured instructions and options (temperature, seed, etc.) will be applied. @@ -1620,7 +1666,7 @@ def with_standard_manager( workflow = ( MagenticBuilder() .participants(agent1=agent1, agent2=agent2) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=20, max_stall_count=3, @@ -1639,7 +1685,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: manager = MyManager() - workflow = MagenticBuilder().participants(agent1=agent1).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants(agent1=agent1).with_manager(manager).build() Usage with prompt customization: @@ -1648,7 +1694,7 @@ async def plan(self, context: MagenticContext) -> ChatMessage: workflow = ( MagenticBuilder() .participants(coder=coder_agent, reviewer=reviewer_agent) - .with_standard_manager( + .with_manager( agent=manager_agent, task_ledger_plan_prompt="Create a detailed step-by-step plan...", progress_ledger_prompt="Assess progress and decide next action...", @@ -1664,27 +1710,68 @@ async def plan(self, context: MagenticContext) -> ChatMessage: - Stall detection helps prevent infinite loops in stuck scenarios - The agent's instructions are used as system instructions for all manager prompts """ + if any([self._manager, self._manager_factory, self._manager_agent_factory]): + raise ValueError("with_manager() has already been called on this builder instance.") + + if sum(x is not None for x in [manager, agent, manager_factory, agent_factory]) != 1: + raise ValueError("Exactly one of manager, agent, manager_factory, or agent_factory must be provided.") + + def _log_warning_if_constructor_args_provided() -> None: + if any( + arg is not None + for arg in [ + task_ledger, + task_ledger_facts_prompt, + task_ledger_plan_prompt, + task_ledger_full_prompt, + task_ledger_facts_update_prompt, + task_ledger_plan_update_prompt, + progress_ledger_prompt, + final_answer_prompt, + max_stall_count, + max_reset_count, + max_round_count, + ] + ): + logger.warning("Customer manager provided; all other with_manager() arguments will be ignored.") + if manager is not None: self._manager = manager - return self - - if agent is None: - raise ValueError("agent is required when manager is not provided: with_standard_manager(agent=...)") - - self._manager = StandardMagenticManager( - agent=agent, - task_ledger=task_ledger, - task_ledger_facts_prompt=task_ledger_facts_prompt, - task_ledger_plan_prompt=task_ledger_plan_prompt, - task_ledger_full_prompt=task_ledger_full_prompt, - task_ledger_facts_update_prompt=task_ledger_facts_update_prompt, - task_ledger_plan_update_prompt=task_ledger_plan_update_prompt, - progress_ledger_prompt=progress_ledger_prompt, - final_answer_prompt=final_answer_prompt, - max_stall_count=max_stall_count, - max_reset_count=max_reset_count, - max_round_count=max_round_count, - ) + _log_warning_if_constructor_args_provided() + elif agent is not None: + self._manager = StandardMagenticManager( + agent=agent, + task_ledger=task_ledger, + task_ledger_facts_prompt=task_ledger_facts_prompt, + task_ledger_plan_prompt=task_ledger_plan_prompt, + task_ledger_full_prompt=task_ledger_full_prompt, + task_ledger_facts_update_prompt=task_ledger_facts_update_prompt, + task_ledger_plan_update_prompt=task_ledger_plan_update_prompt, + progress_ledger_prompt=progress_ledger_prompt, + final_answer_prompt=final_answer_prompt, + max_stall_count=max_stall_count, + max_reset_count=max_reset_count, + max_round_count=max_round_count, + ) + elif manager_factory is not None: + self._manager_factory = manager_factory + _log_warning_if_constructor_args_provided() + elif agent_factory is not None: + self._manager_agent_factory = agent_factory + self._standard_manager_options = { + "task_ledger": task_ledger, + "task_ledger_facts_prompt": task_ledger_facts_prompt, + "task_ledger_plan_prompt": task_ledger_plan_prompt, + "task_ledger_full_prompt": task_ledger_full_prompt, + "task_ledger_facts_update_prompt": task_ledger_facts_update_prompt, + "task_ledger_plan_update_prompt": task_ledger_plan_update_prompt, + "progress_ledger_prompt": progress_ledger_prompt, + "final_answer_prompt": final_answer_prompt, + "max_stall_count": max_stall_count, + "max_reset_count": max_reset_count, + "max_round_count": max_round_count, + } + return self def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: @@ -1693,19 +1780,46 @@ def _resolve_orchestrator(self, participants: Sequence[Executor]) -> Executor: Args: participants: List of resolved participant executors """ - if self._manager is None: - raise ValueError("No manager configured. Call with_standard_manager(...) before building the orchestrator.") + if all(x is None for x in [self._manager, self._manager_factory, self._manager_agent_factory]): + raise ValueError("No manager configured. Call with_manager(...) before building the orchestrator.") + # We don't need to check if multiple are set since that is handled in with_orchestrator() + + if self._manager: + manager = self._manager + elif self._manager_factory: + manager = self._manager_factory() + elif self._manager_agent_factory: + agent_instance = self._manager_agent_factory() + manager = StandardMagenticManager( + agent=agent_instance, + **self._standard_manager_options, + ) + else: + # This should never be reached due to the checks above + raise RuntimeError("Manager could not be resolved. Please set the manager properly with with_manager().") return MagenticOrchestrator( - manager=self._manager, + manager=manager, participant_registry=ParticipantRegistry(participants), require_plan_signoff=self._enable_plan_review, ) def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + + participants: list[Executor | AgentProtocol] = [] + if self._participant_factories: + for factory in self._participant_factories: + participant = factory() + participants.append(participant) + else: + participants = list(self._participants.values()) + executors: list[Executor] = [] - for participant in self._participants.values(): + for participant in participants: if isinstance(participant, Executor): executors.append(participant) elif isinstance(participant, AgentProtocol): @@ -1719,12 +1833,6 @@ def _resolve_participants(self) -> list[Executor]: def build(self) -> Workflow: """Build a Magentic workflow with the orchestrator and all agent executors.""" - if not self._participants: - raise ValueError("No participants added to Magentic workflow") - - if self._manager is None: - raise ValueError("No manager configured. Call with_standard_manager(...) before build().") - logger.info(f"Building Magentic workflow with {len(self._participants)} participants") participants: list[Executor] = self._resolve_participants() diff --git a/python/packages/core/agent_framework/_workflows/_sequential.py b/python/packages/core/agent_framework/_workflows/_sequential.py index 11c123d153..663e85c9dd 100644 --- a/python/packages/core/agent_framework/_workflows/_sequential.py +++ b/python/packages/core/agent_framework/_workflows/_sequential.py @@ -160,9 +160,7 @@ def register_participants( ) -> "SequentialBuilder": """Register participant factories for this sequential workflow.""" if self._participants: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participant_factories: raise ValueError("register_participants() has already been called on this builder instance.") @@ -180,9 +178,7 @@ def participants(self, participants: Sequence[AgentProtocol | Executor]) -> "Seq Raises if empty or duplicates are provided for clarity. """ if self._participant_factories: - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) + raise ValueError("Cannot mix .participants() and .register_participants() in the same builder instance.") if self._participants: raise ValueError("participants() has already been called on this builder instance.") @@ -248,6 +244,10 @@ def with_request_info( def _resolve_participants(self) -> list[Executor]: """Resolve participant instances into Executor objects.""" + if not self._participants and not self._participant_factories: + raise ValueError("No participants provided. Call .participants() or .register_participants() first.") + # We don't need to check if both are set since that is handled in the respective methods + participants: list[Executor | AgentProtocol] = [] if self._participant_factories: # Resolve the participant factories now. This doesn't break the factory pattern @@ -287,18 +287,6 @@ def build(self) -> Workflow: - Else (custom Executor): pass conversation directly to the executor - _EndWithConversation yields the final conversation and the workflow becomes idle """ - if not self._participants and not self._participant_factories: - raise ValueError( - "No participants or participant factories provided to the builder. " - "Use .participants([...]) or .register_participants([...])." - ) - - if self._participants and self._participant_factories: - # Defensive strategy: this should never happen due to checks in respective methods - raise ValueError( - "Cannot mix .participants([...]) and .register_participants() in the same builder instance." - ) - # Internal nodes input_conv = _InputToConversation(id="input-conversation") end = _EndWithConversation(id="end") diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index c65f19d599..48302d2838 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from collections.abc import AsyncIterable, Callable +from collections.abc import AsyncIterable, Callable, Sequence from typing import Any, cast import pytest @@ -40,7 +40,7 @@ def __init__(self, agent_name: str, reply_text: str, **kwargs: Any) -> None: async def run( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -50,7 +50,7 @@ async def run( # type: ignore[override] def run_stream( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -66,9 +66,7 @@ async def _stream() -> AsyncIterable[AgentResponseUpdate]: class MockChatClient: """Mock chat client that raises NotImplementedError for all methods.""" - @property - def additional_properties(self) -> dict[str, Any]: - return {} + additional_properties: dict[str, Any] async def get_response(self, messages: Any, **kwargs: Any) -> ChatResponse: raise NotImplementedError @@ -84,7 +82,7 @@ def __init__(self) -> None: async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -130,7 +128,7 @@ async def run( def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -230,7 +228,7 @@ async def test_group_chat_builder_basic_flow() -> None: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() @@ -257,7 +255,7 @@ async def test_group_chat_as_agent_accepts_conversation() -> None: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() @@ -285,7 +283,9 @@ def test_build_without_manager_raises_error(self) -> None: builder = GroupChatBuilder().participants([agent]) - with pytest.raises(RuntimeError, match="Orchestrator could not be resolved"): + with pytest.raises( + ValueError, match=r"No orchestrator has been configured\. Call with_orchestrator\(\) to set one\." + ): builder.build() def test_build_without_participants_raises_error(self) -> None: @@ -294,9 +294,12 @@ def test_build_without_participants_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) - with pytest.raises(ValueError, match="participants must be configured before build"): + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): builder.build() def test_duplicate_manager_configuration_raises_error(self) -> None: @@ -305,10 +308,13 @@ def test_duplicate_manager_configuration_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) - with pytest.raises(ValueError, match="select_speakers_func has already been configured"): - builder.with_select_speaker_func(selector) + with pytest.raises( + ValueError, + match=r"A selection function has already been configured\. Call with_orchestrator\(\.\.\.\) once only\.", + ): + builder.with_orchestrator(selection_func=selector) def test_empty_participants_raises_error(self) -> None: """Test that empty participants list raises ValueError.""" @@ -316,7 +322,7 @@ def test_empty_participants_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="participants cannot be empty"): builder.participants([]) @@ -329,7 +335,7 @@ def test_duplicate_participant_names_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="Duplicate participant name 'test'"): builder.participants([agent1, agent2]) @@ -357,7 +363,7 @@ async def _stream() -> AsyncIterable[AgentResponseUpdate]: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="AgentProtocol participants must have a non-empty name"): builder.participants([agent]) @@ -369,7 +375,7 @@ def test_empty_participant_name_raises_error(self) -> None: def selector(state: GroupChatState) -> str: return "agent" - builder = GroupChatBuilder().with_select_speaker_func(selector) + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) with pytest.raises(ValueError, match="AgentProtocol participants must have a non-empty name"): builder.participants([agent]) @@ -391,7 +397,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(2) # Limit to 2 rounds .build() @@ -426,7 +432,7 @@ def termination_condition(conversation: list[ChatMessage]) -> bool: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_termination_condition(termination_condition) .build() @@ -454,7 +460,7 @@ async def test_termination_condition_agent_manager_finalizes(self) -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator(manager) + .with_orchestrator(agent=manager) .participants([worker]) .with_termination_condition(lambda conv: any(msg.author_name == "agent" for msg in conv)) .build() @@ -480,7 +486,7 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") - workflow = GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).build() + workflow = GroupChatBuilder().with_orchestrator(selection_func=selector).participants([agent]).build() with pytest.raises(RuntimeError, match="Selection function returned unknown participant 'unknown_agent'"): async for _ in workflow.run_stream("test task"): @@ -501,7 +507,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) .with_checkpointing(storage) @@ -530,7 +536,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) with pytest.raises(ValueError, match="At least one ChatMessage is required to start the group chat workflow."): @@ -550,7 +560,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -575,7 +589,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -603,7 +621,11 @@ def selector(state: GroupChatState) -> str: agent = StubAgent("agent", "response") workflow = ( - GroupChatBuilder().with_select_speaker_func(selector).participants([agent]).with_max_rounds(1).build() + GroupChatBuilder() + .with_orchestrator(selection_func=selector) + .participants([agent]) + .with_max_rounds(1) + .build() ) outputs: list[list[ChatMessage]] = [] @@ -632,7 +654,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) # Very low limit .build() @@ -667,7 +689,7 @@ def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .participants([agent]) .with_max_rounds(1) # Hit limit after first response .build() @@ -700,7 +722,7 @@ async def test_group_chat_checkpoint_runtime_only() -> None: wf = ( GroupChatBuilder() .participants([agent_a, agent_b]) - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .with_max_rounds(2) .build() ) @@ -738,7 +760,7 @@ async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: wf = ( GroupChatBuilder() .participants([agent_a, agent_b]) - .with_select_speaker_func(selector) + .with_orchestrator(selection_func=selector) .with_max_rounds(2) .with_checkpointing(buildtime_storage) .build() @@ -783,7 +805,7 @@ async def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha, beta]) .with_max_rounds(2) .with_request_info(agents=["beta"]) # Only pause before beta runs @@ -835,7 +857,7 @@ async def selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() - .with_select_speaker_func(selector, orchestrator_name="manager") + .with_orchestrator(selection_func=selector, orchestrator_name="manager") .participants([alpha]) .with_max_rounds(1) .with_request_info() # No filter - pause for all @@ -864,3 +886,455 @@ def test_group_chat_builder_with_request_info_returns_self(): builder2 = GroupChatBuilder() result2 = builder2.with_request_info(agents=["test"]) assert result2 is builder2 + + +# region Participant Factory Tests + + +def test_group_chat_builder_rejects_empty_participant_factories(): + """Test that GroupChatBuilder rejects empty participant_factories list.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): + GroupChatBuilder().register_participants([]) + + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): + GroupChatBuilder().with_orchestrator(selection_func=selector).build() + + +def test_group_chat_builder_rejects_mixing_participants_and_factories(): + """Test that mixing .participants() and .register_participants() raises an error.""" + alpha = StubAgent("alpha", "reply from alpha") + + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + GroupChatBuilder().participants([alpha]).register_participants([lambda: StubAgent("beta", "reply from beta")]) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + GroupChatBuilder().register_participants([lambda: alpha]).participants([StubAgent("beta", "reply from beta")]) + + +def test_group_chat_builder_rejects_multiple_calls_to_register_participants(): + """Test that multiple calls to .register_participants() raises an error.""" + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): + ( + GroupChatBuilder() + .register_participants([lambda: StubAgent("alpha", "reply from alpha")]) + .register_participants([lambda: StubAgent("beta", "reply from beta")]) + ) + + +def test_group_chat_builder_rejects_multiple_calls_to_participants(): + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match="participants have already been set"): + ( + GroupChatBuilder() + .participants([StubAgent("alpha", "reply from alpha")]) + .participants([StubAgent("beta", "reply from beta")]) + ) + + +async def test_group_chat_with_participant_factories(): + """Test workflow creation using participant_factories.""" + call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_max_rounds(2) + .build() + ) + + # Factories should be called during build + assert call_count == 2 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("coordinate task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_group_chat_participant_factories_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with factories.""" + call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + builder = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_max_rounds(2) + ) + + # Build first workflow + wf1 = builder.build() + assert call_count == 2 + + # Build second workflow + wf2 = builder.build() + assert call_count == 4 + + # Verify that the two workflows have different agent instances + assert wf1.executors["alpha"] is not wf2.executors["alpha"] + assert wf1.executors["beta"] is not wf2.executors["beta"] + + +async def test_group_chat_participant_factories_with_checkpointing(): + """Test checkpointing with participant_factories.""" + storage = InMemoryCheckpointStorage() + + def create_alpha() -> StubAgent: + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + return StubAgent("beta", "reply from beta") + + selector = make_sequence_selector() + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(selection_func=selector) + .with_checkpointing(storage) + .with_max_rounds(2) + .build() + ) + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("checkpoint test"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert outputs, "Should have workflow output" + + checkpoints = await storage.list_checkpoints() + assert checkpoints, "Checkpoints should be created during workflow execution" + + +# endregion + +# region Orchestrator Factory Tests + + +def test_group_chat_builder_rejects_multiple_orchestrator_configurations(): + """Test that configuring multiple orchestrators raises ValueError.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + def orchestrator_factory() -> ChatAgent: + return cast(ChatAgent, StubManagerAgent()) + + builder = GroupChatBuilder().with_orchestrator(selection_func=selector) + + # Already has a selection_func, should fail on second call + with pytest.raises(ValueError, match=r"A selection function has already been configured"): + builder.with_orchestrator(selection_func=selector) + + # Test with orchestrator_factory + builder2 = GroupChatBuilder().with_orchestrator(orchestrator_factory=orchestrator_factory) + with pytest.raises(ValueError, match=r"An orchestrator factory has already been configured"): + builder2.with_orchestrator(orchestrator_factory=orchestrator_factory) + + +def test_group_chat_builder_requires_exactly_one_orchestrator_option(): + """Test that exactly one orchestrator option must be provided.""" + + def selector(state: GroupChatState) -> str: + return list(state.participants.keys())[0] + + def orchestrator_factory() -> ChatAgent: + return cast(ChatAgent, StubManagerAgent()) + + # No options provided + with pytest.raises(ValueError, match="Exactly one of"): + GroupChatBuilder().with_orchestrator() + + # Multiple options provided + with pytest.raises(ValueError, match="Exactly one of"): + GroupChatBuilder().with_orchestrator(selection_func=selector, orchestrator_factory=orchestrator_factory) + + +async def test_group_chat_with_orchestrator_factory_returning_chat_agent(): + """Test workflow creation using orchestrator_factory that returns ChatAgent.""" + factory_call_count = 0 + + class DynamicManagerAgent(ChatAgent): + """Manager agent that dynamically selects from available participants.""" + + def __init__(self) -> None: + super().__init__(chat_client=MockChatClient(), name="dynamic_manager", description="Dynamic manager") + self._call_count = 0 + + async def run( + self, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentResponse: + if self._call_count == 0: + self._call_count += 1 + payload = { + "terminate": False, + "reason": "Selecting alpha", + "next_speaker": "alpha", + "final_message": None, + } + return AgentResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text=( + '{"terminate": false, "reason": "Selecting alpha", ' + '"next_speaker": "alpha", "final_message": null}' + ), + author_name=self.name, + ) + ], + value=payload, + ) + + payload = { + "terminate": True, + "reason": "Task complete", + "next_speaker": None, + "final_message": "dynamic manager final", + } + return AgentResponse( + messages=[ + ChatMessage( + role=Role.ASSISTANT, + text=( + '{"terminate": true, "reason": "Task complete", ' + '"next_speaker": null, "final_message": "dynamic manager final"}' + ), + author_name=self.name, + ) + ], + value=payload, + ) + + def orchestrator_factory() -> ChatAgent: + nonlocal factory_call_count + factory_call_count += 1 + return cast(ChatAgent, DynamicManagerAgent()) + + alpha = StubAgent("alpha", "reply from alpha") + beta = StubAgent("beta", "reply from beta") + + workflow = ( + GroupChatBuilder() + .participants([alpha, beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("coordinate task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + # The DynamicManagerAgent terminates after second call with final_message + final_messages = outputs[0].data + assert isinstance(final_messages, list) + assert any( + msg.text == "dynamic manager final" + for msg in cast(list[ChatMessage], final_messages) + if msg.author_name == "dynamic_manager" + ) + + +def test_group_chat_with_orchestrator_factory_returning_base_orchestrator(): + """Test that orchestrator_factory returning BaseGroupChatOrchestrator is used as-is.""" + factory_call_count = 0 + selector = make_sequence_selector() + + def orchestrator_factory() -> BaseGroupChatOrchestrator: + nonlocal factory_call_count + factory_call_count += 1 + from agent_framework._workflows._base_group_chat_orchestrator import ParticipantRegistry + from agent_framework._workflows._group_chat import GroupChatOrchestrator + + # Create a custom orchestrator; when returning BaseGroupChatOrchestrator, + # the builder uses it as-is without modifying its participant registry + return GroupChatOrchestrator( + id="custom_orchestrator", + participant_registry=ParticipantRegistry([]), + selection_func=selector, + max_rounds=2, + ) + + alpha = StubAgent("alpha", "reply from alpha") + + workflow = ( + GroupChatBuilder().participants([alpha]).with_orchestrator(orchestrator_factory=orchestrator_factory).build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + # Verify the custom orchestrator is in the workflow + assert "custom_orchestrator" in workflow.executors + + +async def test_group_chat_orchestrator_factory_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with orchestrator factory.""" + factory_call_count = 0 + + def orchestrator_factory() -> ChatAgent: + nonlocal factory_call_count + factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + alpha = StubAgent("alpha", "reply from alpha") + beta = StubAgent("beta", "reply from beta") + + builder = ( + GroupChatBuilder().participants([alpha, beta]).with_orchestrator(orchestrator_factory=orchestrator_factory) + ) + + # Build first workflow + wf1 = builder.build() + assert factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert factory_call_count == 2 + + # Verify that the two workflows have different orchestrator instances + assert wf1.executors["manager_agent"] is not wf2.executors["manager_agent"] + + +def test_group_chat_orchestrator_factory_invalid_return_type(): + """Test that orchestrator_factory raising error for invalid return type.""" + + def invalid_factory() -> Any: + return "invalid type" + + alpha = StubAgent("alpha", "reply from alpha") + + with pytest.raises( + TypeError, + match=r"Orchestrator factory must return ChatAgent or BaseGroupChatOrchestrator instance", + ): + (GroupChatBuilder().participants([alpha]).with_orchestrator(orchestrator_factory=invalid_factory).build()) + + +def test_group_chat_with_both_participant_and_orchestrator_factories(): + """Test workflow creation using both participant_factories and orchestrator_factory.""" + participant_factory_call_count = 0 + orchestrator_factory_call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("beta", "reply from beta") + + def orchestrator_factory() -> ChatAgent: + nonlocal orchestrator_factory_call_count + orchestrator_factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + workflow = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + .build() + ) + + # All factories should be called during build + assert participant_factory_call_count == 2 + assert orchestrator_factory_call_count == 1 + + # Verify all executors are present in the workflow + assert "alpha" in workflow.executors + assert "beta" in workflow.executors + assert "manager_agent" in workflow.executors + + +async def test_group_chat_factories_reusable_for_multiple_workflows(): + """Test that both factories are reused correctly for multiple workflow builds.""" + participant_factory_call_count = 0 + orchestrator_factory_call_count = 0 + + def create_alpha() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("alpha", "reply from alpha") + + def create_beta() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("beta", "reply from beta") + + def orchestrator_factory() -> ChatAgent: + nonlocal orchestrator_factory_call_count + orchestrator_factory_call_count += 1 + return cast(ChatAgent, StubManagerAgent()) + + builder = ( + GroupChatBuilder() + .register_participants([create_alpha, create_beta]) + .with_orchestrator(orchestrator_factory=orchestrator_factory) + ) + + # Build first workflow + wf1 = builder.build() + assert participant_factory_call_count == 2 + assert orchestrator_factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert participant_factory_call_count == 4 + assert orchestrator_factory_call_count == 2 + + # Verify that the workflows have different agent and orchestrator instances + assert wf1.executors["alpha"] is not wf2.executors["alpha"] + assert wf1.executors["beta"] is not wf2.executors["beta"] + assert wf1.executors["manager_agent"] is not wf2.executors["manager_agent"] + + +# endregion diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 268f89d513..af9cdeeb04 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -208,7 +208,9 @@ def test_build_fails_without_start_agent(): def test_build_fails_without_participants(): """Verify that build() raises ValueError when no participants are provided.""" - with pytest.raises(ValueError, match="No participants or participant_factories have been configured."): + with pytest.raises( + ValueError, match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first." + ): HandoffBuilder().build() @@ -272,7 +274,7 @@ async def mock_get_response(messages: Any, options: dict[str, Any] | None = None agent = ChatAgent( chat_client=mock_client, name="test_agent", - default_options={"tool_choice": {"mode": "required"}}, + default_options={"tool_choice": {"mode": "required"}}, # type: ignore ) # Run the agent @@ -292,9 +294,11 @@ def test_handoff_builder_rejects_empty_participant_factories(): """Test that HandoffBuilder rejects empty participant_factories dictionary.""" # Empty factories are rejected immediately when calling participant_factories() with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): - HandoffBuilder().participant_factories({}) + HandoffBuilder().register_participants({}) - with pytest.raises(ValueError, match=r"No participants or participant_factories have been configured"): + with pytest.raises( + ValueError, match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\." + ): HandoffBuilder(participant_factories={}).build() @@ -311,7 +315,7 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m # Case 1: participants first, then participant_factories with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder(participants=[triage]).participant_factories({ + HandoffBuilder(participants=[triage]).register_participants({ "specialist": lambda: MockHandoffAgent(name="specialist") }) @@ -323,13 +327,13 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m # Case 3: participants(), then participant_factories() with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder().participants([triage]).participant_factories({ + HandoffBuilder().participants([triage]).register_participants({ "specialist": lambda: MockHandoffAgent(name="specialist") }) # Case 4: participant_factories(), then participants() with pytest.raises(ValueError, match="Cannot mix .participants"): - HandoffBuilder().participant_factories({"triage": lambda: triage}).participants([ + HandoffBuilder().register_participants({"triage": lambda: triage}).participants([ MockHandoffAgent(name="specialist") ]) @@ -342,11 +346,13 @@ def test_handoff_builder_rejects_mixing_participants_and_participant_factories_m def test_handoff_builder_rejects_multiple_calls_to_participant_factories(): """Test that multiple calls to .participant_factories() raises an error.""" - with pytest.raises(ValueError, match=r"participant_factories\(\) has already been called"): + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): ( HandoffBuilder() - .participant_factories({"agent1": lambda: MockHandoffAgent(name="agent1")}) - .participant_factories({"agent2": lambda: MockHandoffAgent(name="agent2")}) + .register_participants({"agent1": lambda: MockHandoffAgent(name="agent1")}) + .register_participants({"agent2": lambda: MockHandoffAgent(name="agent2")}) ) @@ -385,7 +391,7 @@ def test_handoff_builder_rejects_factory_name_coordinator_with_instances(): triage = MockHandoffAgent(name="triage") specialist = MockHandoffAgent(name="specialist") - with pytest.raises(ValueError, match="Call participant_factories.*before with_start_agent"): + with pytest.raises(ValueError, match=r"Call register_participants\(...\) before with_start_agent\(...\)"): ( HandoffBuilder(participants=[triage, specialist]).with_start_agent( "triage" diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index 7e4a5bb48e..cafde9e8f3 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -1,7 +1,7 @@ # Copyright (c) Microsoft. All rights reserved. import sys -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, Sequence from dataclasses import dataclass from typing import Any, ClassVar, cast @@ -155,7 +155,7 @@ def __init__(self, agent_name: str, reply_text: str, **kwargs: Any) -> None: async def run( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -165,7 +165,7 @@ async def run( # type: ignore[override] def run_stream( # type: ignore[override] self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -193,7 +193,7 @@ async def test_magentic_builder_returns_workflow_and_runs() -> None: manager = FakeManager() agent = StubAgent(manager.next_speaker_name, "first draft") - workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants([agent]).with_manager(manager).build() assert isinstance(workflow, Workflow) @@ -219,7 +219,7 @@ async def test_magentic_as_agent_does_not_accept_conversation() -> None: manager = FakeManager() writer = StubAgent(manager.next_speaker_name, "summary response") - workflow = MagenticBuilder().participants([writer]).with_standard_manager(manager).build() + workflow = MagenticBuilder().participants([writer]).with_manager(manager).build() agent = workflow.as_agent(name="magentic-agent") conversation = [ @@ -247,7 +247,7 @@ async def test_standard_manager_plan_and_replan_combined_ledger(): async def test_magentic_workflow_plan_review_approval_to_completion(): manager = FakeManager() - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).with_plan_review().build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager).with_plan_review().build() req_event: RequestInfoEvent | None = None async for ev in wf.run_stream("do work"): @@ -288,7 +288,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ wf = ( MagenticBuilder() .participants([DummyExec(name=manager.next_speaker_name)]) - .with_standard_manager(manager) + .with_manager(manager) .with_plan_review() .build() ) @@ -330,12 +330,7 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ async def test_magentic_orchestrator_round_limit_produces_partial_result(): manager = FakeManager(max_round_count=1) - wf = ( - MagenticBuilder() - .participants([DummyExec(name=manager.next_speaker_name)]) - .with_standard_manager(manager) - .build() - ) + wf = MagenticBuilder().participants([DummyExec(name=manager.next_speaker_name)]).with_manager(manager).build() events: list[WorkflowEvent] = [] async for ev in wf.run_stream("round limit test"): @@ -363,7 +358,7 @@ async def test_magentic_checkpoint_resume_round_trip(): wf = ( MagenticBuilder() .participants([DummyExec(name=manager1.next_speaker_name)]) - .with_standard_manager(manager1) + .with_manager(manager1) .with_plan_review() .with_checkpointing(storage) .build() @@ -386,7 +381,7 @@ async def test_magentic_checkpoint_resume_round_trip(): wf_resume = ( MagenticBuilder() .participants([DummyExec(name=manager2.next_speaker_name)]) - .with_standard_manager(manager2) + .with_manager(manager2) .with_plan_review() .with_checkpointing(storage) .build() @@ -422,7 +417,7 @@ class StubManagerAgent(BaseAgent): async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: Any = None, **kwargs: Any, @@ -431,7 +426,7 @@ async def run( def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: Any = None, **kwargs: Any, @@ -575,7 +570,7 @@ async def run(self, messages=None, *, thread=None, **kwargs): # type: ignore[ov async def _collect_agent_responses_setup(participant: AgentProtocol) -> list[ChatMessage]: captured: list[ChatMessage] = [] - wf = MagenticBuilder().participants([participant]).with_standard_manager(InvokeOnceManager()).build() + wf = MagenticBuilder().participants([participant]).with_manager(InvokeOnceManager()).build() # Run a bounded stream to allow one invoke and then completion events: list[WorkflowEvent] = [] @@ -623,7 +618,7 @@ async def test_magentic_checkpoint_resume_inner_loop_superstep(): workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -638,7 +633,7 @@ async def test_magentic_checkpoint_resume_inner_loop_superstep(): resumed = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -659,11 +654,7 @@ async def test_magentic_checkpoint_resume_from_saved_state(): manager = InvokeOnceManager() workflow = ( - MagenticBuilder() - .participants([StubThreadAgent()]) - .with_standard_manager(manager) - .with_checkpointing(storage) - .build() + MagenticBuilder().participants([StubThreadAgent()]).with_manager(manager).with_checkpointing(storage).build() ) async for event in workflow.run_stream("checkpoint resume task"): @@ -678,7 +669,7 @@ async def test_magentic_checkpoint_resume_from_saved_state(): resumed_workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(InvokeOnceManager()) .with_checkpointing(storage) .build() ) @@ -699,7 +690,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): workflow = ( MagenticBuilder() .participants([StubThreadAgent()]) - .with_standard_manager(manager) + .with_manager(manager) .with_plan_review() .with_checkpointing(storage) .build() @@ -719,7 +710,7 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): renamed_workflow = ( MagenticBuilder() .participants([StubThreadAgent(name="renamedAgent")]) - .with_standard_manager(InvokeOnceManager()) + .with_manager(InvokeOnceManager()) .with_plan_review() .with_checkpointing(storage) .build() @@ -759,7 +750,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM async def test_magentic_stall_and_reset_reach_limits(): manager = NotProgressingManager(max_round_count=10, max_stall_count=0, max_reset_count=1) - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager).build() events: list[WorkflowEvent] = [] async for ev in wf.run_stream("test limits"): @@ -784,7 +775,7 @@ async def test_magentic_checkpoint_runtime_only() -> None: storage = InMemoryCheckpointStorage() manager = FakeManager(max_round_count=10) - wf = MagenticBuilder().participants([DummyExec("agentA")]).with_standard_manager(manager).build() + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager).build() baseline_output: ChatMessage | None = None async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): @@ -819,7 +810,7 @@ async def test_magentic_checkpoint_runtime_overrides_buildtime() -> None: wf = ( MagenticBuilder() .participants([DummyExec("agentA")]) - .with_standard_manager(manager) + .with_manager(manager) .with_checkpointing(buildtime_storage) .build() ) @@ -871,13 +862,7 @@ async def test_magentic_checkpoint_restore_no_duplicate_history(): manager = FakeManager(max_round_count=10) storage = InMemoryCheckpointStorage() - wf = ( - MagenticBuilder() - .participants([DummyExec("agentA")]) - .with_standard_manager(manager) - .with_checkpointing(storage) - .build() - ) + wf = MagenticBuilder().participants([DummyExec("agentA")]).with_manager(manager).with_checkpointing(storage).build() # Run with conversation history to create initial checkpoint conversation: list[ChatMessage] = [ @@ -927,3 +912,374 @@ async def test_magentic_checkpoint_restore_no_duplicate_history(): # endregion + +# region Participant Factory Tests + + +def test_magentic_builder_rejects_empty_participant_factories(): + """Test that MagenticBuilder rejects empty participant_factories list.""" + with pytest.raises(ValueError, match=r"participant_factories cannot be empty"): + MagenticBuilder().register_participants([]) + + with pytest.raises( + ValueError, + match=r"No participants provided\. Call \.participants\(\) or \.register_participants\(\) first\.", + ): + MagenticBuilder().with_manager(FakeManager()).build() + + +def test_magentic_builder_rejects_mixing_participants_and_factories(): + """Test that mixing .participants() and .register_participants() raises an error.""" + agent = StubAgent("agentA", "reply from agentA") + + # Case 1: participants first, then register_participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + MagenticBuilder().participants([agent]).register_participants([lambda: StubAgent("agentB", "reply")]) + + # Case 2: register_participants first, then participants + with pytest.raises(ValueError, match="Cannot mix .participants"): + MagenticBuilder().register_participants([lambda: agent]).participants([StubAgent("agentB", "reply")]) + + +def test_magentic_builder_rejects_multiple_calls_to_register_participants(): + """Test that multiple calls to .register_participants() raises an error.""" + with pytest.raises( + ValueError, match=r"register_participants\(\) has already been called on this builder instance." + ): + ( + MagenticBuilder() + .register_participants([lambda: StubAgent("agentA", "reply from agentA")]) + .register_participants([lambda: StubAgent("agentB", "reply from agentB")]) + ) + + +def test_magentic_builder_rejects_multiple_calls_to_participants(): + """Test that multiple calls to .participants() raises an error.""" + with pytest.raises(ValueError, match="participants have already been set"): + ( + MagenticBuilder() + .participants([StubAgent("agentA", "reply from agentA")]) + .participants([StubAgent("agentB", "reply from agentB")]) + ) + + +async def test_magentic_with_participant_factories(): + """Test workflow creation using participant_factories.""" + call_count = 0 + + def create_agent() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("agentA", "reply from agentA") + + manager = FakeManager() + workflow = MagenticBuilder().register_participants([create_agent]).with_manager(manager).build() + + # Factory should be called during build + assert call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_magentic_participant_factories_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with factories.""" + call_count = 0 + + def create_agent() -> StubAgent: + nonlocal call_count + call_count += 1 + return StubAgent("agentA", "reply from agentA") + + builder = MagenticBuilder().register_participants([create_agent]).with_manager(FakeManager()) + + # Build first workflow + wf1 = builder.build() + assert call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert call_count == 2 + + # Verify that the two workflows have different agent instances + assert wf1.executors["agentA"] is not wf2.executors["agentA"] + + +async def test_magentic_participant_factories_with_checkpointing(): + """Test checkpointing with participant_factories.""" + storage = InMemoryCheckpointStorage() + + def create_agent() -> StubAgent: + return StubAgent("agentA", "reply from agentA") + + manager = FakeManager() + workflow = ( + MagenticBuilder() + .register_participants([create_agent]) + .with_manager(manager) + .with_checkpointing(storage) + .build() + ) + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("checkpoint test"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert outputs, "Should have workflow output" + + checkpoints = await storage.list_checkpoints() + assert checkpoints, "Checkpoints should be created during workflow execution" + + +# endregion + +# region Manager Factory Tests + + +def test_magentic_builder_rejects_multiple_manager_configurations(): + """Test that configuring multiple managers raises ValueError.""" + manager = FakeManager() + + builder = MagenticBuilder().with_manager(manager) + + with pytest.raises(ValueError, match=r"with_manager\(\) has already been called"): + builder.with_manager(manager) + + +def test_magentic_builder_requires_exactly_one_manager_option(): + """Test that exactly one manager option must be provided.""" + manager = FakeManager() + + def manager_factory() -> MagenticManagerBase: + return FakeManager() + + # No options provided + with pytest.raises(ValueError, match="Exactly one of"): + MagenticBuilder().with_manager() + + # Multiple options provided + with pytest.raises(ValueError, match="Exactly one of"): + MagenticBuilder().with_manager(manager, manager_factory=manager_factory) + + +async def test_magentic_with_manager_factory(): + """Test workflow creation using manager_factory.""" + factory_call_count = 0 + + def manager_factory() -> MagenticManagerBase: + nonlocal factory_call_count + factory_call_count += 1 + return FakeManager() + + agent = StubAgent("agentA", "reply from agentA") + workflow = MagenticBuilder().participants([agent]).with_manager(manager_factory=manager_factory).build() + + # Factory should be called during build + assert factory_call_count == 1 + + outputs: list[WorkflowOutputEvent] = [] + async for event in workflow.run_stream("test task"): + if isinstance(event, WorkflowOutputEvent): + outputs.append(event) + + assert len(outputs) == 1 + + +async def test_magentic_with_agent_factory(): + """Test workflow creation using agent_factory for StandardMagenticManager.""" + factory_call_count = 0 + + def agent_factory() -> AgentProtocol: + nonlocal factory_call_count + factory_call_count += 1 + return cast(AgentProtocol, StubManagerAgent()) + + participant = StubAgent("agentA", "reply from agentA") + workflow = ( + MagenticBuilder() + .participants([participant]) + .with_manager(agent_factory=agent_factory, max_round_count=1) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + # Verify workflow can be started (may not complete successfully due to stub behavior) + event_count = 0 + async for _ in workflow.run_stream("test task"): + event_count += 1 + if event_count > 10: + break + + assert event_count > 0 + + +async def test_magentic_manager_factory_reusable_builder(): + """Test that the builder can be reused to build multiple workflows with manager factory.""" + factory_call_count = 0 + + def manager_factory() -> MagenticManagerBase: + nonlocal factory_call_count + factory_call_count += 1 + return FakeManager() + + agent = StubAgent("agentA", "reply from agentA") + builder = MagenticBuilder().participants([agent]).with_manager(manager_factory=manager_factory) + + # Build first workflow + wf1 = builder.build() + assert factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert factory_call_count == 2 + + # Verify that the two workflows have different orchestrator instances + orchestrator1 = next(e for e in wf1.executors.values() if isinstance(e, MagenticOrchestrator)) + orchestrator2 = next(e for e in wf2.executors.values() if isinstance(e, MagenticOrchestrator)) + assert orchestrator1 is not orchestrator2 + + +def test_magentic_with_both_participant_and_manager_factories(): + """Test workflow creation using both participant_factories and manager_factory.""" + participant_factory_call_count = 0 + manager_factory_call_count = 0 + + def create_agent() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("agentA", "reply from agentA") + + def manager_factory() -> MagenticManagerBase: + nonlocal manager_factory_call_count + manager_factory_call_count += 1 + return FakeManager() + + workflow = ( + MagenticBuilder().register_participants([create_agent]).with_manager(manager_factory=manager_factory).build() + ) + + # All factories should be called during build + assert participant_factory_call_count == 1 + assert manager_factory_call_count == 1 + + # Verify executor is present in the workflow + assert "agentA" in workflow.executors + + +async def test_magentic_factories_reusable_for_multiple_workflows(): + """Test that both factories are reused correctly for multiple workflow builds.""" + participant_factory_call_count = 0 + manager_factory_call_count = 0 + + def create_agent() -> StubAgent: + nonlocal participant_factory_call_count + participant_factory_call_count += 1 + return StubAgent("agentA", "reply from agentA") + + def manager_factory() -> MagenticManagerBase: + nonlocal manager_factory_call_count + manager_factory_call_count += 1 + return FakeManager() + + builder = MagenticBuilder().register_participants([create_agent]).with_manager(manager_factory=manager_factory) + + # Build first workflow + wf1 = builder.build() + assert participant_factory_call_count == 1 + assert manager_factory_call_count == 1 + + # Build second workflow + wf2 = builder.build() + assert participant_factory_call_count == 2 + assert manager_factory_call_count == 2 + + # Verify that the workflows have different agent and orchestrator instances + assert wf1.executors["agentA"] is not wf2.executors["agentA"] + + orchestrator1 = next(e for e in wf1.executors.values() if isinstance(e, MagenticOrchestrator)) + orchestrator2 = next(e for e in wf2.executors.values() if isinstance(e, MagenticOrchestrator)) + assert orchestrator1 is not orchestrator2 + + +def test_magentic_agent_factory_with_standard_manager_options(): + """Test that agent_factory properly passes through standard manager options.""" + factory_call_count = 0 + + def agent_factory() -> AgentProtocol: + nonlocal factory_call_count + factory_call_count += 1 + return cast(AgentProtocol, StubManagerAgent()) + + # Custom options to verify they are passed through + custom_max_stall_count = 5 + custom_max_reset_count = 2 + custom_max_round_count = 10 + custom_facts_prompt = "Custom facts prompt: {task}" + custom_plan_prompt = "Custom plan prompt: {team}" + custom_full_prompt = "Custom full prompt: {task} {team} {facts} {plan}" + custom_facts_update_prompt = "Custom facts update: {task} {old_facts}" + custom_plan_update_prompt = "Custom plan update: {team}" + custom_progress_prompt = "Custom progress: {task} {team} {names}" + custom_final_prompt = "Custom final: {task}" + + # Create a custom task ledger + from agent_framework._workflows._magentic import _MagenticTaskLedger + + custom_task_ledger = _MagenticTaskLedger( + facts=ChatMessage(role=Role.ASSISTANT, text="Custom facts"), + plan=ChatMessage(role=Role.ASSISTANT, text="Custom plan"), + ) + + participant = StubAgent("agentA", "reply from agentA") + workflow = ( + MagenticBuilder() + .participants([participant]) + .with_manager( + agent_factory=agent_factory, + task_ledger=custom_task_ledger, + max_stall_count=custom_max_stall_count, + max_reset_count=custom_max_reset_count, + max_round_count=custom_max_round_count, + task_ledger_facts_prompt=custom_facts_prompt, + task_ledger_plan_prompt=custom_plan_prompt, + task_ledger_full_prompt=custom_full_prompt, + task_ledger_facts_update_prompt=custom_facts_update_prompt, + task_ledger_plan_update_prompt=custom_plan_update_prompt, + progress_ledger_prompt=custom_progress_prompt, + final_answer_prompt=custom_final_prompt, + ) + .build() + ) + + # Factory should be called during build + assert factory_call_count == 1 + + # Get the orchestrator and verify the manager has the custom options + orchestrator = next(e for e in workflow.executors.values() if isinstance(e, MagenticOrchestrator)) + manager = orchestrator._manager # type: ignore[reportPrivateUsage] + + # Verify the manager is a StandardMagenticManager with the expected options + from agent_framework import StandardMagenticManager + + assert isinstance(manager, StandardMagenticManager) + assert manager.task_ledger is custom_task_ledger + assert manager.max_stall_count == custom_max_stall_count + assert manager.max_reset_count == custom_max_reset_count + assert manager.max_round_count == custom_max_round_count + assert manager.task_ledger_facts_prompt == custom_facts_prompt + assert manager.task_ledger_plan_prompt == custom_plan_prompt + assert manager.task_ledger_full_prompt == custom_full_prompt + assert manager.task_ledger_facts_update_prompt == custom_facts_update_prompt + assert manager.task_ledger_plan_update_prompt == custom_plan_update_prompt + assert manager.progress_ledger_prompt == custom_progress_prompt + assert manager.final_answer_prompt == custom_final_prompt + + +# endregion diff --git a/python/packages/core/tests/workflow/test_workflow_kwargs.py b/python/packages/core/tests/workflow/test_workflow_kwargs.py index 75c34f9d95..4d665c5516 100644 --- a/python/packages/core/tests/workflow/test_workflow_kwargs.py +++ b/python/packages/core/tests/workflow/test_workflow_kwargs.py @@ -1,6 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. -from collections.abc import AsyncIterable +from collections.abc import AsyncIterable, Sequence from typing import Annotated, Any import pytest @@ -51,7 +51,7 @@ def __init__(self, name: str = "test_agent") -> None: async def run( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -61,7 +61,7 @@ async def run( async def run_stream( self, - messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None, + messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None, *, thread: AgentThread | None = None, **kwargs: Any, @@ -187,7 +187,7 @@ def simple_selector(state: GroupChatState) -> str: workflow = ( GroupChatBuilder() .participants([agent1, agent2]) - .with_select_speaker_func(simple_selector) + .with_orchestrator(selection_func=simple_selector) .with_max_rounds(2) # Limit rounds to prevent infinite loop .build() ) @@ -408,7 +408,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM agent = _KwargsCapturingAgent(name="agent1") manager = _MockManager() - workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager=manager).build() + workflow = MagenticBuilder().participants([agent]).with_manager(manager=manager).build() custom_data = {"session_id": "magentic123"} @@ -457,7 +457,7 @@ async def prepare_final_answer(self, magentic_context: MagenticContext) -> ChatM agent = _KwargsCapturingAgent(name="agent1") manager = _MockManager() - magentic_workflow = MagenticBuilder().participants([agent]).with_standard_manager(manager=manager).build() + magentic_workflow = MagenticBuilder().participants([agent]).with_manager(manager=manager).build() # Use MagenticWorkflow.run_stream() which goes through the kwargs attachment path custom_data = {"magentic_key": "magentic_value"} diff --git a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py index 6c5425f49f..6380c5a14a 100644 --- a/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py +++ b/python/samples/autogen-migration/orchestrations/02_selector_group_chat.py @@ -86,12 +86,11 @@ async def run_agent_framework() -> None: workflow = ( GroupChatBuilder() .participants([python_expert, javascript_expert, database_expert]) - .set_manager( - manager=client.create_agent( + .with_orchestrator( + agent=client.create_agent( name="selector_manager", instructions="Based on the conversation, select the most appropriate expert to respond next.", ), - display_name="SelectorManager", ) .with_max_rounds(1) .build() diff --git a/python/samples/autogen-migration/orchestrations/04_magentic_one.py b/python/samples/autogen-migration/orchestrations/04_magentic_one.py index ca81f0faf9..62a021dfce 100644 --- a/python/samples/autogen-migration/orchestrations/04_magentic_one.py +++ b/python/samples/autogen-migration/orchestrations/04_magentic_one.py @@ -6,6 +6,16 @@ """ import asyncio +import json +from typing import cast + +from agent_framework import ( + AgentRunUpdateEvent, + ChatMessage, + MagenticOrchestratorEvent, + MagenticProgressLedger, + WorkflowOutputEvent, +) async def run_autogen() -> None: @@ -57,13 +67,7 @@ async def run_autogen() -> None: async def run_agent_framework() -> None: """Agent Framework's MagenticBuilder for orchestrated collaboration.""" - from agent_framework import ( - MagenticAgentDeltaEvent, - MagenticAgentMessageEvent, - MagenticBuilder, - MagenticFinalResultEvent, - MagenticOrchestratorMessageEvent, - ) + from agent_framework import MagenticBuilder from agent_framework.openai import OpenAIChatClient client = OpenAIChatClient(model_id="gpt-4.1-mini") @@ -90,9 +94,13 @@ async def run_agent_framework() -> None: # Create Magentic workflow workflow = ( MagenticBuilder() - .participants(researcher=researcher, coder=coder, reviewer=reviewer) - .with_standard_manager( - chat_client=client, + .participants([researcher, coder, reviewer]) + .with_manager( + agent=client.create_agent( + name="magentic_manager", + instructions="You coordinate a team to complete complex tasks efficiently.", + description="Orchestrator for team coordination", + ), max_round_count=20, max_stall_count=3, max_reset_count=1, @@ -101,41 +109,46 @@ async def run_agent_framework() -> None: ) # Run complex task + last_message_id: str | None = None + output_event: WorkflowOutputEvent | None = None print("[Agent Framework] Magentic conversation:") - last_stream_agent_id: str | None = None - stream_line_open: bool = False - async for event in workflow.run_stream("Research Python async patterns and write a simple example"): - if isinstance(event, MagenticOrchestratorMessageEvent): - if stream_line_open: - print() - stream_line_open = False - print(f"---------- Orchestrator:{event.kind} ----------") - print(getattr(event.message, "text", "")) - elif isinstance(event, MagenticAgentDeltaEvent): - if last_stream_agent_id != event.agent_id or not stream_line_open: - if stream_line_open: - print() - print(f"---------- {event.agent_id} ----------") - last_stream_agent_id = event.agent_id - stream_line_open = True - if event.text: - print(event.text, end="", flush=True) - elif isinstance(event, MagenticAgentMessageEvent): - if stream_line_open: - print() - stream_line_open = False - elif isinstance(event, MagenticFinalResultEvent): - if stream_line_open: - print() - stream_line_open = False - print("---------- Final Result ----------") - if event.message is not None: - print(event.message.text) - - if stream_line_open: - print() - print() # Final newline after conversation + if isinstance(event, AgentRunUpdateEvent): + message_id = event.data.message_id + if message_id != last_message_id: + if last_message_id is not None: + print("\n") + print(f"- {event.executor_id}:", end=" ", flush=True) + last_message_id = message_id + print(event.data, end="", flush=True) + + elif isinstance(event, MagenticOrchestratorEvent): + print(f"\n[Magentic Orchestrator Event] Type: {event.event_type.name}") + if isinstance(event.data, ChatMessage): + print(f"Please review the plan:\n{event.data.text}") + elif isinstance(event.data, MagenticProgressLedger): + print(f"Please review progress ledger:\n{json.dumps(event.data.to_dict(), indent=2)}") + else: + print(f"Unknown data type in MagenticOrchestratorEvent: {type(event.data)}") + + # Block to allow user to read the plan/progress before continuing + # Note: this is for demonstration only and is not the recommended way to handle human interaction. + # Please refer to `with_plan_review` for proper human interaction during planning phases. + await asyncio.get_event_loop().run_in_executor(None, input, "Press Enter to continue...") + + elif isinstance(event, WorkflowOutputEvent): + output_event = event + + if not output_event: + raise RuntimeError("Workflow did not produce a final output event.") + print("\n\nWorkflow completed!") + print("Final Output:") + # The output of the Magentic workflow is a list of ChatMessages with only one final message + # generated by the orchestrator. + output_messages = cast(list[ChatMessage], output_event.data) + if output_messages: + output = output_messages[-1].text + print(output) async def main() -> None: diff --git a/python/samples/getting_started/workflows/README.md b/python/samples/getting_started/workflows/README.md index 8ca5e0f4bc..2be4bca03e 100644 --- a/python/samples/getting_started/workflows/README.md +++ b/python/samples/getting_started/workflows/README.md @@ -115,7 +115,7 @@ For additional observability samples in Agent Framework, see the [observability | Concurrent Orchestration (Custom Aggregator) | [orchestration/concurrent_custom_aggregator.py](./orchestration/concurrent_custom_aggregator.py) | Override aggregator via callback; summarize results with an LLM | | Concurrent Orchestration (Custom Agent Executors) | [orchestration/concurrent_custom_agent_executors.py](./orchestration/concurrent_custom_agent_executors.py) | Child executors own ChatAgents; concurrent fan-out/fan-in via ConcurrentBuilder | | Concurrent Orchestration (Participant Factory) | [orchestration/concurrent_participant_factory.py](./orchestration/concurrent_participant_factory.py) | Use participant factories for state isolation between workflow instances | -| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `with_agent_orchestrator()` to select next speaker | +| Group Chat with Agent Manager | [orchestration/group_chat_agent_manager.py](./orchestration/group_chat_agent_manager.py) | Agent-based manager using `with_orchestrator(agent=)` to select next speaker | | Group Chat Philosophical Debate | [orchestration/group_chat_philosophical_debate.py](./orchestration/group_chat_philosophical_debate.py) | Agent manager moderates long-form, multi-round debate across diverse participants | | Group Chat with Simple Function Selector | [orchestration/group_chat_simple_selector.py](./orchestration/group_chat_simple_selector.py) | Group chat with a simple function selector for next speaker | | Handoff (Simple) | [orchestration/handoff_simple.py](./orchestration/handoff_simple.py) | Single-tier routing: triage agent routes to specialists, control returns to user after each specialist response | diff --git a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py index 2a1ab234f9..27c2e8ad4f 100644 --- a/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/group_chat_workflow_as_agent.py @@ -34,8 +34,8 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator( - OpenAIChatClient().create_agent( + .with_orchestrator( + agent=OpenAIChatClient().create_agent( name="Orchestrator", instructions="You coordinate a team conversation to solve the user's task.", ) diff --git a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py index f4e5b38e86..3badeae78a 100644 --- a/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py +++ b/python/samples/getting_started/workflows/agents/magentic_workflow_as_agent.py @@ -55,7 +55,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, coder_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py index e3ed0ab5ff..924a644c30 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/group_chat_request_info.py @@ -89,7 +89,7 @@ async def main() -> None: # Using agents= filter to only pause before pragmatist speaks (not every turn) workflow = ( GroupChatBuilder() - .with_agent_orchestrator(orchestrator) + .with_orchestrator(agent=orchestrator) .participants([optimist, pragmatist, creative]) .with_max_rounds(6) .with_request_info(agents=[pragmatist]) # Only pause before pragmatist speaks diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py index 12475205d3..ea89f4d230 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_agent_manager.py @@ -68,7 +68,7 @@ async def main() -> None: # Build the group chat workflow workflow = ( GroupChatBuilder() - .with_agent_orchestrator(orchestrator_agent) + .with_orchestrator(agent=orchestrator_agent) .participants([researcher, writer]) # Set a hard termination condition: stop after 4 assistant messages # The agent orchestrator will intelligently decide when to end before this limit but just in case diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py index a26b9df4d0..c8506bc5e6 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_philosophical_debate.py @@ -211,7 +211,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() - .with_agent_orchestrator(moderator) + .with_orchestrator(agent=moderator) .participants([farmer, developer, teacher, activist, spiritual_leader, artist, immigrant, doctor]) .with_termination_condition(lambda messages: sum(1 for msg in messages if msg.role == Role.ASSISTANT) >= 10) .build() diff --git a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py index 517ae313f3..1047cd6f22 100644 --- a/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py +++ b/python/samples/getting_started/workflows/orchestration/group_chat_simple_selector.py @@ -17,7 +17,7 @@ Sample: Group Chat with a round-robin speaker selector What it does: -- Demonstrates the with_select_speaker_func() API for GroupChat orchestration +- Demonstrates the with_orchestrator() API for GroupChat orchestration - Uses a pure Python function to control speaker selection based on conversation state Prerequisites: @@ -84,7 +84,7 @@ async def main() -> None: workflow = ( GroupChatBuilder() .participants([expert, verifier, clarifier, skeptic]) - .with_select_speaker_func(round_robin_selector) + .with_orchestrator(selection_func=round_robin_selector) # Set a hard termination condition: stop after 6 messages (user task + one full rounds + 1) # One round is expert -> verifier -> clarifier -> skeptic, after which the expert gets to respond again. # This will end the conversation after the expert has spoken 2 times (one iteration loop) diff --git a/python/samples/getting_started/workflows/orchestration/magentic.py b/python/samples/getting_started/workflows/orchestration/magentic.py index 8e71d09a42..60746bc113 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic.py +++ b/python/samples/getting_started/workflows/orchestration/magentic.py @@ -80,7 +80,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, coder_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index 6fc284a9ab..2dd6a1a170 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -83,7 +83,7 @@ def build_workflow(checkpoint_storage: FileCheckpointStorage): MagenticBuilder() .participants([researcher, writer]) .with_plan_review() - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=3, diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py index 37a53020e7..1050463d01 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_review.py @@ -63,7 +63,7 @@ async def main() -> None: workflow = ( MagenticBuilder() .participants([researcher_agent, analyst_agent]) - .with_standard_manager( + .with_manager( agent=manager_agent, max_round_count=10, max_stall_count=1, diff --git a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py index a8536afc7f..5876c73608 100644 --- a/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py +++ b/python/samples/getting_started/workflows/tool-approval/group_chat_builder_tool_approval.py @@ -118,8 +118,7 @@ async def main() -> None: # 4. Build a group chat workflow with the selector function workflow = ( GroupChatBuilder() - # Optionally, use `.set_manager(...)` to customize the group chat manager - .with_select_speaker_func(select_next_speaker) + .with_orchestrator(selection_func=select_next_speaker) .participants([qa_engineer, devops_engineer]) # Set a hard limit to 4 rounds # First round: QAEngineer speaks diff --git a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py index 8ea37e8f5e..744bdb7370 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/group_chat.py +++ b/python/samples/semantic-kernel-migration/orchestrations/group_chat.py @@ -233,11 +233,8 @@ async def run_agent_framework_example(task: str) -> str: workflow = ( GroupChatBuilder() - .set_manager( - manager=AzureOpenAIChatClient(credential=credential).create_agent(), - display_name="Coordinator", - ) - .participants(researcher=researcher, planner=planner) + .with_orchestrator(agent=AzureOpenAIChatClient(credential=credential).create_agent()) + .participants([researcher, planner]) .build() ) diff --git a/python/samples/semantic-kernel-migration/orchestrations/magentic.py b/python/samples/semantic-kernel-migration/orchestrations/magentic.py index 87094a2047..3d9aa67ea8 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/magentic.py +++ b/python/samples/semantic-kernel-migration/orchestrations/magentic.py @@ -144,12 +144,7 @@ async def run_agent_framework_example(prompt: str) -> str | None: chat_client=OpenAIChatClient(), ) - workflow = ( - MagenticBuilder() - .participants(researcher=researcher, coder=coder) - .with_standard_manager(agent=manager_agent) - .build() - ) + workflow = MagenticBuilder().participants([researcher, coder]).with_manager(agent=manager_agent).build() final_text: str | None = None async for event in workflow.run_stream(prompt):