diff --git a/sdk/cliproxy/auth/scheduler.go b/sdk/cliproxy/auth/scheduler.go index 1ede8934..bfff53bf 100644 --- a/sdk/cliproxy/auth/scheduler.go +++ b/sdk/cliproxy/auth/scheduler.go @@ -250,17 +250,41 @@ func (s *authScheduler) pickMixed(ctx context.Context, providers []string, model return nil, "", shard.unavailableErrorLocked("mixed", model, predicate) } + predicate := triedPredicate(tried) + candidateShards := make([]*modelScheduler, len(normalized)) + bestPriority := 0 + hasCandidate := false + now := time.Now() + for providerIndex, providerKey := range normalized { + providerState := s.providers[providerKey] + if providerState == nil { + continue + } + shard := providerState.ensureModelLocked(modelKey, now) + candidateShards[providerIndex] = shard + if shard == nil { + continue + } + priorityReady, okPriority := shard.highestReadyPriorityLocked(false, predicate) + if !okPriority { + continue + } + if !hasCandidate || priorityReady > bestPriority { + bestPriority = priorityReady + hasCandidate = true + } + } + if !hasCandidate { + return nil, "", s.mixedUnavailableErrorLocked(normalized, model, tried) + } + if s.strategy == schedulerStrategyFillFirst { - for _, providerKey := range normalized { - providerState := s.providers[providerKey] - if providerState == nil { - continue - } - shard := providerState.ensureModelLocked(modelKey, time.Now()) + for providerIndex, providerKey := range normalized { + shard := candidateShards[providerIndex] if shard == nil { continue } - picked := shard.pickReadyLocked(false, s.strategy, triedPredicate(tried)) + picked := shard.pickReadyAtPriorityLocked(false, bestPriority, s.strategy, predicate) if picked != nil { return picked, providerKey, nil } @@ -276,15 +300,11 @@ func (s *authScheduler) pickMixed(ctx context.Context, providers []string, model for offset := 0; offset < len(normalized); offset++ { providerIndex := (start + offset) % len(normalized) providerKey := normalized[providerIndex] - providerState := s.providers[providerKey] - if providerState == nil { - continue - } - shard := providerState.ensureModelLocked(modelKey, time.Now()) + shard := candidateShards[providerIndex] if shard == nil { continue } - picked := shard.pickReadyLocked(false, schedulerStrategyRoundRobin, triedPredicate(tried)) + picked := shard.pickReadyAtPriorityLocked(false, bestPriority, schedulerStrategyRoundRobin, predicate) if picked == nil { continue } @@ -629,6 +649,19 @@ func (m *modelScheduler) pickReadyLocked(preferWebsocket bool, strategy schedule return nil } m.promoteExpiredLocked(time.Now()) + priorityReady, okPriority := m.highestReadyPriorityLocked(preferWebsocket, predicate) + if !okPriority { + return nil + } + return m.pickReadyAtPriorityLocked(preferWebsocket, priorityReady, strategy, predicate) +} + +// highestReadyPriorityLocked returns the highest priority bucket that still has a matching ready auth. +// The caller must ensure expired entries are already promoted when needed. +func (m *modelScheduler) highestReadyPriorityLocked(preferWebsocket bool, predicate func(*scheduledAuth) bool) (int, bool) { + if m == nil { + return 0, false + } for _, priority := range m.priorityOrder { bucket := m.readyByPriority[priority] if bucket == nil { @@ -638,17 +671,37 @@ func (m *modelScheduler) pickReadyLocked(preferWebsocket bool, strategy schedule if preferWebsocket && len(bucket.ws.flat) > 0 { view = &bucket.ws } - var picked *scheduledAuth - if strategy == schedulerStrategyFillFirst { - picked = view.pickFirst(predicate) - } else { - picked = view.pickRoundRobin(predicate) - } - if picked != nil && picked.auth != nil { - return picked.auth + if view.pickFirst(predicate) != nil { + return priority, true } } - return nil + return 0, false +} + +// pickReadyAtPriorityLocked selects the next ready auth from a specific priority bucket. +// The caller must ensure expired entries are already promoted when needed. +func (m *modelScheduler) pickReadyAtPriorityLocked(preferWebsocket bool, priority int, strategy schedulerStrategy, predicate func(*scheduledAuth) bool) *Auth { + if m == nil { + return nil + } + bucket := m.readyByPriority[priority] + if bucket == nil { + return nil + } + view := &bucket.all + if preferWebsocket && len(bucket.ws.flat) > 0 { + view = &bucket.ws + } + var picked *scheduledAuth + if strategy == schedulerStrategyFillFirst { + picked = view.pickFirst(predicate) + } else { + picked = view.pickRoundRobin(predicate) + } + if picked == nil || picked.auth == nil { + return nil + } + return picked.auth } // unavailableErrorLocked returns the correct unavailable or cooldown error for the shard. diff --git a/sdk/cliproxy/auth/scheduler_benchmark_test.go b/sdk/cliproxy/auth/scheduler_benchmark_test.go index 33fec2d5..050a7cbd 100644 --- a/sdk/cliproxy/auth/scheduler_benchmark_test.go +++ b/sdk/cliproxy/auth/scheduler_benchmark_test.go @@ -176,6 +176,25 @@ func BenchmarkManagerPickNextMixed500(b *testing.B) { } } +func BenchmarkManagerPickNextMixedPriority500(b *testing.B) { + manager, providers, model := benchmarkManagerSetup(b, 500, true, true) + ctx := context.Background() + opts := cliproxyexecutor.Options{} + tried := map[string]struct{}{} + if _, _, _, errWarm := manager.pickNextMixed(ctx, providers, model, opts, tried); errWarm != nil { + b.Fatalf("warmup pickNextMixed error = %v", errWarm) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + auth, exec, provider, errPick := manager.pickNextMixed(ctx, providers, model, opts, tried) + if errPick != nil || auth == nil || exec == nil || provider == "" { + b.Fatalf("pickNextMixed failed: auth=%v exec=%v provider=%q err=%v", auth, exec, provider, errPick) + } + } +} + func BenchmarkManagerPickNextAndMarkResult1000(b *testing.B) { manager, _, model := benchmarkManagerSetup(b, 1000, false, false) ctx := context.Background() diff --git a/sdk/cliproxy/auth/scheduler_test.go b/sdk/cliproxy/auth/scheduler_test.go index 031071af..e7d435a9 100644 --- a/sdk/cliproxy/auth/scheduler_test.go +++ b/sdk/cliproxy/auth/scheduler_test.go @@ -237,6 +237,41 @@ func TestSchedulerPick_MixedProvidersUsesProviderRotationOverReadyCandidates(t * } } +func TestSchedulerPick_MixedProvidersPrefersHighestPriorityTier(t *testing.T) { + t.Parallel() + + model := "gpt-default" + registerSchedulerModels(t, "provider-low", model, "low") + registerSchedulerModels(t, "provider-high-a", model, "high-a") + registerSchedulerModels(t, "provider-high-b", model, "high-b") + + scheduler := newSchedulerForTest( + &RoundRobinSelector{}, + &Auth{ID: "low", Provider: "provider-low", Attributes: map[string]string{"priority": "4"}}, + &Auth{ID: "high-a", Provider: "provider-high-a", Attributes: map[string]string{"priority": "7"}}, + &Auth{ID: "high-b", Provider: "provider-high-b", Attributes: map[string]string{"priority": "7"}}, + ) + + providers := []string{"provider-low", "provider-high-a", "provider-high-b"} + wantProviders := []string{"provider-high-a", "provider-high-b", "provider-high-a", "provider-high-b"} + wantIDs := []string{"high-a", "high-b", "high-a", "high-b"} + for index := range wantProviders { + got, provider, errPick := scheduler.pickMixed(context.Background(), providers, model, cliproxyexecutor.Options{}, nil) + if errPick != nil { + t.Fatalf("pickMixed() #%d error = %v", index, errPick) + } + if got == nil { + t.Fatalf("pickMixed() #%d auth = nil", index) + } + if provider != wantProviders[index] { + t.Fatalf("pickMixed() #%d provider = %q, want %q", index, provider, wantProviders[index]) + } + if got.ID != wantIDs[index] { + t.Fatalf("pickMixed() #%d auth.ID = %q, want %q", index, got.ID, wantIDs[index]) + } + } +} + func TestManager_PickNextMixed_UsesProviderRotationBeforeCredentialRotation(t *testing.T) { t.Parallel()