Prevent malformed Responses SSE frames from breaking stream clients

Line-oriented upstream executors can emit `event:` and `data:` as
separate chunks, but the Responses handler had started terminating
each incoming chunk as a full SSE event. That split `response.created`
into an empty event plus a later data block, which broke downstream
clients like OpenClaw.

This keeps the fix in the handler layer: a small stateful framer now
buffers standalone `event:` lines until the matching `data:` arrives,
preserves already-framed events, and ignores delimiter-only leftovers.
The regression suite now covers split event/data framing, full-event
passthrough, terminal errors, and the bootstrap path that forwards
line-oriented openai-response streams from non-Codex executors too.

Constraint: Keep the fix localized to Responses handler framing instead of patching every executor
Rejected: Revert to v6.9.7 chunk writing | would reintroduce data-only framing regressions
Rejected: Patch each line-oriented executor separately | duplicates fragile SSE assembly logic
Confidence: high
Scope-risk: narrow
Reversibility: clean
Directive: Do not assume incoming Responses stream chunks are already complete SSE events; preserve handler-layer reassembly for split `event:`/`data:` inputs
Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers/openai -count=1
Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers -count=1
Tested: /tmp/go1.26.1/go test ./sdk/api/handlers/... -count=1
Tested: /tmp/go1.26.1/go/bin/go vet ./sdk/api/handlers/...
Tested: Temporary patched server on 127.0.0.1:18317 -> /v1/models 200, /v1/responses non-stream 200, /v1/responses stream emitted combined `event:` + `data:` frames
Not-tested: Full repository test suite outside sdk/api/handlers packages
This commit is contained in:
davidwushi1145
2026-04-02 20:17:45 +08:00
parent 4f99bc54f1
commit abc293c642
4 changed files with 250 additions and 9 deletions
@@ -136,6 +136,8 @@ type authAwareStreamExecutor struct {
type invalidJSONStreamExecutor struct{}
type splitResponsesEventStreamExecutor struct{}
func (e *invalidJSONStreamExecutor) Identifier() string { return "codex" }
func (e *invalidJSONStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
@@ -165,6 +167,36 @@ func (e *invalidJSONStreamExecutor) HttpRequest(ctx context.Context, auth *corea
}
}
func (e *splitResponsesEventStreamExecutor) Identifier() string { return "split-sse" }
func (e *splitResponsesEventStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
}
func (e *splitResponsesEventStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) {
ch := make(chan coreexecutor.StreamChunk, 2)
ch <- coreexecutor.StreamChunk{Payload: []byte("event: response.completed")}
ch <- coreexecutor.StreamChunk{Payload: []byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}")}
close(ch)
return &coreexecutor.StreamResult{Chunks: ch}, nil
}
func (e *splitResponsesEventStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
return auth, nil
}
func (e *splitResponsesEventStreamExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "CountTokens not implemented"}
}
func (e *splitResponsesEventStreamExecutor) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (*http.Response, error) {
return nil, &coreauth.Error{
Code: "not_implemented",
Message: "HttpRequest not implemented",
HTTPStatus: http.StatusNotImplemented,
}
}
func (e *authAwareStreamExecutor) Identifier() string { return "codex" }
func (e *authAwareStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
@@ -607,3 +639,52 @@ func TestExecuteStreamWithAuthManager_ValidatesOpenAIResponsesStreamDataJSON(t *
t.Fatalf("expected terminal error")
}
}
func TestExecuteStreamWithAuthManager_AllowsSplitOpenAIResponsesSSEEventLines(t *testing.T) {
executor := &splitResponsesEventStreamExecutor{}
manager := coreauth.NewManager(nil, nil, nil)
manager.RegisterExecutor(executor)
auth1 := &coreauth.Auth{
ID: "auth1",
Provider: "split-sse",
Status: coreauth.StatusActive,
Metadata: map[string]any{"email": "test1@example.com"},
}
if _, err := manager.Register(context.Background(), auth1); err != nil {
t.Fatalf("manager.Register(auth1): %v", err)
}
registry.GetGlobalRegistry().RegisterClient(auth1.ID, auth1.Provider, []*registry.ModelInfo{{ID: "test-model"}})
t.Cleanup(func() {
registry.GetGlobalRegistry().UnregisterClient(auth1.ID)
})
handler := NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, manager)
dataChan, _, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai-response", "test-model", []byte(`{"model":"test-model"}`), "")
if dataChan == nil || errChan == nil {
t.Fatalf("expected non-nil channels")
}
var got []string
for chunk := range dataChan {
got = append(got, string(chunk))
}
for msg := range errChan {
if msg != nil {
t.Fatalf("unexpected error: %+v", msg)
}
}
if len(got) != 2 {
t.Fatalf("expected 2 forwarded chunks, got %d: %#v", len(got), got)
}
if got[0] != "event: response.completed" {
t.Fatalf("unexpected first chunk: %q", got[0])
}
expectedData := "data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp-1\",\"output\":[]}}"
if got[1] != expectedData {
t.Fatalf("unexpected second chunk.\nGot: %q\nWant: %q", got[1], expectedData)
}
}