fix(executor): enhance parsing of OpenAI stream data lines

- Added trimming for stream input lines to prevent processing of unnecessary whitespace.
- Improved handling of unsupported prefixes and malformed JSON responses, ensuring errors are recorded and propagated appropriately.

Fixed: #2690
This commit is contained in:
Luis Pater
2026-05-04 23:42:26 +08:00
parent 8262a03f29
commit e4a93c02c5
2 changed files with 98 additions and 5 deletions
@@ -283,17 +283,31 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
if detail, ok := helps.ParseOpenAIStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
if len(line) == 0 {
trimmedLine := bytes.TrimSpace(line)
if len(trimmedLine) == 0 {
continue
}
if !bytes.HasPrefix(line, []byte("data:")) {
if !bytes.HasPrefix(trimmedLine, []byte("data:")) {
if bytes.HasPrefix(trimmedLine, []byte(":")) || bytes.HasPrefix(trimmedLine, []byte("event:")) ||
bytes.HasPrefix(trimmedLine, []byte("id:")) || bytes.HasPrefix(trimmedLine, []byte("retry:")) {
continue
}
if bytes.HasPrefix(trimmedLine, []byte("{")) || bytes.HasPrefix(trimmedLine, []byte("[")) {
streamErr := statusErr{code: http.StatusBadGateway, msg: string(trimmedLine)}
helps.RecordAPIResponseError(ctx, e.cfg, streamErr)
reporter.PublishFailure(ctx)
select {
case out <- cliproxyexecutor.StreamChunk{Err: streamErr}:
case <-ctx.Done():
}
return
}
continue
}
// OpenAI-compatible streams are SSE: lines typically prefixed with "data: ".
// Pass through translator; it yields one or more chunks for the target schema.
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(line), &param)
// OpenAI-compatible streams must use SSE data lines.
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(trimmedLine), &param)
for i := range chunks {
select {
case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}:
@@ -5,6 +5,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
@@ -100,3 +101,81 @@ func TestOpenAICompatExecutorPayloadOverrideWinsOverThinkingSuffix(t *testing.T)
t.Fatalf("reasoning_effort = %q, want %q; body=%s", got, "low", string(gotBody))
}
}
func TestOpenAICompatExecutorStreamRejectsPlainJSONAfterBlankLines(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("\n\n: openrouter processing\n\nevent: error\n"))
_, _ = w.Write([]byte(`{"error":{"message":"upstream failed","type":"server_error"}}` + "\n"))
}))
defer server.Close()
executor := NewOpenAICompatExecutor("openai-compatibility", &config.Config{})
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL + "/v1",
"api_key": "test",
}}
result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "openrouter-model",
Payload: []byte(`{"model":"openrouter-model","messages":[{"role":"user","content":"hi"}],"stream":true}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}
var gotErr error
for chunk := range result.Chunks {
if chunk.Err != nil {
gotErr = chunk.Err
break
}
}
if gotErr == nil {
t.Fatalf("expected plain JSON stream error")
}
if status, ok := gotErr.(interface{ StatusCode() int }); !ok || status.StatusCode() != http.StatusBadGateway {
t.Fatalf("stream error status = %v, want %d", gotErr, http.StatusBadGateway)
}
if !strings.Contains(gotErr.Error(), "upstream failed") {
t.Fatalf("stream error = %v", gotErr)
}
}
func TestOpenAICompatExecutorStreamSkipsKeepAliveUntilDataLine(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("\n\n: openrouter processing\n\nevent: ping\nid: 1\nretry: 1000\n"))
_, _ = w.Write([]byte(`data: {"id":"chatcmpl_1","object":"chat.completion.chunk","choices":[{"index":0,"delta":{"content":"hello"},"finish_reason":null}]}` + "\n"))
}))
defer server.Close()
executor := NewOpenAICompatExecutor("openai-compatibility", &config.Config{})
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL + "/v1",
"api_key": "test",
}}
result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "openrouter-model",
Payload: []byte(`{"model":"openrouter-model","messages":[{"role":"user","content":"hi"}],"stream":true}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}
var got strings.Builder
for chunk := range result.Chunks {
if chunk.Err != nil {
t.Fatalf("unexpected stream error: %v", chunk.Err)
}
got.Write(chunk.Payload)
}
if gjson.Get(got.String(), "choices.0.delta.content").String() != "hello" {
t.Fatalf("stream payload = %s", got.String())
}
}