feat(antigravity): conductor-level credits fallback for Claude models

Move credits handling from executor-level retry to conductor-level
orchestration. When all free-tier auths are exhausted (429/503), the
conductor discovers auths with available Google One AI credits and
retries with enabledCreditTypes injected via context flag.

Key changes:
- Add AntigravityCreditsHint system for tracking per-auth credits state
- Conductor tries credits fallback after all auths fail (Execute/Stream/Count)
- Executor injects enabledCreditTypes only when conductor sets context flag
- Credits fallback respects provider scope (requires antigravity in providers)
- Add context cancellation check in credits fallback to avoid wasted requests
- Remove executor-level attemptCreditsFallback and preferCredits machinery
- Restructure 429 decision logic (parse details first, keyword fallback)
- Expand shouldAbort to cover INVALID_ARGUMENT/FAILED_PRECONDITION/500+UNKNOWN
- Support human-readable retry delay parsing (e.g. "1h43m56s")
This commit is contained in:
sususu98
2026-04-23 13:44:20 +08:00
parent a188159632
commit 14d46a0a5d
9 changed files with 989 additions and 694 deletions
@@ -24,6 +24,14 @@ const (
apiRequestKey = "API_REQUEST"
apiResponseKey = "API_RESPONSE"
apiWebsocketTimelineKey = "API_WEBSOCKET_TIMELINE"
// maxErrorLogResponseBodySize limits cached response body when request-log is disabled.
// Prevents unbounded memory growth for large/streaming responses in error-only mode.
maxErrorLogResponseBodySize = 32 * 1024 // 32KB
// maxErrorLogRequestBodySize limits materialized request body in error-only mode.
// Prevents OOM from large payloads (e.g. base64 images) when full request logging is off.
maxErrorLogRequestBodySize = 32 * 1024 // 32KB
)
// UpstreamRequestLog captures the outbound upstream request details for logging.
@@ -42,6 +50,7 @@ type UpstreamRequestLog struct {
type upstreamAttempt struct {
index int
request string
deferredBody []byte // lazy body reference; only materialized on error
response *strings.Builder
responseIntroWritten bool
statusWritten bool
@@ -50,13 +59,12 @@ type upstreamAttempt struct {
bodyHasContent bool
prevWasSSEEvent bool
errorWritten bool
bodyBytesWritten int
bodyTruncated bool
}
// RecordAPIRequest stores the upstream request metadata in Gin context for request logging.
func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequestLog) {
if cfg == nil || !cfg.RequestLog {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
@@ -65,6 +73,8 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ
attempts := getAttempts(ginCtx)
index := len(attempts) + 1
requestLogEnabled := cfg != nil && cfg.RequestLog
builder := &strings.Builder{}
builder.WriteString(fmt.Sprintf("=== API REQUEST %d ===\n", index))
builder.WriteString(fmt.Sprintf("Timestamp: %s\n", time.Now().Format(time.RFC3339Nano)))
@@ -82,10 +92,20 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ
builder.WriteString("\nHeaders:\n")
writeHeaders(builder, info.Headers)
builder.WriteString("\nBody:\n")
if len(info.Body) > 0 {
builder.WriteString(string(info.Body))
if requestLogEnabled {
// Full request logging: format body inline
if len(info.Body) > 0 {
builder.WriteString(string(info.Body))
} else {
builder.WriteString("<empty>")
}
} else {
builder.WriteString("<empty>")
// Error-only mode: defer body to avoid allocating copies for the 99% success path
if len(info.Body) > 0 {
builder.WriteString(fmt.Sprintf("<deferred: %d bytes>", len(info.Body)))
} else {
builder.WriteString("<empty>")
}
}
builder.WriteString("\n\n")
@@ -94,6 +114,9 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ
request: builder.String(),
response: &strings.Builder{},
}
if !requestLogEnabled && len(info.Body) > 0 {
attempt.deferredBody = info.Body
}
attempts = append(attempts, attempt)
ginCtx.Set(apiAttemptsKey, attempts)
updateAggregatedRequest(ginCtx, attempts)
@@ -101,14 +124,18 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ
// RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt.
func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) {
if cfg == nil || !cfg.RequestLog {
return
}
ginCtx := ginContextFrom(ctx)
if ginCtx == nil {
return
}
attempts, attempt := ensureAttempt(ginCtx)
// Materialize deferred request body when upstream returns an error.
// Success responses (2xx) skip this — their deferred body is dropped with gin context.
if status >= http.StatusBadRequest {
materializeDeferredBodies(ginCtx, attempts)
}
ensureResponseIntro(attempt)
if status > 0 && !attempt.statusWritten {
@@ -127,7 +154,7 @@ func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status i
// RecordAPIResponseError adds an error entry for the latest attempt when no HTTP response is available.
func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error) {
if cfg == nil || !cfg.RequestLog || err == nil {
if err == nil {
return
}
ginCtx := ginContextFrom(ctx)
@@ -135,6 +162,11 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error)
return
}
attempts, attempt := ensureAttempt(ginCtx)
// Materialize deferred request body on error — this is the only path that
// actually needs the body. Success path (99%) never pays for body copies.
materializeDeferredBodies(ginCtx, attempts)
ensureResponseIntro(attempt)
if attempt.bodyStarted && !attempt.bodyHasContent {
@@ -152,9 +184,6 @@ func RecordAPIResponseError(ctx context.Context, cfg *config.Config, err error)
// AppendAPIResponseChunk appends an upstream response chunk to Gin context for request logging.
func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byte) {
if cfg == nil || !cfg.RequestLog {
return
}
data := bytes.TrimSpace(chunk)
if len(data) == 0 {
return
@@ -166,6 +195,11 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt
attempts, attempt := ensureAttempt(ginCtx)
ensureResponseIntro(attempt)
requestLogEnabled := cfg != nil && cfg.RequestLog
if !requestLogEnabled && attempt.bodyTruncated {
return
}
if !attempt.headersWritten {
attempt.response.WriteString("Headers:\n")
writeHeaders(attempt.response, nil)
@@ -176,6 +210,22 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt
attempt.response.WriteString("Body:\n")
attempt.bodyStarted = true
}
// Cap response body size when full request-log is disabled to prevent memory growth
if !requestLogEnabled {
remaining := maxErrorLogResponseBodySize - attempt.bodyBytesWritten
if remaining <= 0 {
attempt.bodyTruncated = true
attempt.response.WriteString("\n<truncated: response body exceeded 32KB limit for error log>")
updateAggregatedResponse(ginCtx, attempts)
return
}
if len(data) > remaining {
data = data[:remaining]
attempt.bodyTruncated = true
}
}
currentChunkIsSSEEvent := bytes.HasPrefix(data, []byte("event:"))
currentChunkIsSSEData := bytes.HasPrefix(data, []byte("data:"))
if attempt.bodyHasContent {
@@ -186,9 +236,14 @@ func AppendAPIResponseChunk(ctx context.Context, cfg *config.Config, chunk []byt
attempt.response.WriteString(separator)
}
attempt.response.WriteString(string(data))
attempt.bodyBytesWritten += len(data)
attempt.bodyHasContent = true
attempt.prevWasSSEEvent = currentChunkIsSSEEvent
if attempt.bodyTruncated {
attempt.response.WriteString("\n<truncated: response body exceeded 32KB limit for error log>")
}
updateAggregatedResponse(ginCtx, attempts)
}
@@ -332,6 +387,27 @@ func ginContextFrom(ctx context.Context) *gin.Context {
return ginCtx
}
const creditsUsedKey = "__antigravity_credits_used__"
// MarkCreditsUsed flags the request as having used AI credits for billing.
func MarkCreditsUsed(ctx context.Context) {
if ginCtx := ginContextFrom(ctx); ginCtx != nil {
ginCtx.Set(creditsUsedKey, true)
}
}
// CreditsUsed returns true if the request used AI credits.
func CreditsUsed(ctx context.Context) bool {
if ginCtx := ginContextFrom(ctx); ginCtx != nil {
if val, exists := ginCtx.Get(creditsUsedKey); exists {
if b, ok := val.(bool); ok {
return b
}
}
}
return false
}
func getAttempts(ginCtx *gin.Context) []*upstreamAttempt {
if ginCtx == nil {
return nil
@@ -344,6 +420,34 @@ func getAttempts(ginCtx *gin.Context) []*upstreamAttempt {
return nil
}
// materializeDeferredBodies replaces deferred body placeholders with actual
// (truncated) body content. Called only on the error path so the 99% success
// path pays zero allocation cost for request body logging.
func materializeDeferredBodies(ginCtx *gin.Context, attempts []*upstreamAttempt) {
changed := false
for _, attempt := range attempts {
if attempt.deferredBody == nil {
continue
}
body := attempt.deferredBody
attempt.deferredBody = nil // release reference to allow GC of full payload
placeholder := fmt.Sprintf("<deferred: %d bytes>", len(body))
var replacement string
if len(body) > maxErrorLogRequestBodySize {
replacement = string(body[:maxErrorLogRequestBodySize]) +
fmt.Sprintf("\n<truncated: request body %d bytes, showing first %d>", len(body), maxErrorLogRequestBodySize)
} else {
replacement = string(body)
}
attempt.request = strings.Replace(attempt.request, placeholder, replacement, 1)
changed = true
}
if changed {
updateAggregatedRequest(ginCtx, attempts)
}
}
func ensureAttempt(ginCtx *gin.Context) ([]*upstreamAttempt, *upstreamAttempt) {
attempts := getAttempts(ginCtx)
if len(attempts) == 0 {