Merge pull request #2121 from destinoantagonista-wq/main
Reconcile registry model states on auth changes
This commit is contained in:
@@ -234,6 +234,84 @@ func (m *Manager) RefreshSchedulerEntry(authID string) {
|
|||||||
m.scheduler.upsertAuth(snapshot)
|
m.scheduler.upsertAuth(snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReconcileRegistryModelStates aligns per-model runtime state with the current
|
||||||
|
// registry snapshot for one auth.
|
||||||
|
//
|
||||||
|
// Supported models are reset to a clean state because re-registration already
|
||||||
|
// cleared the registry-side cooldown/suspension snapshot. ModelStates for
|
||||||
|
// models that are no longer present in the registry are pruned entirely so
|
||||||
|
// renamed/removed models cannot keep auth-level status stale.
|
||||||
|
func (m *Manager) ReconcileRegistryModelStates(ctx context.Context, authID string) {
|
||||||
|
if m == nil || authID == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
supportedModels := registry.GetGlobalRegistry().GetModelsForClient(authID)
|
||||||
|
supported := make(map[string]struct{}, len(supportedModels))
|
||||||
|
for _, model := range supportedModels {
|
||||||
|
if model == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
modelKey := canonicalModelKey(model.ID)
|
||||||
|
if modelKey == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
supported[modelKey] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var snapshot *Auth
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
auth, ok := m.auths[authID]
|
||||||
|
if ok && auth != nil && len(auth.ModelStates) > 0 {
|
||||||
|
changed := false
|
||||||
|
for modelKey, state := range auth.ModelStates {
|
||||||
|
baseModel := canonicalModelKey(modelKey)
|
||||||
|
if baseModel == "" {
|
||||||
|
baseModel = strings.TrimSpace(modelKey)
|
||||||
|
}
|
||||||
|
if _, supportedModel := supported[baseModel]; !supportedModel {
|
||||||
|
// Drop state for models that disappeared from the current registry
|
||||||
|
// snapshot. Keeping them around leaks stale errors into auth-level
|
||||||
|
// status, management output, and websocket fallback checks.
|
||||||
|
delete(auth.ModelStates, modelKey)
|
||||||
|
changed = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if state == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if modelStateIsClean(state) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resetModelState(state, now)
|
||||||
|
changed = true
|
||||||
|
}
|
||||||
|
if len(auth.ModelStates) == 0 {
|
||||||
|
auth.ModelStates = nil
|
||||||
|
}
|
||||||
|
if changed {
|
||||||
|
updateAggregatedAvailability(auth, now)
|
||||||
|
if !hasModelError(auth, now) {
|
||||||
|
auth.LastError = nil
|
||||||
|
auth.StatusMessage = ""
|
||||||
|
auth.Status = StatusActive
|
||||||
|
}
|
||||||
|
auth.UpdatedAt = now
|
||||||
|
if errPersist := m.persist(ctx, auth); errPersist != nil {
|
||||||
|
logEntryWithRequestID(ctx).WithField("auth_id", auth.ID).Warnf("failed to persist auth changes during model state reconciliation: %v", errPersist)
|
||||||
|
}
|
||||||
|
snapshot = auth.Clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
if m.scheduler != nil && snapshot != nil {
|
||||||
|
m.scheduler.upsertAuth(snapshot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) SetSelector(selector Selector) {
|
func (m *Manager) SetSelector(selector Selector) {
|
||||||
if m == nil {
|
if m == nil {
|
||||||
return
|
return
|
||||||
@@ -1966,8 +2044,28 @@ func resetModelState(state *ModelState, now time.Time) {
|
|||||||
state.UpdatedAt = now
|
state.UpdatedAt = now
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func modelStateIsClean(state *ModelState) bool {
|
||||||
|
if state == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if state.Status != StatusActive {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if state.Unavailable || state.StatusMessage != "" || !state.NextRetryAfter.IsZero() || state.LastError != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if state.Quota.Exceeded || state.Quota.Reason != "" || !state.Quota.NextRecoverAt.IsZero() || state.Quota.BackoffLevel != 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
||||||
if auth == nil || len(auth.ModelStates) == 0 {
|
if auth == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(auth.ModelStates) == 0 {
|
||||||
|
clearAggregatedAvailability(auth)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
allUnavailable := true
|
allUnavailable := true
|
||||||
@@ -1975,10 +2073,12 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
quotaExceeded := false
|
quotaExceeded := false
|
||||||
quotaRecover := time.Time{}
|
quotaRecover := time.Time{}
|
||||||
maxBackoffLevel := 0
|
maxBackoffLevel := 0
|
||||||
|
hasState := false
|
||||||
for _, state := range auth.ModelStates {
|
for _, state := range auth.ModelStates {
|
||||||
if state == nil {
|
if state == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
hasState = true
|
||||||
stateUnavailable := false
|
stateUnavailable := false
|
||||||
if state.Status == StatusDisabled {
|
if state.Status == StatusDisabled {
|
||||||
stateUnavailable = true
|
stateUnavailable = true
|
||||||
@@ -2008,6 +2108,10 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if !hasState {
|
||||||
|
clearAggregatedAvailability(auth)
|
||||||
|
return
|
||||||
|
}
|
||||||
auth.Unavailable = allUnavailable
|
auth.Unavailable = allUnavailable
|
||||||
if allUnavailable {
|
if allUnavailable {
|
||||||
auth.NextRetryAfter = earliestRetry
|
auth.NextRetryAfter = earliestRetry
|
||||||
@@ -2027,6 +2131,15 @@ func updateAggregatedAvailability(auth *Auth, now time.Time) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clearAggregatedAvailability(auth *Auth) {
|
||||||
|
if auth == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
auth.Unavailable = false
|
||||||
|
auth.NextRetryAfter = time.Time{}
|
||||||
|
auth.Quota = QuotaState{}
|
||||||
|
}
|
||||||
|
|
||||||
func hasModelError(auth *Auth, now time.Time) bool {
|
func hasModelError(auth *Auth, now time.Time) bool {
|
||||||
if auth == nil || len(auth.ModelStates) == 0 {
|
if auth == nil || len(auth.ModelStates) == 0 {
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -312,6 +312,7 @@ func (s *Service) applyCoreAuthAddOrUpdate(ctx context.Context, auth *coreauth.A
|
|||||||
// This operation may block on network calls, but the auth configuration
|
// This operation may block on network calls, but the auth configuration
|
||||||
// is already effective at this point.
|
// is already effective at this point.
|
||||||
s.registerModelsForAuth(auth)
|
s.registerModelsForAuth(auth)
|
||||||
|
s.coreManager.ReconcileRegistryModelStates(ctx, auth.ID)
|
||||||
|
|
||||||
// Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt
|
// Refresh the scheduler entry so that the auth's supportedModelSet is rebuilt
|
||||||
// from the now-populated global model registry. Without this, newly added auths
|
// from the now-populated global model registry. Without this, newly added auths
|
||||||
@@ -1027,6 +1028,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
|
|||||||
s.ensureExecutorsForAuth(current)
|
s.ensureExecutorsForAuth(current)
|
||||||
}
|
}
|
||||||
s.registerModelsForAuth(current)
|
s.registerModelsForAuth(current)
|
||||||
|
s.coreManager.ReconcileRegistryModelStates(context.Background(), current.ID)
|
||||||
|
|
||||||
latest, ok := s.latestAuthForModelRegistration(current.ID)
|
latest, ok := s.latestAuthForModelRegistration(current.ID)
|
||||||
if !ok || latest.Disabled {
|
if !ok || latest.Disabled {
|
||||||
@@ -1040,6 +1042,7 @@ func (s *Service) refreshModelRegistrationForAuth(current *coreauth.Auth) bool {
|
|||||||
// no auth fields changed, but keeps the refresh path simple and correct.
|
// no auth fields changed, but keeps the refresh path simple and correct.
|
||||||
s.ensureExecutorsForAuth(latest)
|
s.ensureExecutorsForAuth(latest)
|
||||||
s.registerModelsForAuth(latest)
|
s.registerModelsForAuth(latest)
|
||||||
|
s.coreManager.ReconcileRegistryModelStates(context.Background(), latest.ID)
|
||||||
s.coreManager.RefreshSchedulerEntry(current.ID)
|
s.coreManager.RefreshSchedulerEntry(current.ID)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user