fix(openai-claude): stabilize streaming tool_use blocks
This commit is contained in:
@@ -8,6 +8,7 @@ package claude
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
translatorcommon "github.com/router-for-me/CLIProxyAPI/v7/internal/translator/common"
|
||||
@@ -26,6 +27,9 @@ type ConvertOpenAIResponseToAnthropicParams struct {
|
||||
Model string
|
||||
CreatedAt int64
|
||||
ToolNameMap map[string]string
|
||||
// SawToolCall is true once at least one tool_use content_block_start has
|
||||
// been emitted on the wire. Using raw upstream tool_calls presence here
|
||||
// can produce stop_reason=tool_use with zero announced tool blocks.
|
||||
SawToolCall bool
|
||||
// Content accumulator for streaming
|
||||
ContentAccumulator strings.Builder
|
||||
@@ -60,6 +64,9 @@ type ToolCallAccumulator struct {
|
||||
ID string
|
||||
Name string
|
||||
Arguments strings.Builder
|
||||
// StartEmitted tracks whether content_block_start has already been sent
|
||||
// for this tool index.
|
||||
StartEmitted bool
|
||||
}
|
||||
|
||||
// ConvertOpenAIResponseToClaude converts OpenAI streaming response format to Anthropic API format.
|
||||
@@ -218,9 +225,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
}
|
||||
|
||||
toolCalls.ForEach(func(_, toolCall gjson.Result) bool {
|
||||
param.SawToolCall = true
|
||||
index := int(toolCall.Get("index").Int())
|
||||
blockIndex := param.toolContentBlockIndex(index)
|
||||
|
||||
// Initialize accumulator if needed
|
||||
if _, exists := param.ToolCallsAccumulator[index]; !exists {
|
||||
@@ -229,27 +234,25 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
|
||||
accumulator := param.ToolCallsAccumulator[index]
|
||||
|
||||
// Handle tool call ID
|
||||
if id := toolCall.Get("id"); id.Exists() {
|
||||
accumulator.ID = id.String()
|
||||
// Handle tool call ID. Only accept JSON-string, non-empty
|
||||
// values so malformed upstream fields do not overwrite a
|
||||
// valid ID or coerce into a content_block.id.
|
||||
if id := toolCall.Get("id"); id.Exists() && id.Type == gjson.String {
|
||||
if idStr := id.String(); idStr != "" {
|
||||
accumulator.ID = idStr
|
||||
}
|
||||
}
|
||||
|
||||
// Handle function name
|
||||
// Handle function name and arguments
|
||||
if function := toolCall.Get("function"); function.Exists() {
|
||||
if name := function.Get("name"); name.Exists() && name.String() != "" {
|
||||
accumulator.Name = util.MapToolName(param.ToolNameMap, name.String())
|
||||
|
||||
stopThinkingContentBlock(param, &results)
|
||||
|
||||
stopTextContentBlock(param, &results)
|
||||
|
||||
// Send content_block_start for tool_use
|
||||
contentBlockStartJSON := `{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}`
|
||||
contentBlockStartJSONBytes := []byte(contentBlockStartJSON)
|
||||
contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "index", blockIndex)
|
||||
contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "content_block.id", util.SanitizeClaudeToolID(accumulator.ID))
|
||||
contentBlockStartJSONBytes, _ = sjson.SetBytes(contentBlockStartJSONBytes, "content_block.name", accumulator.Name)
|
||||
results = append(results, translatorcommon.AppendSSEEventBytes(nil, "content_block_start", contentBlockStartJSONBytes, 2))
|
||||
// Only record the name until content_block_start has been
|
||||
// emitted. Some upstreams send "name": "" or repeat the
|
||||
// field across chunks; reassigning after start could drift
|
||||
// from what was already announced.
|
||||
if !accumulator.StartEmitted {
|
||||
if name := function.Get("name"); name.Exists() && name.Type == gjson.String && name.String() != "" {
|
||||
accumulator.Name = util.MapToolName(param.ToolNameMap, name.String())
|
||||
}
|
||||
}
|
||||
|
||||
// Handle function arguments
|
||||
@@ -261,6 +264,13 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
}
|
||||
}
|
||||
|
||||
// Re-check on every chunk, not only chunks with a function
|
||||
// object. Some upstreams split function.name and id across
|
||||
// separate deltas.
|
||||
if !accumulator.StartEmitted && accumulator.Name != "" && accumulator.ID != "" && !param.ContentBlocksStopped {
|
||||
emitToolUseStart(param, index, accumulator, &results)
|
||||
}
|
||||
|
||||
return true
|
||||
})
|
||||
}
|
||||
@@ -269,9 +279,12 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
// Handle finish_reason (but don't send message_delta/message_stop yet)
|
||||
if finishReason := root.Get("choices.0.finish_reason"); finishReason.Exists() && finishReason.String() != "" {
|
||||
reason := finishReason.String()
|
||||
if param.SawToolCall {
|
||||
switch {
|
||||
case param.SawToolCall:
|
||||
param.FinishReason = "tool_calls"
|
||||
} else {
|
||||
case reason == "tool_calls":
|
||||
param.FinishReason = "stop"
|
||||
default:
|
||||
param.FinishReason = reason
|
||||
}
|
||||
|
||||
@@ -289,8 +302,17 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
|
||||
// Send content_block_stop for any tool calls
|
||||
if !param.ContentBlocksStopped {
|
||||
for index := range param.ToolCallsAccumulator {
|
||||
for _, index := range toolCallAccumulatorIndexes(param.ToolCallsAccumulator) {
|
||||
accumulator := param.ToolCallsAccumulator[index]
|
||||
if !accumulator.StartEmitted {
|
||||
// Belated emit for streams that supplied a valid name but
|
||||
// never sent an id. SanitizeClaudeToolID("") produces the
|
||||
// expected stable synthetic toolu_<nanos>_<n> ID shape.
|
||||
if accumulator.Name == "" {
|
||||
continue
|
||||
}
|
||||
emitToolUseStart(param, index, accumulator, &results)
|
||||
}
|
||||
blockIndex := param.toolContentBlockIndex(index)
|
||||
|
||||
// Send complete input_json_delta with all accumulated arguments
|
||||
@@ -353,8 +375,16 @@ func convertOpenAIDoneToAnthropic(param *ConvertOpenAIResponseToAnthropicParams)
|
||||
stopTextContentBlock(param, &results)
|
||||
|
||||
if !param.ContentBlocksStopped {
|
||||
for index := range param.ToolCallsAccumulator {
|
||||
for _, index := range toolCallAccumulatorIndexes(param.ToolCallsAccumulator) {
|
||||
accumulator := param.ToolCallsAccumulator[index]
|
||||
if !accumulator.StartEmitted {
|
||||
// Belated emit at [DONE]; same behavior as the finish_reason
|
||||
// path for name-but-no-id streams.
|
||||
if accumulator.Name == "" {
|
||||
continue
|
||||
}
|
||||
emitToolUseStart(param, index, accumulator, &results)
|
||||
}
|
||||
blockIndex := param.toolContentBlockIndex(index)
|
||||
|
||||
if accumulator.Arguments.Len() > 0 {
|
||||
@@ -547,6 +577,29 @@ func stopTextContentBlock(param *ConvertOpenAIResponseToAnthropicParams, results
|
||||
param.TextContentBlockIndex = -1
|
||||
}
|
||||
|
||||
func emitToolUseStart(param *ConvertOpenAIResponseToAnthropicParams, openAIToolIndex int, accumulator *ToolCallAccumulator, results *[][]byte) {
|
||||
stopThinkingContentBlock(param, results)
|
||||
stopTextContentBlock(param, results)
|
||||
|
||||
blockIndex := param.toolContentBlockIndex(openAIToolIndex)
|
||||
contentBlockStartJSON := []byte(`{"type":"content_block_start","index":0,"content_block":{"type":"tool_use","id":"","name":"","input":{}}}`)
|
||||
contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "index", blockIndex)
|
||||
contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "content_block.id", util.SanitizeClaudeToolID(accumulator.ID))
|
||||
contentBlockStartJSON, _ = sjson.SetBytes(contentBlockStartJSON, "content_block.name", accumulator.Name)
|
||||
*results = append(*results, translatorcommon.AppendSSEEventBytes(nil, "content_block_start", contentBlockStartJSON, 2))
|
||||
accumulator.StartEmitted = true
|
||||
param.SawToolCall = true
|
||||
}
|
||||
|
||||
func toolCallAccumulatorIndexes(accumulators map[int]*ToolCallAccumulator) []int {
|
||||
indexes := make([]int, 0, len(accumulators))
|
||||
for index := range accumulators {
|
||||
indexes = append(indexes, index)
|
||||
}
|
||||
sort.Ints(indexes)
|
||||
return indexes
|
||||
}
|
||||
|
||||
// ConvertOpenAIResponseToClaudeNonStream converts a non-streaming OpenAI response to a non-streaming Anthropic response.
|
||||
//
|
||||
// Parameters:
|
||||
|
||||
Reference in New Issue
Block a user