diff --git a/internal/runtime/executor/aistudio_executor.go b/internal/runtime/executor/aistudio_executor.go index 73491d82..37e85377 100644 --- a/internal/runtime/executor/aistudio_executor.go +++ b/internal/runtime/executor/aistudio_executor.go @@ -285,7 +285,10 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth if event.Err != nil { helps.RecordAPIResponseError(ctx, e.cfg, event.Err) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} + select { + case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: + case <-ctx.Done(): + } return false } switch event.Type { @@ -303,7 +306,11 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth } lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, filtered, ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])}: + case <-ctx.Done(): + return false + } } break } @@ -319,14 +326,21 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth } lines := sdktranslator.TranslateStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, event.Payload, ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON(lines[i])}: + case <-ctx.Done(): + return false + } } reporter.Publish(ctx, helps.ParseGeminiUsage(event.Payload)) return false case wsrelay.MessageTypeError: helps.RecordAPIResponseError(ctx, e.cfg, event.Err) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} + select { + case out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}: + case <-ctx.Done(): + } return false } return true diff --git a/internal/runtime/executor/antigravity_executor.go b/internal/runtime/executor/antigravity_executor.go index 280c799a..c07680e8 100644 --- a/internal/runtime/executor/antigravity_executor.go +++ b/internal/runtime/executor/antigravity_executor.go @@ -894,12 +894,19 @@ attemptLoop: reporter.Publish(ctx, detail) } - out <- cliproxyexecutor.StreamChunk{Payload: payload} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: payload}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { reporter.EnsurePublished(ctx) } @@ -1357,17 +1364,28 @@ attemptLoop: chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(payload), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } tail := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, []byte("[DONE]"), ¶m) for i := range tail { - out <- cliproxyexecutor.StreamChunk{Payload: tail[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: tail[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { reporter.EnsurePublished(ctx) } diff --git a/internal/runtime/executor/claude_executor.go b/internal/runtime/executor/claude_executor.go index 66432ac4..ea94526e 100644 --- a/internal/runtime/executor/claude_executor.go +++ b/internal/runtime/executor/claude_executor.go @@ -484,12 +484,19 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A cloned := make([]byte, len(line)+1) copy(cloned, line) cloned[len(line)] = '\n' - out <- cliproxyexecutor.StreamChunk{Payload: cloned} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: cloned}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } return } @@ -521,13 +528,20 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A ¶m, ) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/codex_executor.go b/internal/runtime/executor/codex_executor.go index aa8223f4..6efc25b0 100644 --- a/internal/runtime/executor/codex_executor.go +++ b/internal/runtime/executor/codex_executor.go @@ -515,13 +515,20 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, originalPayload, body, translatedLine, ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/gemini_cli_executor.go b/internal/runtime/executor/gemini_cli_executor.go index 15e84572..b6210e6a 100644 --- a/internal/runtime/executor/gemini_cli_executor.go +++ b/internal/runtime/executor/gemini_cli_executor.go @@ -411,19 +411,30 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if bytes.HasPrefix(line, dataTag) { segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, bytes.Clone(line), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } } } segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, []byte("[DONE]"), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } return } reporter.EnsurePublished(ctx) @@ -434,7 +445,10 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut if errRead != nil { helps.RecordAPIResponseError(ctx, e.cfg, errRead) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errRead} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errRead}: + case <-ctx.Done(): + } return } helps.AppendAPIResponseChunk(ctx, e.cfg, data) @@ -442,12 +456,20 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut var param any segments := sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, data, ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } segments = sdktranslator.TranslateStream(respCtx, to, from, attemptModel, opts.OriginalRequest, reqBody, []byte("[DONE]"), ¶m) for i := range segments { - out <- cliproxyexecutor.StreamChunk{Payload: segments[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: segments[i]}: + case <-ctx.Done(): + return + } } }(httpResp, append([]byte(nil), payload...), attemptModel) diff --git a/internal/runtime/executor/gemini_executor.go b/internal/runtime/executor/gemini_executor.go index 0e3c3ec6..2a6e9a6e 100644 --- a/internal/runtime/executor/gemini_executor.go +++ b/internal/runtime/executor/gemini_executor.go @@ -324,17 +324,28 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(payload), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/gemini_vertex_executor.go b/internal/runtime/executor/gemini_vertex_executor.go index b147fde9..20f5aec1 100644 --- a/internal/runtime/executor/gemini_vertex_executor.go +++ b/internal/runtime/executor/gemini_vertex_executor.go @@ -656,17 +656,28 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil @@ -786,17 +797,28 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } } lines := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range lines { - out <- cliproxyexecutor.StreamChunk{Payload: lines[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: lines[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/kimi_executor.go b/internal/runtime/executor/kimi_executor.go index 3588c962..2bb0c7fd 100644 --- a/internal/runtime/executor/kimi_executor.go +++ b/internal/runtime/executor/kimi_executor.go @@ -290,17 +290,28 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut } chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, bytes.Clone(line), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } doneChunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, body, []byte("[DONE]"), ¶m) for i := range doneChunks { - out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: doneChunks[i]}: + case <-ctx.Done(): + return + } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } }() return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil diff --git a/internal/runtime/executor/openai_compat_executor.go b/internal/runtime/executor/openai_compat_executor.go index 4e44a7ae..ebddfddb 100644 --- a/internal/runtime/executor/openai_compat_executor.go +++ b/internal/runtime/executor/openai_compat_executor.go @@ -293,20 +293,31 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy // Pass through translator; it yields one or more chunks for the target schema. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bytes.Clone(line), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } if errScan := scanner.Err(); errScan != nil { helps.RecordAPIResponseError(ctx, e.cfg, errScan) reporter.PublishFailure(ctx) - out <- cliproxyexecutor.StreamChunk{Err: errScan} + select { + case out <- cliproxyexecutor.StreamChunk{Err: errScan}: + case <-ctx.Done(): + } } else { // In case the upstream close the stream without a terminal [DONE] marker. // Feed a synthetic done marker through the translator so pending // response.completed events are still emitted exactly once. chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, []byte("data: [DONE]"), ¶m) for i := range chunks { - out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]} + select { + case out <- cliproxyexecutor.StreamChunk{Payload: chunks[i]}: + case <-ctx.Done(): + return + } } } // Ensure we record the request if no usage chunk was ever seen