Merge origin/dev into pr-1774-review and resolve watcher conflicts

This commit is contained in:
Luis Pater
2026-03-07 11:12:42 +08:00
76 changed files with 4061 additions and 1142 deletions
+76
View File
@@ -369,3 +369,79 @@ func (w *Watcher) persistAuthAsync(message string, paths ...string) {
}
}()
}
func (w *Watcher) stopServerUpdateTimer() {
w.serverUpdateMu.Lock()
defer w.serverUpdateMu.Unlock()
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
w.serverUpdatePend = false
}
func (w *Watcher) triggerServerUpdate(cfg *config.Config) {
if w == nil || w.reloadCallback == nil || cfg == nil {
return
}
if w.stopped.Load() {
return
}
now := time.Now()
w.serverUpdateMu.Lock()
if w.serverUpdateLast.IsZero() || now.Sub(w.serverUpdateLast) >= serverUpdateDebounce {
w.serverUpdateLast = now
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
w.serverUpdatePend = false
w.serverUpdateMu.Unlock()
w.reloadCallback(cfg)
return
}
if w.serverUpdatePend {
w.serverUpdateMu.Unlock()
return
}
delay := serverUpdateDebounce - now.Sub(w.serverUpdateLast)
if delay < 10*time.Millisecond {
delay = 10 * time.Millisecond
}
w.serverUpdatePend = true
if w.serverUpdateTimer != nil {
w.serverUpdateTimer.Stop()
w.serverUpdateTimer = nil
}
var timer *time.Timer
timer = time.AfterFunc(delay, func() {
if w.stopped.Load() {
return
}
w.clientsMutex.RLock()
latestCfg := w.config
w.clientsMutex.RUnlock()
w.serverUpdateMu.Lock()
if w.serverUpdateTimer != timer || !w.serverUpdatePend {
w.serverUpdateMu.Unlock()
return
}
w.serverUpdateTimer = nil
w.serverUpdatePend = false
if latestCfg == nil || w.reloadCallback == nil || w.stopped.Load() {
w.serverUpdateMu.Unlock()
return
}
w.serverUpdateLast = time.Now()
w.serverUpdateMu.Unlock()
w.reloadCallback(latestCfg)
})
w.serverUpdateTimer = timer
w.serverUpdateMu.Unlock()
}
+5
View File
@@ -304,6 +304,11 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string {
if oldModels.hash != newModels.hash {
changes = append(changes, fmt.Sprintf("vertex[%d].models: updated (%d -> %d entries)", i, oldModels.count, newModels.count))
}
oldExcluded := SummarizeExcludedModels(o.ExcludedModels)
newExcluded := SummarizeExcludedModels(n.ExcludedModels)
if oldExcluded.hash != newExcluded.hash {
changes = append(changes, fmt.Sprintf("vertex[%d].excluded-models: updated (%d -> %d entries)", i, oldExcluded.count, newExcluded.count))
}
if !equalStringMap(o.Headers, n.Headers) {
changes = append(changes, fmt.Sprintf("vertex[%d].headers: updated", i))
}
+1 -1
View File
@@ -315,7 +315,7 @@ func (s *ConfigSynthesizer) synthesizeVertexCompat(ctx *SynthesisContext) []*cor
CreatedAt: now,
UpdatedAt: now,
}
ApplyAuthExcludedModelsMeta(a, cfg, nil, "apikey")
ApplyAuthExcludedModelsMeta(a, cfg, compat.ExcludedModels, "apikey")
out = append(out, a)
}
return out
+4
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
@@ -92,6 +93,9 @@ func synthesizeFileAuths(ctx *SynthesisContext, fullPath string, data []byte) []
id = rel
}
}
if runtime.GOOS == "windows" {
id = strings.ToLower(id)
}
proxyURL := ""
if p, ok := metadata["proxy_url"].(string); ok {
+9
View File
@@ -6,6 +6,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
@@ -35,6 +36,11 @@ type Watcher struct {
clientsMutex sync.RWMutex
configReloadMu sync.Mutex
configReloadTimer *time.Timer
serverUpdateMu sync.Mutex
serverUpdateTimer *time.Timer
serverUpdateLast time.Time
serverUpdatePend bool
stopped atomic.Bool
reloadCallback func(*config.Config)
watcher *fsnotify.Watcher
lastAuthHashes map[string]string
@@ -77,6 +83,7 @@ const (
replaceCheckDelay = 50 * time.Millisecond
configReloadDebounce = 150 * time.Millisecond
authRemoveDebounceWindow = 1 * time.Second
serverUpdateDebounce = 1 * time.Second
)
// NewWatcher creates a new file watcher instance
@@ -116,8 +123,10 @@ func (w *Watcher) Start(ctx context.Context) error {
// Stop stops the file watcher
func (w *Watcher) Stop() error {
w.stopped.Store(true)
w.stopDispatch()
w.stopConfigReloadTimer()
w.stopServerUpdateTimer()
return w.watcher.Close()
}
+40
View File
@@ -543,6 +543,46 @@ func TestAuthSliceToMap(t *testing.T) {
}
}
func TestTriggerServerUpdateCancelsPendingTimerOnImmediate(t *testing.T) {
tmpDir := t.TempDir()
cfg := &config.Config{AuthDir: tmpDir}
var reloads int32
w := &Watcher{
reloadCallback: func(*config.Config) {
atomic.AddInt32(&reloads, 1)
},
}
w.SetConfig(cfg)
w.serverUpdateMu.Lock()
w.serverUpdateLast = time.Now().Add(-(serverUpdateDebounce - 100*time.Millisecond))
w.serverUpdateMu.Unlock()
w.triggerServerUpdate(cfg)
if got := atomic.LoadInt32(&reloads); got != 0 {
t.Fatalf("expected no immediate reload, got %d", got)
}
w.serverUpdateMu.Lock()
if !w.serverUpdatePend || w.serverUpdateTimer == nil {
w.serverUpdateMu.Unlock()
t.Fatal("expected a pending server update timer")
}
w.serverUpdateLast = time.Now().Add(-(serverUpdateDebounce + 10*time.Millisecond))
w.serverUpdateMu.Unlock()
w.triggerServerUpdate(cfg)
if got := atomic.LoadInt32(&reloads); got != 1 {
t.Fatalf("expected immediate reload once, got %d", got)
}
time.Sleep(250 * time.Millisecond)
if got := atomic.LoadInt32(&reloads); got != 1 {
t.Fatalf("expected pending timer to be cancelled, got %d reloads", got)
}
}
func TestShouldDebounceRemove(t *testing.T) {
w := &Watcher{}
path := filepath.Clean("test.json")