Merge pull request #1686 from lyd123qw2008/fix/auth-refresh-concurrency-limit
fix(auth): limit auto-refresh concurrency to prevent refresh storms
This commit is contained in:
@@ -60,6 +60,7 @@ type RefreshEvaluator interface {
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
refreshCheckInterval = 5 * time.Second
|
refreshCheckInterval = 5 * time.Second
|
||||||
|
refreshMaxConcurrency = 16
|
||||||
refreshPendingBackoff = time.Minute
|
refreshPendingBackoff = time.Minute
|
||||||
refreshFailureBackoff = 5 * time.Minute
|
refreshFailureBackoff = 5 * time.Minute
|
||||||
quotaBackoffBase = time.Second
|
quotaBackoffBase = time.Second
|
||||||
@@ -155,7 +156,8 @@ type Manager struct {
|
|||||||
rtProvider RoundTripperProvider
|
rtProvider RoundTripperProvider
|
||||||
|
|
||||||
// Auto refresh state
|
// Auto refresh state
|
||||||
refreshCancel context.CancelFunc
|
refreshCancel context.CancelFunc
|
||||||
|
refreshSemaphore chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewManager constructs a manager with optional custom selector and hook.
|
// NewManager constructs a manager with optional custom selector and hook.
|
||||||
@@ -173,6 +175,7 @@ func NewManager(store Store, selector Selector, hook Hook) *Manager {
|
|||||||
hook: hook,
|
hook: hook,
|
||||||
auths: make(map[string]*Auth),
|
auths: make(map[string]*Auth),
|
||||||
providerOffsets: make(map[string]int),
|
providerOffsets: make(map[string]int),
|
||||||
|
refreshSemaphore: make(chan struct{}, refreshMaxConcurrency),
|
||||||
}
|
}
|
||||||
// atomic.Value requires non-nil initial value.
|
// atomic.Value requires non-nil initial value.
|
||||||
manager.runtimeConfig.Store(&internalconfig.Config{})
|
manager.runtimeConfig.Store(&internalconfig.Config{})
|
||||||
@@ -1878,11 +1881,25 @@ func (m *Manager) checkRefreshes(ctx context.Context) {
|
|||||||
if !m.markRefreshPending(a.ID, now) {
|
if !m.markRefreshPending(a.ID, now) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go m.refreshAuth(ctx, a.ID)
|
go m.refreshAuthWithLimit(ctx, a.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) refreshAuthWithLimit(ctx context.Context, id string) {
|
||||||
|
if m.refreshSemaphore == nil {
|
||||||
|
m.refreshAuth(ctx, id)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case m.refreshSemaphore <- struct{}{}:
|
||||||
|
defer func() { <-m.refreshSemaphore }()
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.refreshAuth(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) snapshotAuths() []*Auth {
|
func (m *Manager) snapshotAuths() []*Auth {
|
||||||
m.mu.RLock()
|
m.mu.RLock()
|
||||||
defer m.mu.RUnlock()
|
defer m.mu.RUnlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user