From 62b17f40a100c312bbdd32f9e5631858ff85af8c Mon Sep 17 00:00:00 2001 From: VooDisss Date: Fri, 27 Mar 2026 18:11:57 +0200 Subject: [PATCH] refactor(codex): align continuity helpers with review feedback Align websocket continuity resolution with the HTTP Codex path, make auth-affinity principal keys use a stable string representation, and extract small helpers that remove duplicated continuity and affinity logic without changing the validated cache-hit behavior. --- internal/runtime/executor/codex_continuity.go | 99 +++++++------------ .../executor/codex_websockets_executor.go | 23 ++--- .../codex_websockets_executor_test.go | 2 +- sdk/api/handlers/handlers.go | 20 +++- sdk/cliproxy/auth/conductor.go | 40 +++----- 5 files changed, 86 insertions(+), 98 deletions(-) diff --git a/internal/runtime/executor/codex_continuity.go b/internal/runtime/executor/codex_continuity.go index e7d4508f..3ebb721f 100644 --- a/internal/runtime/executor/codex_continuity.go +++ b/internal/runtime/executor/codex_continuity.go @@ -21,23 +21,44 @@ type codexContinuity struct { Source string } +func metadataString(meta map[string]any, key string) string { + if len(meta) == 0 { + return "" + } + raw, ok := meta[key] + if !ok || raw == nil { + return "" + } + switch v := raw.(type) { + case string: + return strings.TrimSpace(v) + case []byte: + return strings.TrimSpace(string(v)) + default: + return "" + } +} + +func principalString(raw any) string { + switch v := raw.(type) { + case string: + return strings.TrimSpace(v) + case fmt.Stringer: + return strings.TrimSpace(v.String()) + default: + return strings.TrimSpace(fmt.Sprintf("%v", raw)) + } +} + func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) codexContinuity { if promptCacheKey := strings.TrimSpace(gjson.GetBytes(req.Payload, "prompt_cache_key").String()); promptCacheKey != "" { return codexContinuity{Key: promptCacheKey, Source: "prompt_cache_key"} } - if opts.Metadata != nil { - if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - if trimmed := strings.TrimSpace(v); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "execution_session"} - } - case []byte: - if trimmed := strings.TrimSpace(string(v)); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "execution_session"} - } - } - } + if executionSession := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey); executionSession != "" { + return codexContinuity{Key: executionSession, Source: "execution_session"} + } + if affinityKey := metadataString(opts.Metadata, codexAuthAffinityMetadataKey); affinityKey != "" { + return codexContinuity{Key: affinityKey, Source: "auth_affinity"} } if ginCtx := ginContextFrom(ctx); ginCtx != nil { if ginCtx.Request != nil { @@ -46,34 +67,8 @@ func resolveCodexContinuity(ctx context.Context, auth *cliproxyauth.Auth, req cl } } if v, exists := ginCtx.Get("apiKey"); exists && v != nil { - switch value := v.(type) { - case string: - if trimmed := strings.TrimSpace(value); trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - case fmt.Stringer: - if trimmed := strings.TrimSpace(value.String()); trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - default: - trimmed := strings.TrimSpace(fmt.Sprintf("%v", value)) - if trimmed != "" { - return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} - } - } - } - } - if opts.Metadata != nil { - if raw, ok := opts.Metadata[codexAuthAffinityMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - if trimmed := strings.TrimSpace(v); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "auth_affinity"} - } - case []byte: - if trimmed := strings.TrimSpace(string(v)); trimmed != "" { - return codexContinuity{Key: trimmed, Source: "auth_affinity"} - } + if trimmed := principalString(v); trimmed != "" { + return codexContinuity{Key: uuid.NewSHA1(uuid.NameSpaceOID, []byte("cli-proxy-api:codex:prompt-cache:"+trimmed)).String(), Source: "client_principal"} } } } @@ -111,26 +106,8 @@ func logCodexRequestDiagnostics(ctx context.Context, auth *cliproxyauth.Auth, re authID = strings.TrimSpace(auth.ID) authFile = strings.TrimSpace(auth.FileName) } - selectedAuthID := "" - executionSessionID := "" - if opts.Metadata != nil { - if raw, ok := opts.Metadata[cliproxyexecutor.SelectedAuthMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - selectedAuthID = strings.TrimSpace(v) - case []byte: - selectedAuthID = strings.TrimSpace(string(v)) - } - } - if raw, ok := opts.Metadata[cliproxyexecutor.ExecutionSessionMetadataKey]; ok && raw != nil { - switch v := raw.(type) { - case string: - executionSessionID = strings.TrimSpace(v) - case []byte: - executionSessionID = strings.TrimSpace(string(v)) - } - } - } + selectedAuthID := metadataString(opts.Metadata, cliproxyexecutor.SelectedAuthMetadataKey) + executionSessionID := metadataString(opts.Metadata, cliproxyexecutor.ExecutionSessionMetadataKey) entry.Debugf( "codex request diagnostics auth_id=%s selected_auth_id=%s auth_file=%s exec_session=%s continuity_source=%s session_id=%s prompt_cache_key=%s prompt_cache_retention=%s store=%t has_instructions=%t reasoning_effort=%s reasoning_summary=%s chatgpt_account_id=%t originator=%s model=%s source_format=%s", authID, diff --git a/internal/runtime/executor/codex_websockets_executor.go b/internal/runtime/executor/codex_websockets_executor.go index b8ae11ae..d0dd22c3 100644 --- a/internal/runtime/executor/codex_websockets_executor.go +++ b/internal/runtime/executor/codex_websockets_executor.go @@ -189,8 +189,7 @@ func (e *CodexWebsocketsExecutor) Execute(ctx context.Context, auth *cliproxyaut return resp, err } - body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) - continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} + body, wsHeaders, continuity := applyCodexPromptCacheHeaders(ctx, auth, from, req, opts, body) wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -386,8 +385,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr return nil, err } - body, wsHeaders := applyCodexPromptCacheHeaders(from, req, body) - continuity := codexContinuity{Key: strings.TrimSpace(wsHeaders.Get("session_id"))} + body, wsHeaders, continuity := applyCodexPromptCacheHeaders(ctx, auth, from, req, opts, body) wsHeaders = applyCodexWebsocketHeaders(ctx, wsHeaders, auth, apiKey, e.cfg) var authID, authLabel, authType, authValue string @@ -764,13 +762,14 @@ func buildCodexResponsesWebsocketURL(httpURL string) (string, error) { return parsed.String(), nil } -func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecutor.Request, rawJSON []byte) ([]byte, http.Header) { +func applyCodexPromptCacheHeaders(ctx context.Context, auth *cliproxyauth.Auth, from sdktranslator.Format, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, rawJSON []byte) ([]byte, http.Header, codexContinuity) { headers := http.Header{} if len(rawJSON) == 0 { - return rawJSON, headers + return rawJSON, headers, codexContinuity{} } var cache codexCache + continuity := codexContinuity{} if from == "claude" { userIDResult := gjson.GetBytes(req.Payload, "metadata.user_id") if userIDResult.Exists() { @@ -788,15 +787,17 @@ func applyCodexPromptCacheHeaders(from sdktranslator.Format, req cliproxyexecuto } else if from == "openai-response" { if promptCacheKey := gjson.GetBytes(req.Payload, "prompt_cache_key"); promptCacheKey.Exists() { cache.ID = promptCacheKey.String() + continuity = codexContinuity{Key: cache.ID, Source: "prompt_cache_key"} } + } else if from == "openai" { + continuity = resolveCodexContinuity(ctx, auth, req, opts) + cache.ID = continuity.Key } - if cache.ID != "" { - rawJSON, _ = sjson.SetBytes(rawJSON, "prompt_cache_key", cache.ID) - headers.Set("session_id", cache.ID) - } + rawJSON = applyCodexContinuityBody(rawJSON, continuity) + applyCodexContinuityHeaders(headers, continuity) - return rawJSON, headers + return rawJSON, headers, continuity } func applyCodexWebsocketHeaders(ctx context.Context, headers http.Header, auth *cliproxyauth.Auth, token string, cfg *config.Config) http.Header { diff --git a/internal/runtime/executor/codex_websockets_executor_test.go b/internal/runtime/executor/codex_websockets_executor_test.go index 733318a3..e86036bc 100644 --- a/internal/runtime/executor/codex_websockets_executor_test.go +++ b/internal/runtime/executor/codex_websockets_executor_test.go @@ -41,7 +41,7 @@ func TestApplyCodexPromptCacheHeaders_PreservesPromptCacheRetention(t *testing.T } body := []byte(`{"model":"gpt-5-codex","stream":true,"prompt_cache_retention":"persistent"}`) - updatedBody, headers := applyCodexPromptCacheHeaders(sdktranslator.FromString("openai-response"), req, body) + updatedBody, headers, _ := applyCodexPromptCacheHeaders(context.Background(), nil, sdktranslator.FromString("openai-response"), req, cliproxyexecutor.Options{}, body) if got := gjson.GetBytes(updatedBody, "prompt_cache_key").String(); got != "cache-key-1" { t.Fatalf("prompt_cache_key = %q, want %q", got, "cache-key-1") diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 8679f1a1..5fc1154e 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -216,13 +216,31 @@ func requestExecutionMetadata(ctx context.Context) map[string]any { } else if ctx != nil { if ginCtx, ok := ctx.Value("gin").(*gin.Context); ok && ginCtx != nil { if apiKey, exists := ginCtx.Get("apiKey"); exists && apiKey != nil { - meta[authAffinityMetadataKey] = fmt.Sprintf("principal:%v", apiKey) + if principal := stablePrincipalMetadataKey(apiKey); principal != "" { + meta[authAffinityMetadataKey] = principal + } } } } return meta } +func stablePrincipalMetadataKey(raw any) string { + var keyStr string + switch v := raw.(type) { + case string: + keyStr = v + case fmt.Stringer: + keyStr = v.String() + default: + keyStr = fmt.Sprintf("%v", raw) + } + if trimmed := strings.TrimSpace(keyStr); trimmed != "" { + return "principal:" + trimmed + } + return "" +} + func pinnedAuthIDFromContext(ctx context.Context) string { if ctx == nil { return "" diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 7a62f852..d7736cf4 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2271,6 +2271,20 @@ func (m *Manager) AuthAffinity(key string) string { return strings.TrimSpace(m.affinity[key]) } +func (m *Manager) applyAuthAffinity(opts *cliproxyexecutor.Options) { + if m == nil || opts == nil || pinnedAuthIDFromMetadata(opts.Metadata) != "" { + return + } + if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { + if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { + if opts.Metadata == nil { + opts.Metadata = make(map[string]any) + } + opts.Metadata[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID + } + } +} + func (m *Manager) SetAuthAffinity(key, authID string) { key = strings.TrimSpace(key) authID = strings.TrimSpace(authID) @@ -2378,18 +2392,7 @@ func (m *Manager) pickNextLegacy(ctx context.Context, provider, model string, op } func (m *Manager) pickNext(ctx context.Context, provider, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, error) { - if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { - meta := opts.Metadata - if meta == nil { - meta = make(map[string]any) - opts.Metadata = meta - } - meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID - } - } - } + m.applyAuthAffinity(&opts) if !m.useSchedulerFastPath() { return m.pickNextLegacy(ctx, provider, model, opts, tried) } @@ -2504,18 +2507,7 @@ func (m *Manager) pickNextMixedLegacy(ctx context.Context, providers []string, m } func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model string, opts cliproxyexecutor.Options, tried map[string]struct{}) (*Auth, ProviderExecutor, string, error) { - if pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata); pinnedAuthID == "" { - if affinityKey := authAffinityKeyFromMetadata(opts.Metadata); affinityKey != "" { - if affinityAuthID := m.AuthAffinity(affinityKey); affinityAuthID != "" { - meta := opts.Metadata - if meta == nil { - meta = make(map[string]any) - opts.Metadata = meta - } - meta[cliproxyexecutor.PinnedAuthMetadataKey] = affinityAuthID - } - } - } + m.applyAuthAffinity(&opts) if !m.useSchedulerFastPath() { return m.pickNextMixedLegacy(ctx, providers, model, opts, tried) }