From 9dbd0b9a5a04b72f054287e35fd0f20a274cd6e1 Mon Sep 17 00:00:00 2001 From: "Sheng Zhao (SPEECH)" Date: Sun, 4 Jan 2026 14:42:47 +0800 Subject: [PATCH] add voice live agent --- python/packages/azure-voice-live/README.md | 30 + .../__init__.py | 19 + .../_audio_utils.py | 142 +++++ .../_event_processor.py | 183 +++++++ .../_types.py | 117 ++++ .../_voice_live_agent.py | 515 ++++++++++++++++++ .../_voice_live_session.py | 264 +++++++++ .../_web/__init__.py | 7 + .../_web/websocket_handler.py | 168 ++++++ .../examples/streaming_voice_chat.py | 483 ++++++++++++++++ .../packages/azure-voice-live/pyproject.toml | 84 +++ 11 files changed, 2012 insertions(+) create mode 100644 python/packages/azure-voice-live/README.md create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/__init__.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_audio_utils.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_event_processor.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_types.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_agent.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_session.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/__init__.py create mode 100644 python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/websocket_handler.py create mode 100644 python/packages/azure-voice-live/examples/streaming_voice_chat.py create mode 100644 python/packages/azure-voice-live/pyproject.toml diff --git a/python/packages/azure-voice-live/README.md b/python/packages/azure-voice-live/README.md new file mode 100644 index 0000000000..c2d834e073 --- /dev/null +++ b/python/packages/azure-voice-live/README.md @@ -0,0 +1,30 @@ +# Azure Voice Live Agent + +Real-time voice conversation support for Microsoft Agent Framework using Azure Voice Live SDK. + +## Features + +- **Real-time Voice Streaming**: Bidirectional audio streaming with PCM16 @ 24kHz +- **Server-side VAD**: Automatic voice activity detection for natural turn-taking +- **Function Calling**: Tool invocation during voice conversations with automatic execution +- **Multi-Agent Support**: Voice agent can delegate complex queries to text-based agents +- **Interruption Support**: User can interrupt agent responses naturally +- **Audio + Text**: Support for voice input/output with automatic transcription +- **Web Integration**: WebSocket support for browser-based voice interfaces +- **Streaming Responses**: Stream audio and text transcripts in real-time + +## Installation + +```bash +pip install agent-framework-azure-voice-live +``` + +For web support: +```bash +pip install agent-framework-azure-voice-live[web] +``` + +## License + +MIT License - Copyright (c) Microsoft Corporation + diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/__init__.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/__init__.py new file mode 100644 index 0000000000..e0d09d2649 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Azure Voice Live integration for Microsoft Agent Framework. + +This package provides real-time voice conversation capabilities using Azure Voice Live SDK. +""" + +from ._types import AudioContent, VoiceOptions +from ._voice_live_agent import VoiceLiveAgent +from ._voice_live_session import VoiceLiveSession + +__all__ = [ + "VoiceLiveAgent", + "VoiceLiveSession", + "AudioContent", + "VoiceOptions", +] + +__version__ = "0.1.0" diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_audio_utils.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_audio_utils.py new file mode 100644 index 0000000000..9f00ee4fd1 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_audio_utils.py @@ -0,0 +1,142 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Audio utilities for encoding, decoding, and file I/O.""" + +import base64 +import wave +from typing import BinaryIO + + +class AudioUtils: + """Utilities for audio encoding/decoding and file I/O.""" + + @staticmethod + def encode_pcm16_to_base64(audio_bytes: bytes) -> str: + """Encode PCM16 audio bytes to base64 string. + + Args: + audio_bytes: Raw PCM16 audio bytes + + Returns: + Base64-encoded string + """ + return base64.b64encode(audio_bytes).decode("utf-8") + + @staticmethod + def decode_base64_to_pcm16(audio_b64: str) -> bytes: + """Decode base64 string to PCM16 audio bytes. + + Args: + audio_b64: Base64-encoded audio string + + Returns: + Raw PCM16 audio bytes + """ + return base64.b64decode(audio_b64) + + @staticmethod + def save_to_wav( + audio_bytes: bytes, file_path: str, sample_rate: int = 24000, channels: int = 1 + ) -> None: + """Save PCM16 audio to WAV file. + + Args: + audio_bytes: Raw PCM16 audio bytes + file_path: Path to output WAV file + sample_rate: Audio sample rate in Hz + channels: Number of audio channels (1=mono, 2=stereo) + """ + with wave.open(file_path, "wb") as wav_file: + wav_file.setnchannels(channels) + wav_file.setsampwidth(2) # 2 bytes for PCM16 + wav_file.setframerate(sample_rate) + wav_file.writeframes(audio_bytes) + + @staticmethod + def load_from_wav(file_path: str) -> tuple[bytes, int, int]: + """Load PCM16 audio from WAV file. + + Args: + file_path: Path to input WAV file + + Returns: + Tuple of (audio_bytes, sample_rate, channels) + + Raises: + ValueError: If WAV file is not PCM16 format + """ + with wave.open(file_path, "rb") as wav_file: + # Validate format + if wav_file.getsampwidth() != 2: + raise ValueError(f"WAV file must be PCM16 (16-bit), got {wav_file.getsampwidth() * 8}-bit") + + channels = wav_file.getnchannels() + sample_rate = wav_file.getframerate() + audio_bytes = wav_file.readframes(wav_file.getnframes()) + + return audio_bytes, sample_rate, channels + + @staticmethod + def resample_audio( + audio_bytes: bytes, + from_sample_rate: int, + to_sample_rate: int, + channels: int = 1, + ) -> bytes: + """Resample PCM16 audio to a different sample rate. + + Note: This is a simple nearest-neighbor resampling. For production use, + consider using a library like scipy or librosa for higher quality resampling. + + Args: + audio_bytes: Raw PCM16 audio bytes + from_sample_rate: Source sample rate in Hz + to_sample_rate: Target sample rate in Hz + channels: Number of audio channels + + Returns: + Resampled PCM16 audio bytes + """ + if from_sample_rate == to_sample_rate: + return audio_bytes + + import struct + + # Convert bytes to samples + sample_format = "= num_samples: + source_index = num_samples - 1 + resampled.append(samples[source_index]) + + # Convert back to bytes + result = bytearray() + for sample in resampled: + if channels == 1: + result.extend(struct.pack(sample_format, sample)) + else: + result.extend(struct.pack(sample_format, sample[0])) + result.extend(struct.pack(sample_format, sample[1])) + + return bytes(result) diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_event_processor.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_event_processor.py new file mode 100644 index 0000000000..9b59b0d4ef --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_event_processor.py @@ -0,0 +1,183 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Event processor for converting Azure Voice Live events to Agent Framework updates.""" + +from typing import Any + +from agent_framework import AgentRunResponseUpdate + + +class EventProcessor: + """Converts Azure Voice Live events to Agent Framework updates. + + This class processes server events from the Azure Voice Live SDK and converts them + into AgentRunResponseUpdate objects that are compatible with the Agent Framework's + streaming interface. + """ + + def __init__(self) -> None: + """Initialize event processor.""" + self._current_response_id: str | None = None + self._function_calls: dict[str, dict[str, Any]] = {} # Track in-progress function calls + + def process_event(self, event: Any) -> AgentRunResponseUpdate | None: + """Convert server event to agent update. + + Args: + event: Server event from Azure Voice Live SDK + + Returns: + AgentRunResponseUpdate if the event should be emitted, None otherwise + """ + # Import here to avoid circular dependency and to handle SDK availability + try: + from azure.ai.voicelive.models import ServerEventType + except ImportError: + # SDK not available, return None + return None + + event_type = event.type + + if event_type == ServerEventType.SESSION_UPDATED: + # Session configuration complete + return AgentRunResponseUpdate(additional_properties={"type": "session_ready"}) + + elif event_type == ServerEventType.RESPONSE_CREATED: + # New response started + self._current_response_id = event.response.id + return AgentRunResponseUpdate( + additional_properties={"type": "response_started", "response_id": event.response.id} + ) + + elif event_type == ServerEventType.RESPONSE_AUDIO_DELTA: + # Audio chunk received + return AgentRunResponseUpdate( + additional_properties={ + "type": "audio_delta", + "response_id": self._current_response_id, + "audio_data": event.delta, + } + ) + + elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA: + # Transcript chunk received + return AgentRunResponseUpdate( + additional_properties={ + "type": "transcript_delta", + "response_id": self._current_response_id, + "text": event.delta, + } + ) + + elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + # User started speaking (VAD detected) + return AgentRunResponseUpdate(additional_properties={"type": "speech_started"}) + + elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + # User stopped speaking (VAD detected) + return AgentRunResponseUpdate(additional_properties={"type": "speech_stopped"}) + + elif event_type == ServerEventType.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED: + # User audio transcription complete + return AgentRunResponseUpdate( + additional_properties={ + "type": "input_transcription_complete", + "transcript": event.transcript if hasattr(event, "transcript") else None, + } + ) + + elif event_type == ServerEventType.RESPONSE_OUTPUT_ITEM_ADDED: + # New output item (message or function call) added to response + item_type = event.item.type if hasattr(event.item, "type") else None + if item_type == "function_call": + # Initialize function call tracking + item_id = event.item.id if hasattr(event.item, "id") else None + call_id = event.item.call_id if hasattr(event.item, "call_id") else None + name = event.item.name if hasattr(event.item, "name") else None + + print(f"[DEBUG EventProcessor] RESPONSE_OUTPUT_ITEM_ADDED: item_id={item_id}, call_id={call_id}, name={name}") + print(f"[DEBUG EventProcessor] item attributes: {dir(event.item)}") + + # Use call_id if available, otherwise fall back to item_id + key = call_id if call_id else item_id + if key: + self._function_calls[key] = {"name": name, "arguments": ""} + print(f"[DEBUG EventProcessor] Stored function call with key={key}, name={name}") + + elif event_type == ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA: + # Accumulate function call arguments + call_id = event.call_id if hasattr(event, "call_id") else None + if call_id: + if call_id not in self._function_calls: + self._function_calls[call_id] = {"name": event.name if hasattr(event, "name") else None, "arguments": ""} + + # Update name if provided + if hasattr(event, "name") and event.name: + self._function_calls[call_id]["name"] = event.name + + # Accumulate arguments + if hasattr(event, "delta") and event.delta: + self._function_calls[call_id]["arguments"] += event.delta + + return AgentRunResponseUpdate( + additional_properties={ + "type": "function_call_delta", + "call_id": call_id, + "arguments_delta": event.delta if hasattr(event, "delta") else "", + } + ) + + elif event_type == ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE: + # Function call complete + call_id = event.call_id if hasattr(event, "call_id") else None + print(f"[DEBUG EventProcessor] RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE: call_id={call_id}") + print(f"[DEBUG EventProcessor] Known function calls: {list(self._function_calls.keys())}") + + if call_id and call_id in self._function_calls: + call_data = self._function_calls.pop(call_id) + print(f"[DEBUG EventProcessor] Found function call data: name={call_data['name']}, args={call_data['arguments']}") + + return AgentRunResponseUpdate( + additional_properties={ + "type": "function_call", + "call_id": call_id, + "name": call_data["name"], + "arguments": call_data["arguments"], + } + ) + else: + print(f"[DEBUG EventProcessor] call_id {call_id} not found in tracked function calls!") + + elif event_type == ServerEventType.RESPONSE_DONE: + # Response complete + usage = None + if hasattr(event, "response") and hasattr(event.response, "usage"): + usage = event.response.usage + + result = AgentRunResponseUpdate( + additional_properties={ + "type": "response_complete", + "response_id": self._current_response_id, + "usage": usage, + } + ) + + # Reset state + self._current_response_id = None + return result + + elif event_type == ServerEventType.ERROR: + # Error event + error_message = event.error if hasattr(event, "error") else "Unknown error" + return AgentRunResponseUpdate(additional_properties={"type": "error", "error": str(error_message)}) + + # Return None for unhandled event types + return None + + def reset(self) -> None: + """Reset processor state. + + Useful when starting a new conversation or handling connection issues. + """ + self._current_response_id = None + self._function_calls.clear() diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_types.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_types.py new file mode 100644 index 0000000000..e64dfa3b05 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_types.py @@ -0,0 +1,117 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Type definitions for Azure Voice Live integration.""" + +from typing import Any, Literal, Sequence + +from agent_framework import Annotations, BaseContent +from pydantic import BaseModel, Field + + +class AudioContent(BaseContent): + """Audio content with PCM16 data. + + Attributes: + type: Content type identifier + audio_data: Raw PCM16 audio bytes + sample_rate: Audio sample rate in Hz + channels: Number of audio channels + transcript: Optional text transcription of the audio + """ + + def __init__( + self, + audio_data: bytes, + *, + sample_rate: int = 24000, + channels: int = 1, + transcript: str | None = None, + additional_properties: dict[str, Any] | None = None, + raw_representation: Any | None = None, + annotations: Sequence[Annotations] | None = None, + **kwargs: Any, + ): + """Initialize AudioContent. + + Args: + audio_data: Raw PCM16 audio bytes + + Keyword Args: + sample_rate: Audio sample rate in Hz (default: 24000) + channels: Number of audio channels, 1=mono, 2=stereo (default: 1) + transcript: Optional text transcription of the audio + additional_properties: Optional additional properties + raw_representation: Optional raw representation + annotations: Optional annotations + **kwargs: Any additional keyword arguments + """ + super().__init__( + annotations=annotations, + additional_properties=additional_properties, + raw_representation=raw_representation, + **kwargs, + ) + self.type: Literal["audio"] = "audio" + self.audio_data = audio_data + self.sample_rate = sample_rate + self.channels = channels + self.transcript = transcript + + @property + def duration_ms(self) -> int: + """Calculate audio duration in milliseconds. + + Returns: + Duration in milliseconds based on sample rate and data length + """ + bytes_per_sample = self.channels * 2 # 2 bytes per PCM16 sample + num_samples = len(self.audio_data) // bytes_per_sample + return int((num_samples / self.sample_rate) * 1000) + + @property + def duration_seconds(self) -> float: + """Calculate audio duration in seconds. + + Returns: + Duration in seconds + """ + return self.duration_ms / 1000.0 + + +class VoiceOptions(BaseModel): + """Configuration options for VoiceLiveAgent. + + Attributes: + voice: Azure voice name (e.g., "en-US-AvaNeural") + temperature: Sampling temperature (0.0-1.0) + max_response_tokens: Maximum tokens in response + enable_vad: Enable server-side voice activity detection + vad_threshold: VAD sensitivity (0.0-1.0, higher = less sensitive) + vad_prefix_padding_ms: Milliseconds of audio before speech to include + vad_silence_duration_ms: Milliseconds of silence to detect end of speech + input_audio_transcription: Enable automatic transcription of user audio + """ + + voice: str = Field( + default="en-US-AvaNeural", + description="Azure voice name (e.g., 'en-US-AvaNeural', 'en-US-JennyNeural')", + ) + temperature: float = Field(default=0.8, ge=0.0, le=1.0, description="Sampling temperature") + max_response_tokens: int | None = Field(default=None, description="Maximum tokens in response") + + # VAD settings + enable_vad: bool = Field(default=True, description="Enable server-side voice activity detection") + vad_threshold: float = Field( + default=0.5, ge=0.0, le=1.0, description="VAD sensitivity (0.0-1.0, higher = less sensitive)" + ) + vad_prefix_padding_ms: int = Field( + default=300, ge=0, description="Milliseconds of audio before speech to include" + ) + vad_silence_duration_ms: int = Field( + default=500, ge=0, description="Milliseconds of silence to detect end of speech" + ) + + # Transcription + input_audio_transcription: bool = Field( + default=True, description="Enable automatic transcription of user audio" + ) diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_agent.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_agent.py new file mode 100644 index 0000000000..f5f63e0e51 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_agent.py @@ -0,0 +1,515 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Azure Voice Live Agent implementation.""" + +import asyncio +from collections.abc import AsyncIterable +from typing import Any + +from agent_framework import AgentRunResponse, AgentRunResponseUpdate, AgentThread, AIFunction, BaseAgent + +from ._voice_live_session import VoiceLiveSession + + +class VoiceLiveAgent(BaseAgent): + """Real-time voice agent using Azure Voice Live SDK. + + This agent enables real-time voice conversations with streaming audio, + server-side voice activity detection, and function calling support. + + Example: + ```python + from agent_framework_azure_voice_live import VoiceLiveAgent + from azure.identity.aio import DefaultAzureCredential + + agent = VoiceLiveAgent( + endpoint="https://YOUR_RESOURCE.openai.azure.com", + model="gpt-4o-realtime-preview", + credential=DefaultAzureCredential(), + voice="en-US-AvaNeural", + instructions="You are a helpful assistant.", + ) + + # Text input -> Voice output + response = await agent.run("Hello!") + + # Voice input -> Voice output + with open("audio.pcm", "rb") as f: + response = await agent.run(f.read()) + ``` + """ + + def __init__( + self, + *, + endpoint: str, + model: str = "gpt-4o-realtime-preview", + credential: Any, + voice: str = "en-US-AvaNeural", + instructions: str | None = None, + tools: list[AIFunction] | None = None, + temperature: float = 0.8, + max_response_tokens: int | None = None, + # VAD settings + enable_vad: bool = True, + vad_threshold: float = 0.5, + vad_prefix_padding_ms: int = 300, + vad_silence_duration_ms: int = 500, + # Audio settings + input_audio_format: str = "pcm16", + output_audio_format: str = "pcm16", + input_audio_transcription: bool = True, + **kwargs: Any, + ) -> None: + """Initialize VoiceLiveAgent. + + Args: + endpoint: Azure OpenAI endpoint (e.g., "https://YOUR_RESOURCE.openai.azure.com") + model: Model deployment name (default: "gpt-4o-realtime-preview") + credential: Azure credential (AzureKeyCredential or TokenCredential) + voice: Azure voice name (default: "en-US-AvaNeural") + instructions: System instructions for the agent + tools: List of AIFunction tools the agent can use + temperature: Sampling temperature (0.0-1.0) + max_response_tokens: Maximum tokens in response + enable_vad: Enable server-side voice activity detection + vad_threshold: VAD sensitivity (0.0-1.0, higher = less sensitive) + vad_prefix_padding_ms: Milliseconds of audio before speech to include + vad_silence_duration_ms: Milliseconds of silence to detect end of speech + input_audio_format: Input audio format (default: "pcm16") + output_audio_format: Output audio format (default: "pcm16") + input_audio_transcription: Enable automatic transcription of user audio + **kwargs: Additional arguments passed to BaseAgent + """ + super().__init__(name=kwargs.pop("name", f"VoiceLiveAgent_{model}"), **kwargs) + + self._endpoint = endpoint + self._model = model + self._credential = credential + self._voice = voice + self._instructions = instructions + self._tools = tools or [] + self._temperature = temperature + self._max_response_tokens = max_response_tokens + + # VAD configuration + self._enable_vad = enable_vad + self._vad_threshold = vad_threshold + self._vad_prefix_padding_ms = vad_prefix_padding_ms + self._vad_silence_duration_ms = vad_silence_duration_ms + + # Audio formats (for future extensibility) + self._input_audio_format = input_audio_format + self._output_audio_format = output_audio_format + self._input_audio_transcription = input_audio_transcription + + # Session management for multi-turn conversations + self._session: Any = None + + async def connect(self) -> None: + """Connect to Azure Voice Live and establish a persistent session. + + This enables multi-turn conversations without recreating the session each time. + Call this before using run() or run_stream(). + """ + if self._session is not None: + return # Already connected + + self._session = VoiceLiveSession( + endpoint=self._endpoint, + model=self._model, + credential=self._credential, + config=self._build_session_config(), + ) + await self._session.__aenter__() + + async def disconnect(self) -> None: + """Disconnect from Azure Voice Live and close the session. + + Call this when done with the conversation. + """ + if self._session is not None: + await self._session.__aexit__(None, None, None) + self._session = None + + async def run( + self, + input: str | bytes, # Text or PCM16 audio bytes + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AgentRunResponse: + """Run agent with text or audio input. + + Args: + input: User input (text string or PCM16 audio bytes) + thread: Optional conversation thread (not yet implemented) + **kwargs: Additional arguments + + Returns: + AgentRunResponse with audio content and transcript + + Example: + ```python + # Single-turn usage (auto-manages session): + response = await agent.run("What's the weather?") + + # Multi-turn usage (persistent session): + await agent.connect() + response1 = await agent.run("What's the weather?") + response2 = await agent.run("How about tomorrow?") + await agent.disconnect() + ``` + """ + # Auto-connect if not already connected + use_temp_session = self._session is None + + if use_temp_session: + # Create temporary session for single-turn use + session = VoiceLiveSession( + endpoint=self._endpoint, + model=self._model, + credential=self._credential, + config=self._build_session_config(), + ) + async with session: + return await self._run_with_session(session, input) + else: + # Use persistent session + return await self._run_with_session(self._session, input) + + async def _run_with_session(self, session: VoiceLiveSession, input: str | bytes) -> AgentRunResponse: + """Internal method to run with a given session.""" + # Send input + if isinstance(input, str): + await session.send_text(input) + await session.create_response() + else: + await session.send_audio(input, commit=self._enable_vad) + if not self._enable_vad: + await session.create_response() + + # Collect response + return await session.collect_response() + + async def run_stream( + self, + input: str | bytes, + *, + thread: AgentThread | None = None, + **kwargs: Any, + ) -> AsyncIterable[AgentRunResponseUpdate]: + """Stream voice responses in real-time. + + Args: + input: User input (text string or PCM16 audio bytes) + thread: Optional conversation thread (not yet implemented) + **kwargs: Additional arguments + + Yields: + AgentRunResponseUpdate objects with audio deltas and transcript deltas + + Example: + ```python + # Single-turn streaming: + async for update in agent.run_stream("Tell me a story"): + if update.additional_properties.get("type") == "audio_delta": + audio_player.play(update.additional_properties["audio_data"]) + + # Multi-turn streaming: + await agent.connect() + async for update in agent.run_stream("Hello"): + process_update(update) + async for update in agent.run_stream("Tell me more"): + process_update(update) + await agent.disconnect() + ``` + """ + # Auto-connect if not already connected + use_temp_session = self._session is None + + if use_temp_session: + # Create temporary session for single-turn use + session = VoiceLiveSession( + endpoint=self._endpoint, + model=self._model, + credential=self._credential, + config=self._build_session_config(), + ) + async with session: + async for update in self._run_stream_with_session(session, input): + yield update + else: + # Use persistent session + async for update in self._run_stream_with_session(self._session, input): + yield update + + async def _run_stream_with_session( + self, session: VoiceLiveSession, input: str | bytes + ) -> AsyncIterable[AgentRunResponseUpdate]: + """Internal method to stream with a given session.""" + # Send input + if isinstance(input, str): + await session.send_text(input) + await session.create_response() + else: + await session.send_audio(input, commit=self._enable_vad) + if not self._enable_vad: + await session.create_response() + + # Stream response updates + async for update in session.stream_response(): + yield update + + async def stream_audio_chunk(self, audio_chunk: bytes) -> None: + """Stream a single audio chunk for continuous input. + + This is used for streaming conversations where audio is captured + continuously from the microphone and sent in real-time, rather than + recording a complete buffer first. + + Args: + audio_chunk: Raw PCM16 audio chunk (24kHz, mono, 16-bit) + + Raises: + RuntimeError: If session is not connected + + Example: + ```python + await agent.connect() + + # Continuously stream audio chunks + while recording: + chunk = mic.read_chunk() + await agent.stream_audio_chunk(chunk) + + await agent.disconnect() + ``` + """ + if not self._session: + raise RuntimeError("Must call connect() before streaming audio chunks") + + await self._session.send_audio_chunk(audio_chunk) + + async def listen_to_responses(self) -> AsyncIterable[AgentRunResponseUpdate]: + """Listen to all response events continuously. + + This streams all events from the session, not just a single response. + Used for streaming conversations where the agent can respond multiple + times as the user speaks. + + Automatically handles function calls in the background. + + Yields: + AgentRunResponseUpdate objects for each event + + Raises: + RuntimeError: If session is not connected + + Example: + ```python + await agent.connect() + + async for update in agent.listen_to_responses(): + if update.additional_properties.get("type") == "audio_delta": + audio_player.play(update.additional_properties["audio_data"]) + elif update.additional_properties.get("type") == "transcript_delta": + print(update.additional_properties["text"], end="") + + await agent.disconnect() + ``` + """ + if not self._session: + raise RuntimeError("Must call connect() before listening to responses") + + pending_function_call = None + + async for update in self._session.stream_all_events(): + props = update.additional_properties or {} + event_type = props.get("type") + + # Handle function calls automatically + if event_type == "function_call": + print(f"\n[DEBUG] Function call detected: {props.get('name')}") + # Store pending function call to execute after response_done + pending_function_call = { + "call_id": props.get("call_id"), + "name": props.get("name"), + "arguments": props.get("arguments") + } + + # When response is done, execute any pending function call + elif event_type == "response_complete" and pending_function_call: + print(f"[DEBUG] Response done, executing pending function call") + # Execute function and create new response + asyncio.create_task(self._handle_function_call_after_response( + pending_function_call["call_id"], + pending_function_call["name"], + pending_function_call["arguments"] + )) + pending_function_call = None + + yield update + + async def _handle_function_call_after_response(self, call_id: str, name: str, arguments: str) -> None: + """Handle function call execution after response is done, then trigger new response. + + Args: + call_id: Function call ID + name: Function name + arguments: JSON string of arguments + """ + import json + + print(f"[DEBUG] Executing function: {name} with call_id={call_id}, args={arguments}") + + try: + # Parse arguments + args_dict = json.loads(arguments) if arguments else {} + + # Find the function + function = None + for tool in self._tools: + if tool.name == name: + function = tool + break + + if not function: + result = f"Error: Function '{name}' not found" + print(f"[DEBUG] Function not found: {name}") + else: + # Execute the function + print(f"[DEBUG] Calling function {name} with args: {args_dict}") + result = await function(**args_dict) + print(f"[DEBUG] Function {name} returned: {result}") + + # Send result back + print(f"[DEBUG] Sending function result for call_id={call_id}") + await self._session.send_function_result(call_id, str(result)) + print(f"[DEBUG] Function result sent successfully") + + # Now trigger a new response to process the function result + # This is safe because we waited for RESPONSE_DONE + print(f"[DEBUG] Creating new response to process function result") + await self._session.create_response() + print(f"[DEBUG] New response created") + + except Exception as e: + error_msg = f"Error executing {name}: {e}" + print(f"[DEBUG] Exception in function execution: {e}") + import traceback + traceback.print_exc() + try: + await self._session.send_function_result(call_id, error_msg) + await self._session.create_response() + except Exception as e2: + print(f"[DEBUG] Failed to send error result: {e2}") + + async def cancel_response(self) -> None: + """Cancel the ongoing agent response. + + This is used for interruption handling - when the user starts speaking + while the agent is responding, call this to stop the agent's response. + + Raises: + RuntimeError: If session is not connected + + Example: + ```python + await agent.connect() + + # If user interrupts, cancel the response + if user_started_speaking and agent_is_speaking: + await agent.cancel_response() + + await agent.disconnect() + ``` + """ + if not self._session: + raise RuntimeError("Must call connect() before canceling response") + + await self._session.cancel_response() + + def _build_session_config(self) -> Any: + """Build Azure Voice Live session configuration. + + Returns: + RequestSession configuration object + """ + from azure.ai.voicelive.models import ( + AzureStandardVoice, + InputAudioFormat, + Modality, + OutputAudioFormat, + RequestSession, + ServerVad, + ) + + # Configure VAD + turn_detection = None + if self._enable_vad: + turn_detection = ServerVad( + threshold=self._vad_threshold, + prefix_padding_ms=self._vad_prefix_padding_ms, + silence_duration_ms=self._vad_silence_duration_ms, + ) + + # Configure transcription + input_audio_transcription = None + if self._input_audio_transcription: + input_audio_transcription = {"model": "whisper-1"} + + # Build session config + return RequestSession( + modalities=[Modality.TEXT, Modality.AUDIO], + instructions=self._instructions, + voice=AzureStandardVoice(name=self._voice), + input_audio_format=InputAudioFormat.PCM16, + output_audio_format=OutputAudioFormat.PCM16, + input_audio_transcription=input_audio_transcription, + turn_detection=turn_detection, + tools=self._convert_tools_to_azure_format(), + temperature=self._temperature, + max_response_output_tokens=self._max_response_tokens, + ) + + def _convert_tools_to_azure_format(self) -> list[Any]: + """Convert AIFunction tools to Azure Voice Live format. + + Returns: + List of FunctionTool objects in Azure format + """ + from azure.ai.voicelive.models import FunctionTool + + azure_tools = [] + + for tool in self._tools: + # Get the JSON schema from the tool + if hasattr(tool, 'to_json_schema_spec'): + schema = tool.to_json_schema_spec() + + # Extract function details from schema + func_spec = schema.get('function', {}) + + # Create Azure FunctionTool using dict-style assignment + azure_tool = FunctionTool() + azure_tool['type'] = 'function' + azure_tool['name'] = func_spec.get('name', tool.name) + azure_tool['description'] = func_spec.get('description', '') + azure_tool['parameters'] = func_spec.get('parameters', {}) + + azure_tools.append(azure_tool) + print(f"[DEBUG] Tool converted: {tool.name} -> {dict(azure_tool)}") + else: + # Fallback for non-AIFunction tools + azure_tool = FunctionTool() + azure_tool['type'] = 'function' + azure_tool['name'] = getattr(tool, 'name', 'unknown') + azure_tool['description'] = getattr(tool, 'description', '') + azure_tool['parameters'] = tool.parameters() if callable(getattr(tool, 'parameters', None)) else {} + + azure_tools.append(azure_tool) + print(f"[DEBUG] Tool converted (fallback): {dict(azure_tool)}") + + print(f"[DEBUG] Total tools converted: {len(azure_tools)}") + return azure_tools diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_session.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_session.py new file mode 100644 index 0000000000..e6741bae56 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_voice_live_session.py @@ -0,0 +1,264 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Voice Live session management.""" + +import base64 +from collections.abc import AsyncIterable +from typing import Any + +from agent_framework import AgentRunResponse, AgentRunResponseUpdate, ChatMessage, Role + +from ._event_processor import EventProcessor +from ._types import AudioContent + + +class VoiceLiveSession: + """Manages Azure Voice Live WebSocket session. + + This class wraps the Azure Voice Live SDK connection and provides a simplified + interface for sending/receiving audio and handling events. + """ + + def __init__( + self, + endpoint: str, + model: str, + credential: Any, + config: Any, + ) -> None: + """Initialize voice live session. + + Args: + endpoint: Azure OpenAI endpoint URL + model: Model deployment name (e.g., "gpt-4o-realtime-preview") + credential: Azure credential (AzureKeyCredential or TokenCredential) + config: RequestSession configuration object + """ + self._endpoint = endpoint + self._model = model + self._credential = credential + self._config = config + self._connection: Any = None + self._connection_context: Any = None + self._event_processor = EventProcessor() + self._response_started = False # Track if response has been started + + async def __aenter__(self) -> "VoiceLiveSession": + """Connect to Azure Voice Live.""" + from azure.ai.voicelive.aio import connect + + # Establish WebSocket connection - store the context manager + self._connection_context = connect( + endpoint=self._endpoint, credential=self._credential, model=self._model + ) + self._connection = await self._connection_context.__aenter__() + + # Configure session + await self._connection.session.update(session=self._config) + + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + """Disconnect from Azure Voice Live.""" + print("[DEBUG] Closing session...") + if hasattr(self, "_connection_context") and self._connection_context: + await self._connection_context.__aexit__(exc_type, exc_val, exc_tb) + self._connection = None + self._connection_context = None + print("[DEBUG] Session closed") + + async def send_text(self, text: str) -> None: + """Send text input to conversation. + + Args: + text: User message text + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + from azure.ai.voicelive.models import UserMessageItem + + # Create message item using UserMessageItem + item = UserMessageItem() + item["content"] = [{"type": "input_text", "text": text}] + await self._connection.conversation.item.create(item=item) + + async def send_audio(self, audio_bytes: bytes, commit: bool = True) -> None: + """Send audio input (PCM16 bytes). + + Args: + audio_bytes: Raw PCM16 audio bytes (24kHz, mono, 16-bit) + commit: Whether to commit the buffer after appending (default: True) + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + # Encode to base64 + audio_b64 = base64.b64encode(audio_bytes).decode("utf-8") + + # Append to buffer + await self._connection.input_audio_buffer.append(audio=audio_b64) + + # Optionally commit buffer + if commit: + await self._connection.input_audio_buffer.commit() + + async def send_audio_chunk(self, audio_chunk: bytes) -> None: + """Send a single audio chunk for streaming input. + + This appends audio to the buffer without committing, allowing for + continuous streaming. Server VAD will automatically detect speech + boundaries and trigger responses. + + Args: + audio_chunk: Raw PCM16 audio chunk (24kHz, mono, 16-bit) + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + # Encode to base64 + audio_b64 = base64.b64encode(audio_chunk).decode("utf-8") + + # Append to buffer (no commit - let VAD handle it) + await self._connection.input_audio_buffer.append(audio=audio_b64) + + async def create_response(self) -> None: + """Manually trigger response generation. + + This is useful when not using server-side VAD or when you want to + control exactly when the agent responds. + + If VAD already started a response, this will be skipped. + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + # Only create response if one hasn't been started yet (by VAD or otherwise) + if not self._response_started: + print(f"[DEBUG] Calling response.create() manually") + await self._connection.response.create() + else: + print(f"[DEBUG] Skipping response.create() - response already started") + + async def cancel_response(self) -> None: + """Cancel ongoing response. + + This is useful for implementing interruption handling when the user + starts speaking while the agent is responding. + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + await self._connection.response.cancel() + + async def send_function_result(self, call_id: str, output: str) -> None: + """Send function call result back to the model. + + Args: + call_id: Function call ID from the function_call event + output: Function execution result (as string) + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + from azure.ai.voicelive.models import FunctionCallOutputItem + + # Create function call output item + item = FunctionCallOutputItem(call_id=call_id, output=output) + await self._connection.conversation.item.create(item=item) + + async def stream_response(self) -> AsyncIterable[AgentRunResponseUpdate]: + """Stream response events as AgentRunResponseUpdate. + + Yields: + AgentRunResponseUpdate objects for each relevant event + + Raises: + RuntimeError: If session is not connected + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + from azure.ai.voicelive.models import ServerEventType + + async for event in self._connection: + # Track if response has started + if event.type == ServerEventType.RESPONSE_CREATED: + self._response_started = True + + update = self._event_processor.process_event(event) + if update: + yield update + + # Stop on response done + if event.type == ServerEventType.RESPONSE_DONE: + self._response_started = False # Reset for next turn + break + + async def stream_all_events(self) -> AsyncIterable[AgentRunResponseUpdate]: + """Stream all events continuously (not just one response). + + Unlike stream_response() which stops after one response_done, + this continues streaming all events for the session lifetime. + This is used for streaming conversations with multiple turns. + + Yields: + AgentRunResponseUpdate objects for each event + + Raises: + RuntimeError: If session is not connected + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + from azure.ai.voicelive.models import ServerEventType + + async for event in self._connection: + # Track response state + if event.type == ServerEventType.RESPONSE_CREATED: + self._response_started = True + elif event.type == ServerEventType.RESPONSE_DONE: + self._response_started = False + + update = self._event_processor.process_event(event) + if update: + yield update + + async def collect_response(self) -> AgentRunResponse: + """Collect complete response (non-streaming). + + Returns: + AgentRunResponse with complete audio and transcript + + Raises: + RuntimeError: If session is not connected + """ + if not self._connection: + raise RuntimeError("Session not connected. Use async with context manager.") + + from azure.ai.voicelive.models import ServerEventType + + audio_chunks: list[bytes] = [] + transcript = "" + + async for event in self._connection: + event_type = event.type + + if event_type == ServerEventType.RESPONSE_AUDIO_DELTA: + if hasattr(event, "delta") and event.delta: + audio_chunks.append(event.delta) + + elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA: + if hasattr(event, "delta") and event.delta: + transcript += event.delta + + elif event_type == ServerEventType.RESPONSE_DONE: + # Response complete + break + + # Build response + audio_content = AudioContent(audio_data=b"".join(audio_chunks), transcript=transcript) + + message = ChatMessage(role=Role.ASSISTANT, contents=[audio_content]) + + return AgentRunResponse(messages=[message]) diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/__init__.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/__init__.py new file mode 100644 index 0000000000..331e997884 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Web integration for browser-based voice chat.""" + +from ._websocket_handler import VoiceWebSocketHandler + +__all__ = ["VoiceWebSocketHandler"] diff --git a/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/websocket_handler.py b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/websocket_handler.py new file mode 100644 index 0000000000..3797509154 --- /dev/null +++ b/python/packages/azure-voice-live/agent_framework_azure_voice_live/_web/websocket_handler.py @@ -0,0 +1,168 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""WebSocket handler for browser voice connections.""" + +import asyncio +import json +from typing import Any + +from .._voice_live_agent import VoiceLiveAgent +from .._voice_live_session import VoiceLiveSession + + +class VoiceWebSocketHandler: + """Handle browser WebSocket connections for voice chat. + + This handler bridges browser WebSocket connections to Azure Voice Live, + enabling web-based voice interfaces. + + Example: + ```python + from fastapi import FastAPI, WebSocket + from agent_framework_azure_voice_live import VoiceLiveAgent + from agent_framework_azure_voice_live._web import VoiceWebSocketHandler + + app = FastAPI() + agent = VoiceLiveAgent(...) + handler = VoiceWebSocketHandler(agent) + + @app.websocket("/voice") + async def voice_endpoint(websocket: WebSocket): + await handler.handle_connection(websocket) + ``` + """ + + def __init__(self, agent: VoiceLiveAgent) -> None: + """Initialize WebSocket handler. + + Args: + agent: VoiceLiveAgent instance to handle voice conversations + """ + self._agent = agent + + async def handle_connection(self, websocket: Any) -> None: + """Handle WebSocket connection from browser. + + Args: + websocket: FastAPI WebSocket instance + """ + await websocket.accept() + + # Create session + session = VoiceLiveSession( + endpoint=self._agent._endpoint, + model=self._agent._model, + credential=self._agent._credential, + config=self._agent._build_session_config(), + ) + + try: + async with session: + # Bidirectional streaming + receive_task = asyncio.create_task(self._receive_from_browser(websocket, session)) + send_task = asyncio.create_task(self._send_to_browser(websocket, session)) + + # Wait for both tasks + await asyncio.gather(receive_task, send_task, return_exceptions=True) + + except Exception as e: + # Send error to browser + try: + await websocket.send_json({"type": "error", "message": str(e)}) + except Exception: + pass # WebSocket may already be closed + + finally: + # Close WebSocket + try: + await websocket.close() + except Exception: + pass # Already closed + + async def _receive_from_browser(self, websocket: Any, session: VoiceLiveSession) -> None: + """Receive audio from browser and forward to Azure. + + Args: + websocket: FastAPI WebSocket instance + session: VoiceLiveSession instance + """ + try: + while True: + message = await websocket.receive() + + if "bytes" in message: + # Browser sends PCM16 audio chunks + await session.send_audio(message["bytes"]) + + elif "text" in message: + # Handle control messages + try: + data = json.loads(message["text"]) + message_type = data.get("type") + + if message_type == "text": + # Text message from user + await session.send_text(data.get("text", "")) + await session.create_response() + + elif message_type == "cancel": + # Cancel ongoing response + await session.cancel_response() + + elif message_type == "trigger": + # Manually trigger response + await session.create_response() + + except json.JSONDecodeError: + pass # Ignore malformed JSON + + except Exception as e: + print(f"Error receiving from browser: {e}") + + async def _send_to_browser(self, websocket: Any, session: VoiceLiveSession) -> None: + """Receive from Azure and forward to browser. + + Args: + websocket: FastAPI WebSocket instance + session: VoiceLiveSession instance + """ + try: + from azure.ai.voicelive.models import ServerEventType + + async for event in session._connection: + event_type = event.type + + if event_type == ServerEventType.RESPONSE_AUDIO_DELTA: + # Send audio chunk to browser + if hasattr(event, "delta") and event.delta: + await websocket.send_bytes(event.delta) + + elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA: + # Send transcript delta + if hasattr(event, "delta") and event.delta: + await websocket.send_json({"type": "transcript", "text": event.delta}) + + elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED: + # User started speaking + await websocket.send_json({"type": "speech_started"}) + + elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED: + # User stopped speaking + await websocket.send_json({"type": "speech_stopped"}) + + elif event_type == ServerEventType.RESPONSE_CREATED: + # Response started + response_id = event.response.id if hasattr(event.response, "id") else None + await websocket.send_json({"type": "response_started", "response_id": response_id}) + + elif event_type == ServerEventType.RESPONSE_DONE: + # Response complete + await websocket.send_json({"type": "response_complete"}) + + elif event_type == ServerEventType.ERROR: + # Error event + error_msg = str(event.error) if hasattr(event, "error") else "Unknown error" + await websocket.send_json({"type": "error", "message": error_msg}) + + except Exception as e: + print(f"Error sending to browser: {e}") diff --git a/python/packages/azure-voice-live/examples/streaming_voice_chat.py b/python/packages/azure-voice-live/examples/streaming_voice_chat.py new file mode 100644 index 0000000000..459ca47ab5 --- /dev/null +++ b/python/packages/azure-voice-live/examples/streaming_voice_chat.py @@ -0,0 +1,483 @@ +"""Real-time streaming voice chat with interruption support. + +This example demonstrates: +- Continuous microphone streaming (not buffered) +- Real-time audio playback +- Interruption support (user can interrupt agent) +- Server-side Voice Activity Detection (VAD) +- Persistent session across conversation +""" + +import asyncio +import os +import queue +import sys +import threading +from datetime import datetime + +import pyaudio +from dotenv import load_dotenv + +from agent_framework import ai_function, ChatAgent +from agent_framework.openai import OpenAIResponsesClient +from agent_framework_azure_voice_live import VoiceLiveAgent +from azure.core.credentials import AzureKeyCredential + +# Load environment variables +load_dotenv() + +# Fix console encoding for Windows +if sys.platform == "win32": + sys.stdout.reconfigure(encoding="utf-8") + +# Audio settings for Azure Voice Live (PCM16, 24kHz, mono) +SAMPLE_RATE = 24000 +CHANNELS = 1 +CHUNK_SIZE = 4800 # 200ms chunks (24000 * 0.2) +FORMAT = pyaudio.paInt16 + + +# Initialize text agent for complex queries +text_agent = None + +def init_text_agent(): + """Initialize the GPT-4.1 text agent for expert queries.""" + global text_agent + + openai_api_key = os.getenv("AZURE_OPENAI_API_KEY") + openai_base_url = os.getenv("AZURE_OPENAI_BASE_URL") + model_id = os.getenv("AZURE_OPENAI_MODEL_ID", "gpt-4o") + + + if not openai_api_key or not openai_base_url: + print("⚠️ AZURE_OPENAI_API_KEY or AZURE_OPENAI_BASE_URL not found - expert agent disabled") + return None + + # Create OpenAI Responses client (works with Azure when base_url is set) + client = OpenAIResponsesClient( + model_id=model_id, + base_url=openai_base_url, + api_key=openai_api_key, + ) + + # Wrap in ChatAgent for proper agent functionality + text_agent = ChatAgent( + chat_client=client, + name="expert-analyst", + instructions=( + "You are an expert analyst with deep knowledge across many domains. " + "Provide detailed, accurate, and well-reasoned responses. " + "Be concise but thorough. Focus on facts and clarity." + ), + temperature=0.7, + ) + print(f"✅ Expert text agent initialized ({model_id})") + return text_agent + + +# Define AI functions +@ai_function +async def get_weather(location: str) -> str: + """Get current weather for a location. + + Args: + location: City name or location + + Returns: + Weather description + """ + weather_data = { + "seattle": "Rainy, 55°F", + "san francisco": "Foggy, 62°F", + "new york": "Sunny, 68°F", + "london": "Cloudy, 59°F", + "tokyo": "Clear, 72°F", + } + weather = weather_data.get(location.lower(), "Sunny, 70°F") + print(f"\n 🔧 [Function Call] get_weather('{location}') -> {weather}") + return f"The weather in {location} is {weather}." + + +@ai_function +async def get_current_time(timezone: str = "UTC") -> str: + """Get current time. + + Args: + timezone: Timezone name + + Returns: + Current time string + """ + now = datetime.now() + time_str = now.strftime("%I:%M %p") + print(f"\n 🔧 [Function Call] get_current_time('{timezone}') -> {time_str}") + return f"The current time is {time_str} {timezone}" + + +@ai_function +async def ask_expert(query: str) -> str: + """Ask the expert text agent for detailed analysis or complex questions. + + Use this function when the user asks: + - Complex questions requiring deep reasoning + - Technical queries needing detailed explanations + - Research or analytical questions + - Questions outside your immediate knowledge + + Args: + query: The question to ask the expert agent + + Returns: + Detailed expert response + """ + if text_agent is None: + return "Expert agent is not available. Please check AZURE_OPENAI_API_KEY and AZURE_OPENAI_ENDPOINT configuration." + + print(f"\n 🧠 [Expert Agent] Processing query: {query[:100]}...") + + try: + # Send query to expert agent using run() + response = await text_agent.run(query) + + # Extract text from response messages + if response.messages: + expert_answer = "" + for message in response.messages: + if message.contents: + for content in message.contents: + if hasattr(content, 'text'): + expert_answer += content.text + + print(f" ✅ [Expert Agent] Response received ({len(expert_answer)} chars)") + return expert_answer if expert_answer else "Expert agent returned an empty response." + else: + return "Expert agent returned an empty response." + + except Exception as e: + error_msg = f"Expert agent error: {str(e)}" + print(f" ❌ [Expert Agent] {error_msg}") + import traceback + traceback.print_exc() + return error_msg + + +class ConversationState: + """Track conversation state for interruption handling.""" + + def __init__(self): + self.running = True + self.user_is_speaking = False + self.agent_is_speaking = False + self.interrupted = False + + +class AudioPlayer: + """Handles real-time audio playback with interruption support.""" + + def __init__(self): + self.audio = pyaudio.PyAudio() + self.stream = None + self.queue = queue.Queue() + self.playing = False + self.chunks_played = 0 + + def start(self): + """Start the audio playback stream.""" + self.stream = self.audio.open( + format=FORMAT, + channels=CHANNELS, + rate=SAMPLE_RATE, + output=True, + frames_per_buffer=CHUNK_SIZE, + ) + self.playing = True + self.playback_thread = threading.Thread(target=self._playback_worker) + self.playback_thread.daemon = True + self.playback_thread.start() + print("🔊 Audio player started") + + def _playback_worker(self): + """Worker thread for playing audio chunks.""" + while self.playing: + try: + chunk = self.queue.get(timeout=0.1) + if chunk is not None: + self.stream.write(chunk) + self.chunks_played += 1 + except queue.Empty: + continue + except Exception as e: + print(f"\n⚠️ Error playing audio: {e}") + + def play(self, audio_chunk: bytes): + """Queue audio chunk for playback.""" + if self.playing: + self.queue.put(audio_chunk) + + def clear_buffer(self): + """Clear playback buffer (for interruption).""" + while not self.queue.empty(): + try: + self.queue.get_nowait() + except queue.Empty: + break + print("\n🛑 Playback buffer cleared (interrupted)") + + def stop(self): + """Stop playback and clean up.""" + self.playing = False + if hasattr(self, 'playback_thread'): + self.playback_thread.join(timeout=2.0) + if self.stream: + self.stream.stop_stream() + self.stream.close() + self.audio.terminate() + print(f"\n🛑 Audio player stopped. Played {self.chunks_played} chunks.") + + +class MicrophoneStreamer: + """Handles continuous microphone streaming.""" + + def __init__(self): + self.audio = pyaudio.PyAudio() + self.stream = None + self.streaming = False + + def start(self): + """Start the microphone streaming.""" + self.stream = self.audio.open( + format=FORMAT, + channels=CHANNELS, + rate=SAMPLE_RATE, + input=True, + frames_per_buffer=CHUNK_SIZE, + ) + self.streaming = True + print("🎤 Microphone streaming started") + + def read_chunk(self) -> bytes: + """Read a chunk of audio from the microphone.""" + if self.streaming and self.stream: + try: + return self.stream.read(CHUNK_SIZE, exception_on_overflow=False) + except Exception as e: + print(f"\n⚠️ Error reading microphone: {e}") + return b"" + return b"" + + def stop(self): + """Stop streaming and clean up.""" + self.streaming = False + if self.stream: + self.stream.stop_stream() + self.stream.close() + self.audio.terminate() + print("🛑 Microphone streaming stopped") + + +async def audio_capture_loop(agent: VoiceLiveAgent, mic: MicrophoneStreamer, state: ConversationState): + """Continuously stream microphone audio to agent.""" + print("📡 Audio capture loop started") + + while state.running: + # Read chunk from microphone + chunk = await asyncio.to_thread(mic.read_chunk) + + if chunk and len(chunk) > 0: + # Stream to agent immediately + await agent.stream_audio_chunk(chunk) + + await asyncio.sleep(0.01) # Prevent tight loop + + print("📡 Audio capture loop stopped") + + +async def audio_playback_loop(agent: VoiceLiveAgent, player: AudioPlayer, state: ConversationState): + """Play agent audio responses in real-time.""" + print("🔊 Audio playback loop started") + + try: + async for update in agent.listen_to_responses(): + if not state.running: + break + + props = update.additional_properties or {} + update_type = props.get("type") + + if update_type == "response_started": + state.agent_is_speaking = True + state.interrupted = False + print("\n🤖 [Agent speaking...]", end="", flush=True) + + elif update_type == "audio_delta": + # Stream audio to speaker immediately + if not state.interrupted: + audio_data = props.get("audio_data") + if audio_data: + player.play(audio_data) + print("▶️", end="", flush=True) + + elif update_type == "transcript_delta": + # Display transcript + text = props.get("text") + if text: + print(text, end="", flush=True) + + elif update_type == "response_done": + state.agent_is_speaking = False + print("\n✅ [Agent finished]") + + elif update_type == "input_transcription_complete": + # Show what the user said + user_transcript = props.get("transcript") + if user_transcript: + print(f"\n👤 [You]: {user_transcript}") + + elif update_type == "speech_started": + # User started speaking + if state.agent_is_speaking: + # User interrupted the agent + print("\n\n⚠️ [User interrupted agent]") + state.interrupted = True + await agent.cancel_response() + player.clear_buffer() + + elif update_type == "error": + error_msg = props.get("error") + # Suppress harmless errors + if isinstance(error_msg, dict) and error_msg.get("code") == "conversation_already_has_active_response": + continue + print(f"\n❌ Error: {error_msg}") + + except Exception as e: + print(f"\n❌ Playback loop error: {e}") + import traceback + traceback.print_exc() + + print("🔊 Audio playback loop stopped") + + +async def streaming_conversation(agent: VoiceLiveAgent): + """Run a streaming conversation with interruption support. + + Args: + agent: The VoiceLiveAgent instance (must be connected) + """ + print("\n" + "=" * 70) + print("🎤 Starting streaming voice conversation...") + print("💡 You can interrupt the agent anytime by speaking!") + print("🛑 Press Ctrl+C to stop the conversation") + print("=" * 70) + + # Initialize components + mic = MicrophoneStreamer() + player = AudioPlayer() + state = ConversationState() + + try: + # Start audio I/O + mic.start() + player.start() + + print("\n🎤 Listening... Start speaking anytime!") + print("🤖 Agent will respond automatically when you stop speaking.\n") + + # Run concurrent loops indefinitely (until Ctrl+C) + await asyncio.gather( + audio_capture_loop(agent, mic, state), + audio_playback_loop(agent, player, state), + ) + + except KeyboardInterrupt: + print("\n\n⚠️ Interrupted by user (Ctrl+C)") + finally: + # Stop everything + state.running = False + await asyncio.sleep(0.5) # Let loops finish + mic.stop() + player.stop() + + print("=" * 70) + print("✅ Streaming conversation complete!") + print("=" * 70) + + +async def main(): + """Run the streaming voice chat demo.""" + # Get required environment variables + api_key = os.getenv("AZURE_VOICELIVE_API_KEY") + endpoint = os.getenv("AZURE_VOICELIVE_ENDPOINT") + + if not api_key or not endpoint: + raise ValueError( + "AZURE_VOICELIVE_API_KEY and AZURE_VOICELIVE_ENDPOINT " + "environment variables are required." + ) + + # Initialize text agent for expert queries + print("🔧 Initializing expert text agent...") + init_text_agent() + print(await ask_expert("Test initialization")) + + # Prepare tools list + tools = [get_weather, get_current_time] + if text_agent is not None: + tools.append(ask_expert) + print("✅ Expert agent added to tools") + + # Create agent with VAD enabled (required for streaming) + agent = VoiceLiveAgent( + endpoint=endpoint, + credential=AzureKeyCredential(api_key), + model=os.getenv("AZURE_VOICELIVE_MODEL", "gpt-4o-realtime-preview"), + voice=os.getenv("AZURE_VOICELIVE_VOICE", "en-US-AvaNeural"), + instructions=( + "You are a helpful and friendly voice assistant. " + "You can check the weather, tell the time, and answer complex questions. " + "\n\n" + "For simple queries (weather, time, greetings), answer directly. " + "For complex questions requiring deep analysis, technical explanations, " + "or research, use the ask_expert function to get detailed answers. " + "\n\n" + "Keep your responses concise and conversational. " + "If interrupted, acknowledge and adapt naturally." + ), + tools=tools, + temperature=0.8, + enable_vad=True, # REQUIRED for streaming and interruption + vad_threshold=0.5, + vad_silence_duration_ms=500, + input_audio_transcription=True, + ) + + print("=" * 70) + print("🎙️ Streaming Voice Chat Demo (with Expert Agent)") + print("=" * 70) + print("\nThis demo streams audio in real-time with:") + print(" • Continuous microphone input") + print(" • Real-time agent responses") + print(" • Interruption support (speak over the agent)") + print(" • Server-side Voice Activity Detection") + if text_agent: + print(" • Expert GPT-4 agent for complex queries") + print("=" * 70) + + # Connect to persistent session + print("\n🔌 Connecting to Azure Voice Live...") + await agent.connect() + print("✅ Connected! Session is ready.") + + try: + # Run streaming conversation (runs until Ctrl+C) + await streaming_conversation(agent) + finally: + # Disconnect session + print("\n🔌 Disconnecting from Azure Voice Live...") + await agent.disconnect() + print("✅ Disconnected!") + + print("\n✨ Demo complete!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/packages/azure-voice-live/pyproject.toml b/python/packages/azure-voice-live/pyproject.toml new file mode 100644 index 0000000000..3862bc2fee --- /dev/null +++ b/python/packages/azure-voice-live/pyproject.toml @@ -0,0 +1,84 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "agent-framework-azure-voice-live" +version = "0.1.0" +description = "Azure Voice Live integration for Microsoft Agent Framework - Real-time voice conversations with streaming audio" +readme = "README.md" +license = { text = "MIT" } +requires-python = ">=3.10" +authors = [ + { name = "Microsoft", email = "noreply@microsoft.com" }, +] +keywords = [ + "azure", + "voice", + "realtime", + "audio", + "ai", + "agent", + "streaming", +] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "agent-framework>=0.1.0", + "azure-ai-voicelive", # Azure Voice Live SDK (preview) + "azure-identity>=1.14.0", + "azure-core>=1.29.0", +] + +[project.optional-dependencies] +web = [ + "fastapi>=0.104.0", + "uvicorn>=0.24.0", + "websockets>=12.0", +] +dev = [ + "pytest>=7.4.0", + "pytest-asyncio>=0.21.0", + "pytest-cov>=4.1.0", + "black>=23.0.0", + "ruff>=0.1.0", + "mypy>=1.5.0", +] + +[project.urls] +Homepage = "https://github.com/microsoft/agent-framework" +Documentation = "https://learn.microsoft.com/agent-framework/" +Repository = "https://github.com/microsoft/agent-framework" +Issues = "https://github.com/microsoft/agent-framework/issues" + +[tool.hatch.build.targets.wheel] +packages = ["agent_framework_azure_voice_live"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] +python_files = "test_*.py" +python_classes = "Test*" +python_functions = "test_*" + +[tool.black] +line-length = 120 +target-version = ["py310", "py311", "py312"] + +[tool.ruff] +line-length = 120 +target-version = "py310" + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true