fix: harden pooled model-support fallback state
This commit is contained in:
+149
-32
@@ -421,10 +421,6 @@ func preserveRequestedModelSuffix(requestedModel, resolved string) string {
|
||||
}
|
||||
|
||||
func (m *Manager) executionModelCandidates(auth *Auth, routeModel string) []string {
|
||||
return m.prepareExecutionModels(auth, routeModel)
|
||||
}
|
||||
|
||||
func (m *Manager) prepareExecutionModels(auth *Auth, routeModel string) []string {
|
||||
requestedModel := rewriteModelForAuth(routeModel, auth)
|
||||
requestedModel = m.applyOAuthModelAlias(auth, requestedModel)
|
||||
if pool := m.resolveOpenAICompatUpstreamModelPool(auth, requestedModel); len(pool) > 0 {
|
||||
@@ -441,6 +437,46 @@ func (m *Manager) prepareExecutionModels(auth *Auth, routeModel string) []string
|
||||
return []string{resolved}
|
||||
}
|
||||
|
||||
func executionResultModel(routeModel, upstreamModel string, pooled bool) string {
|
||||
if pooled {
|
||||
if resolved := strings.TrimSpace(upstreamModel); resolved != "" {
|
||||
return resolved
|
||||
}
|
||||
}
|
||||
if requested := strings.TrimSpace(routeModel); requested != "" {
|
||||
return requested
|
||||
}
|
||||
return strings.TrimSpace(upstreamModel)
|
||||
}
|
||||
|
||||
func filterExecutionModels(auth *Auth, routeModel string, candidates []string, pooled bool) []string {
|
||||
if len(candidates) == 0 {
|
||||
return nil
|
||||
}
|
||||
now := time.Now()
|
||||
out := make([]string, 0, len(candidates))
|
||||
for _, upstreamModel := range candidates {
|
||||
stateModel := executionResultModel(routeModel, upstreamModel, pooled)
|
||||
blocked, _, _ := isAuthBlockedForModel(auth, stateModel, now)
|
||||
if blocked {
|
||||
continue
|
||||
}
|
||||
out = append(out, upstreamModel)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Manager) preparedExecutionModels(auth *Auth, routeModel string) ([]string, bool) {
|
||||
candidates := m.executionModelCandidates(auth, routeModel)
|
||||
pooled := len(candidates) > 1
|
||||
return filterExecutionModels(auth, routeModel, candidates, pooled), pooled
|
||||
}
|
||||
|
||||
func (m *Manager) prepareExecutionModels(auth *Auth, routeModel string) []string {
|
||||
models, _ := m.preparedExecutionModels(auth, routeModel)
|
||||
return models
|
||||
}
|
||||
|
||||
func discardStreamChunks(ch <-chan cliproxyexecutor.StreamChunk) {
|
||||
if ch == nil {
|
||||
return
|
||||
@@ -451,6 +487,59 @@ func discardStreamChunks(ch <-chan cliproxyexecutor.StreamChunk) {
|
||||
}()
|
||||
}
|
||||
|
||||
type streamBootstrapError struct {
|
||||
cause error
|
||||
headers http.Header
|
||||
}
|
||||
|
||||
func cloneHTTPHeader(headers http.Header) http.Header {
|
||||
if headers == nil {
|
||||
return nil
|
||||
}
|
||||
return headers.Clone()
|
||||
}
|
||||
|
||||
func newStreamBootstrapError(err error, headers http.Header) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return &streamBootstrapError{
|
||||
cause: err,
|
||||
headers: cloneHTTPHeader(headers),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *streamBootstrapError) Error() string {
|
||||
if e == nil || e.cause == nil {
|
||||
return ""
|
||||
}
|
||||
return e.cause.Error()
|
||||
}
|
||||
|
||||
func (e *streamBootstrapError) Unwrap() error {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return e.cause
|
||||
}
|
||||
|
||||
func (e *streamBootstrapError) Headers() http.Header {
|
||||
if e == nil {
|
||||
return nil
|
||||
}
|
||||
return cloneHTTPHeader(e.headers)
|
||||
}
|
||||
|
||||
func streamErrorResult(headers http.Header, err error) *cliproxyexecutor.StreamResult {
|
||||
ch := make(chan cliproxyexecutor.StreamChunk, 1)
|
||||
ch <- cliproxyexecutor.StreamChunk{Err: err}
|
||||
close(ch)
|
||||
return &cliproxyexecutor.StreamResult{
|
||||
Headers: cloneHTTPHeader(headers),
|
||||
Chunks: ch,
|
||||
}
|
||||
}
|
||||
|
||||
func readStreamBootstrap(ctx context.Context, ch <-chan cliproxyexecutor.StreamChunk) ([]cliproxyexecutor.StreamChunk, bool, error) {
|
||||
if ch == nil {
|
||||
return nil, true, nil
|
||||
@@ -483,7 +572,7 @@ func readStreamBootstrap(ctx context.Context, ch <-chan cliproxyexecutor.StreamC
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) wrapStreamResult(ctx context.Context, auth *Auth, provider, routeModel string, headers http.Header, buffered []cliproxyexecutor.StreamChunk, remaining <-chan cliproxyexecutor.StreamChunk) *cliproxyexecutor.StreamResult {
|
||||
func (m *Manager) wrapStreamResult(ctx context.Context, auth *Auth, provider, resultModel string, headers http.Header, buffered []cliproxyexecutor.StreamChunk, remaining <-chan cliproxyexecutor.StreamChunk) *cliproxyexecutor.StreamResult {
|
||||
out := make(chan cliproxyexecutor.StreamChunk)
|
||||
go func() {
|
||||
defer close(out)
|
||||
@@ -496,7 +585,7 @@ func (m *Manager) wrapStreamResult(ctx context.Context, auth *Auth, provider, ro
|
||||
if se, ok := errors.AsType[cliproxyexecutor.StatusError](chunk.Err); ok && se != nil {
|
||||
rerr.HTTPStatus = se.StatusCode()
|
||||
}
|
||||
m.MarkResult(ctx, Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: rerr})
|
||||
m.MarkResult(ctx, Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: rerr})
|
||||
}
|
||||
if !forward {
|
||||
return false
|
||||
@@ -526,19 +615,19 @@ func (m *Manager) wrapStreamResult(ctx context.Context, auth *Auth, provider, ro
|
||||
}
|
||||
}
|
||||
if !failed {
|
||||
m.MarkResult(ctx, Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: true})
|
||||
m.MarkResult(ctx, Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: true})
|
||||
}
|
||||
}()
|
||||
return &cliproxyexecutor.StreamResult{Headers: headers, Chunks: out}
|
||||
}
|
||||
|
||||
func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor ProviderExecutor, auth *Auth, provider string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, routeModel string) (*cliproxyexecutor.StreamResult, error) {
|
||||
func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor ProviderExecutor, auth *Auth, provider string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, routeModel string, execModels []string, pooled bool) (*cliproxyexecutor.StreamResult, error) {
|
||||
if executor == nil {
|
||||
return nil, &Error{Code: "executor_not_found", Message: "executor not registered"}
|
||||
}
|
||||
execModels := m.prepareExecutionModels(auth, routeModel)
|
||||
var lastErr error
|
||||
for idx, execModel := range execModels {
|
||||
resultModel := executionResultModel(routeModel, execModel, pooled)
|
||||
execReq := req
|
||||
execReq.Model = execModel
|
||||
streamResult, errStream := executor.ExecuteStream(ctx, auth, execReq, opts)
|
||||
@@ -550,7 +639,7 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi
|
||||
if se, ok := errors.AsType[cliproxyexecutor.StatusError](errStream); ok && se != nil {
|
||||
rerr.HTTPStatus = se.StatusCode()
|
||||
}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: rerr}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: rerr}
|
||||
result.RetryAfter = retryAfterFromError(errStream)
|
||||
m.MarkResult(ctx, result)
|
||||
if isRequestInvalidError(errStream) {
|
||||
@@ -571,7 +660,7 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi
|
||||
if se, ok := errors.AsType[cliproxyexecutor.StatusError](bootstrapErr); ok && se != nil {
|
||||
rerr.HTTPStatus = se.StatusCode()
|
||||
}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: rerr}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: rerr}
|
||||
result.RetryAfter = retryAfterFromError(bootstrapErr)
|
||||
m.MarkResult(ctx, result)
|
||||
discardStreamChunks(streamResult.Chunks)
|
||||
@@ -582,31 +671,33 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi
|
||||
if se, ok := errors.AsType[cliproxyexecutor.StatusError](bootstrapErr); ok && se != nil {
|
||||
rerr.HTTPStatus = se.StatusCode()
|
||||
}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: rerr}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: rerr}
|
||||
result.RetryAfter = retryAfterFromError(bootstrapErr)
|
||||
m.MarkResult(ctx, result)
|
||||
discardStreamChunks(streamResult.Chunks)
|
||||
lastErr = bootstrapErr
|
||||
continue
|
||||
}
|
||||
errCh := make(chan cliproxyexecutor.StreamChunk, 1)
|
||||
errCh <- cliproxyexecutor.StreamChunk{Err: bootstrapErr}
|
||||
close(errCh)
|
||||
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil
|
||||
rerr := &Error{Message: bootstrapErr.Error()}
|
||||
if se, ok := errors.AsType[cliproxyexecutor.StatusError](bootstrapErr); ok && se != nil {
|
||||
rerr.HTTPStatus = se.StatusCode()
|
||||
}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: rerr}
|
||||
result.RetryAfter = retryAfterFromError(bootstrapErr)
|
||||
m.MarkResult(ctx, result)
|
||||
discardStreamChunks(streamResult.Chunks)
|
||||
return nil, newStreamBootstrapError(bootstrapErr, streamResult.Headers)
|
||||
}
|
||||
|
||||
if closed && len(buffered) == 0 {
|
||||
emptyErr := &Error{Code: "empty_stream", Message: "upstream stream closed before first payload", Retryable: true}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: emptyErr}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: false, Error: emptyErr}
|
||||
m.MarkResult(ctx, result)
|
||||
if idx < len(execModels)-1 {
|
||||
lastErr = emptyErr
|
||||
continue
|
||||
}
|
||||
errCh := make(chan cliproxyexecutor.StreamChunk, 1)
|
||||
errCh <- cliproxyexecutor.StreamChunk{Err: emptyErr}
|
||||
close(errCh)
|
||||
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, nil, errCh), nil
|
||||
return nil, newStreamBootstrapError(emptyErr, streamResult.Headers)
|
||||
}
|
||||
|
||||
remaining := streamResult.Chunks
|
||||
@@ -615,7 +706,7 @@ func (m *Manager) executeStreamWithModelPool(ctx context.Context, executor Provi
|
||||
close(closedCh)
|
||||
remaining = closedCh
|
||||
}
|
||||
return m.wrapStreamResult(ctx, auth.Clone(), provider, routeModel, streamResult.Headers, buffered, remaining), nil
|
||||
return m.wrapStreamResult(ctx, auth.Clone(), provider, resultModel, streamResult.Headers, buffered, remaining), nil
|
||||
}
|
||||
if lastErr == nil {
|
||||
lastErr = &Error{Code: "auth_not_found", Message: "no upstream model available"}
|
||||
@@ -979,9 +1070,10 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
|
||||
routeModel := req.Model
|
||||
opts = ensureRequestedModelMetadata(opts, routeModel)
|
||||
tried := make(map[string]struct{})
|
||||
attempted := make(map[string]struct{})
|
||||
var lastErr error
|
||||
for {
|
||||
if maxRetryCredentials > 0 && len(tried) >= maxRetryCredentials {
|
||||
if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials {
|
||||
if lastErr != nil {
|
||||
return cliproxyexecutor.Response{}, lastErr
|
||||
}
|
||||
@@ -1006,13 +1098,18 @@ func (m *Manager) executeMixedOnce(ctx context.Context, providers []string, req
|
||||
execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt)
|
||||
}
|
||||
|
||||
models := m.prepareExecutionModels(auth, routeModel)
|
||||
models, pooled := m.preparedExecutionModels(auth, routeModel)
|
||||
if len(models) == 0 {
|
||||
continue
|
||||
}
|
||||
attempted[auth.ID] = struct{}{}
|
||||
var authErr error
|
||||
for _, upstreamModel := range models {
|
||||
resultModel := executionResultModel(routeModel, upstreamModel, pooled)
|
||||
execReq := req
|
||||
execReq.Model = upstreamModel
|
||||
resp, errExec := executor.Execute(execCtx, auth, execReq, opts)
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: errExec == nil}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: errExec == nil}
|
||||
if errExec != nil {
|
||||
if errCtx := execCtx.Err(); errCtx != nil {
|
||||
return cliproxyexecutor.Response{}, errCtx
|
||||
@@ -1051,9 +1148,10 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
|
||||
routeModel := req.Model
|
||||
opts = ensureRequestedModelMetadata(opts, routeModel)
|
||||
tried := make(map[string]struct{})
|
||||
attempted := make(map[string]struct{})
|
||||
var lastErr error
|
||||
for {
|
||||
if maxRetryCredentials > 0 && len(tried) >= maxRetryCredentials {
|
||||
if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials {
|
||||
if lastErr != nil {
|
||||
return cliproxyexecutor.Response{}, lastErr
|
||||
}
|
||||
@@ -1078,13 +1176,18 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
|
||||
execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt)
|
||||
}
|
||||
|
||||
models := m.prepareExecutionModels(auth, routeModel)
|
||||
models, pooled := m.preparedExecutionModels(auth, routeModel)
|
||||
if len(models) == 0 {
|
||||
continue
|
||||
}
|
||||
attempted[auth.ID] = struct{}{}
|
||||
var authErr error
|
||||
for _, upstreamModel := range models {
|
||||
resultModel := executionResultModel(routeModel, upstreamModel, pooled)
|
||||
execReq := req
|
||||
execReq.Model = upstreamModel
|
||||
resp, errExec := executor.CountTokens(execCtx, auth, execReq, opts)
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: errExec == nil}
|
||||
result := Result{AuthID: auth.ID, Provider: provider, Model: resultModel, Success: errExec == nil}
|
||||
if errExec != nil {
|
||||
if errCtx := execCtx.Err(); errCtx != nil {
|
||||
return cliproxyexecutor.Response{}, errCtx
|
||||
@@ -1096,14 +1199,14 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
|
||||
if ra := retryAfterFromError(errExec); ra != nil {
|
||||
result.RetryAfter = ra
|
||||
}
|
||||
m.hook.OnResult(execCtx, result)
|
||||
m.MarkResult(execCtx, result)
|
||||
if isRequestInvalidError(errExec) {
|
||||
return cliproxyexecutor.Response{}, errExec
|
||||
}
|
||||
authErr = errExec
|
||||
continue
|
||||
}
|
||||
m.hook.OnResult(execCtx, result)
|
||||
m.MarkResult(execCtx, result)
|
||||
return resp, nil
|
||||
}
|
||||
if authErr != nil {
|
||||
@@ -1123,10 +1226,15 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
|
||||
routeModel := req.Model
|
||||
opts = ensureRequestedModelMetadata(opts, routeModel)
|
||||
tried := make(map[string]struct{})
|
||||
attempted := make(map[string]struct{})
|
||||
var lastErr error
|
||||
for {
|
||||
if maxRetryCredentials > 0 && len(tried) >= maxRetryCredentials {
|
||||
if maxRetryCredentials > 0 && len(attempted) >= maxRetryCredentials {
|
||||
if lastErr != nil {
|
||||
var bootstrapErr *streamBootstrapError
|
||||
if errors.As(lastErr, &bootstrapErr) && bootstrapErr != nil {
|
||||
return streamErrorResult(bootstrapErr.Headers(), bootstrapErr.cause), nil
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
return nil, &Error{Code: "auth_not_found", Message: "no auth available"}
|
||||
@@ -1134,6 +1242,10 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
|
||||
auth, executor, provider, errPick := m.pickNextMixed(ctx, providers, routeModel, opts, tried)
|
||||
if errPick != nil {
|
||||
if lastErr != nil {
|
||||
var bootstrapErr *streamBootstrapError
|
||||
if errors.As(lastErr, &bootstrapErr) && bootstrapErr != nil {
|
||||
return streamErrorResult(bootstrapErr.Headers(), bootstrapErr.cause), nil
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
return nil, errPick
|
||||
@@ -1149,7 +1261,12 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
|
||||
execCtx = context.WithValue(execCtx, roundTripperContextKey{}, rt)
|
||||
execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt)
|
||||
}
|
||||
streamResult, errStream := m.executeStreamWithModelPool(execCtx, executor, auth, provider, req, opts, routeModel)
|
||||
models, pooled := m.preparedExecutionModels(auth, routeModel)
|
||||
if len(models) == 0 {
|
||||
continue
|
||||
}
|
||||
attempted[auth.ID] = struct{}{}
|
||||
streamResult, errStream := m.executeStreamWithModelPool(execCtx, executor, auth, provider, req, opts, routeModel, models, pooled)
|
||||
if errStream != nil {
|
||||
if errCtx := execCtx.Err(); errCtx != nil {
|
||||
return nil, errCtx
|
||||
|
||||
Reference in New Issue
Block a user