feat(redis): implement Pub/Sub support for usage tracking
- Added Redis Pub/Sub capability to broadcast usage updates to subscribed clients. - Enhanced `redisqueue` with subscriber management and message broadcasting. - Updated tests to validate Pub/Sub message handling, subscription behavior, and fallback to the queue after unsubscribing. - Integrated `project_id` parsing into auth-files logic to include project identifiers in metadata.
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
||||
const (
|
||||
defaultRetentionSeconds int64 = 60
|
||||
maxRetentionSeconds int64 = 3600
|
||||
usageSubscriberBuffer = 256
|
||||
)
|
||||
|
||||
type queueItem struct {
|
||||
@@ -17,9 +18,11 @@ type queueItem struct {
|
||||
}
|
||||
|
||||
type queue struct {
|
||||
mu sync.Mutex
|
||||
items []queueItem
|
||||
head int
|
||||
mu sync.Mutex
|
||||
items []queueItem
|
||||
head int
|
||||
subscribers map[uint64]chan []byte
|
||||
nextSubscriberID uint64
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -60,6 +63,9 @@ func Enqueue(payload []byte) {
|
||||
if len(payload) == 0 {
|
||||
return
|
||||
}
|
||||
if global.publishToSubscribers(payload) {
|
||||
return
|
||||
}
|
||||
global.enqueue(payload)
|
||||
}
|
||||
|
||||
@@ -73,11 +79,25 @@ func PopOldest(count int) [][]byte {
|
||||
return global.popOldest(count)
|
||||
}
|
||||
|
||||
func SubscribeUsage() (<-chan []byte, func()) {
|
||||
return global.subscribeUsage()
|
||||
}
|
||||
|
||||
func (q *queue) clear() {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
subscribers := make([]chan []byte, 0, len(q.subscribers))
|
||||
for _, subscriber := range q.subscribers {
|
||||
subscribers = append(subscribers, subscriber)
|
||||
}
|
||||
q.items = nil
|
||||
q.head = 0
|
||||
q.subscribers = nil
|
||||
q.mu.Unlock()
|
||||
|
||||
for _, subscriber := range subscribers {
|
||||
close(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) enqueue(payload []byte) {
|
||||
@@ -94,6 +114,61 @@ func (q *queue) enqueue(payload []byte) {
|
||||
q.maybeCompactLocked()
|
||||
}
|
||||
|
||||
func (q *queue) publishToSubscribers(payload []byte) bool {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
|
||||
if len(q.subscribers) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for id, subscriber := range q.subscribers {
|
||||
cloned := append([]byte(nil), payload...)
|
||||
select {
|
||||
case subscriber <- cloned:
|
||||
default:
|
||||
delete(q.subscribers, id)
|
||||
close(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (q *queue) subscribeUsage() (<-chan []byte, func()) {
|
||||
subscriber := make(chan []byte, usageSubscriberBuffer)
|
||||
|
||||
q.mu.Lock()
|
||||
if q.subscribers == nil {
|
||||
q.subscribers = make(map[uint64]chan []byte)
|
||||
}
|
||||
q.nextSubscriberID++
|
||||
id := q.nextSubscriberID
|
||||
q.subscribers[id] = subscriber
|
||||
q.mu.Unlock()
|
||||
|
||||
var once sync.Once
|
||||
unsubscribe := func() {
|
||||
once.Do(func() {
|
||||
q.unsubscribeUsage(id)
|
||||
})
|
||||
}
|
||||
return subscriber, unsubscribe
|
||||
}
|
||||
|
||||
func (q *queue) unsubscribeUsage(id uint64) {
|
||||
q.mu.Lock()
|
||||
subscriber, ok := q.subscribers[id]
|
||||
if ok {
|
||||
delete(q.subscribers, id)
|
||||
}
|
||||
q.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
close(subscriber)
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queue) popOldest(count int) [][]byte {
|
||||
now := time.Now()
|
||||
|
||||
|
||||
@@ -0,0 +1,67 @@
|
||||
package redisqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEnqueueBroadcastsToUsageSubscribersAndSkipsQueue(t *testing.T) {
|
||||
withEnabledQueue(t, func() {
|
||||
first, unsubscribeFirst := SubscribeUsage()
|
||||
defer unsubscribeFirst()
|
||||
second, unsubscribeSecond := SubscribeUsage()
|
||||
defer unsubscribeSecond()
|
||||
|
||||
Enqueue([]byte("usage-record"))
|
||||
|
||||
requireUsageSubscriberPayload(t, first, "usage-record")
|
||||
requireUsageSubscriberPayload(t, second, "usage-record")
|
||||
|
||||
if items := PopOldest(1); len(items) != 0 {
|
||||
t.Fatalf("PopOldest() items = %q, want empty after subscriber broadcast", items)
|
||||
}
|
||||
|
||||
unsubscribeFirst()
|
||||
unsubscribeSecond()
|
||||
|
||||
Enqueue([]byte("queued-record"))
|
||||
items := PopOldest(1)
|
||||
if len(items) != 1 || string(items[0]) != "queued-record" {
|
||||
t.Fatalf("PopOldest() items = %q, want queued record after unsubscribe", items)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestSetEnabledFalseClosesUsageSubscribers(t *testing.T) {
|
||||
withEnabledQueue(t, func() {
|
||||
subscriber, unsubscribe := SubscribeUsage()
|
||||
defer unsubscribe()
|
||||
|
||||
SetEnabled(false)
|
||||
|
||||
select {
|
||||
case _, ok := <-subscriber:
|
||||
if ok {
|
||||
t.Fatalf("subscriber channel remained open after SetEnabled(false)")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout waiting for subscriber close")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func requireUsageSubscriberPayload(t *testing.T, subscriber <-chan []byte, want string) {
|
||||
t.Helper()
|
||||
|
||||
select {
|
||||
case got, ok := <-subscriber:
|
||||
if !ok {
|
||||
t.Fatalf("subscriber closed before receiving %q", want)
|
||||
}
|
||||
if string(got) != want {
|
||||
t.Fatalf("subscriber payload = %q, want %q", string(got), want)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("timeout waiting for subscriber payload %q", want)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user