feat(runtime): track upstream response headers in logging and usage reporting

- Added APIs to store, retrieve, and clone upstream response headers in context for detailed logging.
- Updated `RecordAPIResponseMetadata`, `RecordAPIWebsocketHandshake`, and related methods to capture response headers.
- Extended `UsageReporter` to include response headers in published usage records.
- Enhanced payload tests to validate response headers' integrity and persistence.
- Refactored `usage.Record` to support optional `ResponseHeaders` field.
This commit is contained in:
Luis Pater
2026-05-19 01:29:23 +08:00
parent 77ba15f71b
commit ad98c9549a
8 changed files with 188 additions and 17 deletions
+55
View File
@@ -2,16 +2,24 @@ package logging
import ( import (
"context" "context"
"net/http"
"sync"
"sync/atomic" "sync/atomic"
) )
type endpointKey struct{} type endpointKey struct{}
type responseStatusKey struct{} type responseStatusKey struct{}
type responseHeadersKey struct{}
type responseStatusHolder struct { type responseStatusHolder struct {
status atomic.Int32 status atomic.Int32
} }
type responseHeadersHolder struct {
mu sync.RWMutex
headers http.Header
}
func WithEndpoint(ctx context.Context, endpoint string) context.Context { func WithEndpoint(ctx context.Context, endpoint string) context.Context {
if ctx == nil { if ctx == nil {
ctx = context.Background() ctx = context.Background()
@@ -39,6 +47,16 @@ func WithResponseStatusHolder(ctx context.Context) context.Context {
return context.WithValue(ctx, responseStatusKey{}, &responseStatusHolder{}) return context.WithValue(ctx, responseStatusKey{}, &responseStatusHolder{})
} }
func WithResponseHeadersHolder(ctx context.Context) context.Context {
if ctx == nil {
ctx = context.Background()
}
if holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder); ok && holder != nil {
return ctx
}
return context.WithValue(ctx, responseHeadersKey{}, &responseHeadersHolder{})
}
func SetResponseStatus(ctx context.Context, status int) { func SetResponseStatus(ctx context.Context, status int) {
if ctx == nil || status <= 0 { if ctx == nil || status <= 0 {
return return
@@ -50,6 +68,19 @@ func SetResponseStatus(ctx context.Context, status int) {
holder.status.Store(int32(status)) holder.status.Store(int32(status))
} }
func SetResponseHeaders(ctx context.Context, headers http.Header) {
if ctx == nil {
return
}
holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder)
if !ok || holder == nil {
return
}
holder.mu.Lock()
defer holder.mu.Unlock()
holder.headers = cloneHTTPHeader(headers)
}
func GetResponseStatus(ctx context.Context) int { func GetResponseStatus(ctx context.Context) int {
if ctx == nil { if ctx == nil {
return 0 return 0
@@ -60,3 +91,27 @@ func GetResponseStatus(ctx context.Context) int {
} }
return int(holder.status.Load()) return int(holder.status.Load())
} }
func GetResponseHeaders(ctx context.Context) http.Header {
if ctx == nil {
return nil
}
holder, ok := ctx.Value(responseHeadersKey{}).(*responseHeadersHolder)
if !ok || holder == nil {
return nil
}
holder.mu.RLock()
defer holder.mu.RUnlock()
return cloneHTTPHeader(holder.headers)
}
func cloneHTTPHeader(src http.Header) http.Header {
if len(src) == 0 {
return nil
}
dst := make(http.Header, len(src))
for key, values := range src {
dst[key] = append([]string(nil), values...)
}
return dst
}
+17 -14
View File
@@ -3,6 +3,7 @@ package redisqueue
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"net/http"
"strings" "strings"
"time" "time"
@@ -71,13 +72,14 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
fail := resolveFail(ctx, record, failed) fail := resolveFail(ctx, record, failed)
detail := requestDetail{ detail := requestDetail{
Timestamp: timestamp, Timestamp: timestamp,
LatencyMs: record.Latency.Milliseconds(), LatencyMs: record.Latency.Milliseconds(),
Source: record.Source, Source: record.Source,
AuthIndex: record.AuthIndex, AuthIndex: record.AuthIndex,
Tokens: tokens, Tokens: tokens,
Failed: failed, Failed: failed,
Fail: fail, Fail: fail,
ResponseHeaders: record.ResponseHeaders,
} }
payload, err := json.Marshal(queuedUsageDetail{ payload, err := json.Marshal(queuedUsageDetail{
@@ -108,13 +110,14 @@ type queuedUsageDetail struct {
} }
type requestDetail struct { type requestDetail struct {
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
LatencyMs int64 `json:"latency_ms"` LatencyMs int64 `json:"latency_ms"`
Source string `json:"source"` Source string `json:"source"`
AuthIndex string `json:"auth_index"` AuthIndex string `json:"auth_index"`
Tokens tokenStats `json:"tokens"` Tokens tokenStats `json:"tokens"`
Failed bool `json:"failed"` Failed bool `json:"failed"`
Fail failDetail `json:"fail"` Fail failDetail `json:"fail"`
ResponseHeaders http.Header `json:"response_headers,omitempty"`
} }
type tokenStats struct { type tokenStats struct {
+76
View File
@@ -19,6 +19,9 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions") ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions")
ctx = internallogging.WithResponseStatusHolder(ctx) ctx = internallogging.WithResponseStatusHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusOK) internallogging.SetResponseStatus(ctx, http.StatusOK)
responseHeaders := http.Header{}
responseHeaders.Add("X-Upstream-Request-Id", "upstream-req-1")
responseHeaders.Add("Retry-After", "30")
plugin := &usageQueuePlugin{} plugin := &usageQueuePlugin{}
plugin.HandleUsage(ctx, coreusage.Record{ plugin.HandleUsage(ctx, coreusage.Record{
@@ -36,7 +39,9 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
OutputTokens: 20, OutputTokens: 20,
TotalTokens: 30, TotalTokens: 30,
}, },
ResponseHeaders: responseHeaders.Clone(),
}) })
responseHeaders.Set("Retry-After", "999")
payload := popSinglePayload(t) payload := popSinglePayload(t)
requireStringField(t, payload, "provider", "openai") requireStringField(t, payload, "provider", "openai")
@@ -46,11 +51,57 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
requireStringField(t, payload, "auth_type", "apikey") requireStringField(t, payload, "auth_type", "apikey")
requireMissingField(t, payload, "user_api_key") requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "ctx-request-id") requireStringField(t, payload, "request_id", "ctx-request-id")
requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"})
requireHeaderField(t, payload, "response_headers", "Retry-After", []string{"30"})
requireBoolField(t, payload, "failed", false) requireBoolField(t, payload, "failed", false)
requireFailField(t, payload, http.StatusOK, "") requireFailField(t, payload, http.StatusOK, "")
}) })
} }
func TestUsageQueuePluginAsyncUsesRecordResponseHeaders(t *testing.T) {
withEnabledQueue(t, func() {
ctx := internallogging.WithRequestID(context.Background(), "ctx-request-id")
ctx = internallogging.WithEndpoint(ctx, "POST /v1/chat/completions")
ctx = internallogging.WithResponseStatusHolder(ctx)
ctx = internallogging.WithResponseHeadersHolder(ctx)
internallogging.SetResponseStatus(ctx, http.StatusOK)
initialHeaders := http.Header{}
initialHeaders.Set("X-Upstream-Request-Id", "upstream-req-1")
internallogging.SetResponseHeaders(ctx, initialHeaders)
mgr := coreusage.NewManager(16)
defer mgr.Stop()
mgr.Register(pluginFunc(func(ctx context.Context, _ coreusage.Record) {
nextHeaders := http.Header{}
nextHeaders.Set("X-Upstream-Request-Id", "upstream-req-2")
internallogging.SetResponseHeaders(ctx, nextHeaders)
}))
mgr.Register(&usageQueuePlugin{})
mgr.Publish(ctx, coreusage.Record{
Provider: "openai",
Model: "gpt-5.4",
Alias: "client-gpt",
APIKey: "test-key",
AuthIndex: "0",
AuthType: "apikey",
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 1500 * time.Millisecond,
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
TotalTokens: 30,
},
ResponseHeaders: internallogging.GetResponseHeaders(ctx),
})
payload := waitForSinglePayload(t, 2*time.Second)
requireHeaderField(t, payload, "response_headers", "X-Upstream-Request-Id", []string{"upstream-req-1"})
})
}
func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t *testing.T) { func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t *testing.T) {
withEnabledQueue(t, func() { withEnabledQueue(t, func() {
ctx := internallogging.WithRequestID(context.Background(), "gin-request-id") ctx := internallogging.WithRequestID(context.Background(), "gin-request-id")
@@ -276,3 +327,28 @@ func requireFailField(t *testing.T, payload map[string]json.RawMessage, wantStat
t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody) t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody)
} }
} }
func requireHeaderField(t *testing.T, payload map[string]json.RawMessage, field, key string, want []string) {
t.Helper()
raw, ok := payload[field]
if !ok {
t.Fatalf("payload missing %q", field)
}
var headers map[string][]string
if err := json.Unmarshal(raw, &headers); err != nil {
t.Fatalf("unmarshal %q: %v", field, err)
}
got, ok := headers[key]
if !ok {
t.Fatalf("%s missing header %q", field, key)
}
if len(got) != len(want) {
t.Fatalf("%s[%q] = %v, want %v", field, key, got, want)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("%s[%q] = %v, want %v", field, key, got, want)
}
}
}
@@ -102,6 +102,7 @@ func RecordAPIRequest(ctx context.Context, cfg *config.Config, info UpstreamRequ
// RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt. // RecordAPIResponseMetadata captures upstream response status/header information for the latest attempt.
func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) { func RecordAPIResponseMetadata(ctx context.Context, cfg *config.Config, status int, headers http.Header) {
logging.SetResponseHeaders(ctx, headers)
if cfg == nil || !cfg.RequestLog { if cfg == nil || !cfg.RequestLog {
return return
} }
@@ -227,6 +228,7 @@ func RecordAPIWebsocketRequest(ctx context.Context, cfg *config.Config, info Ups
// RecordAPIWebsocketHandshake stores the upstream websocket handshake response metadata. // RecordAPIWebsocketHandshake stores the upstream websocket handshake response metadata.
func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status int, headers http.Header) { func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status int, headers http.Header) {
logging.SetResponseHeaders(ctx, headers)
if cfg == nil || !cfg.RequestLog { if cfg == nil || !cfg.RequestLog {
return return
} }
@@ -250,6 +252,7 @@ func RecordAPIWebsocketHandshake(ctx context.Context, cfg *config.Config, status
// RecordAPIWebsocketUpgradeRejection stores a rejected websocket upgrade as an HTTP attempt. // RecordAPIWebsocketUpgradeRejection stores a rejected websocket upgrade as an HTTP attempt.
func RecordAPIWebsocketUpgradeRejection(ctx context.Context, cfg *config.Config, info UpstreamRequestLog, status int, headers http.Header, body []byte) { func RecordAPIWebsocketUpgradeRejection(ctx context.Context, cfg *config.Config, info UpstreamRequestLog, status int, headers http.Header, body []byte) {
logging.SetResponseHeaders(ctx, headers)
if cfg == nil || !cfg.RequestLog { if cfg == nil || !cfg.RequestLog {
return return
} }
@@ -0,0 +1,24 @@
package helps
import (
"context"
"net/http"
"testing"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
"github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
)
func TestRecordAPIResponseMetadataStoresHeadersWhenRequestLogDisabled(t *testing.T) {
ctx := logging.WithResponseHeadersHolder(context.Background())
headers := http.Header{}
headers.Add("X-Upstream-Request-Id", "upstream-req-1")
RecordAPIResponseMetadata(ctx, &config.Config{}, http.StatusOK, headers)
headers.Set("X-Upstream-Request-Id", "mutated")
got := logging.GetResponseHeaders(ctx)
if got.Get("X-Upstream-Request-Id") != "upstream-req-1" {
t.Fatalf("response header = %q, want %q", got.Get("X-Upstream-Request-Id"), "upstream-req-1")
}
}
@@ -10,6 +10,7 @@ import (
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
internallogging "github.com/router-for-me/CLIProxyAPI/v7/internal/logging"
cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth" cliproxyauth "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/auth"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage" "github.com/router-for-me/CLIProxyAPI/v7/sdk/cliproxy/usage"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
@@ -60,7 +61,7 @@ func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string
if !ok { if !ok {
return return
} }
usage.PublishRecord(ctx, record) r.publishRecord(ctx, record)
} }
func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) { func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.Detail) (usage.Record, bool) {
@@ -97,7 +98,7 @@ func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Det
} }
detail = normalizeUsageDetailTotal(detail) detail = normalizeUsageDetailTotal(detail)
r.once.Do(func() { r.once.Do(func() {
usage.PublishRecord(ctx, r.buildRecord(detail, failed, fail)) r.publishRecord(ctx, r.buildRecord(detail, failed, fail))
}) })
} }
@@ -130,10 +131,15 @@ func (r *UsageReporter) EnsurePublished(ctx context.Context) {
return return
} }
r.once.Do(func() { r.once.Do(func() {
usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{})) r.publishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{}))
}) })
} }
func (r *UsageReporter) publishRecord(ctx context.Context, record usage.Record) {
record.ResponseHeaders = internallogging.GetResponseHeaders(ctx)
usage.PublishRecord(ctx, record)
}
func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record { func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record {
var fail usage.Failure var fail usage.Failure
if len(failures) > 0 { if len(failures) > 0 {
+1
View File
@@ -400,6 +400,7 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c *
newCtx = logging.WithEndpoint(newCtx, endpoint) newCtx = logging.WithEndpoint(newCtx, endpoint)
} }
newCtx = logging.WithResponseStatusHolder(newCtx) newCtx = logging.WithResponseStatusHolder(newCtx)
newCtx = logging.WithResponseHeadersHolder(newCtx)
cancelCtx := newCtx cancelCtx := newCtx
if requestCtx != nil && requestCtx != parentCtx { if requestCtx != nil && requestCtx != parentCtx {
+3
View File
@@ -2,6 +2,7 @@ package usage
import ( import (
"context" "context"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -24,6 +25,8 @@ type Record struct {
Failed bool Failed bool
Fail Failure Fail Failure
Detail Detail Detail Detail
// ResponseHeaders stores a snapshot of upstream response headers for usage sinks.
ResponseHeaders http.Header
} }
// Failure holds HTTP failure metadata for an upstream request attempt. // Failure holds HTTP failure metadata for an upstream request attempt.