fix(codex): preserve continuity and safe affinity fallback

Restore Claude continuity after the continuity refactor, keep auth-affinity keys out of upstream Codex session identifiers, and only persist affinity after successful execution so retries can still rotate to healthy credentials when the first auth fails.
This commit is contained in:
VooDisss
2026-03-27 18:27:33 +02:00
parent 62b17f40a1
commit 26eca8b6ba
6 changed files with 79 additions and 21 deletions
@@ -57,9 +57,6 @@ func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cl
if executionSession := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); executionSession != "" { if executionSession := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); executionSession != "" {
return codexContinuity{Key: executionSession, Source: "execution_session"} return codexContinuity{Key: executionSession, Source: "execution_session"}
} }
if affinityKey := metadataString(opts.Metadata, codexAuthAffinityMetadataKey); affinityKey != "" {
return codexContinuity{Key: affinityKey, Source: "auth_affinity"}
}
if ginCtx := ginContextFrom(ctx); ginCtx != nil { if ginCtx := ginContextFrom(ctx); ginCtx != nil {
if ginCtx.Request != nil { if ginCtx.Request != nil {
if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" { if v := strings.TrimSpace(ginCtx.GetHeader("Idempotency-Key")); v != "" {
@@ -612,6 +612,7 @@ func (e *CodexExecutor) cacheHelper(ctx context.Context, auth *cliproxyauth.Auth
} }
setCodexCache(key, cache) setCodexCache(key, cache)
} }
continuity = codexContinuity{Key: cache.ID, Source: "claude_user_cache"}
} }
} else if from == "openai-response" { } else if from == "openai-response" {
promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key") promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key")
@@ -151,3 +151,45 @@ func TestCodexExecutorCacheHelper_OpenAIChatCompletions_FallsBackToStableAuthID(
t.Fatalf("session_id = %q, want %q", got, expected) t.Fatalf("session_id = %q, want %q", got, expected)
} }
} }
func TestCodexExecutorCacheHelper_ClaudePreservesCacheContinuity(t *testing.T) {
executor := &CodexExecutor{}
req := cliproxyexecutor.Request{
Model: "claude-3-7-sonnet",
Payload: []byte(`{"metadata":{"user_id":"user-1"}}`),
}
rawJSON := []byte(`{"model":"gpt-5.4","stream":true}`)
httpReq, continuity, err := executor.cacheHelper(context.Background(), nil, sdktranslator.FromString("claude"), "https://example.com/responses", req, cliproxyexecutor.Options{}, rawJSON)
if err != nil {
t.Fatalf("cacheHelper error: %v", err)
}
if continuity.Key == "" {
t.Fatal("continuity.Key = empty, want non-empty")
}
body, err := io.ReadAll(httpReq.Body)
if err != nil {
t.Fatalf("read request body: %v", err)
}
if got := gjson.GetBytes(body, "prompt_cache_key").String(); got != continuity.Key {
t.Fatalf("prompt_cache_key = %q, want %q", got, continuity.Key)
}
if got := httpReq.Header.Get("session_id"); got != continuity.Key {
t.Fatalf("session_id = %q, want %q", got, continuity.Key)
}
}
func TestResolveCodexContinuity_DoesNotForwardAuthAffinityKey(t *testing.T) {
req := cliproxyexecutor.Request{Payload: []byte(`{"model":"gpt-5.4"}`)}
opts := cliproxyexecutor.Options{Metadata: map[string]any{"auth_affinity_key": "principal:raw-client-secret"}}
auth := &cliproxyauth.Auth{ID: "codex-auth-1", Provider: "codex"}
continuity := resolveCodexContinuity(context.Background(), auth, req, opts)
if continuity.Source != "auth_id" {
t.Fatalf("continuity.Source = %q, want %q", continuity.Source, "auth_id")
}
if continuity.Key == "principal:raw-client-secret" {
t.Fatal("continuity.Key leaked raw auth affinity key")
}
}
@@ -783,6 +783,7 @@ func applyCodexPromptCacheHeaders(ctx context.Context, auth *cliproxyauth.Auth,
} }
setCodexCache(key, cache) setCodexCache(key, cache)
} }
continuity = codexContinuity{Key: cache.ID, Source: "claude_user_cache"}
} }
} else if from == "openai-response" { } else if from == "openai-response" {
if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() {
@@ -57,6 +57,26 @@ func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T
} }
} }
func TestApplyCodexPromptCacheHeaders_ClaudePreservesContinuity(t *testing.T) {
req := cliproxyexecutor.Request{
Model: "claude-3-7-sonnet",
Payload: []byte(`{"metadata":{"user_id":"user-1"}}`),
}
body := []byte(`{"model":"gpt-5.4","stream":true}`)
updatedBody, headers, continuity := applyCodexPromptCacheHeaders(context.Background(), nil, sdktranslator.FromString("claude"), req, cliproxyexecutor.Options{}, body)
if continuity.Key == "" {
t.Fatal("continuity.Key = empty, want non-empty")
}
if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != continuity.Key {
t.Fatalf("prompt_cache_key = %q, want %q", got, continuity.Key)
}
if got := headers.Get("session_id"); got != continuity.Key {
t.Fatalf("session_id = %q, want %q", got, continuity.Key)
}
}
func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) { func TestApplyCodexWebsocketHeadersDefaultsToCurrentResponsesBeta(t *testing.T) {
headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil) headers := applyCodexWebsocketHeaders(context.Background(), http.Header{}, nil, "", nil)
+15 -18
View File
@@ -1093,12 +1093,6 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
entry := logEntryWithRequestID(ctx) entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model) debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID) publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" {
m.SetAuthAffinity(affinityKey, auth.ID)
if log.IsLevelEnabled(log.DebugLevel) {
entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model)
}
}
tried[auth.ID] = struct{}{} tried[auth.ID] = struct{}{}
execCtx := ctx execCtx := ctx
@@ -1138,6 +1132,7 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
continue continue
} }
m.MarkResult(execCtx, result) m.MarkResult(execCtx, result)
m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model)
return resp, nil return resp, nil
} }
if authErr != nil { if authErr != nil {
@@ -1177,12 +1172,6 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
entry := logEntryWithRequestID(ctx) entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model) debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID) publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" {
m.SetAuthAffinity(affinityKey, auth.ID)
if log.IsLevelEnabled(log.DebugLevel) {
entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model)
}
}
tried[auth.ID] = struct{}{} tried[auth.ID] = struct{}{}
execCtx := ctx execCtx := ctx
@@ -1222,6 +1211,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
continue continue
} }
m.MarkResult(execCtx, result) m.MarkResult(execCtx, result)
m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model)
return resp, nil return resp, nil
} }
if authErr != nil { if authErr != nil {
@@ -1269,12 +1259,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
entry := logEntryWithRequestID(ctx) entry := logEntryWithRequestID(ctx)
debugLogAuthSelection(entry, auth, provider, req.Model) debugLogAuthSelection(entry, auth, provider, req.Model)
publishSelectedAuthMetadata(opts.Metadata, auth.ID) publishSelectedAuthMetadata(opts.Metadata, auth.ID)
if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" {
m.SetAuthAffinity(affinityKey, auth.ID)
if log.IsLevelEnabled(log.DebugLevel) {
entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, auth.ID, provider, req.Model)
}
}
tried[auth.ID] = struct{}{} tried[auth.ID] = struct{}{}
execCtx := ctx execCtx := ctx
@@ -1298,6 +1282,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
lastErr = errStream lastErr = errStream
continue continue
} }
m.persistAuthAffinity(entry, opts, auth.ID, provider, req.Model)
return streamResult, nil return streamResult, nil
} }
} }
@@ -2285,6 +2270,18 @@ func (m *Manager) applyAuthAffinity(opts *cliproxyexecutor.Options) {
} }
} }
func (m *Manager) persistAuthAffinity(entry *log.Entry, opts cliproxyexecutor.Options, authID, provider, model string) {
if m == nil {
return
}
if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" {
m.SetAuthAffinity(affinityKey, authID)
if entry != nil && log.IsLevelEnabled(log.DebugLevel) {
entry.Debugf("auth affinity pinned key=%s auth_id=%s provider=%s model=%s", affinityKey, authID, provider, model)
}
}
}
func (m *Manager) SetAuthAffinity(key, authID string) { func (m *Manager) SetAuthAffinity(key, authID string) {
key = strings.TrimSpace(key) key = strings.TrimSpace(key)
authID = strings.TrimSpace(authID) authID = strings.TrimSpace(authID)