Harden Responses SSE framing against partial chunk boundaries
Follow-up review found two real framing hazards in the handler-layer framer: it could flush a partial `data:` payload before the JSON was complete, and it could inject an extra newline before chunks that already began with `\n`/`\r\n`. This commit tightens the framer so it only emits undelimited events when the buffered `data:` payload is already valid JSON (or `[DONE]`), skips newline injection for chunks that already start with a line break, and avoids the heavier `bytes.Split` path while scanning SSE fields. The regression suite now covers split `data:` payload chunks, newline-prefixed chunks, and dropping incomplete trailing data on flush, so the original Responses fix remains intact while the review concerns are explicitly locked down. Constraint: Keep the follow-up limited to handler-layer framing and tests Rejected: Ignore the review and rely on current executor chunk shapes | leaves partial data payload corruption possible Rejected: Build a fully generic SSE parser | wider change than needed for the identified risks Confidence: high Scope-risk: narrow Reversibility: clean Directive: Do not emit undelimited Responses SSE events unless buffered `data:` content is already complete and valid Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers/openai -count=1 Tested: /tmp/go1.26.1/go/bin/go test ./sdk/api/handlers -count=1 Tested: /tmp/go1.26.1/go/bin/go vet ./sdk/api/handlers/... Not-tested: Full repository test suite outside sdk/api/handlers packages
This commit is contained in:
@@ -9,6 +9,7 @@ package openai
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -68,7 +69,7 @@ func (f *responsesSSEFramer) WriteChunk(w io.Writer, chunk []byte) {
|
|||||||
f.pending = f.pending[:0]
|
f.pending = f.pending[:0]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(f.pending) == 0 || responsesSSENeedsMoreData(f.pending) {
|
if len(f.pending) == 0 || !responsesSSECanEmitWithoutDelimiter(f.pending) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
writeResponsesSSEChunk(w, f.pending)
|
writeResponsesSSEChunk(w, f.pending)
|
||||||
@@ -83,7 +84,7 @@ func (f *responsesSSEFramer) Flush(w io.Writer) {
|
|||||||
f.pending = f.pending[:0]
|
f.pending = f.pending[:0]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if responsesSSENeedsMoreData(f.pending) {
|
if !responsesSSECanEmitWithoutDelimiter(f.pending) {
|
||||||
f.pending = f.pending[:0]
|
f.pending = f.pending[:0]
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -121,7 +122,15 @@ func responsesSSENeedsMoreData(chunk []byte) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func responsesSSEHasField(chunk []byte, prefix []byte) bool {
|
func responsesSSEHasField(chunk []byte, prefix []byte) bool {
|
||||||
for _, line := range bytes.Split(chunk, []byte("\n")) {
|
s := chunk
|
||||||
|
for len(s) > 0 {
|
||||||
|
line := s
|
||||||
|
if i := bytes.IndexByte(s, '\n'); i >= 0 {
|
||||||
|
line = s[:i]
|
||||||
|
s = s[i+1:]
|
||||||
|
} else {
|
||||||
|
s = nil
|
||||||
|
}
|
||||||
line = bytes.TrimSpace(line)
|
line = bytes.TrimSpace(line)
|
||||||
if bytes.HasPrefix(line, prefix) {
|
if bytes.HasPrefix(line, prefix) {
|
||||||
return true
|
return true
|
||||||
@@ -130,6 +139,39 @@ func responsesSSEHasField(chunk []byte, prefix []byte) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func responsesSSECanEmitWithoutDelimiter(chunk []byte) bool {
|
||||||
|
trimmed := bytes.TrimSpace(chunk)
|
||||||
|
if len(trimmed) == 0 || responsesSSENeedsMoreData(trimmed) || !responsesSSEHasField(trimmed, []byte("data:")) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return responsesSSEDataLinesValid(trimmed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func responsesSSEDataLinesValid(chunk []byte) bool {
|
||||||
|
s := chunk
|
||||||
|
for len(s) > 0 {
|
||||||
|
line := s
|
||||||
|
if i := bytes.IndexByte(s, '\n'); i >= 0 {
|
||||||
|
line = s[:i]
|
||||||
|
s = s[i+1:]
|
||||||
|
} else {
|
||||||
|
s = nil
|
||||||
|
}
|
||||||
|
line = bytes.TrimSpace(line)
|
||||||
|
if len(line) == 0 || !bytes.HasPrefix(line, []byte("data:")) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
data := bytes.TrimSpace(line[len("data:"):])
|
||||||
|
if len(data) == 0 || bytes.Equal(data, []byte("[DONE]")) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !json.Valid(data) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func responsesSSENeedsLineBreak(pending, chunk []byte) bool {
|
func responsesSSENeedsLineBreak(pending, chunk []byte) bool {
|
||||||
if len(pending) == 0 || len(chunk) == 0 {
|
if len(pending) == 0 || len(chunk) == 0 {
|
||||||
return false
|
return false
|
||||||
@@ -137,9 +179,12 @@ func responsesSSENeedsLineBreak(pending, chunk []byte) bool {
|
|||||||
if bytes.HasSuffix(pending, []byte("\n")) || bytes.HasSuffix(pending, []byte("\r")) {
|
if bytes.HasSuffix(pending, []byte("\n")) || bytes.HasSuffix(pending, []byte("\r")) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
trimmed := bytes.TrimSpace(chunk)
|
if chunk[0] == '\n' || chunk[0] == '\r' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
trimmed := bytes.TrimLeft(chunk, " \t")
|
||||||
if len(trimmed) == 0 {
|
if len(trimmed) == 0 {
|
||||||
return true
|
return false
|
||||||
}
|
}
|
||||||
for _, prefix := range [][]byte{[]byte("data:"), []byte("event:"), []byte("id:"), []byte("retry:"), []byte(":")} {
|
for _, prefix := range [][]byte{[]byte("data:"), []byte("event:"), []byte("id:"), []byte("retry:"), []byte(":")} {
|
||||||
if bytes.HasPrefix(trimmed, prefix) {
|
if bytes.HasPrefix(trimmed, prefix) {
|
||||||
|
|||||||
@@ -96,3 +96,47 @@ func TestForwardResponsesStreamPreservesValidFullSSEEventChunks(t *testing.T) {
|
|||||||
t.Fatalf("unexpected full-event framing.\nGot: %q\nWant: %q", got, string(chunk))
|
t.Fatalf("unexpected full-event framing.\nGot: %q\nWant: %q", got, string(chunk))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForwardResponsesStreamBuffersSplitDataPayloadChunks(t *testing.T) {
|
||||||
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
||||||
|
|
||||||
|
data := make(chan []byte, 2)
|
||||||
|
errs := make(chan *interfaces.ErrorMessage)
|
||||||
|
data <- []byte("data: {\"type\":\"response.created\"")
|
||||||
|
data <- []byte(",\"response\":{\"id\":\"resp-1\"}}")
|
||||||
|
close(data)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
||||||
|
|
||||||
|
got := recorder.Body.String()
|
||||||
|
want := "data: {\"type\":\"response.created\",\"response\":{\"id\":\"resp-1\"}}\n\n\n"
|
||||||
|
if got != want {
|
||||||
|
t.Fatalf("unexpected split-data framing.\nGot: %q\nWant: %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResponsesSSENeedsLineBreakSkipsChunksThatAlreadyStartWithNewline(t *testing.T) {
|
||||||
|
if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\n")) {
|
||||||
|
t.Fatal("expected no injected newline before newline-only chunk")
|
||||||
|
}
|
||||||
|
if responsesSSENeedsLineBreak([]byte("event: response.created"), []byte("\r\n")) {
|
||||||
|
t.Fatal("expected no injected newline before CRLF chunk")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestForwardResponsesStreamDropsIncompleteTrailingDataChunkOnFlush(t *testing.T) {
|
||||||
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
||||||
|
|
||||||
|
data := make(chan []byte, 1)
|
||||||
|
errs := make(chan *interfaces.ErrorMessage)
|
||||||
|
data <- []byte("data: {\"type\":\"response.created\"")
|
||||||
|
close(data)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
||||||
|
|
||||||
|
if got := recorder.Body.String(); got != "\n" {
|
||||||
|
t.Fatalf("expected incomplete trailing data to be dropped on flush.\nGot: %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user