feat(redis): implement Pub/Sub support for usage tracking
- Added Redis Pub/Sub capability to broadcast usage updates to subscribed clients. - Enhanced `redisqueue` with subscriber management and message broadcasting. - Updated tests to validate Pub/Sub message handling, subscription behavior, and fallback to the queue after unsubscribing. - Integrated `project_id` parsing into auth-files logic to include project identifiers in metadata. Closes: #3027
This commit is contained in:
@@ -1919,7 +1919,7 @@ func (h *Handler) RequestCodexToken(c *gin.Context) {
|
|||||||
bundle, errExchange := openaiAuth.ExchangeCodeForTokens(ctx, code, pkceCodes)
|
bundle, errExchange := openaiAuth.ExchangeCodeForTokens(ctx, code, pkceCodes)
|
||||||
if errExchange != nil {
|
if errExchange != nil {
|
||||||
authErr := codex.NewAuthenticationError(codex.ErrCodeExchangeFailed, errExchange)
|
authErr := codex.NewAuthenticationError(codex.ErrCodeExchangeFailed, errExchange)
|
||||||
SetOAuthSessionError(state, "Failed to exchange authorization code for tokens")
|
SetOAuthSessionError(state, oauthSessionErrorWithCause("Failed to exchange authorization code for tokens", errExchange))
|
||||||
log.Errorf("Failed to exchange authorization code for tokens: %v", authErr)
|
log.Errorf("Failed to exchange authorization code for tokens: %v", authErr)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ func (h *Handler) PostOAuthCallback(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if sessionStatus != "" {
|
if sessionStatus != "" {
|
||||||
c.JSON(http.StatusConflict, gin.H{"status": "error", "error": "oauth flow is not pending"})
|
c.JSON(http.StatusConflict, gin.H{"status": "error", "error": sessionStatus})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !strings.EqualFold(sessionProvider, canonicalProvider) {
|
if !strings.EqualFold(sessionProvider, canonicalProvider) {
|
||||||
@@ -89,6 +89,11 @@ func (h *Handler) PostOAuthCallback(c *gin.Context) {
|
|||||||
|
|
||||||
if _, errWrite := WriteOAuthCallbackFileForPendingSession(h.cfg.AuthDir, canonicalProvider, state, code, errMsg); errWrite != nil {
|
if _, errWrite := WriteOAuthCallbackFileForPendingSession(h.cfg.AuthDir, canonicalProvider, state, code, errMsg); errWrite != nil {
|
||||||
if errors.Is(errWrite, errOAuthSessionNotPending) {
|
if errors.Is(errWrite, errOAuthSessionNotPending) {
|
||||||
|
_, status, okSession := GetOAuthSession(state)
|
||||||
|
if okSession && status != "" {
|
||||||
|
c.JSON(http.StatusConflict, gin.H{"status": "error", "error": status})
|
||||||
|
return
|
||||||
|
}
|
||||||
c.JSON(http.StatusConflict, gin.H{"status": "error", "error": "oauth flow is not pending"})
|
c.JSON(http.StatusConflict, gin.H{"status": "error", "error": "oauth flow is not pending"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -190,6 +190,21 @@ func IsOAuthSessionPending(state, provider string) bool {
|
|||||||
return oauthSessions.IsPending(state, provider)
|
return oauthSessions.IsPending(state, provider)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func oauthSessionErrorWithCause(message string, cause error) string {
|
||||||
|
message = strings.TrimSpace(message)
|
||||||
|
if message == "" {
|
||||||
|
message = "Authentication failed"
|
||||||
|
}
|
||||||
|
if cause == nil {
|
||||||
|
return message
|
||||||
|
}
|
||||||
|
detail := strings.TrimSpace(cause.Error())
|
||||||
|
if detail == "" {
|
||||||
|
return message
|
||||||
|
}
|
||||||
|
return message + ": " + detail
|
||||||
|
}
|
||||||
|
|
||||||
func ValidateOAuthState(state string) error {
|
func ValidateOAuthState(state string) error {
|
||||||
trimmed := strings.TrimSpace(state)
|
trimmed := strings.TrimSpace(state)
|
||||||
if trimmed == "" {
|
if trimmed == "" {
|
||||||
|
|||||||
Reference in New Issue
Block a user