Fixed: #1997
test(auth-scheduler): add benchmarks and priority-based scheduling improvements - Added `BenchmarkManagerPickNextMixedPriority500` for mixed-priority performance assessment. - Updated `pickNextMixed` to prioritize highest ready priority tiers. - Introduced `highestReadyPriorityLocked` and `pickReadyAtPriorityLocked` for better scheduling logic. - Added unit test to validate selection of highest priority tiers in mixed provider scenarios.
This commit is contained in:
@@ -250,17 +250,41 @@ func (s *authScheduler) pickMixed(ctx context.Context, providers []string, model
|
|||||||
return nil, "", shard.unavailableErrorLocked("mixed", model, predicate)
|
return nil, "", shard.unavailableErrorLocked("mixed", model, predicate)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
predicate := triedPredicate(tried)
|
||||||
|
candidateShards := make([]*modelScheduler, len(normalized))
|
||||||
|
bestPriority := 0
|
||||||
|
hasCandidate := false
|
||||||
|
now := time.Now()
|
||||||
|
for providerIndex, providerKey := range normalized {
|
||||||
|
providerState := s.providers[providerKey]
|
||||||
|
if providerState == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
shard := providerState.ensureModelLocked(modelKey, now)
|
||||||
|
candidateShards[providerIndex] = shard
|
||||||
|
if shard == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
priorityReady, okPriority := shard.highestReadyPriorityLocked(false, predicate)
|
||||||
|
if !okPriority {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !hasCandidate || priorityReady > bestPriority {
|
||||||
|
bestPriority = priorityReady
|
||||||
|
hasCandidate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !hasCandidate {
|
||||||
|
return nil, "", s.mixedUnavailableErrorLocked(normalized, model, tried)
|
||||||
|
}
|
||||||
|
|
||||||
if s.strategy == schedulerStrategyFillFirst {
|
if s.strategy == schedulerStrategyFillFirst {
|
||||||
for _, providerKey := range normalized {
|
for providerIndex, providerKey := range normalized {
|
||||||
providerState := s.providers[providerKey]
|
shard := candidateShards[providerIndex]
|
||||||
if providerState == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
shard := providerState.ensureModelLocked(modelKey, time.Now())
|
|
||||||
if shard == nil {
|
if shard == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
picked := shard.pickReadyLocked(false, s.strategy, triedPredicate(tried))
|
picked := shard.pickReadyAtPriorityLocked(false, bestPriority, s.strategy, predicate)
|
||||||
if picked != nil {
|
if picked != nil {
|
||||||
return picked, providerKey, nil
|
return picked, providerKey, nil
|
||||||
}
|
}
|
||||||
@@ -276,15 +300,11 @@ func (s *authScheduler) pickMixed(ctx context.Context, providers []string, model
|
|||||||
for offset := 0; offset < len(normalized); offset++ {
|
for offset := 0; offset < len(normalized); offset++ {
|
||||||
providerIndex := (start + offset) % len(normalized)
|
providerIndex := (start + offset) % len(normalized)
|
||||||
providerKey := normalized[providerIndex]
|
providerKey := normalized[providerIndex]
|
||||||
providerState := s.providers[providerKey]
|
shard := candidateShards[providerIndex]
|
||||||
if providerState == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
shard := providerState.ensureModelLocked(modelKey, time.Now())
|
|
||||||
if shard == nil {
|
if shard == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
picked := shard.pickReadyLocked(false, schedulerStrategyRoundRobin, triedPredicate(tried))
|
picked := shard.pickReadyAtPriorityLocked(false, bestPriority, schedulerStrategyRoundRobin, predicate)
|
||||||
if picked == nil {
|
if picked == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -629,6 +649,19 @@ func (m *modelScheduler) pickReadyLocked(preferWebsocket bool, strategy schedule
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
m.promoteExpiredLocked(time.Now())
|
m.promoteExpiredLocked(time.Now())
|
||||||
|
priorityReady, okPriority := m.highestReadyPriorityLocked(preferWebsocket, predicate)
|
||||||
|
if !okPriority {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return m.pickReadyAtPriorityLocked(preferWebsocket, priorityReady, strategy, predicate)
|
||||||
|
}
|
||||||
|
|
||||||
|
// highestReadyPriorityLocked returns the highest priority bucket that still has a matching ready auth.
|
||||||
|
// The caller must ensure expired entries are already promoted when needed.
|
||||||
|
func (m *modelScheduler) highestReadyPriorityLocked(preferWebsocket bool, predicate func(*scheduledAuth) bool) (int, bool) {
|
||||||
|
if m == nil {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
for _, priority := range m.priorityOrder {
|
for _, priority := range m.priorityOrder {
|
||||||
bucket := m.readyByPriority[priority]
|
bucket := m.readyByPriority[priority]
|
||||||
if bucket == nil {
|
if bucket == nil {
|
||||||
@@ -638,17 +671,37 @@ func (m *modelScheduler) pickReadyLocked(preferWebsocket bool, strategy schedule
|
|||||||
if preferWebsocket && len(bucket.ws.flat) > 0 {
|
if preferWebsocket && len(bucket.ws.flat) > 0 {
|
||||||
view = &bucket.ws
|
view = &bucket.ws
|
||||||
}
|
}
|
||||||
var picked *scheduledAuth
|
if view.pickFirst(predicate) != nil {
|
||||||
if strategy == schedulerStrategyFillFirst {
|
return priority, true
|
||||||
picked = view.pickFirst(predicate)
|
|
||||||
} else {
|
|
||||||
picked = view.pickRoundRobin(predicate)
|
|
||||||
}
|
|
||||||
if picked != nil && picked.auth != nil {
|
|
||||||
return picked.auth
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// pickReadyAtPriorityLocked selects the next ready auth from a specific priority bucket.
|
||||||
|
// The caller must ensure expired entries are already promoted when needed.
|
||||||
|
func (m *modelScheduler) pickReadyAtPriorityLocked(preferWebsocket bool, priority int, strategy schedulerStrategy, predicate func(*scheduledAuth) bool) *Auth {
|
||||||
|
if m == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
bucket := m.readyByPriority[priority]
|
||||||
|
if bucket == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
view := &bucket.all
|
||||||
|
if preferWebsocket && len(bucket.ws.flat) > 0 {
|
||||||
|
view = &bucket.ws
|
||||||
|
}
|
||||||
|
var picked *scheduledAuth
|
||||||
|
if strategy == schedulerStrategyFillFirst {
|
||||||
|
picked = view.pickFirst(predicate)
|
||||||
|
} else {
|
||||||
|
picked = view.pickRoundRobin(predicate)
|
||||||
|
}
|
||||||
|
if picked == nil || picked.auth == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return picked.auth
|
||||||
}
|
}
|
||||||
|
|
||||||
// unavailableErrorLocked returns the correct unavailable or cooldown error for the shard.
|
// unavailableErrorLocked returns the correct unavailable or cooldown error for the shard.
|
||||||
|
|||||||
@@ -176,6 +176,25 @@ func BenchmarkManagerPickNextMixed500(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkManagerPickNextMixedPriority500(b *testing.B) {
|
||||||
|
manager, providers, model := benchmarkManagerSetup(b, 500, true, true)
|
||||||
|
ctx := context.Background()
|
||||||
|
opts := cliproxyexecutor.Options{}
|
||||||
|
tried := map[string]struct{}{}
|
||||||
|
if _, _, _, errWarm := manager.pickNextMixed(ctx, providers, model, opts, tried); errWarm != nil {
|
||||||
|
b.Fatalf("warmup pickNextMixed error = %v", errWarm)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
auth, exec, provider, errPick := manager.pickNextMixed(ctx, providers, model, opts, tried)
|
||||||
|
if errPick != nil || auth == nil || exec == nil || provider == "" {
|
||||||
|
b.Fatalf("pickNextMixed failed: auth=%v exec=%v provider=%q err=%v", auth, exec, provider, errPick)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkManagerPickNextAndMarkResult1000(b *testing.B) {
|
func BenchmarkManagerPickNextAndMarkResult1000(b *testing.B) {
|
||||||
manager, _, model := benchmarkManagerSetup(b, 1000, false, false)
|
manager, _, model := benchmarkManagerSetup(b, 1000, false, false)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|||||||
@@ -237,6 +237,41 @@ func TestSchedulerPick_MixedProvidersUsesProviderRotationOverReadyCandidates(t *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchedulerPick_MixedProvidersPrefersHighestPriorityTier(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
model := "gpt-default"
|
||||||
|
registerSchedulerModels(t, "provider-low", model, "low")
|
||||||
|
registerSchedulerModels(t, "provider-high-a", model, "high-a")
|
||||||
|
registerSchedulerModels(t, "provider-high-b", model, "high-b")
|
||||||
|
|
||||||
|
scheduler := newSchedulerForTest(
|
||||||
|
&RoundRobinSelector{},
|
||||||
|
&Auth{ID: "low", Provider: "provider-low", Attributes: map[string]string{"priority": "4"}},
|
||||||
|
&Auth{ID: "high-a", Provider: "provider-high-a", Attributes: map[string]string{"priority": "7"}},
|
||||||
|
&Auth{ID: "high-b", Provider: "provider-high-b", Attributes: map[string]string{"priority": "7"}},
|
||||||
|
)
|
||||||
|
|
||||||
|
providers := []string{"provider-low", "provider-high-a", "provider-high-b"}
|
||||||
|
wantProviders := []string{"provider-high-a", "provider-high-b", "provider-high-a", "provider-high-b"}
|
||||||
|
wantIDs := []string{"high-a", "high-b", "high-a", "high-b"}
|
||||||
|
for index := range wantProviders {
|
||||||
|
got, provider, errPick := scheduler.pickMixed(context.Background(), providers, model, cliproxyexecutor.Options{}, nil)
|
||||||
|
if errPick != nil {
|
||||||
|
t.Fatalf("pickMixed() #%d error = %v", index, errPick)
|
||||||
|
}
|
||||||
|
if got == nil {
|
||||||
|
t.Fatalf("pickMixed() #%d auth = nil", index)
|
||||||
|
}
|
||||||
|
if provider != wantProviders[index] {
|
||||||
|
t.Fatalf("pickMixed() #%d provider = %q, want %q", index, provider, wantProviders[index])
|
||||||
|
}
|
||||||
|
if got.ID != wantIDs[index] {
|
||||||
|
t.Fatalf("pickMixed() #%d auth.ID = %q, want %q", index, got.ID, wantIDs[index])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestManager_PickNextMixed_UsesProviderRotationBeforeCredentialRotation(t *testing.T) {
|
func TestManager_PickNextMixed_UsesProviderRotationBeforeCredentialRotation(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user