feat(client): add timeout handling for Redis operations and subscription failover
- Introduced `homeRedisOperationTimeout` and `homeSubscriptionReceiveTimeout` constants for configurable timeouts. - Enhanced Redis connection options with operation timeout settings and failover mechanisms. - Implemented subscription failover logic on heartbeat timeouts to improve resilience. - Updated message handling to support additional Redis event types, including Pong and Subscription.
This commit is contained in:
+60
-2
@@ -31,6 +31,8 @@ const (
|
|||||||
|
|
||||||
homeReconnectInterval = time.Second
|
homeReconnectInterval = time.Second
|
||||||
homeReconnectFailoverThreshold = 3
|
homeReconnectFailoverThreshold = 3
|
||||||
|
homeRedisOperationTimeout = 3 * time.Second
|
||||||
|
homeSubscriptionReceiveTimeout = 3 * time.Second
|
||||||
redisChannelCluster = "cluster"
|
redisChannelCluster = "cluster"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -180,6 +182,12 @@ func (c *Client) redisOptionsLocked(addr string) (*redis.Options, error) {
|
|||||||
Addr: addr,
|
Addr: addr,
|
||||||
Password: c.homeCfg.Password,
|
Password: c.homeCfg.Password,
|
||||||
TLSConfig: tlsConfig,
|
TLSConfig: tlsConfig,
|
||||||
|
DialTimeout: homeRedisOperationTimeout,
|
||||||
|
ReadTimeout: homeRedisOperationTimeout,
|
||||||
|
WriteTimeout: homeRedisOperationTimeout,
|
||||||
|
MaxRetries: -1,
|
||||||
|
DialerRetries: 1,
|
||||||
|
ContextTimeoutEnabled: true,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -429,6 +437,25 @@ func (c *Client) failoverAfterReconnectFailure() (bool, string) {
|
|||||||
}
|
}
|
||||||
c.reconnectFailures = 0
|
c.reconnectFailures = 0
|
||||||
|
|
||||||
|
return c.switchToNextNodeLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) failoverAfterSubscriptionTimeout() (bool, string) {
|
||||||
|
if c == nil {
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
if !c.clusterDiscoveryEnabledLocked() {
|
||||||
|
c.reconnectFailures = 0
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
c.reconnectFailures = 0
|
||||||
|
return c.switchToNextNodeLocked()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) switchToNextNodeLocked() (bool, string) {
|
||||||
currentHost := strings.TrimSpace(c.homeCfg.Host)
|
currentHost := strings.TrimSpace(c.homeCfg.Host)
|
||||||
currentPort := c.homeCfg.Port
|
currentPort := c.homeCfg.Port
|
||||||
candidates := append([]clusterNode(nil), c.clusterNodes...)
|
candidates := append([]clusterNode(nil), c.clusterNodes...)
|
||||||
@@ -451,6 +478,13 @@ func (c *Client) failoverAfterReconnectFailure() (bool, string) {
|
|||||||
return false, ""
|
return false, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) markSubscriptionTimeout() {
|
||||||
|
switched, addr := c.failoverAfterSubscriptionTimeout()
|
||||||
|
if switched {
|
||||||
|
log.Warnf("home subscription heartbeat timeout; switching to %s", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) resetReconnectFailures() {
|
func (c *Client) resetReconnectFailures() {
|
||||||
if c == nil {
|
if c == nil {
|
||||||
return
|
return
|
||||||
@@ -708,7 +742,7 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Ensure the subscription is established before marking heartbeat OK.
|
// Ensure the subscription is established before marking heartbeat OK.
|
||||||
if _, errReceive := pubsub.Receive(ctx); errReceive != nil {
|
if _, errReceive := pubsub.ReceiveTimeout(ctx, homeSubscriptionReceiveTimeout); errReceive != nil {
|
||||||
_ = pubsub.Close()
|
_ = pubsub.Close()
|
||||||
c.markReconnectFailure("subscribe")
|
c.markReconnectFailure("subscribe")
|
||||||
sleepWithContext(ctx, homeReconnectInterval)
|
sleepWithContext(ctx, homeReconnectInterval)
|
||||||
@@ -719,14 +753,20 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte
|
|||||||
c.heartbeatOK.Store(true)
|
c.heartbeatOK.Store(true)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
msg, errMsg := pubsub.ReceiveMessage(ctx)
|
event, errMsg := pubsub.ReceiveTimeout(ctx, homeSubscriptionReceiveTimeout)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
_ = pubsub.Close()
|
_ = pubsub.Close()
|
||||||
c.heartbeatOK.Store(false)
|
c.heartbeatOK.Store(false)
|
||||||
|
if isTimeoutError(errMsg) {
|
||||||
|
c.markSubscriptionTimeout()
|
||||||
|
} else {
|
||||||
c.markReconnectFailure("subscription")
|
c.markReconnectFailure("subscription")
|
||||||
|
}
|
||||||
sleepWithContext(ctx, homeReconnectInterval)
|
sleepWithContext(ctx, homeReconnectInterval)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
switch msg := event.(type) {
|
||||||
|
case *redis.Message:
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -737,8 +777,26 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte
|
|||||||
log.Warn("failed to apply config update from home control center, ignoring")
|
log.Warn("failed to apply config update from home control center, ignoring")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case *redis.Pong:
|
||||||
|
c.resetReconnectFailures()
|
||||||
|
case *redis.Subscription:
|
||||||
|
continue
|
||||||
|
default:
|
||||||
|
log.Debugf("home subscription returned unsupported message type %T", event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func isTimeoutError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
var netErr net.Error
|
||||||
|
return errors.As(err, &netErr) && netErr.Timeout()
|
||||||
}
|
}
|
||||||
|
|
||||||
func sleepWithContext(ctx context.Context, d time.Duration) {
|
func sleepWithContext(ctx context.Context, d time.Duration) {
|
||||||
|
|||||||
Reference in New Issue
Block a user