fix(openai): preserve multiline repaired SSE data
This commit is contained in:
@@ -148,9 +148,12 @@ func responsesSSEFrameWithData(frame, payload []byte) []byte {
|
|||||||
out.Write(line)
|
out.Write(line)
|
||||||
out.WriteByte('\n')
|
out.WriteByte('\n')
|
||||||
}
|
}
|
||||||
out.WriteString("data: ")
|
for _, line := range bytes.Split(payload, []byte("\n")) {
|
||||||
out.Write(payload)
|
out.WriteString("data: ")
|
||||||
out.WriteString("\n\n")
|
out.Write(line)
|
||||||
|
out.WriteByte('\n')
|
||||||
|
}
|
||||||
|
out.WriteByte('\n')
|
||||||
return out.Bytes()
|
return out.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -122,6 +122,40 @@ func TestForwardResponsesStreamRepairsMixedIndexedAndUnindexedDoneItems(t *testi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestForwardResponsesStreamRepairsMultilineCompletedOutputAsSSEDataLines(t *testing.T) {
|
||||||
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
||||||
|
|
||||||
|
data := make(chan []byte, 2)
|
||||||
|
errs := make(chan *interfaces.ErrorMessage)
|
||||||
|
data <- []byte(`data: {"type":"response.output_item.done","item":{"type":"function_call","arguments":"{}"}}`)
|
||||||
|
data <- []byte("data: {\"type\":\"response.completed\",\ndata: \"response\":{\"id\":\"resp-1\",\"output\":[]}}\n\n")
|
||||||
|
close(data)
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
h.forwardResponsesStream(c, flusher, func(error) {}, data, errs, nil)
|
||||||
|
|
||||||
|
parts := strings.Split(strings.TrimSpace(recorder.Body.String()), "\n\n")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
t.Fatalf("expected 2 SSE events, got %d. Body: %q", len(parts), recorder.Body.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
completedFrame := []byte(parts[1])
|
||||||
|
for _, line := range strings.Split(parts[1], "\n") {
|
||||||
|
if line != "" && !strings.HasPrefix(line, "data: ") {
|
||||||
|
t.Fatalf("expected every completed payload line to be an SSE data line, got %q in %q", line, parts[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, ok := responsesSSEDataPayload(completedFrame)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected completed frame to contain data payload: %q", parts[1])
|
||||||
|
}
|
||||||
|
output := gjson.GetBytes(payload, "response.output")
|
||||||
|
if !output.IsArray() || len(output.Array()) != 1 {
|
||||||
|
t.Fatalf("expected repaired completed output with 1 item, got %s from %q", output.Raw, payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestForwardResponsesStreamReassemblesSplitSSEEventChunks(t *testing.T) {
|
func TestForwardResponsesStreamReassemblesSplitSSEEventChunks(t *testing.T) {
|
||||||
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
h, recorder, c, flusher := newResponsesStreamTestHandler(t)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user