From 872224a2039de4726913099760b1d34a31496935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=98=AE?= Date: Mon, 5 Jan 2026 23:03:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20Kiro=20=E6=B8=A0?= =?UTF-8?q?=E9=81=93=20Extended=20Thinking=20=E5=AE=8C=E6=95=B4=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 实现 Anthropic 官方 Extended Thinking API 在 Kiro 渠道的完整支持,包括请求注入和响应解析。 参考实现:KiroGate (https://github.com/Jwadow/kiro-openai-gateway) --- app/api/routes/anthropic.py | 15 +- app/api/routes/kiro.py | 69 +++++- app/schemas/anthropic.py | 14 +- app/services/anthropic_adapter.py | 350 ++++++++++++++++++++++++------ app/utils/kiro_converters.py | 167 ++++++++++++++ app/utils/thinking_parser.py | 301 +++++++++++++++++++++++++ 6 files changed, 843 insertions(+), 73 deletions(-) create mode 100644 app/utils/kiro_converters.py create mode 100644 app/utils/thinking_parser.py diff --git a/app/api/routes/anthropic.py b/app/api/routes/anthropic.py index 9eb3e8d..e073e21 100644 --- a/app/api/routes/anthropic.py +++ b/app/api/routes/anthropic.py @@ -19,6 +19,7 @@ from app.services.plugin_api_service import PluginAPIService from app.services.kiro_service import KiroService from app.services.anthropic_adapter import AnthropicAdapter +from app.utils.kiro_converters import apply_thinking_to_request, is_thinking_enabled from app.schemas.anthropic import ( AnthropicMessagesRequest, AnthropicMessagesResponse, @@ -174,6 +175,14 @@ async def create_message( # 将Anthropic请求转换为OpenAI格式 openai_request = AnthropicAdapter.anthropic_to_openai_request(request) + + # 提取thinking配置 + thinking_config = getattr(request, 'thinking', None) + thinking_enabled = is_thinking_enabled(thinking_config) + + # 如果是Kiro服务,应用thinking配置 + if use_kiro: + openai_request = apply_thinking_to_request(openai_request, thinking_config) # 准备额外的请求头 extra_headers = {} @@ -203,7 +212,8 @@ async def generate(): async for event in AnthropicAdapter.convert_openai_stream_to_anthropic( openai_stream, model=request.model, - request_id=request_id + request_id=request_id, + thinking_enabled=thinking_enabled ): yield event @@ -258,7 +268,8 @@ async def generate(): # 收集流式响应并转换为完整的OpenAI响应 openai_response = await AnthropicAdapter.collect_openai_stream_to_response( - openai_stream + openai_stream, + thinking_enabled=thinking_enabled ) # 转换响应为Anthropic格式 diff --git a/app/api/routes/kiro.py b/app/api/routes/kiro.py index 0ab1997..07d6c2d 100644 --- a/app/api/routes/kiro.py +++ b/app/api/routes/kiro.py @@ -3,6 +3,7 @@ 提供Kiro账号的管理操作,通过插件API实现 仅对beta用户开放 """ +import secrets from typing import Optional from fastapi import APIRouter, Depends, HTTPException, status, Query from fastapi.responses import JSONResponse @@ -129,6 +130,72 @@ async def create_account( - **client_secret**: IdC客户端密钥(IdC认证时必填) """ try: + if "refresh_token" not in account_data and "refreshToken" in account_data: + account_data["refresh_token"] = account_data.get("refreshToken") + if "auth_method" not in account_data and "authMethod" in account_data: + account_data["auth_method"] = account_data.get("authMethod") + if "account_name" not in account_data and "accountName" in account_data: + account_data["account_name"] = account_data.get("accountName") + if "client_id" not in account_data and "clientId" in account_data: + account_data["client_id"] = account_data.get("clientId") + if "client_secret" not in account_data and "clientSecret" in account_data: + account_data["client_secret"] = account_data.get("clientSecret") + if "machineid" not in account_data and "machineId" in account_data: + account_data["machineid"] = account_data.get("machineId") + if "is_shared" not in account_data and "isShared" in account_data: + account_data["is_shared"] = account_data.get("isShared") + + auth_method = (account_data.get("auth_method") or "Social").strip() + if auth_method.lower() == "social": + auth_method = "Social" + elif auth_method.lower() == "idc": + auth_method = "IdC" + account_data["auth_method"] = auth_method + + refresh_token = account_data.get("refresh_token") + if not refresh_token: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="missing refresh_token" + ) + + if auth_method not in ("Social", "IdC"): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="auth_method must be Social or IdC" + ) + + if auth_method == "IdC" and (not account_data.get("client_id") or not account_data.get("client_secret")): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="IdC requires client_id and client_secret" + ) + + if not account_data.get("machineid"): + account_data["machineid"] = secrets.token_hex(32) + + is_shared = account_data.get("is_shared") + if is_shared is None: + is_shared = 0 + if isinstance(is_shared, bool): + is_shared = 1 if is_shared else 0 + try: + is_shared = int(is_shared) + except Exception: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="is_shared must be 0 or 1" + ) + if is_shared not in (0, 1): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="is_shared must be 0 or 1" + ) + account_data["is_shared"] = is_shared + + if not account_data.get("account_name"): + account_data["account_name"] = "Kiro Account" + result = await service.create_account(current_user.id, account_data) return result except ValueError as e: @@ -423,4 +490,4 @@ async def delete_account( raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"删除账号失败: {str(e)}" - ) \ No newline at end of file + ) diff --git a/app/schemas/anthropic.py b/app/schemas/anthropic.py index 31a4e33..8d92e67 100644 --- a/app/schemas/anthropic.py +++ b/app/schemas/anthropic.py @@ -114,7 +114,7 @@ class AnthropicMessagesRequest(BaseModel): model: str = Field(..., description="模型名称") messages: List[AnthropicMessage] = Field(..., description="消息列表") max_tokens: int = Field(..., description="最大生成token数") - + # 可选参数 system: Optional[Union[str, List[AnthropicTextContent]]] = Field(None, description="系统提示") stop_sequences: Optional[List[str]] = Field(None, description="停止序列") @@ -122,14 +122,20 @@ class AnthropicMessagesRequest(BaseModel): temperature: Optional[float] = Field(None, ge=0, le=1, description="温度参数") top_p: Optional[float] = Field(None, ge=0, le=1, description="Top-p采样") top_k: Optional[int] = Field(None, ge=0, description="Top-k采样") - + + # Extended Thinking 支持 + thinking: Optional[Union[Dict[str, Any], bool, str]] = Field( + None, + description="Extended Thinking 配置。可以是 bool、'enabled' 或 dict 格式如 {'type': 'enabled', 'budget_tokens': 10000}" + ) + # 工具相关 tools: Optional[List[AnthropicTool]] = Field(None, description="可用工具列表") tool_choice: Optional[Union[AnthropicToolChoice, Dict[str, Any]]] = Field(None, description="工具选择策略") - + # 元数据 metadata: Optional[AnthropicMetadata] = Field(None, description="请求元数据") - + model_config = {"extra": "allow"} diff --git a/app/services/anthropic_adapter.py b/app/services/anthropic_adapter.py index 715a49f..572db1d 100644 --- a/app/services/anthropic_adapter.py +++ b/app/services/anthropic_adapter.py @@ -19,6 +19,7 @@ AnthropicErrorResponse, AnthropicErrorDetail, ) +from app.utils.thinking_parser import KiroThinkingTagParser, SegmentType, TextSegment logger = logging.getLogger(__name__) @@ -559,21 +560,24 @@ async def convert_openai_stream_to_anthropic( cls, openai_stream: AsyncGenerator[bytes, None], model: str, - request_id: str + request_id: str, + thinking_enabled: bool = False ) -> AsyncGenerator[str, None]: """ 将OpenAI流式响应转换为Anthropic流式响应格式 - + Args: openai_stream: OpenAI流式响应生成器 model: 模型名称 request_id: 请求ID - + thinking_enabled: 是否启用thinking解析(用于解析原始标签) + Yields: Anthropic格式的SSE事件 - + Note: 支持将OpenAI格式的reasoning_content转换为Anthropic的thinking content block格式 + 如果上游返回原始的标签,也会进行解析 """ # 发送message_start事件 message_start = { @@ -593,7 +597,7 @@ async def convert_openai_stream_to_anthropic( } } yield f"event: message_start\ndata: {json.dumps(message_start, ensure_ascii=False)}\n\n" - + # 跟踪状态 accumulated_text = "" accumulated_thinking = "" @@ -602,18 +606,24 @@ async def convert_openai_stream_to_anthropic( output_tokens = 0 finish_reason = None current_tool_calls = {} # 跟踪工具调用 - + # content block 索引跟踪 current_block_index = 0 - - # thinking content 状态跟踪 - has_thinking_content = False # 是否有thinking内容 + + # thinking content 状态跟踪(reasoning_content字段) + has_reasoning_content = False # 是否有reasoning_content thinking_block_started = False # thinking块是否已开始 thinking_block_stopped = False # thinking块是否已结束 - + # text content 状态跟踪 text_block_started = False # text块是否已开始 - + + # Thinking parser(用于解析原始标签) + thinking_parser: Optional[KiroThinkingTagParser] = None + if thinking_enabled: + thinking_parser = KiroThinkingTagParser() + logger.debug("Thinking parser enabled for stream") + buffer = "" async for chunk in openai_stream: @@ -663,7 +673,7 @@ async def convert_openai_stream_to_anthropic( # 支持多种格式:reasoning_content, reasoning, thinking_content reasoning_delta = delta.get('reasoning_content') or delta.get('reasoning') or delta.get('thinking_content') if reasoning_delta: - has_thinking_content = True + has_reasoning_content = True accumulated_thinking += reasoning_delta # 如果thinking块还没开始,先发送content_block_start @@ -723,57 +733,146 @@ async def convert_openai_stream_to_anthropic( # 处理文本内容 if 'content' in delta and delta['content']: text_delta = delta['content'] - - # 如果之前有thinking内容且thinking块还没结束,先结束thinking块 - if thinking_block_started and not thinking_block_stopped: - thinking_block_stopped = True - - # 如果有签名,先发送签名delta - if thinking_signature: - signature_delta_event = { - "type": "content_block_delta", + + # 如果启用了thinking parser,先用parser解析 + if thinking_parser: + segments = thinking_parser.push_and_parse(text_delta) + + for segment in segments: + if segment.type == SegmentType.THINKING: + # Thinking内容 + accumulated_thinking += segment.content + has_reasoning_content = True + + # 如果thinking块还没开始,先发送content_block_start + if not thinking_block_started: + thinking_block_started = True + thinking_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "thinking", + "thinking": "" + } + } + yield f"event: content_block_start\ndata: {json.dumps(thinking_block_start, ensure_ascii=False)}\n\n" + + # 发送thinking_delta + thinking_delta_event = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "thinking_delta", + "thinking": segment.content + } + } + yield f"event: content_block_delta\ndata: {json.dumps(thinking_delta_event, ensure_ascii=False)}\n\n" + + elif segment.type == SegmentType.TEXT: + # 普通文本内容 + + # 如果之前有thinking内容且thinking块还没结束,先结束thinking块 + if thinking_block_started and not thinking_block_stopped: + thinking_block_stopped = True + + # 如果有签名,先发送签名delta + if thinking_signature: + signature_delta_event = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "signature_delta", + "signature": thinking_signature + } + } + yield f"event: content_block_delta\ndata: {json.dumps(signature_delta_event, ensure_ascii=False)}\n\n" + + # 发送thinking块的content_block_stop + thinking_block_stop = { + "type": "content_block_stop", + "index": current_block_index + } + yield f"event: content_block_stop\ndata: {json.dumps(thinking_block_stop, ensure_ascii=False)}\n\n" + # 增加block索引 + current_block_index += 1 + + # 如果text块还没开始,先发送content_block_start + if not text_block_started: + text_block_started = True + text_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "text", + "text": "" + } + } + yield f"event: content_block_start\ndata: {json.dumps(text_block_start, ensure_ascii=False)}\n\n" + + accumulated_text += segment.content + + # 发送content_block_delta事件 + content_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "text_delta", + "text": segment.content + } + } + yield f"event: content_block_delta\ndata: {json.dumps(content_delta, ensure_ascii=False)}\n\n" + else: + # 没有启用thinking parser,直接处理为文本 + # 如果之前有thinking内容且thinking块还没结束,先结束thinking块 + if thinking_block_started and not thinking_block_stopped: + thinking_block_stopped = True + + # 如果有签名,先发送签名delta + if thinking_signature: + signature_delta_event = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "signature_delta", + "signature": thinking_signature + } + } + yield f"event: content_block_delta\ndata: {json.dumps(signature_delta_event, ensure_ascii=False)}\n\n" + + # 发送thinking块的content_block_stop + thinking_block_stop = { + "type": "content_block_stop", + "index": current_block_index + } + yield f"event: content_block_stop\ndata: {json.dumps(thinking_block_stop, ensure_ascii=False)}\n\n" + # 增加block索引 + current_block_index += 1 + + # 如果text块还没开始,先发送content_block_start + if not text_block_started: + text_block_started = True + text_block_start = { + "type": "content_block_start", "index": current_block_index, - "delta": { - "type": "signature_delta", - "signature": thinking_signature + "content_block": { + "type": "text", + "text": "" } } - yield f"event: content_block_delta\ndata: {json.dumps(signature_delta_event, ensure_ascii=False)}\n\n" - - # 发送thinking块的content_block_stop - thinking_block_stop = { - "type": "content_block_stop", - "index": current_block_index - } - yield f"event: content_block_stop\ndata: {json.dumps(thinking_block_stop, ensure_ascii=False)}\n\n" - # 增加block索引 - current_block_index += 1 - - # 如果text块还没开始,先发送content_block_start - if not text_block_started: - text_block_started = True - text_block_start = { - "type": "content_block_start", + yield f"event: content_block_start\ndata: {json.dumps(text_block_start, ensure_ascii=False)}\n\n" + + accumulated_text += text_delta + + # 发送content_block_delta事件 + content_delta = { + "type": "content_block_delta", "index": current_block_index, - "content_block": { - "type": "text", - "text": "" + "delta": { + "type": "text_delta", + "text": text_delta } } - yield f"event: content_block_start\ndata: {json.dumps(text_block_start, ensure_ascii=False)}\n\n" - - accumulated_text += text_delta - - # 发送content_block_delta事件 - content_delta = { - "type": "content_block_delta", - "index": current_block_index, - "delta": { - "type": "text_delta", - "text": text_delta - } - } - yield f"event: content_block_delta\ndata: {json.dumps(content_delta, ensure_ascii=False)}\n\n" + yield f"event: content_block_delta\ndata: {json.dumps(content_delta, ensure_ascii=False)}\n\n" # 处理工具调用 if 'tool_calls' in delta: @@ -840,7 +939,93 @@ async def convert_openai_stream_to_anthropic( current_tool_calls[tc_index]['arguments'] += args_chunk # 流结束后的清理工作 - + + # 如果启用了thinking parser,刷新缓冲区 + if thinking_parser: + final_segments = thinking_parser.flush() + for segment in final_segments: + if segment.type == SegmentType.THINKING: + # Thinking内容 + accumulated_thinking += segment.content + has_reasoning_content = True + + # 如果thinking块还没开始,先发送content_block_start + if not thinking_block_started: + thinking_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "thinking", + "thinking": "" + } + } + yield f"event: content_block_start\ndata: {json.dumps(thinking_block_start, ensure_ascii=False)}\n\n" + thinking_block_started = True + + # 发送thinking_delta + thinking_delta_event = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "thinking_delta", + "thinking": segment.content + } + } + yield f"event: content_block_delta\ndata: {json.dumps(thinking_delta_event, ensure_ascii=False)}\n\n" + + elif segment.type == SegmentType.TEXT: + # 普通文本内容 + + # 如果之前有thinking内容且thinking块还没结束,先结束thinking块 + if thinking_block_started and not thinking_block_stopped: + thinking_block_stopped = True + + # 如果有签名,先发送签名delta + if thinking_signature: + signature_delta_event = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "signature_delta", + "signature": thinking_signature + } + } + yield f"event: content_block_delta\ndata: {json.dumps(signature_delta_event, ensure_ascii=False)}\n\n" + + # 发送thinking块的content_block_stop + thinking_block_stop = { + "type": "content_block_stop", + "index": current_block_index + } + yield f"event: content_block_stop\ndata: {json.dumps(thinking_block_stop, ensure_ascii=False)}\n\n" + current_block_index += 1 + + # 如果text块还没开始,先发送content_block_start + if not text_block_started: + text_block_started = True + text_block_start = { + "type": "content_block_start", + "index": current_block_index, + "content_block": { + "type": "text", + "text": "" + } + } + yield f"event: content_block_start\ndata: {json.dumps(text_block_start, ensure_ascii=False)}\n\n" + + accumulated_text += segment.content + + # 发送content_block_delta事件 + content_delta = { + "type": "content_block_delta", + "index": current_block_index, + "delta": { + "type": "text_delta", + "text": segment.content + } + } + yield f"event: content_block_delta\ndata: {json.dumps(content_delta, ensure_ascii=False)}\n\n" + # 如果thinking块开始了但还没结束,先结束它 if thinking_block_started and not thinking_block_stopped: thinking_block_stopped = True @@ -971,17 +1156,19 @@ async def convert_openai_stream_to_anthropic( @classmethod async def collect_openai_stream_to_response( cls, - openai_stream: AsyncGenerator[bytes, None] + openai_stream: AsyncGenerator[bytes, None], + thinking_enabled: bool = False ) -> Dict[str, Any]: """ 将OpenAI流式响应收集并转换为完整的非流式响应格式 - + 当用户请求非流式响应(stream=false),但上游总是返回流式响应时, 使用此方法将流式响应收集并组装成完整的响应。 - + Args: openai_stream: OpenAI流式响应生成器 - + thinking_enabled: 是否启用thinking解析(用于解析原始标签) + Returns: OpenAI格式的完整响应字典 """ @@ -995,7 +1182,13 @@ async def collect_openai_stream_to_response( model = "" response_id = "" tool_calls = {} # 跟踪工具调用 {index: {id, name, arguments}} - + + # Thinking parser(用于解析原始标签) + thinking_parser: Optional[KiroThinkingTagParser] = None + if thinking_enabled: + thinking_parser = KiroThinkingTagParser() + logger.debug("Thinking parser enabled for non-stream response") + buffer = "" chunk_count = 0 @@ -1132,7 +1325,21 @@ async def collect_openai_stream_to_response( # 处理文本内容 if 'content' in delta and delta['content']: - accumulated_text += delta['content'] + content_delta = delta['content'] + + # 如果启用了thinking parser,先解析 + if thinking_parser: + segments = thinking_parser.push_and_parse(content_delta) + for segment in segments: + if segment.type == SegmentType.THINKING: + # Thinking内容 + accumulated_reasoning += segment.content + elif segment.type == SegmentType.TEXT: + # 普通文本 + accumulated_text += segment.content + else: + # 没有启用thinking parser,直接添加 + accumulated_text += content_delta # 处理工具调用 if 'tool_calls' in delta: @@ -1169,7 +1376,18 @@ async def collect_openai_stream_to_response( tool_calls[tc_index]['name'] = func['name'] if 'arguments' in func: tool_calls[tc_index]['arguments'] += func['arguments'] - + + # 如果启用了thinking parser,刷新缓冲区 + if thinking_parser: + final_segments = thinking_parser.flush() + for segment in final_segments: + if segment.type == SegmentType.THINKING: + # Thinking内容 + accumulated_reasoning += segment.content + elif segment.type == SegmentType.TEXT: + # 普通文本 + accumulated_text += segment.content + # 构建完整的OpenAI响应 message = { "role": "assistant", @@ -1247,4 +1465,4 @@ def create_error_response( type=error_type, message=message ) - ) \ No newline at end of file + ) diff --git a/app/utils/kiro_converters.py b/app/utils/kiro_converters.py new file mode 100644 index 0000000..e78eeb1 --- /dev/null +++ b/app/utils/kiro_converters.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- + +""" +Kiro 转换器工具函数 + +包含 Extended Thinking 模式支持函数 +""" + +from typing import Any, Dict, Optional, Union +import logging +import uuid + +logger = logging.getLogger(__name__) + + +# ================================================================================================== +# Thinking Mode 支持 +# ================================================================================================== + +# 默认最大思考长度 +DEFAULT_MAX_THINKING_LENGTH = 200000 + + +def is_thinking_enabled(thinking_config: Optional[Union[Dict[str, Any], bool, str]]) -> bool: + """ + 检测 thinking 是否启用。 + + 支持多种格式: + - None: 未启用 + - bool: True/False + - str: "enabled" + - dict: {"type": "enabled", "budget_tokens": 10000} + + Args: + thinking_config: thinking 配置 + + Returns: + 是否启用 thinking + """ + if thinking_config is None: + return False + if isinstance(thinking_config, bool): + return thinking_config + if isinstance(thinking_config, str): + return thinking_config.lower() == "enabled" + if isinstance(thinking_config, dict): + type_val = str(thinking_config.get("type", "")).lower() + if type_val == "enabled": + return True + budget = thinking_config.get("budget_tokens") + if isinstance(budget, (int, float)) and budget > 0: + return True + return False + + +def get_thinking_budget(thinking_config: Optional[Union[Dict[str, Any], bool, str]]) -> int: + """ + 获取 thinking 的 token 预算。 + + Args: + thinking_config: thinking 配置 + + Returns: + token 预算,默认为 DEFAULT_MAX_THINKING_LENGTH + """ + if isinstance(thinking_config, dict): + budget = thinking_config.get("budget_tokens") + if isinstance(budget, (int, float)) and budget > 0: + return int(budget) + return DEFAULT_MAX_THINKING_LENGTH + + +def generate_thinking_hint(thinking_config: Optional[Union[Dict[str, Any], bool, str]]) -> str: + """ + 生成 thinking 模式的提示标签。 + + Args: + thinking_config: thinking 配置 + + Returns: + thinking 提示标签字符串 + """ + budget = get_thinking_budget(thinking_config) + return f"enabled\n{budget}" + + +def inject_thinking_hint(system_prompt: str, thinking_config: Optional[Union[Dict[str, Any], bool, str]]) -> str: + """ + 将 thinking 提示注入到 system prompt 中。 + + 如果 system prompt 已经包含 thinking 标签,则不重复注入。 + + Args: + system_prompt: 原始 system prompt + thinking_config: thinking 配置 + + Returns: + 注入后的 system prompt + """ + if not is_thinking_enabled(thinking_config): + return system_prompt + + # 检查是否已经包含 thinking 标签 + if "" in system_prompt or "" in system_prompt: + return system_prompt + + thinking_hint = generate_thinking_hint(thinking_config) + + if not system_prompt: + return thinking_hint + + # 将 thinking hint 添加到 system prompt 开头 + return f"{thinking_hint}\n\n{system_prompt}" + + +def add_kiro_conversation_state(payload: Dict[str, Any]) -> None: + """ + 为 Kiro payload 添加 conversationState 字段。 + + Args: + payload: Kiro 请求 payload(会被原地修改) + """ + if "conversationState" not in payload: + payload["conversationState"] = {} + + payload["conversationState"]["agentContinuationId"] = str(uuid.uuid4()) + payload["conversationState"]["agentTaskType"] = "vibe" + + +def apply_thinking_to_request( + openai_request: Dict[str, Any], + thinking_config: Optional[Union[Dict[str, Any], bool, str]] = None +) -> Dict[str, Any]: + """ + 将 thinking 配置应用到 OpenAI 格式的请求中。 + + Args: + openai_request: OpenAI 格式的请求 + thinking_config: thinking 配置 + + Returns: + 修改后的请求(原地修改并返回) + """ + if not is_thinking_enabled(thinking_config): + return openai_request + + messages = openai_request.get("messages", []) + if not isinstance(messages, list): + messages = [] + openai_request["messages"] = messages + + injected = False + for msg in messages: + if isinstance(msg, dict) and msg.get("role") == "system": + system_prompt = msg.get("content", "") + if isinstance(system_prompt, str): + msg["content"] = inject_thinking_hint(system_prompt, thinking_config) + injected = True + logger.debug("Injected thinking hint into existing system prompt") + break + + # 没有 system prompt 时,创建一个仅包含 thinking hint 的 system 消息 + if not injected: + messages.insert(0, {"role": "system", "content": generate_thinking_hint(thinking_config)}) + logger.debug("Inserted system prompt with thinking hint") + + return openai_request diff --git a/app/utils/thinking_parser.py b/app/utils/thinking_parser.py new file mode 100644 index 0000000..58a240e --- /dev/null +++ b/app/utils/thinking_parser.py @@ -0,0 +1,301 @@ +# -*- coding: utf-8 -*- + +""" +Kiro 标签增量解析器。 + +将 Kiro API 返回的 ... 标签内容解析为 +Anthropic 官方 Extended Thinking 格式的事件。 + +参考实现: +- proxycast (Rust): https://github.com/hank9999/proxycast +- kiro.rs (Rust): https://github.com/hank9999/kiro.rs +- KiroGate: https://github.com/Jwadow/kiro-openai-gateway +""" + +from dataclasses import dataclass +from enum import Enum, auto +from typing import List, Optional +import logging + +logger = logging.getLogger(__name__) + + +class SegmentType(Enum): + """文本片段类型""" + THINKING = auto() # thinking 内容 + TEXT = auto() # 普通文本内容 + + +@dataclass +class TextSegment: + """文本片段""" + type: SegmentType + content: str + + +class ParseState(Enum): + """解析状态""" + INITIAL = auto() # 初始状态,等待检测是否以 开头 + IN_THINKING = auto() # 在 thinking 块内 + AFTER_THINKING = auto() # thinking 块结束后,处理普通文本 + PASSTHROUGH = auto() # 直通模式(响应不以 开头) + + +class KiroThinkingTagParser: + """ + Kiro 标签增量解析器。 + + 设计原则: + 1. 只解析第一个 ... 块 + 2. 仅当响应以 开头时才启用解析 + 3. 处理跨 chunk 的标签切分 + 4. 跳过被引号包裹的假标签 + + 使用方式: + parser = KiroThinkingTagParser() + for chunk in stream: + segments = parser.push_and_parse(chunk) + for segment in segments: + if segment.type == SegmentType.THINKING: + # 发送 thinking_delta 事件 + elif segment.type == SegmentType.TEXT: + # 发送 text_delta 事件 + # 流结束时刷新缓冲区 + final_segments = parser.flush() + """ + + OPEN_TAG = "" + CLOSE_TAG = "" + # 引号字符,用于检测假标签 + QUOTE_CHARS = ("`", '"', "'", "“", "”", "‘", "’", "「", "」", "『", "』") + + def __init__(self): + self.buffer = "" + self.state = ParseState.INITIAL + self.thinking_extracted = False # 是否已提取过 thinking 块 + + def push_and_parse(self, incoming: str) -> List[TextSegment]: + """ + 增量解析输入文本。 + + Args: + incoming: 新输入的文本 + + Returns: + 解析出的文本片段列表 + """ + if not incoming: + return [] + + self.buffer += incoming + segments: List[TextSegment] = [] + + while True: + if self.state == ParseState.INITIAL: + # 初始状态:检测是否以 开头 + result = self._handle_initial_state() + if result is None: + break # 需要更多数据 + # 状态已更新,继续循环 + continue + + elif self.state == ParseState.IN_THINKING: + # 在 thinking 块内:查找 + segment = self._handle_in_thinking_state() + if segment is None: + break # 需要更多数据 + if segment.content: + segments.append(segment) + # 状态已更新,继续循环 + continue + + elif self.state == ParseState.AFTER_THINKING: + # thinking 块结束后:输出剩余文本 + if self.buffer: + segments.append(TextSegment(SegmentType.TEXT, self.buffer)) + self.buffer = "" + break + + elif self.state == ParseState.PASSTHROUGH: + # 直通模式:直接输出所有内容 + if self.buffer: + segments.append(TextSegment(SegmentType.TEXT, self.buffer)) + self.buffer = "" + break + + return segments + + def flush(self) -> List[TextSegment]: + """ + 流结束时刷新缓冲区。 + + Returns: + 剩余的文本片段列表 + """ + segments: List[TextSegment] = [] + + if self.state == ParseState.INITIAL: + # 从未收到足够数据来判断,当作普通文本 + if self.buffer: + segments.append(TextSegment(SegmentType.TEXT, self.buffer)) + self.buffer = "" + + elif self.state == ParseState.IN_THINKING: + # thinking 块未正常关闭,输出剩余内容作为 thinking + if self.buffer: + logger.warning(f"Thinking block not properly closed, flushing {len(self.buffer)} chars as thinking") + segments.append(TextSegment(SegmentType.THINKING, self.buffer)) + self.buffer = "" + + elif self.state in (ParseState.AFTER_THINKING, ParseState.PASSTHROUGH): + # 输出剩余文本 + if self.buffer: + segments.append(TextSegment(SegmentType.TEXT, self.buffer)) + self.buffer = "" + + return segments + + def _handle_initial_state(self) -> Optional[bool]: + """ + 处理初始状态。 + + Returns: + None 表示需要更多数据,True 表示状态已更新 + """ + # 跳过开头的空白字符 + stripped = self.buffer.lstrip() + whitespace_len = len(self.buffer) - len(stripped) + + # 检查是否有足够数据来判断 + if len(stripped) < len(self.OPEN_TAG): + # 检查是否是 的前缀 + if stripped and self.OPEN_TAG.startswith(stripped): + return None # 可能是 ,等待更多数据 + elif stripped: + # 不是 开头,进入直通模式 + self.state = ParseState.PASSTHROUGH + return True + else: + return None # 只有空白,等待更多数据 + + # 检查是否以 开头 + if stripped.startswith(self.OPEN_TAG): + # 移除开头的空白和 标签 + self.buffer = stripped[len(self.OPEN_TAG):] + self.state = ParseState.IN_THINKING + logger.debug("Detected tag at start, entering thinking mode") + return True + else: + # 不是以 开头,进入直通模式 + self.state = ParseState.PASSTHROUGH + return True + + def _handle_in_thinking_state(self) -> Optional[TextSegment]: + """ + 处理 thinking 块内的状态。 + + Returns: + None 表示需要更多数据,TextSegment 表示解析出的片段 + """ + # 查找真正的 标签 + close_pos = self._find_real_close_tag() + + if close_pos is None: + # 没找到关闭标签 + # 保留可能是标签一部分的尾部数据 + safe_len = len(self.buffer) - len(self.CLOSE_TAG) + 1 + if safe_len > 0: + thinking_content = self.buffer[:safe_len] + self.buffer = self.buffer[safe_len:] + return TextSegment(SegmentType.THINKING, thinking_content) + return None + + # 找到关闭标签 + thinking_content = self.buffer[:close_pos] + # 跳过 标签 + after_tag = self.buffer[close_pos + len(self.CLOSE_TAG):] + # 跳过标签后的换行符(通常有 \n\n) + after_tag = after_tag.lstrip('\n') + + self.buffer = after_tag + self.state = ParseState.AFTER_THINKING + self.thinking_extracted = True + + logger.debug(f"Extracted thinking block: {len(thinking_content)} chars") + return TextSegment(SegmentType.THINKING, thinking_content) + + def _find_real_close_tag(self) -> Optional[int]: + """ + 查找真正的 关闭标签。 + + 跳过被引号包裹的假标签。 + 真正的结束标签通常后面跟着换行符。 + + Returns: + 关闭标签的位置,或 None 如果未找到 + """ + search_start = 0 + + while True: + pos = self.buffer.find(self.CLOSE_TAG, search_start) + if pos == -1: + return None + + # 检查是否被引号包裹 + if self._is_quoted_tag(pos): + search_start = pos + 1 + continue + + # 检查标签后是否有换行符(真正的结束标签特征) + after_pos = pos + len(self.CLOSE_TAG) + if after_pos < len(self.buffer): + # 有后续字符,检查是否是换行 + if self.buffer[after_pos] in '\n\r': + return pos + # 不是换行,可能是假标签,但也可能是流的边界 + # 保守起见,如果后面还有很多内容,认为是假标签 + if len(self.buffer) - after_pos > 10: + search_start = pos + 1 + continue + return pos + else: + # 标签在缓冲区末尾,可能是真正的结束 + return pos + + def _is_quoted_tag(self, tag_pos: int) -> bool: + """ + 检查标签是否被引号包裹。 + + Args: + tag_pos: 标签在缓冲区中的位置 + + Returns: + 是否被引号包裹 + """ + if tag_pos == 0: + return False + + # 检查标签前的字符 + prev_char = self.buffer[tag_pos - 1] + if prev_char in self.QUOTE_CHARS: + return True + + # 检查是否在代码块内(简单检测) + # 统计标签前的反引号数量 + before_text = self.buffer[:tag_pos] + backtick_count = before_text.count('`') + if backtick_count % 2 == 1: + # 奇数个反引号,可能在代码块内 + return True + + return False + + @property + def is_thinking_mode(self) -> bool: + """是否处于 thinking 模式(响应以 开头)""" + return self.state in (ParseState.IN_THINKING, ParseState.AFTER_THINKING) + + @property + def has_extracted_thinking(self) -> bool: + """是否已提取过 thinking 块""" + return self.thinking_extracted