fix(openai-compat): retry empty bootstrap streams
This commit is contained in:
@@ -543,6 +543,20 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi
|
|||||||
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil
|
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if closed && len(buffered) == 0 {
|
||||||
|
emptyErr := &Error{Code: "empty_stream", Message: "upstream stream closed before first payload", Retryable: true}
|
||||||
|
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: emptyErr}
|
||||||
|
m.MarkResult(ctx, result)
|
||||||
|
if idx < len(execModels)-1 {
|
||||||
|
lastErr = emptyErr
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
errCh := make(chan cliproxyexecutor.StreamChunk, 1)
|
||||||
|
errCh <- cliproxyexecutor.StreamChunk{Err: emptyErr}
|
||||||
|
close(errCh)
|
||||||
|
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil
|
||||||
|
}
|
||||||
|
|
||||||
remaining := streamResult.Chunks
|
remaining := streamResult.Chunks
|
||||||
if closed {
|
if closed {
|
||||||
closedCh := make(chan cliproxyexecutor.StreamChunk)
|
closedCh := make(chan cliproxyexecutor.StreamChunk)
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ type openAICompatPoolExecutor struct {
|
|||||||
executeErrors map[string]error
|
executeErrors map[string]error
|
||||||
countErrors map[string]error
|
countErrors map[string]error
|
||||||
streamFirstErrors map[string]error
|
streamFirstErrors map[string]error
|
||||||
|
streamPayloads map[string][]cliproxyexecutor.StreamChunk
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *openAICompatPoolExecutor) Identifier() string { return e.id }
|
func (e *openAICompatPoolExecutor) Identifier() string { return e.id }
|
||||||
@@ -46,14 +47,22 @@ func (e *openAICompatPoolExecutor) ExecuteStream(ctx context.Context, auth *Auth
|
|||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
e.streamModels = append(e.streamModels, req.Model)
|
e.streamModels = append(e.streamModels, req.Model)
|
||||||
err := e.streamFirstErrors[req.Model]
|
err := e.streamFirstErrors[req.Model]
|
||||||
|
payloadChunks, hasCustomChunks := e.streamPayloads[req.Model]
|
||||||
|
chunks := append([]cliproxyexecutor.StreamChunk(nil), payloadChunks...)
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
ch := make(chan cliproxyexecutor.StreamChunk, 1)
|
ch := make(chan cliproxyexecutor.StreamChunk, max(1, len(chunks)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ch <- cliproxyexecutor.StreamChunk{Err: err}
|
ch <- cliproxyexecutor.StreamChunk{Err: err}
|
||||||
close(ch)
|
close(ch)
|
||||||
return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil
|
return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
ch <- cliproxyexecutor.StreamChunk{Payload: []byte(req.Model)}
|
if !hasCustomChunks {
|
||||||
|
ch <- cliproxyexecutor.StreamChunk{Payload: []byte(req.Model)}
|
||||||
|
} else {
|
||||||
|
for _, chunk := range chunks {
|
||||||
|
ch <- chunk
|
||||||
|
}
|
||||||
|
}
|
||||||
close(ch)
|
close(ch)
|
||||||
return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil
|
return &cliproxyexecutor.StreamResult{Headers: http.Header{"X-Model": {req.Model}}, Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
@@ -261,6 +270,42 @@ func TestManagerExecute_OpenAICompatAliasPoolFallsBackWithinSameAuth(t *testing.
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestManagerExecuteStream_OpenAICompatAliasPoolRetriesOnEmptyBootstrap(t *testing.T) {
|
||||||
|
alias := "claude-opus-4.66"
|
||||||
|
executor := &openAICompatPoolExecutor{
|
||||||
|
id: "pool",
|
||||||
|
streamPayloads: map[string][]cliproxyexecutor.StreamChunk{
|
||||||
|
"qwen3.5-plus": {},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
m := newOpenAICompatPoolTestManager(t, alias, []internalconfig.OpenAICompatibilityModel{
|
||||||
|
{Name: "qwen3.5-plus", Alias: alias},
|
||||||
|
{Name: "glm-5", Alias: alias},
|
||||||
|
}, executor)
|
||||||
|
|
||||||
|
streamResult, err := m.ExecuteStream(context.Background(), []string{"pool"}, cliproxyexecutor.Request{Model: alias}, cliproxyexecutor.Options{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("execute stream: %v", err)
|
||||||
|
}
|
||||||
|
var payload []byte
|
||||||
|
for chunk := range streamResult.Chunks {
|
||||||
|
if chunk.Err != nil {
|
||||||
|
t.Fatalf("unexpected stream error: %v", chunk.Err)
|
||||||
|
}
|
||||||
|
payload = append(payload, chunk.Payload...)
|
||||||
|
}
|
||||||
|
if string(payload) != "glm-5" {
|
||||||
|
t.Fatalf("payload = %q, want %q", string(payload), "glm-5")
|
||||||
|
}
|
||||||
|
got := executor.StreamModels()
|
||||||
|
want := []string{"qwen3.5-plus", "glm-5"}
|
||||||
|
for i := range want {
|
||||||
|
if got[i] != want[i] {
|
||||||
|
t.Fatalf("stream call %d model = %q, want %q", i, got[i], want[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestManagerExecuteStream_OpenAICompatAliasPoolFallsBackBeforeFirstByte(t *testing.T) {
|
func TestManagerExecuteStream_OpenAICompatAliasPoolFallsBackBeforeFirstByte(t *testing.T) {
|
||||||
alias := "claude-opus-4.66"
|
alias := "claude-opus-4.66"
|
||||||
executor := &openAICompatPoolExecutor{
|
executor := &openAICompatPoolExecutor{
|
||||||
|
|||||||
Reference in New Issue
Block a user