Merge pull request #1874 from constansino/fix/watcher-auth-event-storm-debounce
fix(watcher): 合并 auth 事件风暴下的回调触发,降低高 CPU
This commit is contained in:
@@ -183,7 +183,7 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
|||||||
|
|
||||||
if w.reloadCallback != nil {
|
if w.reloadCallback != nil {
|
||||||
log.Debugf("triggering server update callback after add/update")
|
log.Debugf("triggering server update callback after add/update")
|
||||||
w.reloadCallback(cfg)
|
w.triggerServerUpdate(cfg)
|
||||||
}
|
}
|
||||||
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||||
}
|
}
|
||||||
@@ -202,7 +202,7 @@ func (w *Watcher) removeClient(path string) {
|
|||||||
|
|
||||||
if w.reloadCallback != nil {
|
if w.reloadCallback != nil {
|
||||||
log.Debugf("triggering server update callback after removal")
|
log.Debugf("triggering server update callback after removal")
|
||||||
w.reloadCallback(cfg)
|
w.triggerServerUpdate(cfg)
|
||||||
}
|
}
|
||||||
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||||
}
|
}
|
||||||
@@ -303,3 +303,67 @@ func (w *Watcher) persistAuthAsync(message string, paths ...string) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) stopServerUpdateTimer() {
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
defer w.serverUpdateMu.Unlock()
|
||||||
|
if w.serverUpdateTimer != nil {
|
||||||
|
w.serverUpdateTimer.Stop()
|
||||||
|
w.serverUpdateTimer = nil
|
||||||
|
}
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Watcher) triggerServerUpdate(cfg *config.Config) {
|
||||||
|
if w == nil || w.reloadCallback == nil || cfg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if w.stopped.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
if w.serverUpdateLast.IsZero() || now.Sub(w.serverUpdateLast) >= serverUpdateDebounce {
|
||||||
|
w.serverUpdateLast = now
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
w.reloadCallback(cfg)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if w.serverUpdatePend {
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := serverUpdateDebounce - now.Sub(w.serverUpdateLast)
|
||||||
|
if delay < 10*time.Millisecond {
|
||||||
|
delay = 10 * time.Millisecond
|
||||||
|
}
|
||||||
|
w.serverUpdatePend = true
|
||||||
|
if w.serverUpdateTimer != nil {
|
||||||
|
w.serverUpdateTimer.Stop()
|
||||||
|
}
|
||||||
|
w.serverUpdateTimer = time.AfterFunc(delay, func() {
|
||||||
|
if w.stopped.Load() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.clientsMutex.RLock()
|
||||||
|
latestCfg := w.config
|
||||||
|
w.clientsMutex.RUnlock()
|
||||||
|
if latestCfg == nil || w.reloadCallback == nil || w.stopped.Load() {
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.serverUpdateMu.Lock()
|
||||||
|
w.serverUpdateLast = time.Now()
|
||||||
|
w.serverUpdatePend = false
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
w.reloadCallback(latestCfg)
|
||||||
|
})
|
||||||
|
w.serverUpdateMu.Unlock()
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/fsnotify/fsnotify"
|
"github.com/fsnotify/fsnotify"
|
||||||
@@ -35,6 +36,11 @@ type Watcher struct {
|
|||||||
clientsMutex sync.RWMutex
|
clientsMutex sync.RWMutex
|
||||||
configReloadMu sync.Mutex
|
configReloadMu sync.Mutex
|
||||||
configReloadTimer *time.Timer
|
configReloadTimer *time.Timer
|
||||||
|
serverUpdateMu sync.Mutex
|
||||||
|
serverUpdateTimer *time.Timer
|
||||||
|
serverUpdateLast time.Time
|
||||||
|
serverUpdatePend bool
|
||||||
|
stopped atomic.Bool
|
||||||
reloadCallback func(*config.Config)
|
reloadCallback func(*config.Config)
|
||||||
watcher *fsnotify.Watcher
|
watcher *fsnotify.Watcher
|
||||||
lastAuthHashes map[string]string
|
lastAuthHashes map[string]string
|
||||||
@@ -76,6 +82,7 @@ const (
|
|||||||
replaceCheckDelay = 50 * time.Millisecond
|
replaceCheckDelay = 50 * time.Millisecond
|
||||||
configReloadDebounce = 150 * time.Millisecond
|
configReloadDebounce = 150 * time.Millisecond
|
||||||
authRemoveDebounceWindow = 1 * time.Second
|
authRemoveDebounceWindow = 1 * time.Second
|
||||||
|
serverUpdateDebounce = 1 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewWatcher creates a new file watcher instance
|
// NewWatcher creates a new file watcher instance
|
||||||
@@ -114,8 +121,10 @@ func (w *Watcher) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
// Stop stops the file watcher
|
// Stop stops the file watcher
|
||||||
func (w *Watcher) Stop() error {
|
func (w *Watcher) Stop() error {
|
||||||
|
w.stopped.Store(true)
|
||||||
w.stopDispatch()
|
w.stopDispatch()
|
||||||
w.stopConfigReloadTimer()
|
w.stopConfigReloadTimer()
|
||||||
|
w.stopServerUpdateTimer()
|
||||||
return w.watcher.Close()
|
return w.watcher.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user