fix(auth): tighten registry model reconciliation
This commit is contained in:
@@ -233,23 +233,19 @@ func (m *Manager) RefreshSchedulerEntry(authID string) {
|
||||
m.scheduler.upsertAuth(snapshot)
|
||||
}
|
||||
|
||||
// ReconcileRegistryModelStates clears stale per-model runtime failures for
|
||||
// models that are currently registered for the auth in the global model registry.
|
||||
// ReconcileRegistryModelStates aligns per-model runtime state with the current
|
||||
// registry snapshot for one auth.
|
||||
//
|
||||
// This keeps the scheduler and the global registry aligned after model
|
||||
// re-registration. Without this reconciliation, a model can reappear in
|
||||
// /v1/models after registry refresh while the scheduler still blocks it because
|
||||
// auth.ModelStates retained an older failure such as not_found or quota.
|
||||
// Supported models are reset to a clean state because re-registration already
|
||||
// cleared the registry-side cooldown/suspension snapshot. ModelStates for
|
||||
// models that are no longer present in the registry are pruned entirely so
|
||||
// renamed/removed models cannot keep auth-level status stale.
|
||||
func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID string) {
|
||||
if m == nil || authID == "" {
|
||||
return
|
||||
}
|
||||
|
||||
supportedModels := registry.GetGlobalRegistry().GetModelsForClient(authID)
|
||||
if len(supportedModels) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
supported := make(map[string]struct{}, len(supportedModels))
|
||||
for _, model := range supportedModels {
|
||||
if model == nil {
|
||||
@@ -261,9 +257,6 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin
|
||||
}
|
||||
supported[modelKey] = struct{}{}
|
||||
}
|
||||
if len(supported) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var snapshot *Auth
|
||||
now := time.Now()
|
||||
@@ -273,14 +266,19 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin
|
||||
if ok && auth != nil && len(auth.ModelStates) > 0 {
|
||||
changed := false
|
||||
for modelKey, state := range auth.ModelStates {
|
||||
if state == nil {
|
||||
continue
|
||||
}
|
||||
baseModel := canonicalModelKey(modelKey)
|
||||
if baseModel == "" {
|
||||
baseModel = strings.TrimSpace(modelKey)
|
||||
}
|
||||
if _, supportedModel := supported[baseModel]; !supportedModel {
|
||||
// Drop state for models that disappeared from the current registry
|
||||
// snapshot. Keeping them around leaks stale errors into auth-level
|
||||
// status, management output, and websocket fallback checks.
|
||||
delete(auth.ModelStates, modelKey)
|
||||
changed = true
|
||||
continue
|
||||
}
|
||||
if state == nil {
|
||||
continue
|
||||
}
|
||||
if modelStateIsClean(state) {
|
||||
@@ -289,6 +287,9 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin
|
||||
resetModelState(state, now)
|
||||
changed = true
|
||||
}
|
||||
if len(auth.ModelStates) == 0 {
|
||||
auth.ModelStates = nil
|
||||
}
|
||||
if changed {
|
||||
updateAggregatedAvailability(auth, now)
|
||||
if !hasModelError(auth, now) {
|
||||
@@ -297,7 +298,9 @@ func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID strin
|
||||
auth.Status = StatusActive
|
||||
}
|
||||
auth.UpdatedAt = now
|
||||
_ = m.persist(ctx, auth)
|
||||
if errPersist := m.persist(ctx, auth); errPersist != nil {
|
||||
logEntryWithRequestID(ctx).WithField("auth_id", auth.ID).Warnf("failed to persist auth changes during model state reconciliation: %v", errPersist)
|
||||
}
|
||||
snapshot = auth.Clone()
|
||||
}
|
||||
}
|
||||
@@ -1827,7 +1830,11 @@ func modelStateIsClean(state *ModelState) bool {
|
||||
}
|
||||
|
||||
func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||
if auth == nil || len(auth.ModelStates) == 0 {
|
||||
if auth == nil {
|
||||
return
|
||||
}
|
||||
if len(auth.ModelStates) == 0 {
|
||||
clearAggregatedAvailability(auth)
|
||||
return
|
||||
}
|
||||
allUnavailable := true
|
||||
@@ -1835,10 +1842,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||
quotaExceeded := false
|
||||
quotaRecover := time.Time{}
|
||||
maxBackoffLevel := 0
|
||||
hasState := false
|
||||
for _, state := range auth.ModelStates {
|
||||
if state == nil {
|
||||
continue
|
||||
}
|
||||
hasState = true
|
||||
stateUnavailable := false
|
||||
if state.Status == StatusDisabled {
|
||||
stateUnavailable = true
|
||||
@@ -1868,6 +1877,10 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if !hasState {
|
||||
clearAggregatedAvailability(auth)
|
||||
return
|
||||
}
|
||||
auth.Unavailable = allUnavailable
|
||||
if allUnavailable {
|
||||
auth.NextRetryAfter = earliestRetry
|
||||
@@ -1887,6 +1900,15 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||
}
|
||||
}
|
||||
|
||||
func clearAggregatedAvailability(auth *Auth) {
|
||||
if auth == nil {
|
||||
return
|
||||
}
|
||||
auth.Unavailable = false
|
||||
auth.NextRetryAfter = time.Time{}
|
||||
auth.Quota = QuotaState{}
|
||||
}
|
||||
|
||||
func hasModelError(auth *Auth, now time.Time) bool {
|
||||
if auth == nil || len(auth.ModelStates) == 0 {
|
||||
return false
|
||||
|
||||
Reference in New Issue
Block a user