fix(executor): implement immediate retry with token refresh on 429 for Qwen and add associated tests

Closes: #2661
This commit is contained in:
Luis Pater
2026-04-10 21:12:03 +08:00
parent 2a97037d7b
commit 65ce86338b
2 changed files with 390 additions and 152 deletions
+219 -152
View File
@@ -153,6 +153,17 @@ func wrapQwenError(ctx context.Context, httpCode int, body []byte) (errCode int,
return errCode, retryAfter
}
func qwenShouldAttemptImmediateRefreshRetry(auth *cliproxyauth.Auth) bool {
if auth == nil || auth.Metadata == nil {
return false
}
if provider := strings.TrimSpace(auth.Provider); provider != "" && !strings.EqualFold(provider, "qwen") {
return false
}
refreshToken, _ := auth.Metadata["refresh_token"].(string)
return strings.TrimSpace(refreshToken) != ""
}
// ensureQwenSystemMessage ensures the request has a single system message at the beginning.
// It always injects the default system prompt and merges any user-provided system messages
// into the injected system message content to satisfy Qwen's strict message ordering rules.
@@ -255,7 +266,8 @@ func ensureQwenSystemMessage(payload []byte) ([]byte, error) {
// QwenExecutor is a stateless executor for Qwen Code using OpenAI-compatible chat completions.
// If access token is unavailable, it falls back to legacy via ClientAdapter.
type QwenExecutor struct {
cfg *config.Config
cfg *config.Config
refreshForImmediateRetry func(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error)
}
func NewQwenExecutor(cfg *config.Config) *QwenExecutor { return &QwenExecutor{cfg: cfg} }
@@ -295,23 +307,13 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
return resp, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
}
// Check rate limit before proceeding
var authID string
if auth != nil {
authID = auth.ID
}
if err := checkQwenRateLimit(authID); err != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
return resp, err
}
baseModel := thinking.ParseSuffix(req.Model).ModelName
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.TrackFailure(ctx, &err)
@@ -338,68 +340,107 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
return resp, err
}
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return resp, err
}
applyQwenHeaders(httpReq, token, false)
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
var authLabel, authType, authValue string
if auth != nil {
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
qwenImmediateRetryAttempted := false
for {
if errRate := checkQwenRateLimit(authID); errRate != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
return resp, errRate
}
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
defer func() {
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if errReq != nil {
return resp, errReq
}
applyQwenHeaders(httpReq, token, false)
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
var authLabel, authType, authValue string
if auth != nil {
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
return resp, errDo
}
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
helps.LogWithRequestID(ctx).WithFields(log.Fields{
"auth_id": redactAuthID(authID),
"model": req.Model,
}).Info("qwen 429 encountered, refreshing token for immediate retry")
qwenImmediateRetryAttempted = true
refreshFn := e.refreshForImmediateRetry
if refreshFn == nil {
refreshFn = e.Refresh
}
refreshedAuth, errRefresh := refreshFn(ctx, auth)
if errRefresh != nil {
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry")
} else if refreshedAuth != nil {
auth = refreshedAuth
continue
}
}
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return resp, err
}
data, errRead := io.ReadAll(httpResp.Body)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
}()
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
if errRead != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errRead)
return resp, errRead
}
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return resp, err
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
reporter.Publish(ctx, helps.ParseOpenAIUsage(data))
var param any
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
// the original model name in the response for client compatibility.
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, &param)
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
return resp, nil
}
data, err := io.ReadAll(httpResp.Body)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return resp, err
}
helps.AppendAPIResponseChunk(ctx, e.cfg, data)
reporter.Publish(ctx, helps.ParseOpenAIUsage(data))
var param any
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
// the original model name in the response for client compatibility.
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, &param)
resp = cliproxyexecutor.Response{Payload: out, Headers: httpResp.Header.Clone()}
return resp, nil
}
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
@@ -407,23 +448,13 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
}
// Check rate limit before proceeding
var authID string
if auth != nil {
authID = auth.ID
}
if err := checkQwenRateLimit(authID); err != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
return nil, err
}
baseModel := thinking.ParseSuffix(req.Model).ModelName
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
reporter := helps.NewUsageReporter(ctx, e.Identifier(), baseModel, auth)
defer reporter.TrackFailure(ctx, &err)
@@ -457,86 +488,122 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
return nil, err
}
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
applyQwenHeaders(httpReq, token, true)
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
var authLabel, authType, authValue string
if auth != nil {
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, err := httpClient.Do(httpReq)
if err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
qwenImmediateRetryAttempted := false
for {
if errRate := checkQwenRateLimit(authID); errRate != nil {
helps.LogWithRequestID(ctx).Warnf("qwen rate limit exceeded for credential %s", redactAuthID(authID))
return nil, errRate
}
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
go func() {
defer close(out)
defer func() {
token, baseURL := qwenCreds(auth)
if baseURL == "" {
baseURL = "https://portal.qwen.ai/v1"
}
url := strings.TrimSuffix(baseURL, "/") + "/chat/completions"
httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body))
if errReq != nil {
return nil, errReq
}
applyQwenHeaders(httpReq, token, true)
var attrs map[string]string
if auth != nil {
attrs = auth.Attributes
}
util.ApplyCustomHeadersFromAttrs(httpReq, attrs)
var authLabel, authType, authValue string
if auth != nil {
authLabel = auth.Label
authType, authValue = auth.AccountInfo()
}
helps.RecordAPIRequest(ctx, e.cfg, helps.UpstreamRequestLog{
URL: url,
Method: http.MethodPost,
Headers: httpReq.Header.Clone(),
Body: body,
Provider: e.Identifier(),
AuthID: authID,
AuthLabel: authLabel,
AuthType: authType,
AuthValue: authValue,
})
httpClient := helps.NewProxyAwareHTTPClient(ctx, e.cfg, auth, 0)
httpResp, errDo := httpClient.Do(httpReq)
if errDo != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errDo)
return nil, errDo
}
helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
b, _ := io.ReadAll(httpResp.Body)
helps.AppendAPIResponseChunk(ctx, e.cfg, b)
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
errCode, retryAfter := wrapQwenError(ctx, httpResp.StatusCode, b)
helps.LogWithRequestID(ctx).Debugf("request error, error status: %d (mapped: %d), error message: %s", httpResp.StatusCode, errCode, helps.SummarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
if errCode == http.StatusTooManyRequests && !qwenImmediateRetryAttempted && qwenShouldAttemptImmediateRefreshRetry(auth) {
helps.LogWithRequestID(ctx).WithFields(log.Fields{
"auth_id": redactAuthID(authID),
"model": req.Model,
}).Info("qwen 429 encountered, refreshing token for immediate retry (stream)")
qwenImmediateRetryAttempted = true
refreshFn := e.refreshForImmediateRetry
if refreshFn == nil {
refreshFn = e.Refresh
}
refreshedAuth, errRefresh := refreshFn(ctx, auth)
if errRefresh != nil {
helps.LogWithRequestID(ctx).WithError(errRefresh).WithField("auth_id", redactAuthID(authID)).Warn("qwen 429 refresh failed; skipping immediate retry (stream)")
} else if refreshedAuth != nil {
auth = refreshedAuth
continue
}
}
err = statusErr{code: errCode, msg: string(b), retryAfter: retryAfter}
return nil, err
}
out := make(chan cliproxyexecutor.StreamChunk)
go func() {
defer close(out)
defer func() {
if errClose := httpResp.Body.Close(); errClose != nil {
log.Errorf("qwen executor: close response body error: %v", errClose)
}
}()
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 52_428_800) // 50MB
var param any
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := helps.ParseOpenAIStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), &param)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
}
}
doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), &param)
for i := range doneChunks {
out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]}
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
scanner := bufio.NewScanner(httpResp.Body)
scanner.Buffer(nil, 52_428_800) // 50MB
var param any
for scanner.Scan() {
line := scanner.Bytes()
helps.AppendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := helps.ParseOpenAIStreamUsage(line); ok {
reporter.Publish(ctx, detail)
}
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), &param)
for i := range chunks {
out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}
}
}
doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), &param)
for i := range doneChunks {
out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]}
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
}
}()
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
}
}
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {