From e5a6fd2d4f35a624a52fc5021fe0d0294c77f137 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sun, 21 Sep 2025 11:16:03 +0800 Subject: [PATCH] refactor: standardize `dataTag` processing across response translators - Unified `dataTag` initialization by removing spaces after `data:`. - Replaced manual slicing with `bytes.TrimSpace` for consistent and robust handling of JSON payloads. --- internal/client/gemini-cli_client.go | 6 ++--- internal/client/gemini_client.go | 6 ++--- .../client/openai-compatibility_client.go | 25 +++++-------------- internal/client/qwen_client.go | 12 ++++----- .../claude/gemini/claude_gemini_response.go | 6 ++--- .../claude_openai_response.go | 6 ++--- .../claude_openai-responses_response.go | 4 +-- .../codex/claude/codex_claude_response.go | 4 +-- .../codex/gemini/codex_gemini_response.go | 6 ++--- .../chat-completions/codex_openai_response.go | 6 ++--- .../codex_openai-responses_response.go | 8 +++--- 11 files changed, 38 insertions(+), 51 deletions(-) diff --git a/internal/client/gemini-cli_client.go b/internal/client/gemini-cli_client.go index c2b48683..8c923748 100644 --- a/internal/client/gemini-cli_client.go +++ b/internal/client/gemini-cli_client.go @@ -554,7 +554,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st rawJSON, _ = sjson.SetBytes(rawJSON, "project", c.GetProjectID()) rawJSON, _ = sjson.SetBytes(rawJSON, "model", modelName) - dataTag := []byte("data: ") + dataTag := []byte("data:") errChan := make(chan *interfaces.ErrorMessage) dataChan := make(chan []byte) // log.Debugf(string(rawJSON)) @@ -619,7 +619,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], ¶m) + lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), ¶m) for i := 0; i < len(lines); i++ { dataChan <- []byte(lines[i]) } @@ -630,7 +630,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - dataChan <- line[6:] + dataChan <- bytes.TrimSpace(line[5:]) } c.AddAPIResponseData(ctx, line) } diff --git a/internal/client/gemini_client.go b/internal/client/gemini_client.go index 10e43d2a..8ff5de60 100644 --- a/internal/client/gemini_client.go +++ b/internal/client/gemini_client.go @@ -298,7 +298,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin handlerType := handler.HandlerType() rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) - dataTag := []byte("data: ") + dataTag := []byte("data:") errChan := make(chan *interfaces.ErrorMessage) dataChan := make(chan []byte) // log.Debugf(string(rawJSON)) @@ -342,7 +342,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], ¶m) + lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), ¶m) for i := 0; i < len(lines); i++ { dataChan <- []byte(lines[i]) } @@ -353,7 +353,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - dataChan <- line[6:] + dataChan <- bytes.TrimSpace(line[5:]) } c.AddAPIResponseData(ctx, line) } diff --git a/internal/client/openai-compatibility_client.go b/internal/client/openai-compatibility_client.go index 990bc610..56139b0c 100644 --- a/internal/client/openai-compatibility_client.go +++ b/internal/client/openai-compatibility_client.go @@ -291,9 +291,8 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo handlerType := handler.HandlerType() rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) - dataTag := []byte("data: ") - dataUglyTag := []byte("data:") // Some APIs providers don't add space after "data:", fuck for them all - doneTag := []byte("data: [DONE]") + dataTag := []byte("data:") + doneTag := []byte("[DONE]") errChan := make(chan *interfaces.ErrorMessage) dataChan := make(chan []byte) // log.Debugf(string(rawJSON)) @@ -332,19 +331,10 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - if bytes.Equal(line, doneTag) { + if bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) { break } - lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], ¶m) - for i := 0; i < len(lines); i++ { - c.AddAPIResponseData(ctx, line) - dataChan <- []byte(lines[i]) - } - } else if bytes.HasPrefix(line, dataUglyTag) { - if bytes.Equal(line, doneTag) { - break - } - lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[5:], ¶m) + lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), ¶m) for i := 0; i < len(lines); i++ { c.AddAPIResponseData(ctx, line) dataChan <- []byte(lines[i]) @@ -356,13 +346,10 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - if bytes.Equal(line, doneTag) { + if bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) { break } - c.AddAPIResponseData(newCtx, line[6:]) - dataChan <- line[6:] - } else if bytes.HasPrefix(line, dataUglyTag) { - c.AddAPIResponseData(newCtx, line[5:]) + c.AddAPIResponseData(newCtx, bytes.TrimSpace(line[5:])) dataChan <- line[5:] } } diff --git a/internal/client/qwen_client.go b/internal/client/qwen_client.go index 9eff9a46..ab22977c 100644 --- a/internal/client/qwen_client.go +++ b/internal/client/qwen_client.go @@ -215,8 +215,8 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string, handlerType := handler.HandlerType() rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) - dataTag := []byte("data: ") - doneTag := []byte("data: [DONE]") + dataTag := []byte("data:") + doneTag := []byte("[DONE]") errChan := make(chan *interfaces.ErrorMessage) dataChan := make(chan []byte) @@ -264,7 +264,7 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string, for scanner.Scan() { line := scanner.Bytes() if bytes.HasPrefix(line, dataTag) { - lines := translator.Response(handlerType, c.Type(), ctx, modelName, originalRequestRawJSON, rawJSON, line[6:], ¶m) + lines := translator.Response(handlerType, c.Type(), ctx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), ¶m) for i := 0; i < len(lines); i++ { dataChan <- []byte(lines[i]) } @@ -274,9 +274,9 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string, } else { for scanner.Scan() { line := scanner.Bytes() - if !bytes.HasPrefix(line, doneTag) { - if bytes.HasPrefix(line, dataTag) { - dataChan <- line[6:] + if bytes.HasPrefix(line, dataTag) { + if !bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) { + dataChan <- bytes.TrimSpace(line[5:]) } } c.AddAPIResponseData(ctx, line) diff --git a/internal/translator/claude/gemini/claude_gemini_response.go b/internal/translator/claude/gemini/claude_gemini_response.go index aab4b344..74de0c0b 100644 --- a/internal/translator/claude/gemini/claude_gemini_response.go +++ b/internal/translator/claude/gemini/claude_gemini_response.go @@ -17,7 +17,7 @@ import ( ) var ( - dataTag = []byte("data: ") + dataTag = []byte("data:") ) // ConvertAnthropicResponseToGeminiParams holds parameters for response conversion @@ -64,7 +64,7 @@ func ConvertClaudeResponseToGemini(_ context.Context, modelName string, original if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) root := gjson.ParseBytes(rawJSON) eventType := root.Get("type").String() @@ -336,7 +336,7 @@ func ConvertClaudeResponseToGeminiNonStream(_ context.Context, modelName string, line := scanner.Bytes() // log.Debug(string(line)) if bytes.HasPrefix(line, dataTag) { - jsonData := line[6:] + jsonData := bytes.TrimSpace(line[5:]) streamingEvents = append(streamingEvents, jsonData) } } diff --git a/internal/translator/claude/openai/chat-completions/claude_openai_response.go b/internal/translator/claude/openai/chat-completions/claude_openai_response.go index 7cdbdfd0..0d11aedc 100644 --- a/internal/translator/claude/openai/chat-completions/claude_openai_response.go +++ b/internal/translator/claude/openai/chat-completions/claude_openai_response.go @@ -18,7 +18,7 @@ import ( ) var ( - dataTag = []byte("data: ") + dataTag = []byte("data:") ) // ConvertAnthropicResponseToOpenAIParams holds parameters for response conversion @@ -62,7 +62,7 @@ func ConvertClaudeResponseToOpenAI(_ context.Context, modelName string, original if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) root := gjson.ParseBytes(rawJSON) eventType := root.Get("type").String() @@ -289,7 +289,7 @@ func ConvertClaudeResponseToOpenAINonStream(_ context.Context, _ string, origina if !bytes.HasPrefix(line, dataTag) { continue } - chunks = append(chunks, line[6:]) + chunks = append(chunks, bytes.TrimSpace(rawJSON[5:])) } // Base OpenAI non-streaming response template diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index 8f956e07..f0d0d2a7 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -34,7 +34,7 @@ type claudeToResponsesState struct { ReasoningIndex int } -var dataTag = []byte("data: ") +var dataTag = []byte("data:") func emitEvent(event string, payload string) string { return fmt.Sprintf("event: %s\ndata: %s\n\n", event, payload) @@ -51,7 +51,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) root := gjson.ParseBytes(rawJSON) ev := root.Get("type").String() var out []string diff --git a/internal/translator/codex/claude/codex_claude_response.go b/internal/translator/codex/claude/codex_claude_response.go index 704568e1..64d4cc67 100644 --- a/internal/translator/codex/claude/codex_claude_response.go +++ b/internal/translator/codex/claude/codex_claude_response.go @@ -16,7 +16,7 @@ import ( ) var ( - dataTag = []byte("data: ") + dataTag = []byte("data:") ) // ConvertCodexResponseToClaude performs sophisticated streaming response format conversion. @@ -45,7 +45,7 @@ func ConvertCodexResponseToClaude(_ context.Context, _ string, originalRequestRa if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) output := "" rootResult := gjson.ParseBytes(rawJSON) diff --git a/internal/translator/codex/gemini/codex_gemini_response.go b/internal/translator/codex/gemini/codex_gemini_response.go index 67559ac2..20d255a4 100644 --- a/internal/translator/codex/gemini/codex_gemini_response.go +++ b/internal/translator/codex/gemini/codex_gemini_response.go @@ -16,7 +16,7 @@ import ( ) var ( - dataTag = []byte("data: ") + dataTag = []byte("data:") ) // ConvertCodexResponseToGeminiParams holds parameters for response conversion. @@ -53,7 +53,7 @@ func ConvertCodexResponseToGemini(_ context.Context, modelName string, originalR if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) rootResult := gjson.ParseBytes(rawJSON) typeResult := rootResult.Get("type") @@ -161,7 +161,7 @@ func ConvertCodexResponseToGeminiNonStream(_ context.Context, modelName string, if !bytes.HasPrefix(line, dataTag) { continue } - rawJSON = line[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) rootResult := gjson.ParseBytes(rawJSON) diff --git a/internal/translator/codex/openai/chat-completions/codex_openai_response.go b/internal/translator/codex/openai/chat-completions/codex_openai_response.go index 9a596426..7ecf05be 100644 --- a/internal/translator/codex/openai/chat-completions/codex_openai_response.go +++ b/internal/translator/codex/openai/chat-completions/codex_openai_response.go @@ -16,7 +16,7 @@ import ( ) var ( - dataTag = []byte("data: ") + dataTag = []byte("data:") ) // ConvertCliToOpenAIParams holds parameters for response conversion. @@ -54,7 +54,7 @@ func ConvertCodexResponseToOpenAI(_ context.Context, modelName string, originalR if !bytes.HasPrefix(rawJSON, dataTag) { return []string{} } - rawJSON = rawJSON[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) // Initialize the OpenAI SSE template. template := `{"id":"","object":"chat.completion.chunk","created":12345,"model":"model","choices":[{"index":0,"delta":{"role":null,"content":null,"reasoning_content":null,"tool_calls":null},"finish_reason":null,"native_finish_reason":null}]}` @@ -175,7 +175,7 @@ func ConvertCodexResponseToOpenAINonStream(_ context.Context, _ string, original if !bytes.HasPrefix(line, dataTag) { continue } - rawJSON = line[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) rootResult := gjson.ParseBytes(rawJSON) // Verify this is a response.completed event diff --git a/internal/translator/codex/openai/responses/codex_openai-responses_response.go b/internal/translator/codex/openai/responses/codex_openai-responses_response.go index 9707e05e..0652ef4b 100644 --- a/internal/translator/codex/openai/responses/codex_openai-responses_response.go +++ b/internal/translator/codex/openai/responses/codex_openai-responses_response.go @@ -13,8 +13,8 @@ import ( // ConvertCodexResponseToOpenAIResponses converts OpenAI Chat Completions streaming chunks // to OpenAI Responses SSE events (response.*). func ConvertCodexResponseToOpenAIResponses(ctx context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, param *any) []string { - if bytes.HasPrefix(rawJSON, []byte("data: ")) { - rawJSON = rawJSON[6:] + if bytes.HasPrefix(rawJSON, []byte("data:")) { + rawJSON = bytes.TrimSpace(rawJSON[5:]) if typeResult := gjson.GetBytes(rawJSON, "type"); typeResult.Exists() { typeStr := typeResult.String() if typeStr == "response.created" || typeStr == "response.in_progress" || typeStr == "response.completed" { @@ -32,14 +32,14 @@ func ConvertCodexResponseToOpenAIResponsesNonStream(_ context.Context, modelName scanner := bufio.NewScanner(bytes.NewReader(rawJSON)) buffer := make([]byte, 10240*1024) scanner.Buffer(buffer, 10240*1024) - dataTag := []byte("data: ") + dataTag := []byte("data:") for scanner.Scan() { line := scanner.Bytes() if !bytes.HasPrefix(line, dataTag) { continue } - rawJSON = line[6:] + rawJSON = bytes.TrimSpace(rawJSON[5:]) rootResult := gjson.ParseBytes(rawJSON) // Verify this is a response.completed event