Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions python/packages/azure-voice-live/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Azure Voice Live Agent

Real-time voice conversation support for Microsoft Agent Framework using Azure Voice Live SDK.

## Features

- **Real-time Voice Streaming**: Bidirectional audio streaming with PCM16 @ 24kHz
- **Server-side VAD**: Automatic voice activity detection for natural turn-taking
- **Function Calling**: Tool invocation during voice conversations with automatic execution
- **Multi-Agent Support**: Voice agent can delegate complex queries to text-based agents
- **Interruption Support**: User can interrupt agent responses naturally
- **Audio + Text**: Support for voice input/output with automatic transcription
- **Web Integration**: WebSocket support for browser-based voice interfaces
- **Streaming Responses**: Stream audio and text transcripts in real-time

## Installation

```bash
pip install agent-framework-azure-voice-live
```

For web support:
```bash
pip install agent-framework-azure-voice-live[web]
```

## License

MIT License - Copyright (c) Microsoft Corporation

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) Microsoft. All rights reserved.

"""Azure Voice Live integration for Microsoft Agent Framework.

This package provides real-time voice conversation capabilities using Azure Voice Live SDK.
"""

from ._types import AudioContent, VoiceOptions
from ._voice_live_agent import VoiceLiveAgent
from ._voice_live_session import VoiceLiveSession

__all__ = [
"VoiceLiveAgent",
"VoiceLiveSession",
"AudioContent",
"VoiceOptions",
]

__version__ = "0.1.0"
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright (c) Microsoft. All rights reserved.

"""Audio utilities for encoding, decoding, and file I/O."""

import base64
import wave
from typing import BinaryIO
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'BinaryIO' is not used.

Suggested change
from typing import BinaryIO

Copilot uses AI. Check for mistakes.


class AudioUtils:
"""Utilities for audio encoding/decoding and file I/O."""

@staticmethod
def encode_pcm16_to_base64(audio_bytes: bytes) -> str:
"""Encode PCM16 audio bytes to base64 string.

Args:
audio_bytes: Raw PCM16 audio bytes

Returns:
Base64-encoded string
"""
return base64.b64encode(audio_bytes).decode("utf-8")

@staticmethod
def decode_base64_to_pcm16(audio_b64: str) -> bytes:
"""Decode base64 string to PCM16 audio bytes.

Args:
audio_b64: Base64-encoded audio string

Returns:
Raw PCM16 audio bytes
"""
return base64.b64decode(audio_b64)

@staticmethod
def save_to_wav(
audio_bytes: bytes, file_path: str, sample_rate: int = 24000, channels: int = 1
) -> None:
"""Save PCM16 audio to WAV file.

Args:
audio_bytes: Raw PCM16 audio bytes
file_path: Path to output WAV file
sample_rate: Audio sample rate in Hz
channels: Number of audio channels (1=mono, 2=stereo)
"""
with wave.open(file_path, "wb") as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(2) # 2 bytes for PCM16
wav_file.setframerate(sample_rate)
wav_file.writeframes(audio_bytes)

@staticmethod
def load_from_wav(file_path: str) -> tuple[bytes, int, int]:
"""Load PCM16 audio from WAV file.

Args:
file_path: Path to input WAV file

Returns:
Tuple of (audio_bytes, sample_rate, channels)

Raises:
ValueError: If WAV file is not PCM16 format
"""
with wave.open(file_path, "rb") as wav_file:
# Validate format
if wav_file.getsampwidth() != 2:
raise ValueError(f"WAV file must be PCM16 (16-bit), got {wav_file.getsampwidth() * 8}-bit")

channels = wav_file.getnchannels()
sample_rate = wav_file.getframerate()
audio_bytes = wav_file.readframes(wav_file.getnframes())

return audio_bytes, sample_rate, channels

