Merge pull request #2939 from stringer07/fix/codex-stream-output-backfill
fix(codex): backfill streaming response output
This commit is contained in:
@@ -36,6 +36,69 @@ const (
|
|||||||
|
|
||||||
var dataTag = []byte("data:")
|
var dataTag = []byte("data:")
|
||||||
|
|
||||||
|
// Streamed Codex responses may emit response.output_item.done events while leaving
|
||||||
|
// response.completed.response.output empty. Keep the stream path aligned with the
|
||||||
|
// already-patched non-stream path by reconstructing response.output from those items.
|
||||||
|
func collectCodexOutputItemDone(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback *[][]byte) {
|
||||||
|
itemResult := gjson.GetBytes(eventData, "item")
|
||||||
|
if !itemResult.Exists() || itemResult.Type != gjson.JSON {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
outputIndexResult := gjson.GetBytes(eventData, "output_index")
|
||||||
|
if outputIndexResult.Exists() {
|
||||||
|
outputItemsByIndex[outputIndexResult.Int()] = []byte(itemResult.Raw)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*outputItemsFallback = append(*outputItemsFallback, []byte(itemResult.Raw))
|
||||||
|
}
|
||||||
|
|
||||||
|
func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][]byte, outputItemsFallback [][]byte) []byte {
|
||||||
|
outputResult := gjson.GetBytes(eventData, "response.output")
|
||||||
|
shouldPatchOutput := (!outputResult.Exists() || !outputResult.IsArray() || len(outputResult.Array()) == 0) && (len(outputItemsByIndex) > 0 || len(outputItemsFallback) > 0)
|
||||||
|
if !shouldPatchOutput {
|
||||||
|
return eventData
|
||||||
|
}
|
||||||
|
|
||||||
|
indexes := make([]int64, 0, len(outputItemsByIndex))
|
||||||
|
for idx := range outputItemsByIndex {
|
||||||
|
indexes = append(indexes, idx)
|
||||||
|
}
|
||||||
|
sort.Slice(indexes, func(i, j int) bool {
|
||||||
|
return indexes[i] < indexes[j]
|
||||||
|
})
|
||||||
|
|
||||||
|
items := make([][]byte, 0, len(outputItemsByIndex)+len(outputItemsFallback))
|
||||||
|
for _, idx := range indexes {
|
||||||
|
items = append(items, outputItemsByIndex[idx])
|
||||||
|
}
|
||||||
|
items = append(items, outputItemsFallback...)
|
||||||
|
|
||||||
|
outputArray := []byte("[]")
|
||||||
|
if len(items) > 0 {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
totalLen := 2
|
||||||
|
for _, item := range items {
|
||||||
|
totalLen += len(item)
|
||||||
|
}
|
||||||
|
if len(items) > 1 {
|
||||||
|
totalLen += len(items) - 1
|
||||||
|
}
|
||||||
|
buf.Grow(totalLen)
|
||||||
|
buf.WriteByte('[')
|
||||||
|
for i, item := range items {
|
||||||
|
if i > 0 {
|
||||||
|
buf.WriteByte(',')
|
||||||
|
}
|
||||||
|
buf.Write(item)
|
||||||
|
}
|
||||||
|
buf.WriteByte(']')
|
||||||
|
outputArray = buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
completedDataPatched, _ := sjson.SetRawBytes(eventData, "response.output", outputArray)
|
||||||
|
return completedDataPatched
|
||||||
|
}
|
||||||
|
|
||||||
// CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint).
|
// CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint).
|
||||||
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
|
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
|
||||||
type CodexExecutor struct {
|
type CodexExecutor struct {
|
||||||
@@ -414,20 +477,28 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
scanner := bufio.NewScanner(httpResp.Body)
|
scanner := bufio.NewScanner(httpResp.Body)
|
||||||
scanner.Buffer(nil, 52_428_800) // 50MB
|
scanner.Buffer(nil, 52_428_800) // 50MB
|
||||||
var param any
|
var param any
|
||||||
|
outputItemsByIndex := make(map[int64][]byte)
|
||||||
|
var outputItemsFallback [][]byte
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Bytes()
|
line := scanner.Bytes()
|
||||||
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
|
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
|
||||||
|
translatedLine := bytes.Clone(line)
|
||||||
|
|
||||||
if bytes.HasPrefix(line, dataTag) {
|
if bytes.HasPrefix(line, dataTag) {
|
||||||
data := bytes.TrimSpace(line[5:])
|
data := bytes.TrimSpace(line[5:])
|
||||||
if gjson.GetBytes(data, "type").String() == "response.completed" {
|
switch gjson.GetBytes(data, "type").String() {
|
||||||
|
case "response.output_item.done":
|
||||||
|
collectCodexOutputItemDone(data, outputItemsByIndex, &outputItemsFallback)
|
||||||
|
case "response.completed":
|
||||||
if detail, ok := helps.ParseCodexUsage(data); ok {
|
if detail, ok := helps.ParseCodexUsage(data); ok {
|
||||||
reporter.Publish(ctx, detail)
|
reporter.Publish(ctx, detail)
|
||||||
}
|
}
|
||||||
|
data = patchCodexCompletedOutput(data, outputItemsByIndex, outputItemsFallback)
|
||||||
|
translatedLine = append([]byte("data: "), data...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, bytes.Clone(line), ¶m)
|
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, translatedLine, ¶m)
|
||||||
for i := range chunks {
|
for i := range chunks {
|
||||||
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
|
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package executor
|
package executor
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@@ -44,3 +45,53 @@ func TestCodexExecutorExecute_EmptyStreamCompletionOutputUsesOutputItemDone(t *t
|
|||||||
t.Fatalf("choices.0.message.content = %q, want %q; payload=%s", gotContent, "ok", string(resp.Payload))
|
t.Fatalf("choices.0.message.content = %q, want %q; payload=%s", gotContent, "ok", string(resp.Payload))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCodexExecutorExecuteStream_EmptyStreamCompletionOutputUsesOutputItemDone(t *testing.T) {
|
||||||
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
_, _ = w.Write([]byte("data: {\"type\":\"response.output_item.done\",\"item\":{\"type\":\"message\",\"role\":\"assistant\",\"content\":[{\"type\":\"output_text\",\"text\":\"ok\"}]},\"output_index\":0}\n"))
|
||||||
|
_, _ = w.Write([]byte("data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_1\",\"object\":\"response\",\"created_at\":1775555723,\"status\":\"completed\",\"model\":\"gpt-5.4-mini-2026-03-17\",\"output\":[],\"usage\":{\"input_tokens\":8,\"output_tokens\":28,\"total_tokens\":36}}}\n\n"))
|
||||||
|
}))
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
|
executor := NewCodexExecutor(&config.Config{})
|
||||||
|
auth := &cliproxyauth.Auth{Attributes: map[string]string{
|
||||||
|
"base_url": server.URL,
|
||||||
|
"api_key": "test",
|
||||||
|
}}
|
||||||
|
|
||||||
|
result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
|
||||||
|
Model: "gpt-5.4-mini",
|
||||||
|
Payload: []byte(`{"model":"gpt-5.4-mini","input":"Say ok"}`),
|
||||||
|
}, cliproxyexecutor.Options{
|
||||||
|
SourceFormat: sdktranslator.FromString("openai-response"),
|
||||||
|
Stream: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ExecuteStream error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var completed []byte
|
||||||
|
for chunk := range result.Chunks {
|
||||||
|
if chunk.Err != nil {
|
||||||
|
t.Fatalf("stream chunk error: %v", chunk.Err)
|
||||||
|
}
|
||||||
|
payload := bytes.TrimSpace(chunk.Payload)
|
||||||
|
if !bytes.HasPrefix(payload, []byte("data:")) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
data := bytes.TrimSpace(payload[5:])
|
||||||
|
if gjson.GetBytes(data, "type").String() == "response.completed" {
|
||||||
|
completed = append([]byte(nil), data...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(completed) == 0 {
|
||||||
|
t.Fatal("missing response.completed chunk")
|
||||||
|
}
|
||||||
|
|
||||||
|
gotContent := gjson.GetBytes(completed, "response.output.0.content.0.text").String()
|
||||||
|
if gotContent != "ok" {
|
||||||
|
t.Fatalf("response.output[0].content[0].text = %q, want %q; completed=%s", gotContent, "ok", string(completed))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user