fix(auth): add retry logic for 429 status with Retry-After and improve testing
This commit is contained in:
@@ -1850,6 +1850,50 @@ func (m *Manager) closestCooldownWait(providers []string, model string, attempt
|
|||||||
return minWait, found
|
return minWait, found
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) retryAllowed(attempt int, providers []string) bool {
|
||||||
|
if m == nil || attempt < 0 || len(providers) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defaultRetry := int(m.requestRetry.Load())
|
||||||
|
if defaultRetry < 0 {
|
||||||
|
defaultRetry = 0
|
||||||
|
}
|
||||||
|
providerSet := make(map[string]struct{}, len(providers))
|
||||||
|
for i := range providers {
|
||||||
|
key := strings.TrimSpace(strings.ToLower(providers[i]))
|
||||||
|
if key == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
providerSet[key] = struct{}{}
|
||||||
|
}
|
||||||
|
if len(providerSet) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.RLock()
|
||||||
|
defer m.mu.RUnlock()
|
||||||
|
for _, auth := range m.auths {
|
||||||
|
if auth == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
providerKey := strings.TrimSpace(strings.ToLower(auth.Provider))
|
||||||
|
if _, ok := providerSet[providerKey]; !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
effectiveRetry := defaultRetry
|
||||||
|
if override, ok := auth.RequestRetryOverride(); ok {
|
||||||
|
effectiveRetry = override
|
||||||
|
}
|
||||||
|
if effectiveRetry < 0 {
|
||||||
|
effectiveRetry = 0
|
||||||
|
}
|
||||||
|
if attempt < effectiveRetry {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
|
func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []string, model string, maxWait time.Duration) (time.Duration, bool) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return 0, false
|
return 0, false
|
||||||
@@ -1857,17 +1901,31 @@ func (m *Manager) shouldRetryAfterError(err error, attempt int, providers []stri
|
|||||||
if maxWait <= 0 {
|
if maxWait <= 0 {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
if status := statusCodeFromError(err); status == http.StatusOK {
|
status := statusCodeFromError(err)
|
||||||
|
if status == http.StatusOK {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
if isRequestInvalidError(err) {
|
if isRequestInvalidError(err) {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
wait, found := m.closestCooldownWait(providers, model, attempt)
|
wait, found := m.closestCooldownWait(providers, model, attempt)
|
||||||
if !found || wait > maxWait {
|
if found {
|
||||||
|
if wait > maxWait {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return wait, true
|
||||||
|
}
|
||||||
|
if status != http.StatusTooManyRequests {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
return wait, true
|
if !m.retryAllowed(attempt, providers) {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
retryAfter := retryAfterFromError(err)
|
||||||
|
if retryAfter == nil || *retryAfter <= 0 || *retryAfter > maxWait {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
return *retryAfter, true
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForCooldown(ctx context.Context, wait time.Duration) error {
|
func waitForCooldown(ctx context.Context, wait time.Duration) error {
|
||||||
|
|||||||
@@ -690,6 +690,57 @@ func TestManager_Execute_DisableCooling_DoesNotBlackoutAfter429RetryAfter(t *tes
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestManager_Execute_DisableCooling_RetriesAfter429RetryAfter(t *testing.T) {
|
||||||
|
prev := quotaCooldownDisabled.Load()
|
||||||
|
quotaCooldownDisabled.Store(false)
|
||||||
|
t.Cleanup(func() { quotaCooldownDisabled.Store(prev) })
|
||||||
|
|
||||||
|
m := NewManager(nil, nil, nil)
|
||||||
|
m.SetRetryConfig(3, 100*time.Millisecond, 0)
|
||||||
|
|
||||||
|
executor := &authFallbackExecutor{
|
||||||
|
id: "claude",
|
||||||
|
executeErrors: map[string]error{
|
||||||
|
"auth-429-retryafter-exec": &retryAfterStatusError{
|
||||||
|
status: http.StatusTooManyRequests,
|
||||||
|
message: "quota exhausted",
|
||||||
|
retryAfter: 5 * time.Millisecond,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m.RegisterExecutor(executor)
|
||||||
|
|
||||||
|
auth := &Auth{
|
||||||
|
ID: "auth-429-retryafter-exec",
|
||||||
|
Provider: "claude",
|
||||||
|
Metadata: map[string]any{
|
||||||
|
"disable_cooling": true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if _, errRegister := m.Register(context.Background(), auth); errRegister != nil {
|
||||||
|
t.Fatalf("register auth: %v", errRegister)
|
||||||
|
}
|
||||||
|
|
||||||
|
model := "test-model-429-retryafter-exec"
|
||||||
|
reg := registry.GetGlobalRegistry()
|
||||||
|
reg.RegisterClient(auth.ID, "claude", []*registry.ModelInfo{{ID: model}})
|
||||||
|
t.Cleanup(func() { reg.UnregisterClient(auth.ID) })
|
||||||
|
|
||||||
|
req := cliproxyexecutor.Request{Model: model}
|
||||||
|
_, errExecute := m.Execute(context.Background(), []string{"claude"}, req, cliproxyexecutor.Options{})
|
||||||
|
if errExecute == nil {
|
||||||
|
t.Fatal("expected execute error")
|
||||||
|
}
|
||||||
|
if statusCodeFromError(errExecute) != http.StatusTooManyRequests {
|
||||||
|
t.Fatalf("execute status = %d, want %d", statusCodeFromError(errExecute), http.StatusTooManyRequests)
|
||||||
|
}
|
||||||
|
|
||||||
|
calls := executor.ExecuteCalls()
|
||||||
|
if len(calls) != 4 {
|
||||||
|
t.Fatalf("execute calls = %d, want 4 (initial + 3 retries)", len(calls))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestManager_MarkResult_RequestScopedNotFoundDoesNotCooldownAuth(t *testing.T) {
|
func TestManager_MarkResult_RequestScopedNotFoundDoesNotCooldownAuth(t *testing.T) {
|
||||||
m := NewManager(nil, nil, nil)
|
m := NewManager(nil, nil, nil)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user