refactor(watcher): make auth file events fully incremental
This commit is contained in:
+91
-19
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/config"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/util"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/diff"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/internal/watcher/synthesizer"
|
||||
coreauth "github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -75,6 +76,7 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
|
||||
|
||||
w.lastAuthHashes = make(map[string]string)
|
||||
w.lastAuthContents = make(map[string]*coreauth.Auth)
|
||||
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
|
||||
if resolvedAuthDir, errResolveAuthDir := util.ResolveAuthDir(cfg.AuthDir); errResolveAuthDir != nil {
|
||||
log.Errorf("failed to resolve auth directory for hash cache: %v", errResolveAuthDir)
|
||||
} else if resolvedAuthDir != "" {
|
||||
@@ -92,6 +94,24 @@ func (w *Watcher) reloadClients(rescanAuth bool, affectedOAuthProviders []string
|
||||
if errParse := json.Unmarshal(data, &auth); errParse == nil {
|
||||
w.lastAuthContents[normalizedPath] = &auth
|
||||
}
|
||||
ctx := &synthesizer.SynthesisContext{
|
||||
Config: cfg,
|
||||
AuthDir: resolvedAuthDir,
|
||||
Now: time.Now(),
|
||||
IDGenerator: synthesizer.NewStableIDGenerator(),
|
||||
}
|
||||
if generated := synthesizer.SynthesizeAuthFile(ctx, path, data); len(generated) > 0 {
|
||||
pathAuths := make(map[string]*coreauth.Auth, len(generated))
|
||||
for _, a := range generated {
|
||||
if a == nil || strings.TrimSpace(a.ID) == "" {
|
||||
continue
|
||||
}
|
||||
pathAuths[a.ID] = a.Clone()
|
||||
}
|
||||
if len(pathAuths) > 0 {
|
||||
w.fileAuthsByPath[normalizedPath] = pathAuths
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@@ -143,13 +163,14 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
||||
}
|
||||
|
||||
w.clientsMutex.Lock()
|
||||
|
||||
cfg := w.config
|
||||
if cfg == nil {
|
||||
if w.config == nil {
|
||||
log.Error("config is nil, cannot add or update client")
|
||||
w.clientsMutex.Unlock()
|
||||
return
|
||||
}
|
||||
if w.fileAuthsByPath == nil {
|
||||
w.fileAuthsByPath = make(map[string]map[string]*coreauth.Auth)
|
||||
}
|
||||
if prev, ok := w.lastAuthHashes[normalized]; ok && prev == curHash {
|
||||
log.Debugf("auth file unchanged (hash match), skipping reload: %s", filepath.Base(path))
|
||||
w.clientsMutex.Unlock()
|
||||
@@ -177,34 +198,85 @@ func (w *Watcher) addOrUpdateClient(path string) {
|
||||
}
|
||||
w.lastAuthContents[normalized] = &newAuth
|
||||
|
||||
w.clientsMutex.Unlock() // Unlock before the callback
|
||||
|
||||
w.refreshAuthState(false)
|
||||
|
||||
if w.reloadCallback != nil {
|
||||
log.Debugf("triggering server update callback after add/update")
|
||||
w.reloadCallback(cfg)
|
||||
oldByID := make(map[string]*coreauth.Auth)
|
||||
if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 {
|
||||
for id, a := range existing {
|
||||
oldByID[id] = a
|
||||
}
|
||||
}
|
||||
|
||||
// Build synthesized auth entries for this single file only.
|
||||
sctx := &synthesizer.SynthesisContext{
|
||||
Config: w.config,
|
||||
AuthDir: w.authDir,
|
||||
Now: time.Now(),
|
||||
IDGenerator: synthesizer.NewStableIDGenerator(),
|
||||
}
|
||||
generated := synthesizer.SynthesizeAuthFile(sctx, path, data)
|
||||
newByID := make(map[string]*coreauth.Auth)
|
||||
for _, a := range generated {
|
||||
if a == nil || strings.TrimSpace(a.ID) == "" {
|
||||
continue
|
||||
}
|
||||
newByID[a.ID] = a.Clone()
|
||||
}
|
||||
if len(newByID) > 0 {
|
||||
w.fileAuthsByPath[normalized] = newByID
|
||||
} else {
|
||||
delete(w.fileAuthsByPath, normalized)
|
||||
}
|
||||
updates := w.computePerPathUpdatesLocked(oldByID, newByID)
|
||||
w.clientsMutex.Unlock()
|
||||
|
||||
w.persistAuthAsync(fmt.Sprintf("Sync auth %s", filepath.Base(path)), path)
|
||||
w.dispatchAuthUpdates(updates)
|
||||
}
|
||||
|
||||
func (w *Watcher) removeClient(path string) {
|
||||
normalized := w.normalizeAuthPath(path)
|
||||
w.clientsMutex.Lock()
|
||||
|
||||
cfg := w.config
|
||||
oldByID := make(map[string]*coreauth.Auth)
|
||||
if existing := w.fileAuthsByPath[normalized]; len(existing) > 0 {
|
||||
for id, a := range existing {
|
||||
oldByID[id] = a
|
||||
}
|
||||
}
|
||||
delete(w.lastAuthHashes, normalized)
|
||||
delete(w.lastAuthContents, normalized)
|
||||
delete(w.fileAuthsByPath, normalized)
|
||||
|
||||
w.clientsMutex.Unlock() // Release the lock before the callback
|
||||
updates := w.computePerPathUpdatesLocked(oldByID, map[string]*coreauth.Auth{})
|
||||
w.clientsMutex.Unlock()
|
||||
|
||||
w.refreshAuthState(false)
|
||||
|
||||
if w.reloadCallback != nil {
|
||||
log.Debugf("triggering server update callback after removal")
|
||||
w.reloadCallback(cfg)
|
||||
}
|
||||
w.persistAuthAsync(fmt.Sprintf("Remove auth %s", filepath.Base(path)), path)
|
||||
w.dispatchAuthUpdates(updates)
|
||||
}
|
||||
|
||||
func (w *Watcher) computePerPathUpdatesLocked(oldByID, newByID map[string]*coreauth.Auth) []AuthUpdate {
|
||||
if w.currentAuths == nil {
|
||||
w.currentAuths = make(map[string]*coreauth.Auth)
|
||||
}
|
||||
updates := make([]AuthUpdate, 0, len(oldByID)+len(newByID))
|
||||
for id, newAuth := range newByID {
|
||||
existing, ok := w.currentAuths[id]
|
||||
if !ok {
|
||||
w.currentAuths[id] = newAuth.Clone()
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionAdd, ID: id, Auth: newAuth.Clone()})
|
||||
continue
|
||||
}
|
||||
if !authEqual(existing, newAuth) {
|
||||
w.currentAuths[id] = newAuth.Clone()
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionModify, ID: id, Auth: newAuth.Clone()})
|
||||
}
|
||||
}
|
||||
for id := range oldByID {
|
||||
if _, stillExists := newByID[id]; stillExists {
|
||||
continue
|
||||
}
|
||||
delete(w.currentAuths, id)
|
||||
updates = append(updates, AuthUpdate{Action: AuthUpdateActionDelete, ID: id})
|
||||
}
|
||||
return updates
|
||||
}
|
||||
|
||||
func (w *Watcher) loadFileClients(cfg *config.Config) int {
|
||||
|
||||
Reference in New Issue
Block a user