Merge pull request #3089 from XYenon/feat/session-affinity
feat: support Codex/PI session headers for session affinity
This commit is contained in:
+3
-2
@@ -104,8 +104,9 @@ quota-exceeded:
|
|||||||
routing:
|
routing:
|
||||||
strategy: "round-robin" # round-robin (default), fill-first
|
strategy: "round-robin" # round-robin (default), fill-first
|
||||||
# Enable universal session-sticky routing for all clients.
|
# Enable universal session-sticky routing for all clients.
|
||||||
# Session IDs are extracted from: X-Session-ID header, Idempotency-Key,
|
# Session IDs are extracted from: metadata.user_id (Claude Code session format),
|
||||||
# metadata.user_id, conversation_id, or first few messages hash.
|
# X-Session-ID, Session_id (Codex), X-Amp-Thread-Id (Amp CLI),
|
||||||
|
# X-Client-Request-Id (PI), conversation_id, or first few messages hash.
|
||||||
# Automatic failover is always enabled when bound auth becomes unavailable.
|
# Automatic failover is always enabled when bound auth becomes unavailable.
|
||||||
session-affinity: false # default: false
|
session-affinity: false # default: false
|
||||||
# How long session-to-auth bindings are retained. Default: 1h
|
# How long session-to-auth bindings are retained. Default: 1h
|
||||||
|
|||||||
@@ -226,7 +226,9 @@ type RoutingConfig struct {
|
|||||||
|
|
||||||
// SessionAffinity enables universal session-sticky routing for all clients.
|
// SessionAffinity enables universal session-sticky routing for all clients.
|
||||||
// Session IDs are extracted from multiple sources:
|
// Session IDs are extracted from multiple sources:
|
||||||
// X-Session-ID header, Idempotency-Key, metadata.user_id, conversation_id, or message hash.
|
// metadata.user_id (Claude Code session format), X-Session-ID, Session_id (Codex),
|
||||||
|
// X-Amp-Thread-Id (Amp CLI thread), X-Client-Request-Id (PI), metadata.user_id,
|
||||||
|
// conversation_id, or message hash.
|
||||||
// Automatic failover is always enabled when bound auth becomes unavailable.
|
// Automatic failover is always enabled when bound auth becomes unavailable.
|
||||||
SessionAffinity bool `yaml:"session-affinity,omitempty" json:"session-affinity,omitempty"`
|
SessionAffinity bool `yaml:"session-affinity,omitempty" json:"session-affinity,omitempty"`
|
||||||
|
|
||||||
|
|||||||
@@ -469,11 +469,14 @@ func NewSessionAffinitySelectorWithConfig(cfg SessionAffinityConfig) *SessionAff
|
|||||||
|
|
||||||
// Pick selects an auth with session affinity when possible.
|
// Pick selects an auth with session affinity when possible.
|
||||||
// Priority for session ID extraction:
|
// Priority for session ID extraction:
|
||||||
// 1. metadata.user_id (Claude Code format) - highest priority
|
// 1. metadata.user_id (Claude Code format with _session_{uuid}) - highest priority
|
||||||
// 2. X-Session-ID header
|
// 2. X-Session-ID header
|
||||||
// 3. metadata.user_id (non-Claude Code format)
|
// 3. Session_id header (Codex)
|
||||||
// 4. conversation_id field
|
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
|
||||||
// 5. Hash-based fallback from messages
|
// 5. X-Client-Request-Id header (PI)
|
||||||
|
// 6. metadata.user_id (non-Claude Code format)
|
||||||
|
// 7. conversation_id field in request body
|
||||||
|
// 8. Stable hash from first few messages content (fallback)
|
||||||
//
|
//
|
||||||
// Note: The cache key includes provider, session ID, and model to handle cases where
|
// Note: The cache key includes provider, session ID, and model to handle cases where
|
||||||
// a session uses multiple models (e.g., gemini-2.5-pro and gemini-3-flash-preview)
|
// a session uses multiple models (e.g., gemini-2.5-pro and gemini-3-flash-preview)
|
||||||
@@ -570,10 +573,12 @@ func (s *SessionAffinitySelector) InvalidateAuth(authID string) {
|
|||||||
// Priority order:
|
// Priority order:
|
||||||
// 1. metadata.user_id (Claude Code format with _session_{uuid}) - highest priority for Claude Code clients
|
// 1. metadata.user_id (Claude Code format with _session_{uuid}) - highest priority for Claude Code clients
|
||||||
// 2. X-Session-ID header
|
// 2. X-Session-ID header
|
||||||
// 3. X-Amp-Thread-Id header (Amp CLI thread ID)
|
// 3. Session_id header (Codex)
|
||||||
// 4. metadata.user_id (non-Claude Code format)
|
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
|
||||||
// 5. conversation_id field in request body
|
// 5. X-Client-Request-Id header (PI)
|
||||||
// 6. Stable hash from first few messages content (fallback)
|
// 6. metadata.user_id (non-Claude Code format)
|
||||||
|
// 7. conversation_id field in request body
|
||||||
|
// 8. Stable hash from first few messages content (fallback)
|
||||||
func ExtractSessionID(headers http.Header, payload []byte, metadata map[string]any) string {
|
func ExtractSessionID(headers http.Header, payload []byte, metadata map[string]any) string {
|
||||||
primary, _ := extractSessionIDs(headers, payload, metadata)
|
primary, _ := extractSessionIDs(headers, payload, metadata)
|
||||||
return primary
|
return primary
|
||||||
@@ -609,29 +614,43 @@ func extractSessionIDs(headers http.Header, payload []byte, metadata map[string]
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. X-Amp-Thread-Id header (Amp CLI thread ID)
|
// 3. Session_id header (Codex)
|
||||||
|
if headers != nil {
|
||||||
|
if sid := headers.Get("Session_id"); sid != "" {
|
||||||
|
return "codex:" + sid, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. X-Amp-Thread-Id header (Amp CLI thread ID)
|
||||||
if headers != nil {
|
if headers != nil {
|
||||||
if tid := headers.Get("X-Amp-Thread-Id"); tid != "" {
|
if tid := headers.Get("X-Amp-Thread-Id"); tid != "" {
|
||||||
return "amp:" + tid, ""
|
return "amp:" + tid, ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 5. X-Client-Request-Id header (PI)
|
||||||
|
if headers != nil {
|
||||||
|
if rid := headers.Get("X-Client-Request-Id"); rid != "" {
|
||||||
|
return "clientreq:" + rid, ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if len(payload) == 0 {
|
if len(payload) == 0 {
|
||||||
return "", ""
|
return "", ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. metadata.user_id (non-Claude Code format)
|
// 6. metadata.user_id (non-Claude Code format)
|
||||||
userID := gjson.GetBytes(payload, "metadata.user_id").String()
|
userID := gjson.GetBytes(payload, "metadata.user_id").String()
|
||||||
if userID != "" {
|
if userID != "" {
|
||||||
return "user:" + userID, ""
|
return "user:" + userID, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. conversation_id field
|
// 7. conversation_id field
|
||||||
if convID := gjson.GetBytes(payload, "conversation_id").String(); convID != "" {
|
if convID := gjson.GetBytes(payload, "conversation_id").String(); convID != "" {
|
||||||
return "conv:" + convID, ""
|
return "conv:" + convID, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Hash-based fallback from message content
|
// 8. Hash-based fallback from message content
|
||||||
return extractMessageHashIDs(payload)
|
return extractMessageHashIDs(payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -776,6 +776,46 @@ func TestExtractSessionID_Headers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExtractSessionID_CodexSessionIDHeader(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Set("Session_id", "codex-session-123")
|
||||||
|
|
||||||
|
got := ExtractSessionID(headers, nil, nil)
|
||||||
|
want := "codex:codex-session-123"
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("ExtractSessionID() with Session_id = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractSessionID_ClientRequestIDHeader(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Set("X-Client-Request-Id", "pi-session-123")
|
||||||
|
|
||||||
|
got := ExtractSessionID(headers, nil, nil)
|
||||||
|
want := "clientreq:pi-session-123"
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("ExtractSessionID() with X-Client-Request-Id = %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExtractSessionID_CodexSessionIDPriorityOverClientRequestID(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Set("X-Client-Request-Id", "pi-session-123")
|
||||||
|
headers.Set("Session_id", "codex-session-456")
|
||||||
|
|
||||||
|
got := ExtractSessionID(headers, nil, nil)
|
||||||
|
want := "codex:codex-session-456"
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("ExtractSessionID() = %q, want %q (Session_id should take priority over X-Client-Request-Id)", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestExtractSessionID_AmpThreadId(t *testing.T) {
|
func TestExtractSessionID_AmpThreadId(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
@@ -789,6 +829,20 @@ func TestExtractSessionID_AmpThreadId(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExtractSessionID_AmpThreadIdPriorityOverClientRequestID(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
headers := make(http.Header)
|
||||||
|
headers.Set("X-Amp-Thread-Id", "T-priority-test")
|
||||||
|
headers.Set("X-Client-Request-Id", "pi-session-123")
|
||||||
|
|
||||||
|
got := ExtractSessionID(headers, nil, nil)
|
||||||
|
want := "amp:T-priority-test"
|
||||||
|
if got != want {
|
||||||
|
t.Errorf("ExtractSessionID() = %q, want %q (X-Amp-Thread-Id should take priority over X-Client-Request-Id)", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestExtractSessionID_AmpThreadIdLowerPriority verifies X-Amp-Thread-Id is lower
|
// TestExtractSessionID_AmpThreadIdLowerPriority verifies X-Amp-Thread-Id is lower
|
||||||
// priority than Claude Code metadata.user_id but higher than conversation_id.
|
// priority than Claude Code metadata.user_id but higher than conversation_id.
|
||||||
func TestExtractSessionID_AmpThreadIdPriority(t *testing.T) {
|
func TestExtractSessionID_AmpThreadIdPriority(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user