From 14d46a0a5dedb338d5e64bd8660d4ac7f2909bcd Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 13:44:20 +0800 Subject: [PATCH 1/6] feat(antigravity): conductor-level credits fallback for Claude models Move credits handling from executor-level retry to conductor-level orchestration. When all free-tier auths are exhausted (429/503), the conductor discovers auths with available Google One AI credits and retries with enabledCreditTypes injected via context flag. Key changes: - Add AntigravityCreditsHint system for tracking per-auth credits state - Conductor tries credits fallback after all auths fail (Execute/Stream/Count) - Executor injects enabledCreditTypes only when conductor sets context flag - Credits fallback respects provider scope (requires antigravity in providers) - Add context cancellation check in credits fallback to avoid wasted requests - Remove executor-level attemptCreditsFallback and preferCredits machinery - Restructure 429 decision logic (parse details first, keyword fallback) - Expand shouldAbort to cover INVALID_ARGUMENT/FAILED_PRECONDITION/500+UNKNOWN - Support human-readable retry delay parsing (e.g. "1h43m56s") --- config.example.yaml | 2 +- internal/config/config.go | 5 +- .../runtime/executor/antigravity_executor.go | 654 +++++++----------- .../antigravity_executor_credits_test.go | 513 +++++++------- .../runtime/executor/gemini_cli_executor.go | 9 +- .../runtime/executor/helps/logging_helpers.go | 130 +++- sdk/cliproxy/auth/antigravity_credits.go | 90 +++ sdk/cliproxy/auth/antigravity_credits_test.go | 62 ++ sdk/cliproxy/auth/conductor.go | 218 +++++- 9 files changed, 989 insertions(+), 694 deletions(-) create mode 100644 sdk/cliproxy/auth/antigravity_credits.go create mode 100644 sdk/cliproxy/auth/antigravity_credits_test.go diff --git a/config.example.yaml b/config.example.yaml index 734dd7d5..13042b78 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -98,7 +98,7 @@ disable-cooling: false quota-exceeded: switch-project: true # Whether to automatically switch to another project when a quota is exceeded switch-preview-model: true # Whether to automatically switch to a preview model when a quota is exceeded - antigravity-credits: true # Whether to retry Antigravity quota_exhausted 429s once with enabledCreditTypes=["GOOGLE_ONE_AI"] + antigravity-credits: true # Whether to use credits as last-resort fallback when all free-tier auths are exhausted for Claude models # Routing strategy for selecting credentials when multiple match. routing: diff --git a/internal/config/config.go b/internal/config/config.go index 760d43ec..1ebbb460 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -206,8 +206,9 @@ type QuotaExceeded struct { // SwitchPreviewModel indicates whether to automatically switch to a preview model when a quota is exceeded. SwitchPreviewModel bool `yaml:"switch-preview-model" json:"switch-preview-model"` - // AntigravityCredits indicates whether to retry Antigravity quota_exhausted 429s once - // on the same credential with enabledCreditTypes=["GOOGLE_ONE_AI"]. + // AntigravityCredits enables credits-based last-resort fallback for Claude models. + // When all free-tier auths are exhausted (429/503), the conductor retries with + // an auth that has available Google One AI credits. AntigravityCredits bool `yaml:"antigravity-credits" json:"antigravity-credits"` } diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 163b2d92..633373d2 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -52,8 +52,6 @@ const ( defaultAntigravityAgent = "antigravity/1.21.9 darwin/arm64" // fallback only; overridden at runtime by misc.AntigravityUserAgent() antigravityAuthType = "antigravity" refreshSkew = 3000 * time.Second - antigravityCreditsRetryTTL = 5 * time.Hour - antigravityCreditsAutoDisableDuration = 5 * time.Hour antigravityShortQuotaCooldownThreshold = 5 * time.Minute antigravityInstantRetryThreshold = 3 * time.Second // systemInstruction = "You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**" @@ -62,8 +60,6 @@ const ( type antigravity429Category string type antigravityCreditsFailureState struct { - Count int - DisabledUntil time.Time PermanentlyDisabled bool ExplicitBalanceExhausted bool } @@ -91,28 +87,79 @@ var ( randSource = rand.New(rand.NewSource(time.Now().UnixNano())) randSourceMutex sync.Mutex antigravityCreditsFailureByAuth sync.Map - antigravityPreferCreditsByModel sync.Map antigravityShortCooldownByAuth sync.Map + antigravityCreditsBalanceByAuth sync.Map // auth.ID → antigravityCreditsBalance antigravityQuotaExhaustedKeywords = []string{ "quota_exhausted", "quota exhausted", } - antigravityCreditsExhaustedKeywords = []string{ - "google_one_ai", - "insufficient credit", - "insufficient credits", - "not enough credit", - "not enough credits", - "credit exhausted", - "credits exhausted", - "credit balance", - "minimumcreditamountforusage", - "minimum credit amount for usage", - "minimum credit", - "resource has been exhausted", - } ) +type antigravityCreditsBalance struct { + CreditAmount float64 + MinCreditAmount float64 + PaidTierID string + Known bool +} + +func antigravityAuthHasCredits(auth *cliproxyauth.Auth) bool { + if auth == nil || strings.TrimSpace(auth.ID) == "" { + return false + } + if hint, ok := cliproxyauth.GetAntigravityCreditsHint(auth.ID); ok && hint.Known { + return hint.Available + } + val, ok := antigravityCreditsBalanceByAuth.Load(strings.TrimSpace(auth.ID)) + if !ok { + return true // optimistic: assume credits available when balance unknown + } + bal, valid := val.(antigravityCreditsBalance) + if !valid { + antigravityCreditsBalanceByAuth.Delete(strings.TrimSpace(auth.ID)) + return false + } + if !bal.Known { + return false + } + available := bal.CreditAmount >= bal.MinCreditAmount + cliproxyauth.SetAntigravityCreditsHint(strings.TrimSpace(auth.ID), cliproxyauth.AntigravityCreditsHint{ + Known: true, + Available: available, + CreditAmount: bal.CreditAmount, + MinCreditAmount: bal.MinCreditAmount, + PaidTierID: bal.PaidTierID, + UpdatedAt: time.Now(), + }) + return available +} + +// parseMetaFloat extracts a float64 from auth.Metadata (handles string and numeric types). +func parseMetaFloat(metadata map[string]any, key string) (float64, bool) { + v, ok := metadata[key] + if !ok { + return 0, false + } + switch typed := v.(type) { + case float64: + return typed, true + case int: + return float64(typed), true + case int64: + return float64(typed), true + case uint64: + return float64(typed), true + case json.Number: + if f, err := typed.Float64(); err == nil { + return f, true + } + case string: + if f, err := strconv.ParseFloat(strings.TrimSpace(typed), 64); err == nil { + return f, true + } + } + return 0, false +} + // AntigravityExecutor proxies requests to the antigravity upstream. type AntigravityExecutor struct { cfg *config.Config @@ -189,7 +236,7 @@ func validateAntigravityRequestSignatures(from sdktranslator.Format, rawJSON []b if from.String() != "claude" { return rawJSON, nil } - // Always strip thinking blocks with empty signatures (proxy-generated). + // Always strip thinking blocks with invalid signatures (empty or non-Claude-format). rawJSON = antigravityclaude.StripEmptySignatureThinkingBlocks(rawJSON) if cache.SignatureCacheEnabled() { return rawJSON, nil @@ -298,6 +345,41 @@ func decideAntigravity429(body []byte) antigravity429Decision { decision.retryAfter = retryAfter } + status := strings.TrimSpace(gjson.GetBytes(body, "error.status").String()) + if !strings.EqualFold(status, "RESOURCE_EXHAUSTED") { + return decision + } + + details := gjson.GetBytes(body, "error.details") + if details.Exists() && details.IsArray() { + for _, detail := range details.Array() { + if detail.Get("@type").String() != "type.googleapis.com/google.rpc.ErrorInfo" { + continue + } + reason := strings.TrimSpace(detail.Get("reason").String()) + decision.reason = reason + switch { + case strings.EqualFold(reason, "QUOTA_EXHAUSTED"): + decision.kind = antigravity429DecisionFullQuotaExhausted + return decision + case strings.EqualFold(reason, "RATE_LIMIT_EXCEEDED"): + if decision.retryAfter == nil { + decision.kind = antigravity429DecisionSoftRetry + return decision + } + switch { + case *decision.retryAfter < antigravityInstantRetryThreshold: + decision.kind = antigravity429DecisionInstantRetrySameAuth + case *decision.retryAfter < antigravityShortQuotaCooldownThreshold: + decision.kind = antigravity429DecisionShortCooldownSwitchAuth + default: + decision.kind = antigravity429DecisionFullQuotaExhausted + } + return decision + } + } + } + lowerBody := strings.ToLower(string(body)) for _, keyword := range antigravityQuotaExhaustedKeywords { if strings.Contains(lowerBody, keyword) { @@ -307,123 +389,14 @@ func decideAntigravity429(body []byte) antigravity429Decision { } } - status := strings.TrimSpace(gjson.GetBytes(body, "error.status").String()) - if !strings.EqualFold(status, "RESOURCE_EXHAUSTED") { - return decision - } - - details := gjson.GetBytes(body, "error.details") - if !details.Exists() || !details.IsArray() { - decision.kind = antigravity429DecisionSoftRetry - return decision - } - - for _, detail := range details.Array() { - if detail.Get("@type").String() != "type.googleapis.com/google.rpc.ErrorInfo" { - continue - } - reason := strings.TrimSpace(detail.Get("reason").String()) - decision.reason = reason - switch { - case strings.EqualFold(reason, "QUOTA_EXHAUSTED"): - decision.kind = antigravity429DecisionFullQuotaExhausted - return decision - case strings.EqualFold(reason, "RATE_LIMIT_EXCEEDED"): - if decision.retryAfter == nil { - decision.kind = antigravity429DecisionSoftRetry - return decision - } - switch { - case *decision.retryAfter < antigravityInstantRetryThreshold: - decision.kind = antigravity429DecisionInstantRetrySameAuth - case *decision.retryAfter < antigravityShortQuotaCooldownThreshold: - decision.kind = antigravity429DecisionShortCooldownSwitchAuth - default: - decision.kind = antigravity429DecisionFullQuotaExhausted - } - return decision - } - } - decision.kind = antigravity429DecisionSoftRetry return decision } -func antigravityHasQuotaResetDelayOrModelInfo(body []byte) bool { - if len(body) == 0 { - return false - } - details := gjson.GetBytes(body, "error.details") - if !details.Exists() || !details.IsArray() { - return false - } - for _, detail := range details.Array() { - if detail.Get("@type").String() != "type.googleapis.com/google.rpc.ErrorInfo" { - continue - } - if strings.TrimSpace(detail.Get("metadata.quotaResetDelay").String()) != "" { - return true - } - if strings.TrimSpace(detail.Get("metadata.model").String()) != "" { - return true - } - } - return false -} - func antigravityCreditsRetryEnabled(cfg *config.Config) bool { return cfg != nil && cfg.QuotaExceeded.AntigravityCredits } -func antigravityCreditsFailureStateForAuth(auth *cliproxyauth.Auth) (string, antigravityCreditsFailureState, bool) { - if auth == nil || strings.TrimSpace(auth.ID) == "" { - return "", antigravityCreditsFailureState{}, false - } - authID := strings.TrimSpace(auth.ID) - value, ok := antigravityCreditsFailureByAuth.Load(authID) - if !ok { - return authID, antigravityCreditsFailureState{}, true - } - state, ok := value.(antigravityCreditsFailureState) - if !ok { - antigravityCreditsFailureByAuth.Delete(authID) - return authID, antigravityCreditsFailureState{}, true - } - return authID, state, true -} - -func antigravityCreditsDisabled(auth *cliproxyauth.Auth, now time.Time) bool { - authID, state, ok := antigravityCreditsFailureStateForAuth(auth) - if !ok { - return false - } - if state.PermanentlyDisabled { - return true - } - if state.DisabledUntil.IsZero() { - return false - } - if state.DisabledUntil.After(now) { - return true - } - antigravityCreditsFailureByAuth.Delete(authID) - return false -} - -func recordAntigravityCreditsFailure(auth *cliproxyauth.Auth, now time.Time) { - authID, state, ok := antigravityCreditsFailureStateForAuth(auth) - if !ok { - return - } - if state.PermanentlyDisabled { - antigravityCreditsFailureByAuth.Store(authID, state) - return - } - state.Count++ - state.DisabledUntil = now.Add(antigravityCreditsAutoDisableDuration) - antigravityCreditsFailureByAuth.Store(authID, state) -} - func clearAntigravityCreditsFailureState(auth *cliproxyauth.Auth) { if auth == nil || strings.TrimSpace(auth.ID) == "" { return @@ -440,6 +413,25 @@ func markAntigravityCreditsPermanentlyDisabled(auth *cliproxyauth.Auth) { ExplicitBalanceExhausted: true, } antigravityCreditsFailureByAuth.Store(authID, state) + antigravityCreditsBalanceByAuth.Store(authID, antigravityCreditsBalance{ + CreditAmount: 0, + MinCreditAmount: 1, + Known: true, + }) + cliproxyauth.SetAntigravityCreditsHint(authID, cliproxyauth.AntigravityCreditsHint{ + Known: true, + Available: false, + CreditAmount: 0, + MinCreditAmount: 1, + UpdatedAt: time.Now(), + }) +} + +func clearAntigravityCreditsPermanentlyDisabled(auth *cliproxyauth.Auth) { + if auth == nil || strings.TrimSpace(auth.ID) == "" { + return + } + antigravityCreditsFailureByAuth.Delete(strings.TrimSpace(auth.ID)) } func antigravityHasExplicitCreditsBalanceExhaustedReason(body []byte) bool { @@ -462,81 +454,6 @@ func antigravityHasExplicitCreditsBalanceExhaustedReason(body []byte) bool { return false } -func antigravityPreferCreditsKey(auth *cliproxyauth.Auth, modelName string) string { - if auth == nil { - return "" - } - authID := strings.TrimSpace(auth.ID) - modelName = strings.TrimSpace(modelName) - if authID == "" || modelName == "" { - return "" - } - return authID + "|" + modelName -} - -func antigravityShouldPreferCredits(auth *cliproxyauth.Auth, modelName string, now time.Time) bool { - key := antigravityPreferCreditsKey(auth, modelName) - if key == "" { - return false - } - value, ok := antigravityPreferCreditsByModel.Load(key) - if !ok { - return false - } - until, ok := value.(time.Time) - if !ok || until.IsZero() { - antigravityPreferCreditsByModel.Delete(key) - return false - } - if !until.After(now) { - antigravityPreferCreditsByModel.Delete(key) - return false - } - return true -} - -func markAntigravityPreferCredits(auth *cliproxyauth.Auth, modelName string, now time.Time, retryAfter *time.Duration) { - key := antigravityPreferCreditsKey(auth, modelName) - if key == "" { - return - } - until := now.Add(antigravityCreditsRetryTTL) - if retryAfter != nil && *retryAfter > 0 { - until = now.Add(*retryAfter) - } - antigravityPreferCreditsByModel.Store(key, until) -} - -func clearAntigravityPreferCredits(auth *cliproxyauth.Auth, modelName string) { - key := antigravityPreferCreditsKey(auth, modelName) - if key == "" { - return - } - antigravityPreferCreditsByModel.Delete(key) -} - -func shouldMarkAntigravityCreditsExhausted(statusCode int, body []byte, reqErr error) bool { - if reqErr != nil || statusCode == 0 { - return false - } - if statusCode >= http.StatusInternalServerError || statusCode == http.StatusRequestTimeout { - return false - } - lowerBody := strings.ToLower(string(body)) - for _, keyword := range antigravityCreditsExhaustedKeywords { - if strings.Contains(lowerBody, keyword) { - if keyword == "resource has been exhausted" && - statusCode == http.StatusTooManyRequests && - decideAntigravity429(body).kind == antigravity429DecisionSoftRetry && - !antigravityHasQuotaResetDelayOrModelInfo(body) { - return false - } - return true - } - } - return false -} - func newAntigravityStatusErr(statusCode int, body []byte) statusErr { err := statusErr{code: statusCode, msg: string(body)} if statusCode == http.StatusTooManyRequests { @@ -547,129 +464,6 @@ func newAntigravityStatusErr(statusCode int, body []byte) statusErr { return err } -func (e *AntigravityExecutor) attemptCreditsFallback( - ctx context.Context, - auth *cliproxyauth.Auth, - httpClient *http.Client, - token string, - modelName string, - payload []byte, - stream bool, - alt string, - baseURL string, - originalBody []byte, -) (*http.Response, bool) { - if !antigravityCreditsRetryEnabled(e.cfg) { - return nil, false - } - if decideAntigravity429(originalBody).kind != antigravity429DecisionFullQuotaExhausted { - return nil, false - } - now := time.Now() - if shouldForcePermanentDisableCredits(originalBody) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return nil, false - } - - if antigravityHasExplicitCreditsBalanceExhaustedReason(originalBody) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return nil, false - } - - if antigravityCreditsDisabled(auth, now) { - return nil, false - } - creditsPayload := injectEnabledCreditTypes(payload) - if len(creditsPayload) == 0 { - return nil, false - } - - httpReq, errReq := e.buildRequest(ctx, auth, token, modelName, creditsPayload, stream, alt, baseURL) - if errReq != nil { - helps.RecordAPIResponseError(ctx, e.cfg, errReq) - clearAntigravityPreferCredits(auth, modelName) - recordAntigravityCreditsFailure(auth, now) - return nil, true - } - httpResp, errDo := httpClient.Do(httpReq) - if errDo != nil { - helps.RecordAPIResponseError(ctx, e.cfg, errDo) - clearAntigravityPreferCredits(auth, modelName) - recordAntigravityCreditsFailure(auth, now) - return nil, true - } - if httpResp.StatusCode >= http.StatusOK && httpResp.StatusCode < http.StatusMultipleChoices { - retryAfter, _ := parseRetryDelay(originalBody) - markAntigravityPreferCredits(auth, modelName, now, retryAfter) - clearAntigravityCreditsFailureState(auth) - return httpResp, true - } - - helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - bodyBytes, errRead := io.ReadAll(httpResp.Body) - if errClose := httpResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close credits fallback response body error: %v", errClose) - } - if errRead != nil { - helps.RecordAPIResponseError(ctx, e.cfg, errRead) - clearAntigravityPreferCredits(auth, modelName) - recordAntigravityCreditsFailure(auth, now) - return nil, true - } - helps.AppendAPIResponseChunk(ctx, e.cfg, bodyBytes) - if shouldForcePermanentDisableCredits(bodyBytes) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return nil, true - } - - if antigravityHasExplicitCreditsBalanceExhaustedReason(bodyBytes) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return nil, true - } - - clearAntigravityPreferCredits(auth, modelName) - recordAntigravityCreditsFailure(auth, now) - return nil, true -} - -func (e *AntigravityExecutor) handleDirectCreditsFailure(ctx context.Context, auth *cliproxyauth.Auth, modelName string, reqErr error) { - if reqErr != nil { - if shouldForcePermanentDisableCredits(reqErrBody(reqErr)) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return - } - - if antigravityHasExplicitCreditsBalanceExhaustedReason(reqErrBody(reqErr)) { - clearAntigravityPreferCredits(auth, modelName) - markAntigravityCreditsPermanentlyDisabled(auth) - return - } - - helps.RecordAPIResponseError(ctx, e.cfg, reqErr) - } - clearAntigravityPreferCredits(auth, modelName) - recordAntigravityCreditsFailure(auth, time.Now()) -} -func reqErrBody(reqErr error) []byte { - if reqErr == nil { - return nil - } - msg := reqErr.Error() - if strings.TrimSpace(msg) == "" { - return nil - } - return []byte(msg) -} - -func shouldForcePermanentDisableCredits(body []byte) bool { - return antigravityHasExplicitCreditsBalanceExhaustedReason(body) -} - // Execute performs a non-streaming request to the Antigravity API. func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (resp cliproxyexecutor.Response, err error) { if opts.Alt == "responses/compact" { @@ -721,6 +515,8 @@ func (e *AntigravityExecutor) Execute(ctx context.Context, auth *cliproxyauth.Au requestedModel := helps.PayloadRequestedModel(opts, req.Model) translated = helps.ApplyPayloadConfigWithRoot(e.cfg, baseModel, "antigravity", "request", translated, originalTranslated, requestedModel) + useCredits := cliproxyauth.AntigravityCreditsRequested(ctx) && antigravityCreditsRetryEnabled(e.cfg) + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) attempts := antigravityRetryAttempts(auth, e.cfg) @@ -733,11 +529,10 @@ attemptLoop: for idx, baseURL := range baseURLs { requestPayload := translated - usedCreditsDirect := false - if antigravityCreditsRetryEnabled(e.cfg) && antigravityShouldPreferCredits(auth, baseModel, time.Now()) { - if creditsPayload := injectEnabledCreditTypes(translated); len(creditsPayload) > 0 { - requestPayload = creditsPayload - usedCreditsDirect = true + if useCredits { + if cp := injectEnabledCreditTypes(translated); len(cp) > 0 { + requestPayload = cp + helps.MarkCreditsUsed(ctx) } } @@ -785,7 +580,6 @@ attemptLoop: wait := antigravityInstantRetryDelay(*decision.retryAfter) log.Debugf("antigravity executor: instant retry for model %s, waiting %s", baseModel, wait) if errWait := antigravityWait(ctx, wait); errWait != nil { - return resp, errWait } } @@ -794,34 +588,13 @@ attemptLoop: case antigravity429DecisionShortCooldownSwitchAuth: if decision.retryAfter != nil && *decision.retryAfter > 0 { markAntigravityShortCooldown(auth, baseModel, time.Now(), *decision.retryAfter) - log.Debugf("antigravity executor: short quota cooldown (%s) for model %s, recorded cooldown and skipping credits fallback", *decision.retryAfter, baseModel) + log.Debugf("antigravity executor: short quota cooldown (%s) for model %s, recorded cooldown", *decision.retryAfter, baseModel) } case antigravity429DecisionFullQuotaExhausted: - if usedCreditsDirect { - clearAntigravityPreferCredits(auth, baseModel) - recordAntigravityCreditsFailure(auth, time.Now()) - } else { - creditsResp, _ := e.attemptCreditsFallback(ctx, auth, httpClient, token, baseModel, translated, false, opts.Alt, baseURL, bodyBytes) - if creditsResp != nil { - helps.RecordAPIResponseMetadata(ctx, e.cfg, creditsResp.StatusCode, creditsResp.Header.Clone()) - creditsBody, errCreditsRead := io.ReadAll(creditsResp.Body) - if errClose := creditsResp.Body.Close(); errClose != nil { - log.Errorf("antigravity executor: close credits success response body error: %v", errClose) - } - if errCreditsRead != nil { - helps.RecordAPIResponseError(ctx, e.cfg, errCreditsRead) - err = errCreditsRead - return resp, err - } - helps.AppendAPIResponseChunk(ctx, e.cfg, creditsBody) - reporter.Publish(ctx, helps.ParseAntigravityUsage(creditsBody)) - var param any - converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, creditsBody, ¶m) - resp = cliproxyexecutor.Response{Payload: converted, Headers: creditsResp.Header.Clone()} - reporter.EnsurePublished(ctx) - return resp, nil - } + if useCredits && antigravityHasExplicitCreditsBalanceExhaustedReason(bodyBytes) { + markAntigravityCreditsPermanentlyDisabled(auth) } + // No credits logic - just fall through to error return below } } @@ -870,6 +643,10 @@ attemptLoop: return resp, err } + // Success + if useCredits { + clearAntigravityCreditsFailureState(auth) + } reporter.Publish(ctx, helps.ParseAntigravityUsage(bodyBytes)) var param any converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bodyBytes, ¶m) @@ -935,6 +712,8 @@ func (e *AntigravityExecutor) executeClaudeNonStream(ctx context.Context, auth * requestedModel := helps.PayloadRequestedModel(opts, req.Model) translated = helps.ApplyPayloadConfigWithRoot(e.cfg, baseModel, "antigravity", "request", translated, originalTranslated, requestedModel) + useCredits := cliproxyauth.AntigravityCreditsRequested(ctx) && antigravityCreditsRetryEnabled(e.cfg) + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) @@ -948,11 +727,10 @@ attemptLoop: for idx, baseURL := range baseURLs { requestPayload := translated - usedCreditsDirect := false - if antigravityCreditsRetryEnabled(e.cfg) && antigravityShouldPreferCredits(auth, baseModel, time.Now()) { - if creditsPayload := injectEnabledCreditTypes(translated); len(creditsPayload) > 0 { - requestPayload = creditsPayload - usedCreditsDirect = true + if useCredits { + if cp := injectEnabledCreditTypes(translated); len(cp) > 0 { + requestPayload = cp + helps.MarkCreditsUsed(ctx) } } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, true, opts.Alt, baseURL) @@ -1014,7 +792,6 @@ attemptLoop: wait := antigravityInstantRetryDelay(*decision.retryAfter) log.Debugf("antigravity executor: instant retry for model %s, waiting %s", baseModel, wait) if errWait := antigravityWait(ctx, wait); errWait != nil { - return resp, errWait } } @@ -1023,25 +800,16 @@ attemptLoop: case antigravity429DecisionShortCooldownSwitchAuth: if decision.retryAfter != nil && *decision.retryAfter > 0 { markAntigravityShortCooldown(auth, baseModel, time.Now(), *decision.retryAfter) - log.Debugf("antigravity executor: short quota cooldown (%s) for model %s, recorded cooldown and skipping credits fallback", *decision.retryAfter, baseModel) + log.Debugf("antigravity executor: short quota cooldown (%s) for model %s, recorded cooldown", *decision.retryAfter, baseModel) } case antigravity429DecisionFullQuotaExhausted: - if usedCreditsDirect { - clearAntigravityPreferCredits(auth, baseModel) - recordAntigravityCreditsFailure(auth, time.Now()) - } else { - creditsResp, _ := e.attemptCreditsFallback(ctx, auth, httpClient, token, baseModel, translated, true, opts.Alt, baseURL, bodyBytes) - if creditsResp != nil { - httpResp = creditsResp - helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - } + if useCredits && antigravityHasExplicitCreditsBalanceExhaustedReason(bodyBytes) { + markAntigravityCreditsPermanentlyDisabled(auth) } + // No credits logic - just fall through to error return below } } - if httpResp.StatusCode >= http.StatusOK && httpResp.StatusCode < http.StatusMultipleChoices { - goto streamSuccessClaudeNonStream - } lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), bodyBytes...) lastErr = nil @@ -1085,7 +853,10 @@ attemptLoop: return resp, err } - streamSuccessClaudeNonStream: + // Stream success + if useCredits { + clearAntigravityCreditsFailureState(auth) + } out := make(chan cliproxyexecutor.StreamChunk) go func(resp *http.Response) { defer close(out) @@ -1389,6 +1160,7 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya if updatedAuth != nil { auth = updatedAuth } + originalTranslated := sdktranslator.TranslateRequest(from, to, baseModel, originalPayload, true) translated := sdktranslator.TranslateRequest(from, to, baseModel, req.Payload, true) @@ -1400,6 +1172,8 @@ func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxya requestedModel := helps.PayloadRequestedModel(opts, req.Model) translated = helps.ApplyPayloadConfigWithRoot(e.cfg, baseModel, "antigravity", "request", translated, originalTranslated, requestedModel) + useCredits := cliproxyauth.AntigravityCreditsRequested(ctx) && antigravityCreditsRetryEnabled(e.cfg) + baseURLs := antigravityBaseURLFallbackOrder(auth) httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) @@ -1413,11 +1187,10 @@ attemptLoop: for idx, baseURL := range baseURLs { requestPayload := translated - usedCreditsDirect := false - if antigravityCreditsRetryEnabled(e.cfg) && antigravityShouldPreferCredits(auth, baseModel, time.Now()) { - if creditsPayload := injectEnabledCreditTypes(translated); len(creditsPayload) > 0 { - requestPayload = creditsPayload - usedCreditsDirect = true + if useCredits { + if cp := injectEnabledCreditTypes(translated); len(cp) > 0 { + requestPayload = cp + helps.MarkCreditsUsed(ctx) } } httpReq, errReq := e.buildRequest(ctx, auth, token, baseModel, requestPayload, true, opts.Alt, baseURL) @@ -1478,7 +1251,6 @@ attemptLoop: wait := antigravityInstantRetryDelay(*decision.retryAfter) log.Debugf("antigravity executor: instant retry for model %s, waiting %s", baseModel, wait) if errWait := antigravityWait(ctx, wait); errWait != nil { - return nil, errWait } } @@ -1487,25 +1259,16 @@ attemptLoop: case antigravity429DecisionShortCooldownSwitchAuth: if decision.retryAfter != nil && *decision.retryAfter > 0 { markAntigravityShortCooldown(auth, baseModel, time.Now(), *decision.retryAfter) - log.Debugf("antigravity executor: short quota cooldown (%s) for model %s, recorded cooldown and skipping credits fallback", *decision.retryAfter, baseModel) + log.Debugf("antigravity executor: short quota cooldown (%s) for model %s recorded", *decision.retryAfter, baseModel) } case antigravity429DecisionFullQuotaExhausted: - if usedCreditsDirect { - clearAntigravityPreferCredits(auth, baseModel) - recordAntigravityCreditsFailure(auth, time.Now()) - } else { - creditsResp, _ := e.attemptCreditsFallback(ctx, auth, httpClient, token, baseModel, translated, true, opts.Alt, baseURL, bodyBytes) - if creditsResp != nil { - httpResp = creditsResp - helps.RecordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone()) - } + if useCredits && antigravityHasExplicitCreditsBalanceExhaustedReason(bodyBytes) { + markAntigravityCreditsPermanentlyDisabled(auth) } + // No credits logic - just fall through to error return below } } - if httpResp.StatusCode >= http.StatusOK && httpResp.StatusCode < http.StatusMultipleChoices { - goto streamSuccessExecuteStream - } lastStatus = httpResp.StatusCode lastBody = append([]byte(nil), bodyBytes...) lastErr = nil @@ -1549,7 +1312,10 @@ attemptLoop: return nil, err } - streamSuccessExecuteStream: + // Stream success + if useCredits { + clearAntigravityCreditsFailureState(auth) + } out := make(chan cliproxyexecutor.StreamChunk) go func(resp *http.Response) { defer close(out) @@ -1792,6 +1558,9 @@ func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *clipr accessToken := metaStringValue(auth.Metadata, "access_token") expiry := tokenExpiry(auth.Metadata) if accessToken != "" && expiry.After(time.Now().Add(refreshSkew)) { + if !cliproxyauth.HasKnownAntigravityCreditsHint(auth.ID) { + e.updateAntigravityCreditsBalance(ctx, auth, accessToken) + } return accessToken, nil, nil } refreshCtx := context.Background() @@ -1882,6 +1651,7 @@ func (e *AntigravityExecutor) refreshToken(ctx context.Context, auth *cliproxyau if errProject := e.ensureAntigravityProjectID(ctx, auth, tokenResp.AccessToken); errProject != nil { log.Warnf("antigravity executor: ensure project id failed: %v", errProject) } + e.updateAntigravityCreditsBalance(ctx, auth, tokenResp.AccessToken) return auth, nil } @@ -1918,6 +1688,94 @@ func (e *AntigravityExecutor) ensureAntigravityProjectID(ctx context.Context, au return nil } +func (e *AntigravityExecutor) updateAntigravityCreditsBalance(ctx context.Context, auth *cliproxyauth.Auth, accessToken string) { + if auth == nil || strings.TrimSpace(auth.ID) == "" { + return + } + token := strings.TrimSpace(accessToken) + if token == "" { + token = metaStringValue(auth.Metadata, "access_token") + } + if token == "" { + return + } + + loadReqBody := `{"metadata":{"ideType":"ANTIGRAVITY","platform":"PLATFORM_UNSPECIFIED","pluginType":"GEMINI"}}` + endpointURL := "https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist" + httpReq, errReq := http.NewRequestWithContext(ctx, http.MethodPost, endpointURL, strings.NewReader(loadReqBody)) + if errReq != nil { + log.Debugf("antigravity executor: create loadCodeAssist request error: %v", errReq) + return + } + httpReq.Header.Set("Authorization", "Bearer "+token) + httpReq.Header.Set("Content-Type", "application/json") + httpReq.Header.Set("User-Agent", "google-api-nodejs-client/9.15.1") + + httpClient := newAntigravityHTTPClient(ctx, e.cfg, auth, 0) + httpResp, errDo := httpClient.Do(httpReq) + if errDo != nil { + log.Debugf("antigravity executor: loadCodeAssist request error: %v", errDo) + return + } + defer func() { + if errClose := httpResp.Body.Close(); errClose != nil { + log.Errorf("antigravity executor: close loadCodeAssist response body error: %v", errClose) + } + }() + + bodyBytes, errRead := io.ReadAll(httpResp.Body) + if errRead != nil || httpResp.StatusCode < http.StatusOK || httpResp.StatusCode >= http.StatusMultipleChoices { + log.Debugf("antigravity executor: loadCodeAssist returned status %d, err=%v", httpResp.StatusCode, errRead) + return + } + + authID := strings.TrimSpace(auth.ID) + paidTierID := strings.TrimSpace(gjson.GetBytes(bodyBytes, "paidTier.id").String()) + + credits := gjson.GetBytes(bodyBytes, "paidTier.availableCredits") + if !credits.IsArray() { + cliproxyauth.SetAntigravityCreditsHint(authID, cliproxyauth.AntigravityCreditsHint{ + Known: true, + Available: false, + PaidTierID: paidTierID, + UpdatedAt: time.Now(), + }) + return + } + for _, credit := range credits.Array() { + if !strings.EqualFold(credit.Get("creditType").String(), "GOOGLE_ONE_AI") { + continue + } + creditAmount, errCA := strconv.ParseFloat(strings.TrimSpace(credit.Get("creditAmount").String()), 64) + if errCA != nil { + continue + } + minAmount, errMA := strconv.ParseFloat(strings.TrimSpace(credit.Get("minimumCreditAmountForUsage").String()), 64) + if errMA != nil { + continue + } + bal := antigravityCreditsBalance{ + CreditAmount: creditAmount, + MinCreditAmount: minAmount, + PaidTierID: paidTierID, + Known: true, + } + antigravityCreditsBalanceByAuth.Store(authID, bal) + cliproxyauth.SetAntigravityCreditsHint(authID, cliproxyauth.AntigravityCreditsHint{ + Known: true, + Available: creditAmount >= minAmount, + CreditAmount: creditAmount, + MinCreditAmount: minAmount, + PaidTierID: paidTierID, + UpdatedAt: time.Now(), + }) + if creditAmount >= minAmount { + clearAntigravityCreditsPermanentlyDisabled(auth) + } + return + } +} + func (e *AntigravityExecutor) buildRequest(ctx context.Context, auth *cliproxyauth.Auth, token, modelName string, payload []byte, stream bool, alt, baseURL string) (*http.Request, error) { if token == "" { return nil, statusErr{code: http.StatusUnauthorized, msg: "missing access token"} diff --git a/internal/runtime/executor/antigravity_executor_credits_test.go b/internal/runtime/executor/antigravity_executor_credits_test.go index cf968ac7..b9c7a91f 100644 --- a/internal/runtime/executor/antigravity_executor_credits_test.go +++ b/internal/runtime/executor/antigravity_executor_credits_test.go @@ -18,8 +18,8 @@ import ( func resetAntigravityCreditsRetryState() { antigravityCreditsFailureByAuth = sync.Map{} - antigravityPreferCreditsByModel = sync.Map{} antigravityShortCooldownByAuth = sync.Map{} + antigravityCreditsBalanceByAuth = sync.Map{} } func TestClassifyAntigravity429(t *testing.T) { @@ -30,6 +30,43 @@ func TestClassifyAntigravity429(t *testing.T) { } }) + t.Run("standard antigravity rate limit with ui message stays rate limited", func(t *testing.T) { + body := []byte(`{ + "error": { + "code": 429, + "message": "You have exhausted your capacity on this model. Your quota will reset after 0s.", + "status": "RESOURCE_EXHAUSTED", + "details": [ + { + "@type": "type.googleapis.com/google.rpc.ErrorInfo", + "reason": "RATE_LIMIT_EXCEEDED", + "domain": "cloudcode-pa.googleapis.com", + "metadata": { + "model": "claude-opus-4-6-thinking", + "quotaResetDelay": "479.417207ms", + "quotaResetTimeStamp": "2026-04-20T09:19:49Z", + "uiMessage": "true" + } + }, + { + "@type": "type.googleapis.com/google.rpc.RetryInfo", + "retryDelay": "0.479417207s" + } + ] + } + }`) + if got := classifyAntigravity429(body); got != antigravity429RateLimited { + t.Fatalf("classifyAntigravity429() = %q, want %q", got, antigravity429RateLimited) + } + decision := decideAntigravity429(body) + if decision.kind != antigravity429DecisionInstantRetrySameAuth { + t.Fatalf("decideAntigravity429().kind = %q, want %q", decision.kind, antigravity429DecisionInstantRetrySameAuth) + } + if decision.retryAfter == nil { + t.Fatal("decideAntigravity429().retryAfter = nil") + } + }) + t.Run("structured rate limit", func(t *testing.T) { body := []byte(`{ "error": { @@ -67,8 +104,32 @@ func TestClassifyAntigravity429(t *testing.T) { }) } +func TestAntigravityShouldRetryNoCapacity_Standard503(t *testing.T) { + body := []byte(`{ + "error": { + "code": 503, + "message": "No capacity available for model gemini-3.1-flash-image on the server", + "status": "UNAVAILABLE", + "details": [ + { + "@type": "type.googleapis.com/google.rpc.ErrorInfo", + "reason": "MODEL_CAPACITY_EXHAUSTED", + "domain": "cloudcode-pa.googleapis.com", + "metadata": { + "model": "gemini-3.1-flash-image" + } + } + ] + } + }`) + if !antigravityShouldRetryNoCapacity(http.StatusServiceUnavailable, body) { + t.Fatal("antigravityShouldRetryNoCapacity() = false, want true") + } +} + + func TestInjectEnabledCreditTypes(t *testing.T) { - body := []byte(`{"model":"gemini-2.5-flash","request":{}}`) + body := []byte(`{"model":"claude-sonnet-4-6","request":{}}`) got := injectEnabledCreditTypes(body) if got == nil { t.Fatal("injectEnabledCreditTypes() returned nil") @@ -82,37 +143,22 @@ func TestInjectEnabledCreditTypes(t *testing.T) { } } -func TestShouldMarkAntigravityCreditsExhausted(t *testing.T) { - t.Run("credit errors are marked", func(t *testing.T) { - for _, body := range [][]byte{ - []byte(`{"error":{"message":"Insufficient GOOGLE_ONE_AI credits"}}`), - []byte(`{"error":{"message":"minimumCreditAmountForUsage requirement not met"}}`), - } { - if !shouldMarkAntigravityCreditsExhausted(http.StatusForbidden, body, nil) { - t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = false, want true", string(body)) - } - } - }) - - t.Run("transient 429 resource exhausted is not marked", func(t *testing.T) { - body := []byte(`{"error":{"code":429,"message":"Resource has been exhausted (e.g. check quota).","status":"RESOURCE_EXHAUSTED"}}`) - if shouldMarkAntigravityCreditsExhausted(http.StatusTooManyRequests, body, nil) { - t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = true, want false", string(body)) - } - }) - - t.Run("resource exhausted with quota metadata is still marked", func(t *testing.T) { - body := []byte(`{"error":{"code":429,"message":"Resource has been exhausted","status":"RESOURCE_EXHAUSTED","details":[{"@type":"type.googleapis.com/google.rpc.ErrorInfo","metadata":{"quotaResetDelay":"1h","model":"claude-sonnet-4-6"}}]}}`) - if !shouldMarkAntigravityCreditsExhausted(http.StatusTooManyRequests, body, nil) { - t.Fatalf("shouldMarkAntigravityCreditsExhausted(%s) = false, want true", string(body)) - } - }) - - if shouldMarkAntigravityCreditsExhausted(http.StatusServiceUnavailable, []byte(`{"error":{"message":"credits exhausted"}}`), nil) { - t.Fatal("shouldMarkAntigravityCreditsExhausted() = true for 5xx, want false") +func TestParseRetryDelay_HumanReadableDuration(t *testing.T) { + body := []byte(`{"error":{"message":"You have exhausted your capacity on this model. Your quota will reset after 1h43m56s."}}`) + retryAfter, err := parseRetryDelay(body) + if err != nil { + t.Fatalf("parseRetryDelay() error = %v", err) + } + if retryAfter == nil { + t.Fatal("parseRetryDelay() returned nil") + } + want := time.Hour + 43*time.Minute + 56*time.Second + if *retryAfter != want { + t.Fatalf("parseRetryDelay() = %v, want %v", *retryAfter, want) } } + func TestAntigravityExecute_RetriesTransient429ResourceExhausted(t *testing.T) { resetAntigravityCreditsRetryState() t.Cleanup(resetAntigravityCreditsRetryState) @@ -147,7 +193,7 @@ func TestAntigravityExecute_RetriesTransient429ResourceExhausted(t *testing.T) { } resp, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", + Model: "claude-sonnet-4-6", Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), }, cliproxyexecutor.Options{ SourceFormat: sdktranslator.FormatAntigravity, @@ -163,32 +209,18 @@ func TestAntigravityExecute_RetriesTransient429ResourceExhausted(t *testing.T) { } } -func TestAntigravityExecute_RetriesQuotaExhaustedWithCredits(t *testing.T) { +func TestAntigravityExecute_CreditsInjectedWhenConductorRequests(t *testing.T) { resetAntigravityCreditsRetryState() t.Cleanup(resetAntigravityCreditsRetryState) - var ( - mu sync.Mutex - requestBodies []string - ) - + var requestBodies []string server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) _ = r.Body.Close() - - mu.Lock() requestBodies = append(requestBodies, string(body)) - reqNum := len(requestBodies) - mu.Unlock() - - if reqNum == 1 { - w.WriteHeader(http.StatusTooManyRequests) - _, _ = w.Write([]byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`)) - return - } if !strings.Contains(string(body), `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("second request body missing enabledCreditTypes: %s", string(body)) + t.Fatalf("request body missing enabledCreditTypes: %s", string(body)) } w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"ok"}]}}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}}`)) @@ -199,7 +231,7 @@ func TestAntigravityExecute_RetriesQuotaExhaustedWithCredits(t *testing.T) { QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, }) auth := &cliproxyauth.Auth{ - ID: "auth-credits-ok", + ID: "auth-credits-conductor", Attributes: map[string]string{ "base_url": server.URL, }, @@ -210,8 +242,11 @@ func TestAntigravityExecute_RetriesQuotaExhaustedWithCredits(t *testing.T) { }, } - resp, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", + // Simulate conductor setting credits requested flag in context + ctx := cliproxyauth.WithAntigravityCredits(context.Background()) + + resp, err := exec.Execute(ctx, auth, cliproxyexecutor.Request{ + Model: "claude-sonnet-4-6", Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), }, cliproxyexecutor.Options{ SourceFormat: sdktranslator.FormatAntigravity, @@ -222,227 +257,12 @@ func TestAntigravityExecute_RetriesQuotaExhaustedWithCredits(t *testing.T) { if len(resp.Payload) == 0 { t.Fatal("Execute() returned empty payload") } - - mu.Lock() - defer mu.Unlock() - if len(requestBodies) != 2 { - t.Fatalf("request count = %d, want 2", len(requestBodies)) + if len(requestBodies) != 1 { + t.Fatalf("request count = %d, want 1", len(requestBodies)) } } -func TestAntigravityExecute_SkipsCreditsRetryWhenAlreadyExhausted(t *testing.T) { - resetAntigravityCreditsRetryState() - t.Cleanup(resetAntigravityCreditsRetryState) - - var requestCount int - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - requestCount++ - w.WriteHeader(http.StatusTooManyRequests) - _, _ = w.Write([]byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`)) - })) - defer server.Close() - - exec := NewAntigravityExecutor(&config.Config{ - QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, - }) - auth := &cliproxyauth.Auth{ - ID: "auth-credits-exhausted", - Attributes: map[string]string{ - "base_url": server.URL, - }, - Metadata: map[string]any{ - "access_token": "token", - "project_id": "project-1", - "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), - }, - } - recordAntigravityCreditsFailure(auth, time.Now()) - - _, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", - Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), - }, cliproxyexecutor.Options{ - SourceFormat: sdktranslator.FormatAntigravity, - }) - if err == nil { - t.Fatal("Execute() error = nil, want 429") - } - sErr, ok := err.(statusErr) - if !ok { - t.Fatalf("Execute() error type = %T, want statusErr", err) - } - if got := sErr.StatusCode(); got != http.StatusTooManyRequests { - t.Fatalf("Execute() status code = %d, want %d", got, http.StatusTooManyRequests) - } - if requestCount != 1 { - t.Fatalf("request count = %d, want 1", requestCount) - } -} - -func TestAntigravityExecute_PrefersCreditsAfterSuccessfulFallback(t *testing.T) { - resetAntigravityCreditsRetryState() - t.Cleanup(resetAntigravityCreditsRetryState) - - var ( - mu sync.Mutex - requestBodies []string - ) - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - body, _ := io.ReadAll(r.Body) - _ = r.Body.Close() - - mu.Lock() - requestBodies = append(requestBodies, string(body)) - reqNum := len(requestBodies) - mu.Unlock() - - switch reqNum { - case 1: - w.WriteHeader(http.StatusTooManyRequests) - _, _ = w.Write([]byte(`{"error":{"status":"RESOURCE_EXHAUSTED","details":[{"@type":"type.googleapis.com/google.rpc.ErrorInfo","reason":"QUOTA_EXHAUSTED"},{"@type":"type.googleapis.com/google.rpc.RetryInfo","retryDelay":"10s"}]}}`)) - case 2, 3: - if !strings.Contains(string(body), `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("request %d body missing enabledCreditTypes: %s", reqNum, string(body)) - } - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"OK"}]}}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}}`)) - default: - t.Fatalf("unexpected request count %d", reqNum) - } - })) - defer server.Close() - - exec := NewAntigravityExecutor(&config.Config{ - QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, - }) - auth := &cliproxyauth.Auth{ - ID: "auth-prefer-credits", - Attributes: map[string]string{ - "base_url": server.URL, - }, - Metadata: map[string]any{ - "access_token": "token", - "project_id": "project-1", - "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), - }, - } - - request := cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", - Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), - } - opts := cliproxyexecutor.Options{SourceFormat: sdktranslator.FormatAntigravity} - - if _, err := exec.Execute(context.Background(), auth, request, opts); err != nil { - t.Fatalf("first Execute() error = %v", err) - } - if _, err := exec.Execute(context.Background(), auth, request, opts); err != nil { - t.Fatalf("second Execute() error = %v", err) - } - - mu.Lock() - defer mu.Unlock() - if len(requestBodies) != 3 { - t.Fatalf("request count = %d, want 3", len(requestBodies)) - } - if strings.Contains(requestBodies[0], `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("first request unexpectedly used credits: %s", requestBodies[0]) - } - if !strings.Contains(requestBodies[1], `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("fallback request missing credits: %s", requestBodies[1]) - } - if !strings.Contains(requestBodies[2], `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("preferred request missing credits: %s", requestBodies[2]) - } -} - -func TestAntigravityExecute_PreservesBaseURLFallbackAfterCreditsRetryFailure(t *testing.T) { - resetAntigravityCreditsRetryState() - t.Cleanup(resetAntigravityCreditsRetryState) - - var ( - mu sync.Mutex - firstCount int - secondCount int - ) - - firstServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - body, _ := io.ReadAll(r.Body) - _ = r.Body.Close() - - mu.Lock() - firstCount++ - reqNum := firstCount - mu.Unlock() - - switch reqNum { - case 1: - w.WriteHeader(http.StatusTooManyRequests) - _, _ = w.Write([]byte(`{"error":{"status":"RESOURCE_EXHAUSTED","details":[{"@type":"type.googleapis.com/google.rpc.ErrorInfo","reason":"QUOTA_EXHAUSTED"}]}}`)) - case 2: - if !strings.Contains(string(body), `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("credits retry missing enabledCreditTypes: %s", string(body)) - } - w.WriteHeader(http.StatusForbidden) - _, _ = w.Write([]byte(`{"error":{"message":"permission denied"}}`)) - default: - t.Fatalf("unexpected first server request count %d", reqNum) - } - })) - defer firstServer.Close() - - secondServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - mu.Lock() - secondCount++ - mu.Unlock() - w.Header().Set("Content-Type", "application/json") - _, _ = w.Write([]byte(`{"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"ok"}]}}],"usageMetadata":{"promptTokenCount":1,"candidatesTokenCount":1,"totalTokenCount":2}}}`)) - })) - defer secondServer.Close() - - exec := NewAntigravityExecutor(&config.Config{ - QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, - }) - auth := &cliproxyauth.Auth{ - ID: "auth-baseurl-fallback", - Attributes: map[string]string{ - "base_url": firstServer.URL, - }, - Metadata: map[string]any{ - "access_token": "token", - "project_id": "project-1", - "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), - }, - } - - originalOrder := antigravityBaseURLFallbackOrder - defer func() { antigravityBaseURLFallbackOrder = originalOrder }() - antigravityBaseURLFallbackOrder = func(auth *cliproxyauth.Auth) []string { - return []string{firstServer.URL, secondServer.URL} - } - - resp, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", - Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), - }, cliproxyexecutor.Options{ - SourceFormat: sdktranslator.FormatAntigravity, - }) - if err != nil { - t.Fatalf("Execute() error = %v", err) - } - if len(resp.Payload) == 0 { - t.Fatal("Execute() returned empty payload") - } - if firstCount != 2 { - t.Fatalf("first server request count = %d, want 2", firstCount) - } - if secondCount != 1 { - t.Fatalf("second server request count = %d, want 1", secondCount) - } -} - -func TestAntigravityExecute_DoesNotDirectInjectCreditsWhenFlagDisabled(t *testing.T) { +func TestAntigravityExecute_NoCreditsWithoutConductorFlag(t *testing.T) { resetAntigravityCreditsRetryState() t.Cleanup(resetAntigravityCreditsRetryState) @@ -457,10 +277,10 @@ func TestAntigravityExecute_DoesNotDirectInjectCreditsWhenFlagDisabled(t *testin defer server.Close() exec := NewAntigravityExecutor(&config.Config{ - QuotaExceeded: config.QuotaExceeded{AntigravityCredits: false}, + QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, }) auth := &cliproxyauth.Auth{ - ID: "auth-flag-disabled", + ID: "auth-no-conductor-flag", Attributes: map[string]string{ "base_url": server.URL, }, @@ -470,10 +290,10 @@ func TestAntigravityExecute_DoesNotDirectInjectCreditsWhenFlagDisabled(t *testin "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), }, } - markAntigravityPreferCredits(auth, "gemini-2.5-flash", time.Now(), nil) + // No conductor credits flag set in context _, err := exec.Execute(context.Background(), auth, cliproxyexecutor.Request{ - Model: "gemini-2.5-flash", + Model: "claude-sonnet-4-6", Payload: []byte(`{"request":{"contents":[{"role":"user","parts":[{"text":"hi"}]}]}}`), }, cliproxyexecutor.Options{ SourceFormat: sdktranslator.FormatAntigravity, @@ -484,7 +304,150 @@ func TestAntigravityExecute_DoesNotDirectInjectCreditsWhenFlagDisabled(t *testin if len(requestBodies) != 1 { t.Fatalf("request count = %d, want 1", len(requestBodies)) } - if strings.Contains(requestBodies[0], `"enabledCreditTypes":["GOOGLE_ONE_AI"]`) { - t.Fatalf("request unexpectedly used enabledCreditTypes with flag disabled: %s", requestBodies[0]) + // Should NOT contain credits since conductor didn't request them + if strings.Contains(requestBodies[0], `"enabledCreditTypes"`) { + t.Fatalf("request should not contain enabledCreditTypes without conductor flag: %s", requestBodies[0]) + } +} + +func TestAntigravityAuthHasCredits(t *testing.T) { + t.Run("sufficient balance", func(t *testing.T) { + resetAntigravityCreditsRetryState() + auth := &cliproxyauth.Auth{ID: "test-sufficient"} + antigravityCreditsBalanceByAuth.Store("test-sufficient", antigravityCreditsBalance{ + CreditAmount: 25000, + MinCreditAmount: 50, + Known: true, + }) + if !antigravityAuthHasCredits(auth) { + t.Fatal("antigravityAuthHasCredits() = false, want true") + } + }) + + t.Run("insufficient balance", func(t *testing.T) { + resetAntigravityCreditsRetryState() + auth := &cliproxyauth.Auth{ID: "test-insufficient"} + antigravityCreditsBalanceByAuth.Store("test-insufficient", antigravityCreditsBalance{ + CreditAmount: 30, + MinCreditAmount: 50, + Known: true, + }) + if antigravityAuthHasCredits(auth) { + t.Fatal("antigravityAuthHasCredits() = true, want false") + } + }) + + t.Run("no balance stored returns true (optimistic)", func(t *testing.T) { + resetAntigravityCreditsRetryState() + auth := &cliproxyauth.Auth{ID: "test-no-balance"} + if !antigravityAuthHasCredits(auth) { + t.Fatal("antigravityAuthHasCredits() = false with no balance stored, want true (optimistic default)") + } + }) + + t.Run("nil auth returns false", func(t *testing.T) { + if antigravityAuthHasCredits(nil) { + t.Fatal("antigravityAuthHasCredits(nil) = true, want false") + } + }) + + t.Run("empty ID returns false", func(t *testing.T) { + auth := &cliproxyauth.Auth{} + if antigravityAuthHasCredits(auth) { + t.Fatal("antigravityAuthHasCredits(empty ID) = true, want false") + } + }) + + t.Run("unknown balance returns false", func(t *testing.T) { + resetAntigravityCreditsRetryState() + auth := &cliproxyauth.Auth{ID: "test-unknown"} + antigravityCreditsBalanceByAuth.Store("test-unknown", antigravityCreditsBalance{ + Known: false, + }) + if antigravityAuthHasCredits(auth) { + t.Fatal("antigravityAuthHasCredits() = true for unknown balance, want false") + } + }) +} + +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return f(req) +} + +func TestEnsureAccessToken_WarmTokenLoadsCreditsHint(t *testing.T) { + resetAntigravityCreditsRetryState() + t.Cleanup(resetAntigravityCreditsRetryState) + + exec := NewAntigravityExecutor(&config.Config{}) + auth := &cliproxyauth.Auth{ + ID: "auth-warm-token-credits", + Metadata: map[string]any{ + "access_token": "token", + "expired": time.Now().Add(1 * time.Hour).Format(time.RFC3339), + }, + } + ctx := context.WithValue(context.Background(), "cliproxy.roundtripper", roundTripperFunc(func(req *http.Request) (*http.Response, error) { + if req.URL.String() != "https://cloudcode-pa.googleapis.com/v1internal:loadCodeAssist" { + t.Fatalf("unexpected request url %s", req.URL.String()) + } + return &http.Response{ + StatusCode: http.StatusOK, + Header: make(http.Header), + Body: io.NopCloser(strings.NewReader(`{"paidTier":{"id":"tier-1","availableCredits":[{"creditType":"GOOGLE_ONE_AI","creditAmount":"25000","minimumCreditAmountForUsage":"50"}]}}`)), + }, nil + })) + + token, updatedAuth, err := exec.ensureAccessToken(ctx, auth) + if err != nil { + t.Fatalf("ensureAccessToken() error = %v", err) + } + if token != "token" { + t.Fatalf("ensureAccessToken() token = %q, want %q", token, "token") + } + if updatedAuth != nil { + t.Fatalf("ensureAccessToken() updatedAuth = %v, want nil", updatedAuth) + } + if !cliproxyauth.HasKnownAntigravityCreditsHint(auth.ID) { + t.Fatal("expected credits hint to be populated for warm token auth") + } + hint, ok := cliproxyauth.GetAntigravityCreditsHint(auth.ID) + if !ok { + t.Fatal("expected credits hint lookup to succeed") + } + if !hint.Available { + t.Fatalf("hint.Available = %v, want true", hint.Available) + } + if hint.CreditAmount != 25000 || hint.MinCreditAmount != 50 { + t.Fatalf("hint amounts = (%v, %v), want (25000, 50)", hint.CreditAmount, hint.MinCreditAmount) + } +} + +func TestParseMetaFloat(t *testing.T) { + tests := []struct { + name string + value any + wantVal float64 + wantOK bool + }{ + {"string", "25000", 25000, true}, + {"float64", float64(100), 100, true}, + {"int", int(50), 50, true}, + {"int64", int64(75), 75, true}, + {"empty string", "", 0, false}, + {"invalid string", "abc", 0, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + meta := map[string]any{"key": tt.value} + got, ok := parseMetaFloat(meta, "key") + if ok != tt.wantOK { + t.Fatalf("parseMetaFloat() ok = %v, want %v", ok, tt.wantOK) + } + if ok && got != tt.wantVal { + t.Fatalf("parseMetaFloat() = %f, want %f", got, tt.wantVal) + } + }) } } diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index d2df6109..a18f824a 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -898,7 +898,14 @@ func parseRetryDelay(errorBody []byte) (*time.Duration, error) { if matches := re.FindStringSubmatch(message); len(matches) > 1 { seconds, err := strconv.Atoi(matches[1]) if err == nil { - return new(time.Duration(seconds) * time.Second), nil + duration := time.Duration(seconds) * time.Second + return &duration, nil + } + } + reHuman := regexp.MustCompile(`after\s+((?:\d+h)?(?:\d+m)?(?:\d+s)?)\.?`) + if matches := reHuman.FindStringSubmatch(strings.ToLower(message)); len(matches) > 1 { + if duration, err := time.ParseDuration(matches[1]); err == nil && duration > 0 { + return &duration, nil } } } diff --git a/internal/runtime/executor/helps/logging_helpers.go b/internal/runtime/executor/helps/logging_helpers.go index 767c8820..b77ec1a9 100644 --- a/internal/runtime/executor/helps/logging_helpers.go +++ b/internal/runtime/executor/helps/logging_helpers.go @@ -24,6 +24,14 @@ const ( apiRequestKey = "API_REQUEST" apiResponseKey = "API_RESPONSE" apiWebsocketTimelineKey = "API_WEBSOCKET_TIMELINE" + + // maxErrorLogResponseBodySize limits cached response body when request-log is disabled. + // Prevents unbounded memory growth for large/streaming responses in error-only mode. + maxErrorLogResponseBodySize = 32 * 1024 // 32KB + + // maxErrorLogRequestBodySize limits materialized request body in error-only mode. + // Prevents OOM from large payloads (e.g. base64 images) when full request logging is off. + maxErrorLogRequestBodySize = 32 * 1024 // 32KB ) // UpstreamRequestLog captures the outbound upstream request details for logging. @@ -42,6 +50,7 @@ type UpstreamRequestLog struct { type upstreamAttempt struct { index int request string + deferredBody []byte // lazy body reference; only materialized on error response *strings.Builder responseIntroWritten bool statusWritten bool @@ -50,13 +59,12 @@ type upstreamAttempt struct { bodyHasContent bool prevWasSSEEvent bool errorWritten bool + bodyBytesWritten int + bodyTruncated bool } // RecordAPIRequest stores the upstream request metadata in Gin context for request logging. func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequestLog) { - if cfg == nil || !cfg.RequestLog { - return - } ginCtx := ginContextFrom(ctx) if ginCtx == nil { return @@ -65,6 +73,8 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ attempts := getAttempts(ginCtx) index := len(attempts) + 1 + requestLogEnabled := cfg != nil && cfg.RequestLog + builder := &strings.Builder{} builder.WriteString(fmt.Sprintf("=== API REQUEST %d ===\n", index)) builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano))) @@ -82,10 +92,20 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ builder.WriteString("\nHeaders:\n") writeHeaders(builder, info.Headers) builder.WriteString("\nBody:\n") - if len(info.Body) > 0 { - builder.WriteString(string(info.Body)) + if requestLogEnabled { + // Full request logging: format body inline + if len(info.Body) > 0 { + builder.WriteString(string(info.Body)) + } else { + builder.WriteString("") + } } else { - builder.WriteString("") + // Error-only mode: defer body to avoid allocating copies for the 99% success path + if len(info.Body) > 0 { + builder.WriteString(fmt.Sprintf("", len(info.Body))) + } else { + builder.WriteString("") + } } builder.WriteString("\n\n") @@ -94,6 +114,9 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ request: builder.String(), response: &strings.Builder{}, } + if !requestLogEnabled && len(info.Body) > 0 { + attempt.deferredBody = info.Body + } attempts = append(attempts, attempt) ginCtx.Set(apiAttemptsKey, attempts) updateAggregatedRequest(ginCtx, attempts) @@ -101,14 +124,18 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ // RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt. func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) { - if cfg == nil || !cfg.RequestLog { - return - } ginCtx := ginContextFrom(ctx) if ginCtx == nil { return } attempts, attempt := ensureAttempt(ginCtx) + + // Materialize deferred request body when upstream returns an error. + // Success responses (2xx) skip this — their deferred body is dropped with gin context. + if status >= http.StatusBadRequest { + materializeDeferredBodies(ginCtx, attempts) + } + ensureResponseIntro(attempt) if status > 0 && !attempt.statusWritten { @@ -127,7 +154,7 @@ func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status i // RecordAPIResponseError adds an error entry for the latest attempt when no HTTP response is available. func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) { - if cfg == nil || !cfg.RequestLog || err == nil { + if err == nil { return } ginCtx := ginContextFrom(ctx) @@ -135,6 +162,11 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) return } attempts, attempt := ensureAttempt(ginCtx) + + // Materialize deferred request body on error — this is the only path that + // actually needs the body. Success path (99%) never pays for body copies. + materializeDeferredBodies(ginCtx, attempts) + ensureResponseIntro(attempt) if attempt.bodyStarted && !attempt.bodyHasContent { @@ -152,9 +184,6 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) // AppendAPIResponseChunk appends an upstream response chunk to Gin context for request logging. func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byte) { - if cfg == nil || !cfg.RequestLog { - return - } data := bytes.TrimSpace(chunk) if len(data) == 0 { return @@ -166,6 +195,11 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempts, attempt := ensureAttempt(ginCtx) ensureResponseIntro(attempt) + requestLogEnabled := cfg != nil && cfg.RequestLog + if !requestLogEnabled && attempt.bodyTruncated { + return + } + if !attempt.headersWritten { attempt.response.WriteString("Headers:\n") writeHeaders(attempt.response, nil) @@ -176,6 +210,22 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempt.response.WriteString("Body:\n") attempt.bodyStarted = true } + + // Cap response body size when full request-log is disabled to prevent memory growth + if !requestLogEnabled { + remaining := maxErrorLogResponseBodySize - attempt.bodyBytesWritten + if remaining <= 0 { + attempt.bodyTruncated = true + attempt.response.WriteString("\n") + updateAggregatedResponse(ginCtx, attempts) + return + } + if len(data) > remaining { + data = data[:remaining] + attempt.bodyTruncated = true + } + } + currentChunkIsSSEEvent := bytes.HasPrefix(data, []byte("event:")) currentChunkIsSSEData := bytes.HasPrefix(data, []byte("data:")) if attempt.bodyHasContent { @@ -186,9 +236,14 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempt.response.WriteString(separator) } attempt.response.WriteString(string(data)) + attempt.bodyBytesWritten += len(data) attempt.bodyHasContent = true attempt.prevWasSSEEvent = currentChunkIsSSEEvent + if attempt.bodyTruncated { + attempt.response.WriteString("\n") + } + updateAggregatedResponse(ginCtx, attempts) } @@ -332,6 +387,27 @@ func ginContextFrom(ctx context.Context) *gin.Context { return ginCtx } +const creditsUsedKey = "__antigravity_credits_used__" + +// MarkCreditsUsed flags the request as having used AI credits for billing. +func MarkCreditsUsed(ctx context.Context) { + if ginCtx := ginContextFrom(ctx); ginCtx != nil { + ginCtx.Set(creditsUsedKey, true) + } +} + +// CreditsUsed returns true if the request used AI credits. +func CreditsUsed(ctx context.Context) bool { + if ginCtx := ginContextFrom(ctx); ginCtx != nil { + if val, exists := ginCtx.Get(creditsUsedKey); exists { + if b, ok := val.(bool); ok { + return b + } + } + } + return false +} + func getAttempts(ginCtx *gin.Context) []*upstreamAttempt { if ginCtx == nil { return nil @@ -344,6 +420,34 @@ func getAttempts(ginCtx *gin.Context) []*upstreamAttempt { return nil } +// materializeDeferredBodies replaces deferred body placeholders with actual +// (truncated) body content. Called only on the error path so the 99% success +// path pays zero allocation cost for request body logging. +func materializeDeferredBodies(ginCtx *gin.Context, attempts []*upstreamAttempt) { + changed := false + for _, attempt := range attempts { + if attempt.deferredBody == nil { + continue + } + body := attempt.deferredBody + attempt.deferredBody = nil // release reference to allow GC of full payload + + placeholder := fmt.Sprintf("", len(body)) + var replacement string + if len(body) > maxErrorLogRequestBodySize { + replacement = string(body[:maxErrorLogRequestBodySize]) + + fmt.Sprintf("\n", len(body), maxErrorLogRequestBodySize) + } else { + replacement = string(body) + } + attempt.request = strings.Replace(attempt.request, placeholder, replacement, 1) + changed = true + } + if changed { + updateAggregatedRequest(ginCtx, attempts) + } +} + func ensureAttempt(ginCtx *gin.Context) ([]*upstreamAttempt, *upstreamAttempt) { attempts := getAttempts(ginCtx) if len(attempts) == 0 { diff --git a/sdk/cliproxy/auth/antigravity_credits.go b/sdk/cliproxy/auth/antigravity_credits.go new file mode 100644 index 00000000..77b03bfd --- /dev/null +++ b/sdk/cliproxy/auth/antigravity_credits.go @@ -0,0 +1,90 @@ +package auth + +import ( + "context" + "strings" + "sync" + "time" +) + +type antigravityUseCreditsContextKey struct{} + +// WithAntigravityCredits returns a child context that signals the executor to +// inject enabledCreditTypes into the request payload. +func WithAntigravityCredits(ctx context.Context) context.Context { + return context.WithValue(ctx, antigravityUseCreditsContextKey{}, true) +} + +// AntigravityCreditsRequested reports whether the context carries the credits flag. +func AntigravityCreditsRequested(ctx context.Context) bool { + if ctx == nil { + return false + } + v, _ := ctx.Value(antigravityUseCreditsContextKey{}).(bool) + return v +} + +// AntigravityCreditsHint stores the latest known AI credits state for one auth. +type AntigravityCreditsHint struct { + Known bool + Available bool + CreditAmount float64 + MinCreditAmount float64 + PaidTierID string + UpdatedAt time.Time +} + +var antigravityCreditsHintByAuth sync.Map + +// SetAntigravityCreditsHint updates the latest known AI credits state for an auth. +func SetAntigravityCreditsHint(authID string, hint AntigravityCreditsHint) { + authID = strings.TrimSpace(authID) + if authID == "" { + return + } + if hint.UpdatedAt.IsZero() { + hint.UpdatedAt = time.Now() + } + antigravityCreditsHintByAuth.Store(authID, hint) +} + +// GetAntigravityCreditsHint returns the latest known AI credits state for an auth. +func GetAntigravityCreditsHint(authID string) (AntigravityCreditsHint, bool) { + authID = strings.TrimSpace(authID) + if authID == "" { + return AntigravityCreditsHint{}, false + } + value, ok := antigravityCreditsHintByAuth.Load(authID) + if !ok { + return AntigravityCreditsHint{}, false + } + hint, ok := value.(AntigravityCreditsHint) + if !ok { + antigravityCreditsHintByAuth.Delete(authID) + return AntigravityCreditsHint{}, false + } + return hint, true +} + +// HasKnownAntigravityCreditsHint reports whether credits state has been discovered for an auth. +func HasKnownAntigravityCreditsHint(authID string) bool { + hint, ok := GetAntigravityCreditsHint(authID) + return ok && hint.Known +} + +func antigravityCreditsAvailableForModel(auth *Auth, model string) bool { + if auth == nil { + return false + } + if !strings.EqualFold(strings.TrimSpace(auth.Provider), "antigravity") { + return false + } + if !strings.Contains(strings.ToLower(strings.TrimSpace(model)), "claude") { + return false + } + hint, ok := GetAntigravityCreditsHint(auth.ID) + if !ok || !hint.Known { + return false + } + return hint.Available +} diff --git a/sdk/cliproxy/auth/antigravity_credits_test.go b/sdk/cliproxy/auth/antigravity_credits_test.go new file mode 100644 index 00000000..8f59b4c7 --- /dev/null +++ b/sdk/cliproxy/auth/antigravity_credits_test.go @@ -0,0 +1,62 @@ +package auth + +import ( + "testing" + "time" +) + +func TestIsAuthBlockedForModel_ClaudeWithCreditsStillBlockedDuringCooldown(t *testing.T) { + auth := &Auth{ + ID: "ag-1", + Provider: "antigravity", + ModelStates: map[string]*ModelState{ + "claude-sonnet-4-6": { + Unavailable: true, + NextRetryAfter: time.Now().Add(10 * time.Minute), + Quota: QuotaState{ + Exceeded: true, + NextRecoverAt: time.Now().Add(10 * time.Minute), + }, + }, + }, + } + + SetAntigravityCreditsHint(auth.ID, AntigravityCreditsHint{ + Known: true, + Available: true, + UpdatedAt: time.Now(), + }) + + blocked, reason, _ := isAuthBlockedForModel(auth, "claude-sonnet-4-6", time.Now()) + if !blocked || reason != blockReasonCooldown { + t.Fatalf("expected auth to be blocked during cooldown even with credits, got blocked=%v reason=%v", blocked, reason) + } +} + +func TestIsAuthBlockedForModel_KeepsGeminiBlockedWithoutCreditsBypass(t *testing.T) { + auth := &Auth{ + ID: "ag-2", + Provider: "antigravity", + ModelStates: map[string]*ModelState{ + "gemini-3-flash": { + Unavailable: true, + NextRetryAfter: time.Now().Add(10 * time.Minute), + Quota: QuotaState{ + Exceeded: true, + NextRecoverAt: time.Now().Add(10 * time.Minute), + }, + }, + }, + } + + SetAntigravityCreditsHint(auth.ID, AntigravityCreditsHint{ + Known: true, + Available: true, + UpdatedAt: time.Now(), + }) + + blocked, reason, _ := isAuthBlockedForModel(auth, "gemini-3-flash", time.Now()) + if !blocked || reason != blockReasonCooldown { + t.Fatalf("expected gemini auth to remain blocked, got blocked=%v reason=%v", blocked, reason) + } +} diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 0a9c157b..dff479df 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1202,12 +1202,16 @@ func (m *Manager) Execute(ctx context.Context, providers []string, req cliproxye } } if lastErr != nil { + if shouldAttemptAntigravityCreditsFallback(m, lastErr, normalized) { + if resp, ok := m.tryAntigravityCreditsExecute(ctx, req, opts); ok { + return resp, nil + } + } return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} } -// ExecuteCount performs a non-streaming execution using the configured selector and executor. // It supports multiple providers for the same model and round-robins the starting provider per model. func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { normalized := m.normalizeProviders(providers) @@ -1233,6 +1237,11 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip } } if lastErr != nil { + if shouldAttemptAntigravityCreditsFallback(m, lastErr, normalized) { + if resp, ok := m.tryAntigravityCreditsExecuteCount(ctx, req, opts); ok { + return resp, nil + } + } return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} @@ -1264,6 +1273,11 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli } } if lastErr != nil { + if shouldAttemptAntigravityCreditsFallback(m, lastErr, normalized) { + if result, ok := m.tryAntigravityCreditsExecuteStream(ctx, req, opts); ok { + return result, nil + } + } return nil, lastErr } return nil, &Error{Code: "auth_not_found", Message: "no auth available"} @@ -2319,7 +2333,8 @@ func retryAfterFromError(err error) *time.Duration { if retryAfter == nil { return nil } - return new(*retryAfter) + value := *retryAfter + return &value } func statusCodeFromResult(err *Error) int { @@ -2409,11 +2424,18 @@ func isRequestInvalidError(err error) bool { status := statusCodeFromError(err) switch status { case http.StatusBadRequest: - return strings.Contains(err.Error(), "invalid_request_error") + msg := err.Error() + return strings.Contains(msg, "invalid_request_error") || + strings.Contains(msg, "INVALID_ARGUMENT") || + strings.Contains(msg, "FAILED_PRECONDITION") case http.StatusNotFound: return isRequestScopedNotFoundMessage(err.Error()) case http.StatusUnprocessableEntity: return true + case http.StatusInternalServerError: + msg := err.Error() + return strings.Contains(msg, "\"status\":\"UNKNOWN\"") || + strings.Contains(msg, "\"status\": \"UNKNOWN\"") default: return false } @@ -2886,6 +2908,193 @@ func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model s return authCopy, executor, providerKey, nil } +func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string) []creditsCandidateEntry { + if m == nil { + return nil + } + m.mu.RLock() + defer m.mu.RUnlock() + var candidates []creditsCandidateEntry + for _, auth := range m.auths { + if auth == nil || auth.Disabled || auth.Status == StatusDisabled { + continue + } + if !antigravityCreditsAvailableForModel(auth, routeModel) { + continue + } + providerKey := strings.TrimSpace(strings.ToLower(auth.Provider)) + executor, ok := m.executors[providerKey] + if !ok { + continue + } + candidates = append(candidates, creditsCandidateEntry{ + auth: auth.Clone(), + executor: executor, + provider: providerKey, + }) + } + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].auth.ID < candidates[j].auth.ID + }) + return candidates +} + +type creditsCandidateEntry struct { + auth *Auth + executor ProviderExecutor + provider string +} + +func shouldAttemptAntigravityCreditsFallback(m *Manager, lastErr error, providers []string) bool { + if m == nil || lastErr == nil { + return false + } + if len(providers) > 0 { + hasAntigravity := false + for _, p := range providers { + if strings.EqualFold(strings.TrimSpace(p), "antigravity") { + hasAntigravity = true + break + } + } + if !hasAntigravity { + return false + } + } + cfg, _ := m.runtimeConfig.Load().(*internalconfig.Config) + if cfg == nil || !cfg.QuotaExceeded.AntigravityCredits { + return false + } + status := statusCodeFromError(lastErr) + switch status { + case http.StatusTooManyRequests, http.StatusServiceUnavailable: + return true + case 0: + var authErr *Error + if errors.As(lastErr, &authErr) && authErr != nil { + return authErr.Code == "auth_not_found" || authErr.Code == "auth_unavailable" || authErr.Code == "model_cooldown" + } + var cooldownErr *modelCooldownError + if errors.As(lastErr, &cooldownErr) { + return true + } + return false + default: + return false + } +} + +func (m *Manager) tryAntigravityCreditsExecute(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, bool) { + routeModel := req.Model + candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) + for _, c := range candidates { + if ctx.Err() != nil { + return cliproxyexecutor.Response{}, false + } + creditsCtx := WithAntigravityCredits(ctx) + if rt := m.roundTripperFor(c.auth); rt != nil { + creditsCtx = context.WithValue(creditsCtx, roundTripperContextKey{}, rt) + creditsCtx = context.WithValue(creditsCtx, "cliproxy.roundtripper", rt) + } + creditsOpts := ensureRequestedModelMetadata(opts, routeModel) + publishSelectedAuthMetadata(creditsOpts.Metadata, c.auth.ID) + models := m.executionModelCandidates(c.auth, routeModel) + if len(models) == 0 { + continue + } + for _, upstreamModel := range models { + resultModel := m.stateModelForExecution(c.auth, routeModel, upstreamModel, len(models) > 1) + execReq := req + execReq.Model = upstreamModel + resp, errExec := c.executor.Execute(creditsCtx, c.auth, execReq, creditsOpts) + result := Result{AuthID: c.auth.ID, Provider: c.provider, Model: resultModel, Success: errExec == nil} + if errExec != nil { + result.Error = &Error{Message: errExec.Error()} + if se, ok := errors.AsType[cliproxyexecutor.StatusError](errExec); ok && se != nil { + result.Error.HTTPStatus = se.StatusCode() + } + if ra := retryAfterFromError(errExec); ra != nil { + result.RetryAfter = ra + } + m.MarkResult(creditsCtx, result) + continue + } + m.MarkResult(creditsCtx, result) + return resp, true + } + } + return cliproxyexecutor.Response{}, false +} + +func (m *Manager) tryAntigravityCreditsExecuteCount(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, bool) { + routeModel := req.Model + candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) + for _, c := range candidates { + if ctx.Err() != nil { + return cliproxyexecutor.Response{}, false + } + creditsCtx := WithAntigravityCredits(ctx) + if rt := m.roundTripperFor(c.auth); rt != nil { + creditsCtx = context.WithValue(creditsCtx, roundTripperContextKey{}, rt) + creditsCtx = context.WithValue(creditsCtx, "cliproxy.roundtripper", rt) + } + creditsOpts := ensureRequestedModelMetadata(opts, routeModel) + publishSelectedAuthMetadata(creditsOpts.Metadata, c.auth.ID) + models := m.executionModelCandidates(c.auth, routeModel) + if len(models) == 0 { + continue + } + for _, upstreamModel := range models { + resultModel := m.stateModelForExecution(c.auth, routeModel, upstreamModel, len(models) > 1) + execReq := req + execReq.Model = upstreamModel + resp, errExec := c.executor.CountTokens(creditsCtx, c.auth, execReq, creditsOpts) + result := Result{AuthID: c.auth.ID, Provider: c.provider, Model: resultModel, Success: errExec == nil} + if errExec != nil { + result.Error = &Error{Message: errExec.Error()} + if se, ok := errors.AsType[cliproxyexecutor.StatusError](errExec); ok && se != nil { + result.Error.HTTPStatus = se.StatusCode() + } + if ra := retryAfterFromError(errExec); ra != nil { + result.RetryAfter = ra + } + m.MarkResult(creditsCtx, result) + continue + } + m.MarkResult(creditsCtx, result) + return resp, true + } + } + return cliproxyexecutor.Response{}, false +} + +func (m *Manager) tryAntigravityCreditsExecuteStream(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, bool) { + routeModel := req.Model + candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) + for _, c := range candidates { + if ctx.Err() != nil { + return nil, false + } + creditsCtx := WithAntigravityCredits(ctx) + if rt := m.roundTripperFor(c.auth); rt != nil { + creditsCtx = context.WithValue(creditsCtx, roundTripperContextKey{}, rt) + creditsCtx = context.WithValue(creditsCtx, "cliproxy.roundtripper", rt) + } + creditsOpts := ensureRequestedModelMetadata(opts, routeModel) + publishSelectedAuthMetadata(creditsOpts.Metadata, c.auth.ID) + models := m.executionModelCandidates(c.auth, routeModel) + if len(models) == 0 { + continue + } + result, errStream := m.executeStreamWithModelPool(creditsCtx, c.executor, c.auth, c.provider, req, creditsOpts, routeModel, models, len(models) > 1) + if errStream != nil { + continue + } + return result, true + } + return nil, false +} + func (m *Manager) persist(ctx context.Context, auth *Auth) error { if m.store == nil || auth == nil { return nil @@ -3200,14 +3409,15 @@ func (m *Manager) refreshAuth(ctx context.Context, id string) { m.mu.RLock() auth := m.auths[id] var exec ProviderExecutor + var cloned *Auth if auth != nil { exec = m.executors[auth.Provider] + cloned = auth.Clone() } m.mu.RUnlock() if auth == nil || exec == nil { return } - cloned := auth.Clone() updated, err := exec.Refresh(ctx, cloned) if err != nil && errors.Is(err, context.Canceled) { log.Debugf("refresh canceled for %s, %s", auth.Provider, auth.ID) From 4de5c29f86f3e57186140a8fec8e505476d5772a Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 15:17:00 +0800 Subject: [PATCH 2/6] fix(antigravity): remove credits fallback from CountTokens, fix gofmt CountTokens upstream API does not support enabledCreditTypes, so remove the dead credits fallback path from ExecuteCount and delete the unused tryAntigravityCreditsExecuteCount method. Fix gofmt on credits test file. --- .../antigravity_executor_credits_test.go | 2 - sdk/cliproxy/auth/conductor.go | 47 ------------------- 2 files changed, 49 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor_credits_test.go b/internal/runtime/executor/antigravity_executor_credits_test.go index b9c7a91f..9e4662cf 100644 --- a/internal/runtime/executor/antigravity_executor_credits_test.go +++ b/internal/runtime/executor/antigravity_executor_credits_test.go @@ -127,7 +127,6 @@ func TestAntigravityShouldRetryNoCapacity_Standard503(t *testing.T) { } } - func TestInjectEnabledCreditTypes(t *testing.T) { body := []byte(`{"model":"claude-sonnet-4-6","request":{}}`) got := injectEnabledCreditTypes(body) @@ -158,7 +157,6 @@ func TestParseRetryDelay_HumanReadableDuration(t *testing.T) { } } - func TestAntigravityExecute_RetriesTransient429ResourceExhausted(t *testing.T) { resetAntigravityCreditsRetryState() t.Cleanup(resetAntigravityCreditsRetryState) diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index dff479df..2a4ee6cb 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -1237,11 +1237,6 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip } } if lastErr != nil { - if shouldAttemptAntigravityCreditsFallback(m, lastErr, normalized) { - if resp, ok := m.tryAntigravityCreditsExecuteCount(ctx, req, opts); ok { - return resp, nil - } - } return cliproxyexecutor.Response{}, lastErr } return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} @@ -3026,48 +3021,6 @@ func (m *Manager) tryAntigravityCreditsExecute(ctx context.Context, req cliproxy return cliproxyexecutor.Response{}, false } -func (m *Manager) tryAntigravityCreditsExecuteCount(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, bool) { - routeModel := req.Model - candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) - for _, c := range candidates { - if ctx.Err() != nil { - return cliproxyexecutor.Response{}, false - } - creditsCtx := WithAntigravityCredits(ctx) - if rt := m.roundTripperFor(c.auth); rt != nil { - creditsCtx = context.WithValue(creditsCtx, roundTripperContextKey{}, rt) - creditsCtx = context.WithValue(creditsCtx, "cliproxy.roundtripper", rt) - } - creditsOpts := ensureRequestedModelMetadata(opts, routeModel) - publishSelectedAuthMetadata(creditsOpts.Metadata, c.auth.ID) - models := m.executionModelCandidates(c.auth, routeModel) - if len(models) == 0 { - continue - } - for _, upstreamModel := range models { - resultModel := m.stateModelForExecution(c.auth, routeModel, upstreamModel, len(models) > 1) - execReq := req - execReq.Model = upstreamModel - resp, errExec := c.executor.CountTokens(creditsCtx, c.auth, execReq, creditsOpts) - result := Result{AuthID: c.auth.ID, Provider: c.provider, Model: resultModel, Success: errExec == nil} - if errExec != nil { - result.Error = &Error{Message: errExec.Error()} - if se, ok := errors.AsType[cliproxyexecutor.StatusError](errExec); ok && se != nil { - result.Error.HTTPStatus = se.StatusCode() - } - if ra := retryAfterFromError(errExec); ra != nil { - result.RetryAfter = ra - } - m.MarkResult(creditsCtx, result) - continue - } - m.MarkResult(creditsCtx, result) - return resp, true - } - } - return cliproxyexecutor.Response{}, false -} - func (m *Manager) tryAntigravityCreditsExecuteStream(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, bool) { routeModel := req.Model candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) From e75daa299b49dcbe43079eb97bcbe568af13431e Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 17:38:02 +0800 Subject: [PATCH 3/6] fix(antigravity): respect pinned auth in credits fallback, release deferred body on success - findAllAntigravityCreditsCandidateAuths now filters by PinnedAuthMetadataKey to prevent credential isolation violations during credits fallback - Release deferredBody reference on success path to avoid holding large payloads in memory for the lifetime of the gin context --- internal/runtime/executor/helps/logging_helpers.go | 4 ++++ sdk/cliproxy/auth/conductor.go | 10 +++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/internal/runtime/executor/helps/logging_helpers.go b/internal/runtime/executor/helps/logging_helpers.go index b77ec1a9..1292deb4 100644 --- a/internal/runtime/executor/helps/logging_helpers.go +++ b/internal/runtime/executor/helps/logging_helpers.go @@ -134,6 +134,10 @@ func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status i // Success responses (2xx) skip this — their deferred body is dropped with gin context. if status >= http.StatusBadRequest { materializeDeferredBodies(ginCtx, attempts) + } else { + for _, a := range attempts { + a.deferredBody = nil + } } ensureResponseIntro(attempt) diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index 2a4ee6cb..d1490e3c 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2903,10 +2903,11 @@ func (m *Manager) pickNextMixed(ctx context.Context, providers []string, model s return authCopy, executor, providerKey, nil } -func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string) []creditsCandidateEntry { +func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string, opts cliproxyexecutor.Options) []creditsCandidateEntry { if m == nil { return nil } + pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata) m.mu.RLock() defer m.mu.RUnlock() var candidates []creditsCandidateEntry @@ -2914,6 +2915,9 @@ func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string) []c if auth == nil || auth.Disabled || auth.Status == StatusDisabled { continue } + if pinnedAuthID != "" && auth.ID != pinnedAuthID { + continue + } if !antigravityCreditsAvailableForModel(auth, routeModel) { continue } @@ -2981,7 +2985,7 @@ func shouldAttemptAntigravityCreditsFallback(m *Manager, lastErr error, provider func (m *Manager) tryAntigravityCreditsExecute(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, bool) { routeModel := req.Model - candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) + candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel, opts) for _, c := range candidates { if ctx.Err() != nil { return cliproxyexecutor.Response{}, false @@ -3023,7 +3027,7 @@ func (m *Manager) tryAntigravityCreditsExecute(ctx context.Context, req cliproxy func (m *Manager) tryAntigravityCreditsExecuteStream(ctx context.Context, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, bool) { routeModel := req.Model - candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel) + candidates := m.findAllAntigravityCreditsCandidateAuths(routeModel, opts) for _, c := range candidates { if ctx.Err() != nil { return nil, false From 920b6efffa7ce82e7d964a017b03033ca55c281b Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 17:41:54 +0800 Subject: [PATCH 4/6] refactor(logging): strip unrelated deferred body changes, keep credits-only logging Remove deferred body optimization and maxErrorLog constants that were unrelated to credits fallback. Keep only MarkCreditsUsed/CreditsUsed helpers for flagging requests that consumed AI credits. --- .../runtime/executor/helps/logging_helpers.go | 156 ++++-------------- 1 file changed, 35 insertions(+), 121 deletions(-) diff --git a/internal/runtime/executor/helps/logging_helpers.go b/internal/runtime/executor/helps/logging_helpers.go index 1292deb4..a0b30f70 100644 --- a/internal/runtime/executor/helps/logging_helpers.go +++ b/internal/runtime/executor/helps/logging_helpers.go @@ -24,14 +24,7 @@ const ( apiRequestKey = "API_REQUEST" apiResponseKey = "API_RESPONSE" apiWebsocketTimelineKey = "API_WEBSOCKET_TIMELINE" - - // maxErrorLogResponseBodySize limits cached response body when request-log is disabled. - // Prevents unbounded memory growth for large/streaming responses in error-only mode. - maxErrorLogResponseBodySize = 32 * 1024 // 32KB - - // maxErrorLogRequestBodySize limits materialized request body in error-only mode. - // Prevents OOM from large payloads (e.g. base64 images) when full request logging is off. - maxErrorLogRequestBodySize = 32 * 1024 // 32KB + creditsUsedKey = "__antigravity_credits_used__" ) // UpstreamRequestLog captures the outbound upstream request details for logging. @@ -50,7 +43,6 @@ type UpstreamRequestLog struct { type upstreamAttempt struct { index int request string - deferredBody []byte // lazy body reference; only materialized on error response *strings.Builder responseIntroWritten bool statusWritten bool @@ -59,12 +51,13 @@ type upstreamAttempt struct { bodyHasContent bool prevWasSSEEvent bool errorWritten bool - bodyBytesWritten int - bodyTruncated bool } // RecordAPIRequest stores the upstream request metadata in Gin context for request logging. func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequestLog) { + if cfg == nil || !cfg.RequestLog { + return + } ginCtx := ginContextFrom(ctx) if ginCtx == nil { return @@ -73,8 +66,6 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ attempts := getAttempts(ginCtx) index := len(attempts) + 1 - requestLogEnabled := cfg != nil && cfg.RequestLog - builder := &strings.Builder{} builder.WriteString(fmt.Sprintf("=== API REQUEST %d ===\n", index)) builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano))) @@ -92,20 +83,10 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ builder.WriteString("\nHeaders:\n") writeHeaders(builder, info.Headers) builder.WriteString("\nBody:\n") - if requestLogEnabled { - // Full request logging: format body inline - if len(info.Body) > 0 { - builder.WriteString(string(info.Body)) - } else { - builder.WriteString("") - } + if len(info.Body) > 0 { + builder.WriteString(string(info.Body)) } else { - // Error-only mode: defer body to avoid allocating copies for the 99% success path - if len(info.Body) > 0 { - builder.WriteString(fmt.Sprintf("", len(info.Body))) - } else { - builder.WriteString("") - } + builder.WriteString("") } builder.WriteString("\n\n") @@ -114,9 +95,6 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ request: builder.String(), response: &strings.Builder{}, } - if !requestLogEnabled && len(info.Body) > 0 { - attempt.deferredBody = info.Body - } attempts = append(attempts, attempt) ginCtx.Set(apiAttemptsKey, attempts) updateAggregatedRequest(ginCtx, attempts) @@ -124,22 +102,14 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ // RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt. func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) { + if cfg == nil || !cfg.RequestLog { + return + } ginCtx := ginContextFrom(ctx) if ginCtx == nil { return } attempts, attempt := ensureAttempt(ginCtx) - - // Materialize deferred request body when upstream returns an error. - // Success responses (2xx) skip this — their deferred body is dropped with gin context. - if status >= http.StatusBadRequest { - materializeDeferredBodies(ginCtx, attempts) - } else { - for _, a := range attempts { - a.deferredBody = nil - } - } - ensureResponseIntro(attempt) if status > 0 && !attempt.statusWritten { @@ -158,7 +128,7 @@ func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status i // RecordAPIResponseError adds an error entry for the latest attempt when no HTTP response is available. func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) { - if err == nil { + if cfg == nil || !cfg.RequestLog || err == nil { return } ginCtx := ginContextFrom(ctx) @@ -166,11 +136,6 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) return } attempts, attempt := ensureAttempt(ginCtx) - - // Materialize deferred request body on error — this is the only path that - // actually needs the body. Success path (99%) never pays for body copies. - materializeDeferredBodies(ginCtx, attempts) - ensureResponseIntro(attempt) if attempt.bodyStarted && !attempt.bodyHasContent { @@ -188,6 +153,9 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) // AppendAPIResponseChunk appends an upstream response chunk to Gin context for request logging. func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byte) { + if cfg == nil || !cfg.RequestLog { + return + } data := bytes.TrimSpace(chunk) if len(data) == 0 { return @@ -199,11 +167,6 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempts, attempt := ensureAttempt(ginCtx) ensureResponseIntro(attempt) - requestLogEnabled := cfg != nil && cfg.RequestLog - if !requestLogEnabled && attempt.bodyTruncated { - return - } - if !attempt.headersWritten { attempt.response.WriteString("Headers:\n") writeHeaders(attempt.response, nil) @@ -214,22 +177,6 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempt.response.WriteString("Body:\n") attempt.bodyStarted = true } - - // Cap response body size when full request-log is disabled to prevent memory growth - if !requestLogEnabled { - remaining := maxErrorLogResponseBodySize - attempt.bodyBytesWritten - if remaining <= 0 { - attempt.bodyTruncated = true - attempt.response.WriteString("\n") - updateAggregatedResponse(ginCtx, attempts) - return - } - if len(data) > remaining { - data = data[:remaining] - attempt.bodyTruncated = true - } - } - currentChunkIsSSEEvent := bytes.HasPrefix(data, []byte("event:")) currentChunkIsSSEData := bytes.HasPrefix(data, []byte("data:")) if attempt.bodyHasContent { @@ -240,14 +187,9 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt attempt.response.WriteString(separator) } attempt.response.WriteString(string(data)) - attempt.bodyBytesWritten += len(data) attempt.bodyHasContent = true attempt.prevWasSSEEvent = currentChunkIsSSEEvent - if attempt.bodyTruncated { - attempt.response.WriteString("\n") - } - updateAggregatedResponse(ginCtx, attempts) } @@ -391,27 +333,6 @@ func ginContextFrom(ctx context.Context) *gin.Context { return ginCtx } -const creditsUsedKey = "__antigravity_credits_used__" - -// MarkCreditsUsed flags the request as having used AI credits for billing. -func MarkCreditsUsed(ctx context.Context) { - if ginCtx := ginContextFrom(ctx); ginCtx != nil { - ginCtx.Set(creditsUsedKey, true) - } -} - -// CreditsUsed returns true if the request used AI credits. -func CreditsUsed(ctx context.Context) bool { - if ginCtx := ginContextFrom(ctx); ginCtx != nil { - if val, exists := ginCtx.Get(creditsUsedKey); exists { - if b, ok := val.(bool); ok { - return b - } - } - } - return false -} - func getAttempts(ginCtx *gin.Context) []*upstreamAttempt { if ginCtx == nil { return nil @@ -424,34 +345,6 @@ func getAttempts(ginCtx *gin.Context) []*upstreamAttempt { return nil } -// materializeDeferredBodies replaces deferred body placeholders with actual -// (truncated) body content. Called only on the error path so the 99% success -// path pays zero allocation cost for request body logging. -func materializeDeferredBodies(ginCtx *gin.Context, attempts []*upstreamAttempt) { - changed := false - for _, attempt := range attempts { - if attempt.deferredBody == nil { - continue - } - body := attempt.deferredBody - attempt.deferredBody = nil // release reference to allow GC of full payload - - placeholder := fmt.Sprintf("", len(body)) - var replacement string - if len(body) > maxErrorLogRequestBodySize { - replacement = string(body[:maxErrorLogRequestBodySize]) + - fmt.Sprintf("\n", len(body), maxErrorLogRequestBodySize) - } else { - replacement = string(body) - } - attempt.request = strings.Replace(attempt.request, placeholder, replacement, 1) - changed = true - } - if changed { - updateAggregatedRequest(ginCtx, attempts) - } -} - func ensureAttempt(ginCtx *gin.Context) ([]*upstreamAttempt, *upstreamAttempt) { attempts := getAttempts(ginCtx) if len(attempts) == 0 { @@ -676,3 +569,24 @@ func LogWithRequestID(ctx context.Context) *log.Entry { } return log.WithField("request_id", requestID) } + +// MarkCreditsUsed flags the request as having used AI credits for billing. +func MarkCreditsUsed(ctx context.Context) { + ginCtx := ginContextFrom(ctx) + if ginCtx != nil { + ginCtx.Set(creditsUsedKey, true) + } +} + +// CreditsUsed returns true if the request used AI credits. +func CreditsUsed(ctx context.Context) bool { + ginCtx := ginContextFrom(ctx) + if ginCtx != nil { + if val, exists := ginCtx.Get(creditsUsedKey); exists { + if b, ok := val.(bool); ok { + return b + } + } + } + return false +} From f130846ec17f28f4c9d85214c941c1bc8d2adacc Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 22:47:51 +0800 Subject: [PATCH 5/6] fix(auth): break credits cold-start deadlock by keeping unknown-hint auths as fallback candidates Replace antigravityCreditsAvailableForModel with inline known/unknown split. Auths whose credit hints are not yet populated are kept as lower-priority candidates instead of being rejected, breaking the chicken-and-egg deadlock at cold start. --- sdk/cliproxy/auth/conductor.go | 32 ++++++++-- .../auth/conductor_credits_candidates_test.go | 61 +++++++++++++++++++ 2 files changed, 87 insertions(+), 6 deletions(-) create mode 100644 sdk/cliproxy/auth/conductor_credits_candidates_test.go diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index d1490e3c..4d37581a 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -2910,7 +2910,8 @@ func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string, opt pinnedAuthID := pinnedAuthIDFromMetadata(opts.Metadata) m.mu.RLock() defer m.mu.RUnlock() - var candidates []creditsCandidateEntry + var known []creditsCandidateEntry + var unknown []creditsCandidateEntry for _, auth := range m.auths { if auth == nil || auth.Disabled || auth.Status == StatusDisabled { continue @@ -2918,7 +2919,10 @@ func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string, opt if pinnedAuthID != "" && auth.ID != pinnedAuthID { continue } - if !antigravityCreditsAvailableForModel(auth, routeModel) { + if !strings.EqualFold(strings.TrimSpace(auth.Provider), "antigravity") { + continue + } + if !strings.Contains(strings.ToLower(strings.TrimSpace(routeModel)), "claude") { continue } providerKey := strings.TrimSpace(strings.ToLower(auth.Provider)) @@ -2926,16 +2930,32 @@ func (m *Manager) findAllAntigravityCreditsCandidateAuths(routeModel string, opt if !ok { continue } - candidates = append(candidates, creditsCandidateEntry{ + + hint, okHint := GetAntigravityCreditsHint(auth.ID) + if okHint && hint.Known { + if !hint.Available { + continue + } + known = append(known, creditsCandidateEntry{ + auth: auth.Clone(), + executor: executor, + provider: providerKey, + }) + continue + } + unknown = append(unknown, creditsCandidateEntry{ auth: auth.Clone(), executor: executor, provider: providerKey, }) } - sort.Slice(candidates, func(i, j int) bool { - return candidates[i].auth.ID < candidates[j].auth.ID + sort.Slice(known, func(i, j int) bool { + return known[i].auth.ID < known[j].auth.ID }) - return candidates + sort.Slice(unknown, func(i, j int) bool { + return unknown[i].auth.ID < unknown[j].auth.ID + }) + return append(known, unknown...) } type creditsCandidateEntry struct { diff --git a/sdk/cliproxy/auth/conductor_credits_candidates_test.go b/sdk/cliproxy/auth/conductor_credits_candidates_test.go new file mode 100644 index 00000000..e66798ac --- /dev/null +++ b/sdk/cliproxy/auth/conductor_credits_candidates_test.go @@ -0,0 +1,61 @@ +package auth + +import ( + "testing" + "time" + + cliproxyexecutor "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/executor" +) + +func TestFindAllAntigravityCreditsCandidateAuths_PrefersKnownCreditsThenUnknown(t *testing.T) { + m := &Manager{ + auths: map[string]*Auth{ + "zz-credits": {ID: "zz-credits", Provider: "antigravity"}, + "aa-unknown": {ID: "aa-unknown", Provider: "antigravity"}, + "mm-no": {ID: "mm-no", Provider: "antigravity"}, + }, + executors: map[string]ProviderExecutor{ + "antigravity": schedulerTestExecutor{}, + }, + } + + SetAntigravityCreditsHint("zz-credits", AntigravityCreditsHint{ + Known: true, + Available: true, + UpdatedAt: time.Now(), + }) + SetAntigravityCreditsHint("mm-no", AntigravityCreditsHint{ + Known: true, + Available: false, + UpdatedAt: time.Now(), + }) + + opts := cliproxyexecutor.Options{} + + candidates := m.findAllAntigravityCreditsCandidateAuths("claude-sonnet-4-6", opts) + if len(candidates) != 2 { + t.Fatalf("candidates len = %d, want 2", len(candidates)) + } + if candidates[0].auth.ID != "zz-credits" { + t.Fatalf("candidates[0].auth.ID = %q, want %q", candidates[0].auth.ID, "zz-credits") + } + if candidates[1].auth.ID != "aa-unknown" { + t.Fatalf("candidates[1].auth.ID = %q, want %q", candidates[1].auth.ID, "aa-unknown") + } + + nonClaude := m.findAllAntigravityCreditsCandidateAuths("gemini-3-flash", opts) + if len(nonClaude) != 0 { + t.Fatalf("nonClaude len = %d, want 0", len(nonClaude)) + } + + pinnedOpts := cliproxyexecutor.Options{ + Metadata: map[string]any{cliproxyexecutor.PinnedAuthMetadataKey: "aa-unknown"}, + } + pinned := m.findAllAntigravityCreditsCandidateAuths("claude-sonnet-4-6", pinnedOpts) + if len(pinned) != 1 { + t.Fatalf("pinned len = %d, want 1", len(pinned)) + } + if pinned[0].auth.ID != "aa-unknown" { + t.Fatalf("pinned[0].auth.ID = %q, want %q", pinned[0].auth.ID, "aa-unknown") + } +} From 7ad19000411982cd066e74e6f4e4c33776335614 Mon Sep 17 00:00:00 2001 From: sususu98 Date: Thu, 23 Apr 2026 23:58:10 +0800 Subject: [PATCH 6/6] perf(antigravity): async credits hint refresh for warm tokens --- .../runtime/executor/antigravity_executor.go | 69 ++++++++++++++++++- .../antigravity_executor_credits_test.go | 9 ++- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 633373d2..6983bfac 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -52,6 +52,8 @@ const ( defaultAntigravityAgent = "antigravity/1.21.9 darwin/arm64" // fallback only; overridden at runtime by misc.AntigravityUserAgent() antigravityAuthType = "antigravity" refreshSkew = 3000 * time.Second + antigravityCreditsHintRefreshInterval = 10 * time.Minute + antigravityCreditsHintRefreshTimeout = 5 * time.Second antigravityShortQuotaCooldownThreshold = 5 * time.Minute antigravityInstantRetryThreshold = 3 * time.Second // systemInstruction = "You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**" @@ -89,6 +91,7 @@ var ( antigravityCreditsFailureByAuth sync.Map antigravityShortCooldownByAuth sync.Map antigravityCreditsBalanceByAuth sync.Map // auth.ID → antigravityCreditsBalance + antigravityCreditsHintRefreshByID sync.Map // auth.ID → *antigravityCreditsHintRefreshState antigravityQuotaExhaustedKeywords = []string{ "quota_exhausted", "quota exhausted", @@ -102,6 +105,11 @@ type antigravityCreditsBalance struct { Known bool } +type antigravityCreditsHintRefreshState struct { + mu sync.Mutex + lastAttempt time.Time +} + func antigravityAuthHasCredits(auth *cliproxyauth.Auth) bool { if auth == nil || strings.TrimSpace(auth.ID) == "" { return false @@ -1558,9 +1566,7 @@ func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *clipr accessToken := metaStringValue(auth.Metadata, "access_token") expiry := tokenExpiry(auth.Metadata) if accessToken != "" && expiry.After(time.Now().Add(refreshSkew)) { - if !cliproxyauth.HasKnownAntigravityCreditsHint(auth.ID) { - e.updateAntigravityCreditsBalance(ctx, auth, accessToken) - } + e.maybeRefreshAntigravityCreditsHint(ctx, auth, accessToken) return accessToken, nil, nil } refreshCtx := context.Background() @@ -1576,6 +1582,63 @@ func (e *AntigravityExecutor) ensureAccessToken(ctx context.Context, auth *clipr return metaStringValue(updated.Metadata, "access_token"), updated, nil } +func (e *AntigravityExecutor) maybeRefreshAntigravityCreditsHint(ctx context.Context, auth *cliproxyauth.Auth, accessToken string) { + if e == nil || auth == nil || !antigravityCreditsRetryEnabled(e.cfg) { + return + } + if ctx != nil && ctx.Err() != nil { + return + } + authID := strings.TrimSpace(auth.ID) + if authID == "" { + return + } + if hint, ok := cliproxyauth.GetAntigravityCreditsHint(authID); ok && hint.Known { + return + } + if strings.TrimSpace(accessToken) == "" { + accessToken = metaStringValue(auth.Metadata, "access_token") + } + if strings.TrimSpace(accessToken) == "" { + return + } + + state := &antigravityCreditsHintRefreshState{} + if existing, loaded := antigravityCreditsHintRefreshByID.LoadOrStore(authID, state); loaded { + if cast, ok := existing.(*antigravityCreditsHintRefreshState); ok && cast != nil { + state = cast + } else { + antigravityCreditsHintRefreshByID.Delete(authID) + antigravityCreditsHintRefreshByID.Store(authID, state) + } + } + + now := time.Now() + if !state.mu.TryLock() { + return + } + if !state.lastAttempt.IsZero() && now.Sub(state.lastAttempt) < antigravityCreditsHintRefreshInterval { + state.mu.Unlock() + return + } + state.lastAttempt = now + + refreshCtx := context.Background() + if ctx != nil { + if rt, ok := ctx.Value("cliproxy.roundtripper").(http.RoundTripper); ok && rt != nil { + refreshCtx = context.WithValue(refreshCtx, "cliproxy.roundtripper", rt) + } + } + refreshCtx, cancel := context.WithTimeout(refreshCtx, antigravityCreditsHintRefreshTimeout) + authCopy := auth.Clone() + + go func(state *antigravityCreditsHintRefreshState, auth *cliproxyauth.Auth, token string) { + defer cancel() + defer state.mu.Unlock() + e.updateAntigravityCreditsBalance(refreshCtx, auth, token) + }(state, authCopy, accessToken) +} + func (e *AntigravityExecutor) refreshToken(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) { if auth == nil { return nil, statusErr{code: http.StatusUnauthorized, msg: "missing auth"} diff --git a/internal/runtime/executor/antigravity_executor_credits_test.go b/internal/runtime/executor/antigravity_executor_credits_test.go index 9e4662cf..6e38223e 100644 --- a/internal/runtime/executor/antigravity_executor_credits_test.go +++ b/internal/runtime/executor/antigravity_executor_credits_test.go @@ -20,6 +20,7 @@ func resetAntigravityCreditsRetryState() { antigravityCreditsFailureByAuth = sync.Map{} antigravityShortCooldownByAuth = sync.Map{} antigravityCreditsBalanceByAuth = sync.Map{} + antigravityCreditsHintRefreshByID = sync.Map{} } func TestClassifyAntigravity429(t *testing.T) { @@ -378,7 +379,9 @@ func TestEnsureAccessToken_WarmTokenLoadsCreditsHint(t *testing.T) { resetAntigravityCreditsRetryState() t.Cleanup(resetAntigravityCreditsRetryState) - exec := NewAntigravityExecutor(&config.Config{}) + exec := NewAntigravityExecutor(&config.Config{ + QuotaExceeded: config.QuotaExceeded{AntigravityCredits: true}, + }) auth := &cliproxyauth.Auth{ ID: "auth-warm-token-credits", Metadata: map[string]any{ @@ -407,6 +410,10 @@ func TestEnsureAccessToken_WarmTokenLoadsCreditsHint(t *testing.T) { if updatedAuth != nil { t.Fatalf("ensureAccessToken() updatedAuth = %v, want nil", updatedAuth) } + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) && !cliproxyauth.HasKnownAntigravityCreditsHint(auth.ID) { + time.Sleep(10 * time.Millisecond) + } if !cliproxyauth.HasKnownAntigravityCreditsHint(auth.ID) { t.Fatal("expected credits hint to be populated for warm token auth") }