feat(home): implement count for home auth dispatch requests and enable usage statistics

- Added `count` attribute to `homeAuthCount` requests to improve home message batching.
- Enabled usage statistics for home mode by default and added config-level enforcement.
- Adjusted failure logging to include detailed metadata in `UsageReporter`.
- Updated multiple executors to pass error details to `PublishFailure` for better debugging.
- Enhanced unit tests to validate `count` behavior and usage statistics enforcement across components.
This commit is contained in:
Luis Pater
2026-05-10 01:30:43 +08:00
parent 1abf8625d8
commit 66c3dae06b
21 changed files with 281 additions and 52 deletions
+15 -7
View File
@@ -190,7 +190,20 @@ func headersToLowerMap(headers http.Header) map[string]string {
return out
}
func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID string, headers http.Header) ([]byte, error) {
func newAuthDispatchRequest(requestedModel string, sessionID string, headers http.Header, count int) authDispatchRequest {
if count <= 0 {
count = 1
}
return authDispatchRequest{
Type: "auth",
Model: requestedModel,
Count: count,
SessionID: strings.TrimSpace(sessionID),
Headers: headersToLowerMap(headers),
}
}
func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID string, headers http.Header, count int) ([]byte, error) {
if err := c.ensureClients(); err != nil {
return nil, err
}
@@ -198,12 +211,7 @@ func (c *Client) RPopAuth(ctx context.Context, requestedModel string, sessionID
if requestedModel == "" {
return nil, fmt.Errorf("home: requested model is empty")
}
req := authDispatchRequest{
Type: "auth",
Model: requestedModel,
SessionID: strings.TrimSpace(sessionID),
Headers: headersToLowerMap(headers),
}
req := newAuthDispatchRequest(requestedModel, sessionID, headers, count)
keyBytes, err := json.Marshal(&req)
if err != nil {
return nil, err
+32
View File
@@ -0,0 +1,32 @@
package home
import (
"encoding/json"
"net/http"
"testing"
)
func TestAuthDispatchRequestIncludesCount(t *testing.T) {
req := newAuthDispatchRequest("gpt-5.4", "session-1", http.Header{"Authorization": {"Bearer test"}}, 2)
raw, err := json.Marshal(&req)
if err != nil {
t.Fatalf("marshal auth dispatch request: %v", err)
}
var payload map[string]any
if err := json.Unmarshal(raw, &payload); err != nil {
t.Fatalf("unmarshal auth dispatch request: %v", err)
}
if got := int(payload["count"].(float64)); got != 2 {
t.Fatalf("count = %d, want 2", got)
}
}
func TestAuthDispatchRequestDefaultsCountToOne(t *testing.T) {
req := newAuthDispatchRequest("gpt-5.4", "", nil, 0)
if req.Count != 1 {
t.Fatalf("count = %d, want 1", req.Count)
}
}
+1
View File
@@ -3,6 +3,7 @@ package home
type authDispatchRequest struct {
Type string `json:"type"`
Model string `json:"model"`
Count int `json:"count"`
SessionID string `json:"session_id,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
+25
View File
@@ -66,6 +66,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
if !failed {
failed = !resolveSuccess(ctx)
}
fail := resolveFail(ctx, record, failed)
detail := requestDetail{
Timestamp: timestamp,
@@ -74,6 +75,7 @@ func (p *usageQueuePlugin) HandleUsage(ctx context.Context, record coreusage.Rec
AuthIndex: record.AuthIndex,
Tokens: tokens,
Failed: failed,
Fail: fail,
}
payload, err := json.Marshal(queuedUsageDetail{
@@ -110,6 +112,7 @@ type requestDetail struct {
AuthIndex string `json:"auth_index"`
Tokens tokenStats `json:"tokens"`
Failed bool `json:"failed"`
Fail failDetail `json:"fail"`
}
type tokenStats struct {
@@ -120,6 +123,28 @@ type tokenStats struct {
TotalTokens int64 `json:"total_tokens"`
}
type failDetail struct {
StatusCode int `json:"status_code"`
Body string `json:"body"`
}
func resolveFail(ctx context.Context, record coreusage.Record, failed bool) failDetail {
fail := failDetail{
StatusCode: record.Fail.StatusCode,
Body: strings.TrimSpace(record.Fail.Body),
}
if !failed {
return failDetail{StatusCode: 200}
}
if fail.StatusCode <= 0 {
fail.StatusCode = internallogging.GetResponseStatus(ctx)
}
if fail.StatusCode <= 0 {
fail.StatusCode = 500
}
return fail
}
func resolveSuccess(ctx context.Context) bool {
status := internallogging.GetResponseStatus(ctx)
if status == 0 {
+41 -3
View File
@@ -44,9 +44,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndSuccess(t *testing.T) {
requireStringField(t, payload, "alias", "client-gpt")
requireStringField(t, payload, "endpoint", "POST /v1/chat/completions")
requireStringField(t, payload, "auth_type", "apikey")
requireStringField(t, payload, "user_api_key", "test-key")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "ctx-request-id")
requireBoolField(t, payload, "failed", false)
requireFailField(t, payload, http.StatusOK, "")
})
}
@@ -68,6 +69,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 2500 * time.Millisecond,
Fail: coreusage.Failure{
StatusCode: http.StatusInternalServerError,
Body: "upstream failed",
},
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
@@ -81,9 +86,10 @@ func TestUsageQueuePluginPayloadIncludesStableFieldsAndFailureAndGinRequestID(t
requireStringField(t, payload, "alias", "client-mini")
requireStringField(t, payload, "endpoint", "GET /v1/responses")
requireStringField(t, payload, "auth_type", "apikey")
requireStringField(t, payload, "user_api_key", "test-key")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "gin-request-id")
requireBoolField(t, payload, "failed", true)
requireFailField(t, payload, http.StatusInternalServerError, "upstream failed")
})
}
@@ -115,6 +121,10 @@ func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) {
Source: "user@example.com",
RequestedAt: time.Date(2026, 4, 25, 0, 0, 0, 0, time.UTC),
Latency: 1500 * time.Millisecond,
Fail: coreusage.Failure{
StatusCode: http.StatusBadGateway,
Body: "bad gateway",
},
Detail: coreusage.Detail{
InputTokens: 10,
OutputTokens: 20,
@@ -125,9 +135,10 @@ func TestUsageQueuePluginAsyncIgnoresRecycledGinContext(t *testing.T) {
payload := waitForSinglePayload(t, 2*time.Second)
requireStringField(t, payload, "endpoint", "POST /v1/chat/completions")
requireStringField(t, payload, "alias", "client-gpt")
requireStringField(t, payload, "user_api_key", "test-key")
requireMissingField(t, payload, "user_api_key")
requireStringField(t, payload, "request_id", "ctx-request-id")
requireBoolField(t, payload, "failed", true)
requireFailField(t, payload, http.StatusBadGateway, "bad gateway")
})
}
@@ -217,6 +228,14 @@ func requireStringField(t *testing.T, payload map[string]json.RawMessage, key, w
}
}
func requireMissingField(t *testing.T, payload map[string]json.RawMessage, key string) {
t.Helper()
if _, ok := payload[key]; ok {
t.Fatalf("payload unexpectedly contains %q", key)
}
}
type pluginFunc func(context.Context, coreusage.Record)
func (fn pluginFunc) HandleUsage(ctx context.Context, record coreusage.Record) {
@@ -238,3 +257,22 @@ func requireBoolField(t *testing.T, payload map[string]json.RawMessage, key stri
t.Fatalf("%s = %t, want %t", key, got, want)
}
}
func requireFailField(t *testing.T, payload map[string]json.RawMessage, wantStatus int, wantBody string) {
t.Helper()
raw, ok := payload["fail"]
if !ok {
t.Fatalf("payload missing %q", "fail")
}
var got struct {
StatusCode int `json:"status_code"`
Body string `json:"body"`
}
if err := json.Unmarshal(raw, &got); err != nil {
t.Fatalf("unmarshal fail: %v", err)
}
if got.StatusCode != wantStatus || got.Body != wantBody {
t.Fatalf("fail = {status_code:%d body:%q}, want {status_code:%d body:%q}", got.StatusCode, got.Body, wantStatus, wantBody)
}
}
@@ -284,7 +284,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
processEvent := func(event wsrelay.StreamEvent) bool {
if event.Err != nil {
helps.RecordAPIResponseError(ctx, e.cfg, event.Err)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, event.Err)
select {
case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}:
case <-ctx.Done():
@@ -336,7 +336,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
return false
case wsrelay.MessageTypeError:
helps.RecordAPIResponseError(ctx, e.cfg, event.Err)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, event.Err)
select {
case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}:
case <-ctx.Done():
@@ -898,7 +898,7 @@ attemptLoop:
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
out <- cliproxyexecutor.StreamChunk{Err: errScan}
} else {
reporter.EnsurePublished(ctx)
@@ -1374,7 +1374,7 @@ attemptLoop:
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
+2 -2
View File
@@ -472,7 +472,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -512,7 +512,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
+1 -1
View File
@@ -524,7 +524,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -580,7 +580,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
terminateReason = "read_error"
terminateErr = errRead
helps.RecordAPIWebsocketError(ctx, e.cfg, "read", errRead)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errRead)
_ = send(cliproxyexecutor.StreamChunk{Err: errRead})
return
}
@@ -590,7 +590,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
terminateReason = "unexpected_binary"
terminateErr = err
helps.RecordAPIWebsocketError(ctx, e.cfg, "unexpected_binary", err)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, err)
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "unexpected_binary", err)
}
@@ -610,7 +610,7 @@ func (e *CodexWebsocketsExecutor) ExecuteStream(ctx context.Context, auth *clipr
terminateReason = "upstream_error"
terminateErr = wsErr
helps.RecordAPIWebsocketError(ctx, e.cfg, "upstream_error", wsErr)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, wsErr)
if sess != nil {
e.invalidateUpstreamConn(sess, conn, "upstream_error", wsErr)
}
@@ -430,7 +430,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -444,7 +444,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
data, errRead := io.ReadAll(resp.Body)
if errRead != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errRead)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errRead)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errRead}:
case <-ctx.Done():
+1 -1
View File
@@ -341,7 +341,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -679,7 +679,7 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -821,7 +821,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -3,6 +3,7 @@ package helps
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync"
@@ -51,7 +52,7 @@ func NewUsageReporter(ctx context.Context, provider, model string, auth *cliprox
}
func (r *UsageReporter) Publish(ctx context.Context, detail usage.Detail) {
r.publishWithOutcome(ctx, detail, false)
r.publishWithOutcome(ctx, detail, false, usage.Failure{})
}
func (r *UsageReporter) PublishAdditionalModel(ctx context.Context, model string, detail usage.Detail) {
@@ -74,11 +75,11 @@ func (r *UsageReporter) buildAdditionalModelRecord(model string, detail usage.De
if !hasNonZeroTokenUsage(detail) {
return usage.Record{}, false
}
return r.buildRecordForModel(model, detail, false), true
return r.buildRecordForModel(model, detail, false, usage.Failure{}), true
}
func (r *UsageReporter) PublishFailure(ctx context.Context) {
r.publishWithOutcome(ctx, usage.Detail{}, true)
func (r *UsageReporter) PublishFailure(ctx context.Context, errs ...error) {
r.publishWithOutcome(ctx, usage.Detail{}, true, failFromErrors(errs...))
}
func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) {
@@ -86,17 +87,17 @@ func (r *UsageReporter) TrackFailure(ctx context.Context, errPtr *error) {
return
}
if *errPtr != nil {
r.PublishFailure(ctx)
r.PublishFailure(ctx, *errPtr)
}
}
func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool) {
func (r *UsageReporter) publishWithOutcome(ctx context.Context, detail usage.Detail, failed bool, fail usage.Failure) {
if r == nil {
return
}
detail = normalizeUsageDetailTotal(detail)
r.once.Do(func() {
usage.PublishRecord(ctx, r.buildRecord(detail, failed))
usage.PublishRecord(ctx, r.buildRecord(detail, failed, fail))
})
}
@@ -127,20 +128,24 @@ func (r *UsageReporter) EnsurePublished(ctx context.Context) {
return
}
r.once.Do(func() {
usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false))
usage.PublishRecord(ctx, r.buildRecord(usage.Detail{}, false, usage.Failure{}))
})
}
func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool) usage.Record {
if r == nil {
return usage.Record{Detail: detail, Failed: failed}
func (r *UsageReporter) buildRecord(detail usage.Detail, failed bool, failures ...usage.Failure) usage.Record {
var fail usage.Failure
if len(failures) > 0 {
fail = failures[0]
}
return r.buildRecordForModel(r.model, detail, failed)
if r == nil {
return usage.Record{Detail: detail, Failed: failed, Fail: fail}
}
return r.buildRecordForModel(r.model, detail, failed, fail)
}
func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool) usage.Record {
func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, failed bool, fail usage.Failure) usage.Record {
if r == nil {
return usage.Record{Model: model, Detail: detail, Failed: failed}
return usage.Record{Model: model, Detail: detail, Failed: failed, Fail: fail}
}
return usage.Record{
Provider: r.provider,
@@ -154,10 +159,28 @@ func (r *UsageReporter) buildRecordForModel(model string, detail usage.Detail, f
RequestedAt: r.requestedAt,
Latency: r.latency(),
Failed: failed,
Fail: fail,
Detail: detail,
}
}
func failFromErrors(errs ...error) usage.Failure {
for _, err := range errs {
if err == nil {
continue
}
fail := usage.Failure{
Body: strings.TrimSpace(err.Error()),
}
var se interface{ StatusCode() int }
if errors.As(err, &se) && se != nil {
fail.StatusCode = se.StatusCode()
}
return fail
}
return usage.Failure{}
}
func (r *UsageReporter) latency() time.Duration {
if r == nil || r.requestedAt.IsZero() {
return 0
+1 -1
View File
@@ -307,7 +307,7 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():
@@ -296,7 +296,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
if bytes.HasPrefix(trimmedLine, []byte("{")) || bytes.HasPrefix(trimmedLine, []byte("[")) {
streamErr := statusErr{code: http.StatusBadGateway, msg: string(trimmedLine)}
helps.RecordAPIResponseError(ctx, e.cfg, streamErr)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, streamErr)
select {
case out <- cliproxyexecutor.StreamChunk{Err: streamErr}:
case <-ctx.Done():
@@ -318,7 +318,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
}
if errScan := scanner.Err(); errScan != nil {
helps.RecordAPIResponseError(ctx, e.cfg, errScan)
reporter.PublishFailure(ctx)
reporter.PublishFailure(ctx, errScan)
select {
case out <- cliproxyexecutor.StreamChunk{Err: errScan}:
case <-ctx.Done():