refactor(executor): remove immediate retry with token refresh on 429 for Qwen and update tests accordingly

This commit is contained in:
Luis Pater
2026-04-11 16:35:18 +08:00
parent 5ab9afac83
commit 828df80088
2 changed files with 27 additions and 82 deletions
@@ -153,17 +153,6 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int,
return errCode, retryAfter return errCode, retryAfter
} }
func qwenShouldAttemptImmediateRefreshRetry(auth *cliproxyauth.Auth) bool {
if auth == nil || auth.Metadata == nil {
return false
}
if provider := strings.TrimSpace(auth.Provider); provider != "" && !strings.EqualFold(provider, "qwen") {
return false
}
refreshToken, _ := auth.Metadata["refresh_token"].(string)
return strings.TrimSpace(refreshToken) != ""
}
// ensureQwenSystemMessage ensures the request has a single system message at the beginning. // ensureQwenSystemMessage ensures the request has a single system message at the beginning.
// It always injects the default system prompt and merges any user-provided system messages // It always injects the default system prompt and merges any user-provided system messages
// into the injected system message content to satisfy Qwen's strict message ordering rules. // into the injected system message content to satisfy Qwen's strict message ordering rules.
@@ -340,7 +329,6 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
return resp, err return resp, err
} }
qwenImmediateRetryAttempted := false
for { for {
if errRate := checkQwenRateLimit(authID); errRate != nil { if errRate := checkQwenRateLimit(authID); errRate != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
@@ -398,26 +386,6 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
helps.LogWithRequestID(ctx).WithFields(log.Fields{
"auth_id": redactAuthID(authID),
"model": req.Model,
}).Info("qwen 429 encountered, refreshing token for immediate retry")
qwenImmediateRetryAttempted = true
refreshFn := e.refreshForImmediateRetry
if refreshFn == nil {
refreshFn = e.Refresh
}
refreshedAuth, errRefresh := refreshFn(ctx, auth)
if errRefresh != nil {
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry")
} else if refreshedAuth != nil {
auth = refreshedAuth
continue
}
}
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return resp, err return resp, err
} }
@@ -488,7 +456,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
return nil, err return nil, err
} }
qwenImmediateRetryAttempted := false
for { for {
if errRate := checkQwenRateLimit(authID); errRate != nil { if errRate := checkQwenRateLimit(authID); errRate != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID)) helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
@@ -546,26 +513,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b) errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b)) helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
helps.LogWithRequestID(ctx).WithFields(log.Fields{
"auth_id": redactAuthID(authID),
"model": req.Model,
}).Info("qwen 429 encountered, refreshing token for immediate retry (stream)")
qwenImmediateRetryAttempted = true
refreshFn := e.refreshForImmediateRetry
if refreshFn == nil {
refreshFn = e.Refresh
}
refreshedAuth, errRefresh := refreshFn(ctx, auth)
if errRefresh != nil {
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry (stream)")
} else if refreshedAuth != nil {
auth = refreshedAuth
continue
}
}
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter} err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return nil, err return nil, err
} }
+27 -29
View File
@@ -216,7 +216,7 @@ func TestQwenCreds_NormalizesResourceURL(t *testing.T) {
} }
} }
func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) { func TestQwenExecutorExecute_429DoesNotRefreshOrRetry(t *testing.T) {
qwenRateLimiter.Lock() qwenRateLimiter.Lock()
qwenRateLimiter.requests = make(map[string][]time.Time) qwenRateLimiter.requests = make(map[string][]time.Time)
qwenRateLimiter.Unlock() qwenRateLimiter.Unlock()
@@ -272,27 +272,31 @@ func TestQwenExecutorExecute_429RefreshAndRetry(t *testing.T) {
} }
ctx := context.Background() ctx := context.Background()
resp, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ _, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{
Model: "qwen-max", Model: "qwen-max",
Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`), Payload: []byte(`{"model":"qwen-max","messages":[{"role":"user","content":"hi"}]}`),
}, cliproxyexecutor.Options{ }, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"), SourceFormat: sdktranslator.FromString("openai"),
}) })
if err != nil { if err == nil {
t.Fatalf("Execute() error = %v", err) t.Fatalf("Execute() expected error, got nil")
} }
if len(resp.Payload) == 0 { status, ok := err.(statusErr)
t.Fatalf("Execute() payload is empty") if !ok {
t.Fatalf("Execute() error type = %T, want statusErr", err)
} }
if atomic.LoadInt32(&calls) != 2 { if status.StatusCode() != http.StatusTooManyRequests {
t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) t.Fatalf("Execute() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
} }
if atomic.LoadInt32(&refresherCalls) != 1 { if atomic.LoadInt32(&calls) != 1 {
t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
}
if atomic.LoadInt32(&refresherCalls) != 0 {
t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls))
} }
} }
func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) { func TestQwenExecutorExecuteStream_429DoesNotRefreshOrRetry(t *testing.T) {
qwenRateLimiter.Lock() qwenRateLimiter.Lock()
qwenRateLimiter.requests = make(map[string][]time.Time) qwenRateLimiter.requests = make(map[string][]time.Time)
qwenRateLimiter.Unlock() qwenRateLimiter.Unlock()
@@ -351,32 +355,26 @@ func TestQwenExecutorExecuteStream_429RefreshAndRetry(t *testing.T) {
} }
ctx := context.Background() ctx := context.Background()
stream, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{ _, err := exec.ExecuteStream(ctx, auth, cliproxyexecutor.Request{
Model: "qwen-max", Model: "qwen-max",
Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`), Payload: []byte(`{"model":"qwen-max","stream":true,"messages":[{"role":"user","content":"hi"}]}`),
}, cliproxyexecutor.Options{ }, cliproxyexecutor.Options{
SourceFormat: sdktranslator.FromString("openai"), SourceFormat: sdktranslator.FromString("openai"),
}) })
if err != nil { if err == nil {
t.Fatalf("ExecuteStream() error = %v", err) t.Fatalf("ExecuteStream() expected error, got nil")
} }
if atomic.LoadInt32(&calls) != 2 { status, ok := err.(statusErr)
t.Fatalf("upstream calls = %d, want 2", atomic.LoadInt32(&calls)) if !ok {
t.Fatalf("ExecuteStream() error type = %T, want statusErr", err)
} }
if atomic.LoadInt32(&refresherCalls) != 1 { if status.StatusCode() != http.StatusTooManyRequests {
t.Fatalf("refresher calls = %d, want 1", atomic.LoadInt32(&refresherCalls)) t.Fatalf("ExecuteStream() status code = %d, want %d", status.StatusCode(), http.StatusTooManyRequests)
} }
if atomic.LoadInt32(&calls) != 1 {
var sawPayload bool t.Fatalf("upstream calls = %d, want 1", atomic.LoadInt32(&calls))
for chunk := range stream.Chunks {
if chunk.Err != nil {
t.Fatalf("stream chunk error = %v", chunk.Err)
}
if len(chunk.Payload) > 0 {
sawPayload = true
}
} }
if !sawPayload { if atomic.LoadInt32(&refresherCalls) != 0 {
t.Fatalf("stream did not produce any payload chunks") t.Fatalf("refresher calls = %d, want 0", atomic.LoadInt32(&refresherCalls))
} }
} }