feat: add configurable retention period for Redis usage queue
- Introduced `redis-usage-queue-retention-seconds` config parameter with a default of 60 seconds and a max of 3600 seconds. - Updated logic in `redisqueue` to honor configurable retention periods for enqueued usage data. - Modified config validation and initialization to support and enforce retention limits. - Enhanced change tracking in `config_diff` to detect updates to this parameter.
This commit is contained in:
@@ -418,6 +418,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled)
|
redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled)
|
||||||
|
redisqueue.SetRetentionSeconds(cfg.RedisUsageQueueRetentionSeconds)
|
||||||
coreauth.SetQuotaCooldownDisabled(cfg.DisableCooling)
|
coreauth.SetQuotaCooldownDisabled(cfg.DisableCooling)
|
||||||
|
|
||||||
if err = logging.ConfigureLogOutput(cfg); err != nil {
|
if err = logging.ConfigureLogOutput(cfg); err != nil {
|
||||||
|
|||||||
@@ -66,6 +66,10 @@ error-logs-max-files: 10
|
|||||||
# When false, disable in-memory usage statistics aggregation
|
# When false, disable in-memory usage statistics aggregation
|
||||||
usage-statistics-enabled: false
|
usage-statistics-enabled: false
|
||||||
|
|
||||||
|
# How long (in seconds) Redis usage queue items are retained in memory for the RESP interface (LPOP/RPOP).
|
||||||
|
# Default: 60. Max: 3600.
|
||||||
|
redis-usage-queue-retention-seconds: 60
|
||||||
|
|
||||||
# Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/
|
# Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/
|
||||||
# Per-entry proxy-url also supports "direct" or "none" to bypass both the global proxy-url and environment proxies explicitly.
|
# Per-entry proxy-url also supports "direct" or "none" to bypass both the global proxy-url and environment proxies explicitly.
|
||||||
proxy-url: ""
|
proxy-url: ""
|
||||||
|
|||||||
@@ -1000,6 +1000,10 @@ func (s *Server) UpdateClients(cfg *config.Config) {
|
|||||||
redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled)
|
redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if oldCfg == nil || oldCfg.RedisUsageQueueRetentionSeconds != cfg.RedisUsageQueueRetentionSeconds {
|
||||||
|
redisqueue.SetRetentionSeconds(cfg.RedisUsageQueueRetentionSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
if s.requestLogger != nil && (oldCfg == nil || oldCfg.ErrorLogsMaxFiles != cfg.ErrorLogsMaxFiles) {
|
if s.requestLogger != nil && (oldCfg == nil || oldCfg.ErrorLogsMaxFiles != cfg.ErrorLogsMaxFiles) {
|
||||||
if setter, ok := s.requestLogger.(interface{ SetErrorLogsMaxFiles(int) }); ok {
|
if setter, ok := s.requestLogger.(interface{ SetErrorLogsMaxFiles(int) }); ok {
|
||||||
setter.SetErrorLogsMaxFiles(cfg.ErrorLogsMaxFiles)
|
setter.SetErrorLogsMaxFiles(cfg.ErrorLogsMaxFiles)
|
||||||
|
|||||||
@@ -65,6 +65,11 @@ type Config struct {
|
|||||||
// UsageStatisticsEnabled toggles in-memory usage aggregation; when false, usage data is discarded.
|
// UsageStatisticsEnabled toggles in-memory usage aggregation; when false, usage data is discarded.
|
||||||
UsageStatisticsEnabled bool `yaml:"usage-statistics-enabled" json:"usage-statistics-enabled"`
|
UsageStatisticsEnabled bool `yaml:"usage-statistics-enabled" json:"usage-statistics-enabled"`
|
||||||
|
|
||||||
|
// RedisUsageQueueRetentionSeconds controls how long (in seconds) usage queue items
|
||||||
|
// are retained in memory for the Redis RESP interface (LPOP/RPOP).
|
||||||
|
// Default: 60. Max: 3600.
|
||||||
|
RedisUsageQueueRetentionSeconds int `yaml:"redis-usage-queue-retention-seconds" json:"redis-usage-queue-retention-seconds"`
|
||||||
|
|
||||||
// DisableCooling disables quota cooldown scheduling when true.
|
// DisableCooling disables quota cooldown scheduling when true.
|
||||||
DisableCooling bool `yaml:"disable-cooling" json:"disable-cooling"`
|
DisableCooling bool `yaml:"disable-cooling" json:"disable-cooling"`
|
||||||
|
|
||||||
@@ -609,6 +614,7 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) {
|
|||||||
cfg.LogsMaxTotalSizeMB = 0
|
cfg.LogsMaxTotalSizeMB = 0
|
||||||
cfg.ErrorLogsMaxFiles = 10
|
cfg.ErrorLogsMaxFiles = 10
|
||||||
cfg.UsageStatisticsEnabled = false
|
cfg.UsageStatisticsEnabled = false
|
||||||
|
cfg.RedisUsageQueueRetentionSeconds = 60
|
||||||
cfg.DisableCooling = false
|
cfg.DisableCooling = false
|
||||||
cfg.DisableImageGeneration = DisableImageGenerationOff
|
cfg.DisableImageGeneration = DisableImageGenerationOff
|
||||||
cfg.Pprof.Enable = false
|
cfg.Pprof.Enable = false
|
||||||
@@ -671,6 +677,13 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) {
|
|||||||
cfg.ErrorLogsMaxFiles = 10
|
cfg.ErrorLogsMaxFiles = 10
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.RedisUsageQueueRetentionSeconds <= 0 {
|
||||||
|
cfg.RedisUsageQueueRetentionSeconds = 60
|
||||||
|
} else if cfg.RedisUsageQueueRetentionSeconds > 3600 {
|
||||||
|
log.WithField("value", cfg.RedisUsageQueueRetentionSeconds).Warn("redis-usage-queue-retention-seconds too large; clamping to 3600")
|
||||||
|
cfg.RedisUsageQueueRetentionSeconds = 3600
|
||||||
|
}
|
||||||
|
|
||||||
if cfg.MaxRetryCredentials < 0 {
|
if cfg.MaxRetryCredentials < 0 {
|
||||||
cfg.MaxRetryCredentials = 0
|
cfg.MaxRetryCredentials = 0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
const retentionWindow = time.Minute
|
const (
|
||||||
|
defaultRetentionSeconds int64 = 60
|
||||||
|
maxRetentionSeconds int64 = 3600
|
||||||
|
)
|
||||||
|
|
||||||
type queueItem struct {
|
type queueItem struct {
|
||||||
enqueuedAt time.Time
|
enqueuedAt time.Time
|
||||||
@@ -20,10 +23,15 @@ type queue struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
enabled atomic.Bool
|
enabled atomic.Bool
|
||||||
global queue
|
retentionSeconds atomic.Int64
|
||||||
|
global queue
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
retentionSeconds.Store(defaultRetentionSeconds)
|
||||||
|
}
|
||||||
|
|
||||||
func SetEnabled(value bool) {
|
func SetEnabled(value bool) {
|
||||||
enabled.Store(value)
|
enabled.Store(value)
|
||||||
if !value {
|
if !value {
|
||||||
@@ -35,6 +43,16 @@ func Enabled() bool {
|
|||||||
return enabled.Load()
|
return enabled.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SetRetentionSeconds(value int) {
|
||||||
|
normalized := int64(value)
|
||||||
|
if normalized <= 0 {
|
||||||
|
normalized = defaultRetentionSeconds
|
||||||
|
} else if normalized > maxRetentionSeconds {
|
||||||
|
normalized = maxRetentionSeconds
|
||||||
|
}
|
||||||
|
retentionSeconds.Store(normalized)
|
||||||
|
}
|
||||||
|
|
||||||
func Enqueue(payload []byte) {
|
func Enqueue(payload []byte) {
|
||||||
if !Enabled() {
|
if !Enabled() {
|
||||||
return
|
return
|
||||||
@@ -110,7 +128,11 @@ func (q *queue) pruneLocked(now time.Time) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
cutoff := now.Add(-retentionWindow)
|
windowSeconds := retentionSeconds.Load()
|
||||||
|
if windowSeconds <= 0 {
|
||||||
|
windowSeconds = defaultRetentionSeconds
|
||||||
|
}
|
||||||
|
cutoff := now.Add(-time.Duration(windowSeconds) * time.Second)
|
||||||
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
|
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
|
||||||
q.head++
|
q.head++
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -39,6 +39,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string {
|
|||||||
if oldCfg.UsageStatisticsEnabled != newCfg.UsageStatisticsEnabled {
|
if oldCfg.UsageStatisticsEnabled != newCfg.UsageStatisticsEnabled {
|
||||||
changes = append(changes, fmt.Sprintf("usage-statistics-enabled: %t -> %t", oldCfg.UsageStatisticsEnabled, newCfg.UsageStatisticsEnabled))
|
changes = append(changes, fmt.Sprintf("usage-statistics-enabled: %t -> %t", oldCfg.UsageStatisticsEnabled, newCfg.UsageStatisticsEnabled))
|
||||||
}
|
}
|
||||||
|
if oldCfg.RedisUsageQueueRetentionSeconds != newCfg.RedisUsageQueueRetentionSeconds {
|
||||||
|
changes = append(changes, fmt.Sprintf("redis-usage-queue-retention-seconds: %d -> %d", oldCfg.RedisUsageQueueRetentionSeconds, newCfg.RedisUsageQueueRetentionSeconds))
|
||||||
|
}
|
||||||
if oldCfg.DisableCooling != newCfg.DisableCooling {
|
if oldCfg.DisableCooling != newCfg.DisableCooling {
|
||||||
changes = append(changes, fmt.Sprintf("disable-cooling: %t -> %t", oldCfg.DisableCooling, newCfg.DisableCooling))
|
changes = append(changes, fmt.Sprintf("disable-cooling: %t -> %t", oldCfg.DisableCooling, newCfg.DisableCooling))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user