Merge pull request #3476 from sususu98/fix/codex-context-length-stream-errors-dev

fix codex context length stream errors
This commit is contained in:
Luis Pater
2026-05-21 02:53:54 +08:00
committed by GitHub
4 changed files with 480 additions and 2 deletions
+111
View File
@@ -100,6 +100,103 @@ func patchCodexCompletedOutput(eventData []byte, outputItemsByIndex map[int64][]
return completedDataPatched
}
func codexTerminalStreamContextLengthErr(eventData []byte) (statusErr, bool) {
eventType := gjson.GetBytes(eventData, "type").String()
var body []byte
switch eventType {
case "error":
body = codexTerminalErrorBody(eventData, "error")
if len(body) == 0 {
body = codexTerminalTopLevelErrorBody(eventData)
}
case "response.failed":
body = codexTerminalErrorBody(eventData, "response.error")
if len(body) == 0 {
body = codexTerminalErrorBody(eventData, "error")
}
default:
return statusErr{}, false
}
if len(body) == 0 {
return statusErr{}, false
}
if !codexTerminalErrorIsContextLength(body) {
return statusErr{}, false
}
return newCodexStatusErr(http.StatusBadRequest, body), true
}
func codexTerminalErrorBody(eventData []byte, path string) []byte {
errorResult := gjson.GetBytes(eventData, path)
if !errorResult.Exists() {
return nil
}
body := []byte(`{"error":{}}`)
if errorResult.Type == gjson.JSON {
body, _ = sjson.SetRawBytes(body, "error", []byte(errorResult.Raw))
} else if message := strings.TrimSpace(errorResult.String()); message != "" {
body, _ = sjson.SetBytes(body, "error.message", message)
}
if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" {
if message := strings.TrimSpace(gjson.GetBytes(eventData, "response.error.message").String()); message != "" {
body, _ = sjson.SetBytes(body, "error.message", message)
}
}
if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" {
if code := strings.TrimSpace(gjson.GetBytes(body, "error.code").String()); code != "" {
body, _ = sjson.SetBytes(body, "error.message", code)
}
}
if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" {
if errorType := strings.TrimSpace(gjson.GetBytes(body, "error.type").String()); errorType != "" {
body, _ = sjson.SetBytes(body, "error.message", errorType)
}
}
return body
}
func codexTerminalTopLevelErrorBody(eventData []byte) []byte {
message := strings.TrimSpace(gjson.GetBytes(eventData, "message").String())
code := strings.TrimSpace(gjson.GetBytes(eventData, "code").String())
errorType := strings.TrimSpace(gjson.GetBytes(eventData, "error_type").String())
param := strings.TrimSpace(gjson.GetBytes(eventData, "param").String())
if message == "" && code == "" && errorType == "" && param == "" {
return nil
}
body := []byte(`{"error":{}}`)
if message != "" {
body, _ = sjson.SetBytes(body, "error.message", message)
}
if code != "" {
body, _ = sjson.SetBytes(body, "error.code", code)
}
if errorType != "" {
body, _ = sjson.SetBytes(body, "error.type", errorType)
}
if param != "" {
body, _ = sjson.SetBytes(body, "error.param", param)
}
if strings.TrimSpace(gjson.GetBytes(body, "error.message").String()) == "" {
if code != "" {
body, _ = sjson.SetBytes(body, "error.message", code)
} else if errorType != "" {
body, _ = sjson.SetBytes(body, "error.message", errorType)
}
}
return body
}
func codexTerminalErrorIsContextLength(body []byte) bool {
errorCode := strings.ToLower(strings.TrimSpace(gjson.GetBytes(body, "error.code").String()))
message := strings.ToLower(strings.TrimSpace(gjson.GetBytes(body, "error.message").String()))
return errorCode == "context_length_exceeded" ||
errorCode == "context_too_large" ||
strings.Contains(message, "context window") ||
strings.Contains(message, "context length") ||
strings.Contains(message, "too many tokens")
}
// CodexExecutor is a stateless executor for Codex (OpenAI Responses API entrypoint).
// If api_key is unavailable on auth, it falls back to legacy via ClientAdapter.
type CodexExecutor struct {
@@ -249,6 +346,11 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
eventData := bytes.TrimSpace(line[5:])
eventType := gjson.GetBytes(eventData, "type").String()
if streamErr, ok := codexTerminalStreamContextLengthErr(eventData); ok {
err = streamErr
return resp, err
}
if eventType == "response.output_item.done" {
itemResult := gjson.GetBytes(eventData, "item")
if !itemResult.Exists() || itemResult.Type != gjson.JSON {
@@ -506,6 +608,15 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
if bytes.HasPrefix(line, dataTag) {
data := bytes.TrimSpace(line[5:])
if streamErr, ok := codexTerminalStreamContextLengthErr(data); ok {
helps.RecordAPIResponseError(ctx, e.cfg, streamErr)
reporter.PublishFailure(ctx, streamErr)
select {
case out <- cliproxyexecutor.StreamChunk{Err: streamErr}:
case <-ctx.Done():
}
return
}
switch gjson.GetBytes(data, "type").String() {
case "response.output_item.done":
collectCodexOutputItemDone(data, outputItemsByIndex, &outputItemsFallback)
@@ -5,6 +5,7 @@ import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
@@ -46,6 +47,128 @@ func TestCodexExecutorExecute_EmptyStreamCompletionOutputUsesOutputItemDone(t *t
}
}
func TestCodexExecutorExecuteSurfacesTerminalStreamError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("event: response.created\n"))
_, _ = w.Write([]byte(`data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.5"}}` + "\n\n"))
_, _ = w.Write([]byte("event: error\n"))
_, _ = w.Write([]byte(`data: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","param":"input"},"sequence_number":2}` + "\n\n"))
_, _ = w.Write([]byte("event: response.failed\n"))
_, _ = w.Write([]byte(`data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again."}}}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
}}
_, err := executor.Execute(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.5",
Payload: []byte(`{"model":"gpt-5.5","input":"hello"}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai-response"),
Stream: false,
})
if err == nil {
t.Fatal("expected terminal stream error, got nil")
}
if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest {
t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err)
}
assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large")
if !strings.Contains(err.Error(), "Your input exceeds the context window") {
t.Fatalf("error message missing upstream context text: %v", err)
}
}
func TestCodexExecutorExecuteStreamSurfacesTerminalStreamError(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte("event: response.created\n"))
_, _ = w.Write([]byte(`data: {"type":"response.created","response":{"id":"resp_1","model":"gpt-5.5"}}` + "\n\n"))
_, _ = w.Write([]byte("event: error\n"))
_, _ = w.Write([]byte(`data: {"type":"error","error":{"type":"invalid_request_error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","param":"input"},"sequence_number":2}` + "\n\n"))
}))
defer server.Close()
executor := NewCodexExecutor(&config.Config{})
auth := &cliproxyauth.Auth{Attributes: map[string]string{
"base_url": server.URL,
"api_key": "test",
}}
result, err := executor.ExecuteStream(context.Background(), auth, cliproxyexecutor.Request{
Model: "gpt-5.5",
Payload: []byte(`{"model":"gpt-5.5","input":"hello"}`),
}, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai-response"),
Stream: true,
})
if err != nil {
t.Fatalf("ExecuteStream error: %v", err)
}
var streamErr error
for chunk := range result.Chunks {
if chunk.Err != nil {
streamErr = chunk.Err
break
}
}
if streamErr == nil {
t.Fatal("missing stream terminal error")
}
if got := statusCodeFromTestError(t, streamErr); got != http.StatusBadRequest {
t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, streamErr)
}
assertCodexErrorCode(t, streamErr.Error(), "invalid_request_error", "context_too_large")
}
func TestCodexTerminalStreamContextLengthErrFromResponseFailed(t *testing.T) {
err, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"response.failed","response":{"id":"resp_1","status":"failed","error":{"code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again."}}}`))
if !ok {
t.Fatal("expected context length terminal error")
}
if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest {
t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err)
}
assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large")
}
func TestCodexTerminalStreamContextLengthErrFromTopLevelError(t *testing.T) {
err, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"error","code":"context_length_exceeded","message":"Your input exceeds the context window of this model. Please adjust your input and try again.","sequence_number":2}`))
if !ok {
t.Fatal("expected top-level context length terminal error")
}
if got := statusCodeFromTestError(t, err); got != http.StatusBadRequest {
t.Fatalf("status code = %d, want %d; err=%v", got, http.StatusBadRequest, err)
}
assertCodexErrorCode(t, err.Error(), "invalid_request_error", "context_too_large")
if !strings.Contains(err.Error(), "Your input exceeds the context window") {
t.Fatalf("error message missing upstream context text: %v", err)
}
}
func TestCodexTerminalStreamContextLengthErrIgnoresOtherTerminalErrors(t *testing.T) {
_, ok := codexTerminalStreamContextLengthErr([]byte(`{"type":"error","error":{"type":"rate_limit_error","code":"rate_limit_exceeded","message":"Rate limit reached."}}`))
if ok {
t.Fatal("rate limit terminal error should not be handled by context length fix")
}
}
func statusCodeFromTestError(t *testing.T, err error) int {
t.Helper()
statusErr, ok := err.(interface{ StatusCode() int })
if !ok {
t.Fatalf("error %T does not expose StatusCode(): %v", err, err)
}
return statusErr.StatusCode()
}
func TestCodexExecutorExecuteStream_EmptyStreamCompletionOutputUsesOutputItemDone(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")