@staticmethod
def resample_audio(
audio_bytes: bytes,
from_sample_rate: int,
to_sample_rate: int,
channels: int = 1,
) -> bytes:
"""Resample PCM16 audio to a different sample rate.

Note: This is a simple nearest-neighbor resampling. For production use,
consider using a library like scipy or librosa for higher quality resampling.

Args:
audio_bytes: Raw PCM16 audio bytes
from_sample_rate: Source sample rate in Hz
to_sample_rate: Target sample rate in Hz
channels: Number of audio channels

Returns:
Resampled PCM16 audio bytes
"""
if from_sample_rate == to_sample_rate:
return audio_bytes

import struct

# Convert bytes to samples
sample_format = "<h" # Little-endian signed 16-bit integer
bytes_per_sample = channels * 2
num_samples = len(audio_bytes) // bytes_per_sample

samples = []
for i in range(num_samples):
offset = i * bytes_per_sample
if channels == 1:
sample = struct.unpack(sample_format, audio_bytes[offset : offset + 2])[0]
samples.append(sample)
else:
# Stereo
left = struct.unpack(sample_format, audio_bytes[offset : offset + 2])[0]
right = struct.unpack(sample_format, audio_bytes[offset + 2 : offset + 4])[0]
samples.append((left, right))

# Resample using nearest-neighbor
ratio = to_sample_rate / from_sample_rate
new_num_samples = int(num_samples * ratio)

resampled = []
for i in range(new_num_samples):
source_index = int(i / ratio)
if source_index >= num_samples:
source_index = num_samples - 1
resampled.append(samples[source_index])

# Convert back to bytes
result = bytearray()
for sample in resampled:
if channels == 1:
result.extend(struct.pack(sample_format, sample))
else:
result.extend(struct.pack(sample_format, sample[0]))
result.extend(struct.pack(sample_format, sample[1]))

