feat(home): add cluster nodes payload parsing and Redis channel handling
- Added `parseClusterNodesPayload` for streamlined cluster node parsing. - Introduced `handleSubscriptionPayload` to handle Redis channel payloads, including updates for the new `cluster` channel. - Updated subscription logic to process and apply cluster node updates seamlessly.
This commit is contained in:
+49
-6
@@ -31,6 +31,7 @@ const (
|
||||
|
||||
homeReconnectInterval = time.Second
|
||||
homeReconnectFailoverThreshold = 3
|
||||
redisChannelCluster = "cluster"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -310,11 +311,10 @@ func (c *Client) refreshClusterNodes(ctx context.Context) (bool, error) {
|
||||
return false, errDo
|
||||
}
|
||||
|
||||
var envelope clusterNodesEnvelope
|
||||
if errUnmarshal := json.Unmarshal([]byte(raw), &envelope); errUnmarshal != nil {
|
||||
return false, errUnmarshal
|
||||
nodes, errParse := parseClusterNodesPayload([]byte(raw))
|
||||
if errParse != nil {
|
||||
return false, errParse
|
||||
}
|
||||
nodes := normalizeClusterNodes(envelope.Nodes)
|
||||
if len(nodes) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
@@ -326,6 +326,28 @@ func (c *Client) refreshClusterNodes(ctx context.Context) (bool, error) {
|
||||
return c.switchToNodeLocked(nodes[0]), nil
|
||||
}
|
||||
|
||||
func parseClusterNodesPayload(raw []byte) ([]clusterNode, error) {
|
||||
var envelope clusterNodesEnvelope
|
||||
if errUnmarshal := json.Unmarshal(raw, &envelope); errUnmarshal != nil {
|
||||
return nil, errUnmarshal
|
||||
}
|
||||
return normalizeClusterNodes(envelope.Nodes), nil
|
||||
}
|
||||
|
||||
func (c *Client) updateClusterNodesFromPayload(raw []byte) error {
|
||||
if c == nil || !c.clusterDiscoveryEnabled() {
|
||||
return nil
|
||||
}
|
||||
nodes, errParse := parseClusterNodesPayload(raw)
|
||||
if errParse != nil {
|
||||
return errParse
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.clusterNodes = nodes
|
||||
c.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func normalizeClusterNodes(nodes []clusterNode) []clusterNode {
|
||||
out := make([]clusterNode, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
@@ -570,6 +592,25 @@ func (c *Client) RPushRequestLog(ctx context.Context, payload []byte) error {
|
||||
return cmd.RPush(ctx, redisKeyRequestLog, payload).Err()
|
||||
}
|
||||
|
||||
func (c *Client) handleSubscriptionPayload(channel string, payload string, onConfig func([]byte) error) error {
|
||||
payload = strings.TrimSpace(payload)
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch strings.ToLower(strings.TrimSpace(channel)) {
|
||||
case redisChannelConfig:
|
||||
if onConfig == nil {
|
||||
return nil
|
||||
}
|
||||
return onConfig([]byte(payload))
|
||||
case redisChannelCluster:
|
||||
return c.updateClusterNodesFromPayload([]byte(payload))
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// StartConfigSubscriber connects to home, fetches config once via GET config, then subscribes to
|
||||
// the "config" channel to receive runtime config updates.
|
||||
//
|
||||
@@ -664,8 +705,10 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte
|
||||
if msg == nil {
|
||||
continue
|
||||
}
|
||||
if payload := strings.TrimSpace(msg.Payload); payload != "" {
|
||||
if errApply := onConfig([]byte(payload)); errApply != nil {
|
||||
if errApply := c.handleSubscriptionPayload(msg.Channel, msg.Payload, onConfig); errApply != nil {
|
||||
if strings.EqualFold(strings.TrimSpace(msg.Channel), redisChannelCluster) {
|
||||
log.Warn("failed to apply cluster update from home control center, ignoring")
|
||||
} else {
|
||||
log.Warn("failed to apply config update from home control center, ignoring")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user