return bytes(result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright (c) Microsoft. All rights reserved.

"""Event processor for converting Azure Voice Live events to Agent Framework updates."""

from typing import Any

from agent_framework import AgentRunResponseUpdate


class EventProcessor:
"""Converts Azure Voice Live events to Agent Framework updates.

This class processes server events from the Azure Voice Live SDK and converts them
into AgentRunResponseUpdate objects that are compatible with the Agent Framework's
streaming interface.
"""

def __init__(self) -> None:
"""Initialize event processor."""
self._current_response_id: str | None = None
self._function_calls: dict[str, dict[str, Any]] = {} # Track in-progress function calls

def process_event(self, event: Any) -> AgentRunResponseUpdate | None:
"""Convert server event to agent update.

Args:
event: Server event from Azure Voice Live SDK

Returns:
AgentRunResponseUpdate if the event should be emitted, None otherwise
"""
# Import here to avoid circular dependency and to handle SDK availability
try:
from azure.ai.voicelive.models import ServerEventType
except ImportError:
# SDK not available, return None
return None

event_type = event.type

if event_type == ServerEventType.SESSION_UPDATED:
# Session configuration complete
return AgentRunResponseUpdate(additional_properties={"type": "session_ready"})

elif event_type == ServerEventType.RESPONSE_CREATED:
# New response started
self._current_response_id = event.response.id
return AgentRunResponseUpdate(
additional_properties={"type": "response_started", "response_id": event.response.id}
)

elif event_type == ServerEventType.RESPONSE_AUDIO_DELTA:
# Audio chunk received
return AgentRunResponseUpdate(
additional_properties={
"type": "audio_delta",
"response_id": self._current_response_id,
"audio_data": event.delta,
}
)

elif event_type == ServerEventType.RESPONSE_AUDIO_TRANSCRIPT_DELTA:
# Transcript chunk received
return AgentRunResponseUpdate(
additional_properties={
"type": "transcript_delta",
"response_id": self._current_response_id,
"text": event.delta,
}
)

elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STARTED:
# User started speaking (VAD detected)
return AgentRunResponseUpdate(additional_properties={"type": "speech_started"})

elif event_type == ServerEventType.INPUT_AUDIO_BUFFER_SPEECH_STOPPED:
# User stopped speaking (VAD detected)
return AgentRunResponseUpdate(additional_properties={"type": "speech_stopped"})

elif event_type == ServerEventType.CONVERSATION_ITEM_INPUT_AUDIO_TRANSCRIPTION_COMPLETED:
# User audio transcription complete
return AgentRunResponseUpdate(
additional_properties={
"type": "input_transcription_complete",
"transcript": event.transcript if hasattr(event, "transcript") else None,
}
)

elif event_type == ServerEventType.RESPONSE_OUTPUT_ITEM_ADDED:
# New output item (message or function call) added to response
item_type = event.item.type if hasattr(event.item, "type") else None
if item_type == "function_call":
# Initialize function call tracking
item_id = event.item.id if hasattr(event.item, "id") else None
call_id = event.item.call_id if hasattr(event.item, "call_id") else None
name = event.item.name if hasattr(event.item, "name") else None

print(f"[DEBUG EventProcessor] RESPONSE_OUTPUT_ITEM_ADDED: item_id={item_id}, call_id={call_id}, name={name}")
print(f"[DEBUG EventProcessor] item attributes: {dir(event.item)}")

# Use call_id if available, otherwise fall back to item_id
key = call_id if call_id else item_id
if key:
self._function_calls[key] = {"name": name, "arguments": ""}
print(f"[DEBUG EventProcessor] Stored function call with key={key}, name={name}")

elif event_type == ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DELTA:
# Accumulate function call arguments
call_id = event.call_id if hasattr(event, "call_id") else None
if call_id:
if call_id not in self._function_calls:
self._function_calls[call_id] = {"name": event.name if hasattr(event, "name") else None, "arguments": ""}

# Update name if provided
if hasattr(event, "name") and event.name:
self._function_calls[call_id]["name"] = event.name

# Accumulate arguments
if hasattr(event, "delta") and event.delta:
self._function_calls[call_id]["arguments"] += event.delta

return AgentRunResponseUpdate(
additional_properties={
"type": "function_call_delta",
"call_id": call_id,
"arguments_delta": event.delta if hasattr(event, "delta") else "",
}
)

elif event_type == ServerEventType.RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE:
# Function call complete
call_id = event.call_id if hasattr(event, "call_id") else None
print(f"[DEBUG EventProcessor] RESPONSE_FUNCTION_CALL_ARGUMENTS_DONE: call_id={call_id}")
print(f"[DEBUG EventProcessor] Known function calls: {list(self._function_calls.keys())}")

if call_id and call_id in self._function_calls:
call_data = self._function_calls.pop(call_id)
print(f"[DEBUG EventProcessor] Found function call data: name={call_data['name']}, args={call_data['arguments']}")

return AgentRunResponseUpdate(
additional_properties={
"type": "function_call",
"call_id": call_id,
"name": call_data["name"],
"arguments": call_data["arguments"],
}
)
else:
print(f"[DEBUG EventProcessor] call_id {call_id} not found in tracked function calls!")
Comment on lines +98 to +149
Copy link

Copilot AI Jan 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug print statements should be removed or replaced with proper logging before production use. These print statements are scattered throughout the codebase and can clutter output.

Copilot uses AI. Check for mistakes.

elif event_type == ServerEventType.RESPONSE_DONE:
# Response complete
usage = None
if hasattr(event, "response") and hasattr(event.response, "usage"):
usage = event.response.usage

result = AgentRunResponseUpdate(
additional_properties={
"type": "response_complete",
"response_id": self._current_response_id,
"usage": usage,
}
)

# Reset state
self._current_response_id = None
return result

elif event_type == ServerEventType.ERROR:
# Error event
error_message = event.error if hasattr(event, "error") else "Unknown error"
return AgentRunResponseUpdate(additional_properties={"type": "error", "error": str(error_message)})

# Return None for unhandled event types
return None

def reset(self) -> None:
"""Reset processor state.

Useful when starting a new conversation or handling connection issues.
"""
self._current_response_id = None
self._function_calls.clear()
Loading
Loading