diff --git a/internal/home/client.go b/internal/home/client.go index 3edd3135..2652bc1c 100644 --- a/internal/home/client.go +++ b/internal/home/client.go @@ -31,6 +31,7 @@ const ( homeReconnectInterval = time.Second homeReconnectFailoverThreshold = 3 + redisChannelCluster = "cluster" ) var ( @@ -310,11 +311,10 @@ func (c *Client) refreshClusterNodes(ctx context.Context) (bool, error) { return false, errDo } - var envelope clusterNodesEnvelope - if errUnmarshal := json.Unmarshal([]byte(raw), &envelope); errUnmarshal != nil { - return false, errUnmarshal + nodes, errParse := parseClusterNodesPayload([]byte(raw)) + if errParse != nil { + return false, errParse } - nodes := normalizeClusterNodes(envelope.Nodes) if len(nodes) == 0 { return false, nil } @@ -326,6 +326,28 @@ func (c *Client) refreshClusterNodes(ctx context.Context) (bool, error) { return c.switchToNodeLocked(nodes[0]), nil } +func parseClusterNodesPayload(raw []byte) ([]clusterNode, error) { + var envelope clusterNodesEnvelope + if errUnmarshal := json.Unmarshal(raw, &envelope); errUnmarshal != nil { + return nil, errUnmarshal + } + return normalizeClusterNodes(envelope.Nodes), nil +} + +func (c *Client) updateClusterNodesFromPayload(raw []byte) error { + if c == nil || !c.clusterDiscoveryEnabled() { + return nil + } + nodes, errParse := parseClusterNodesPayload(raw) + if errParse != nil { + return errParse + } + c.mu.Lock() + c.clusterNodes = nodes + c.mu.Unlock() + return nil +} + func normalizeClusterNodes(nodes []clusterNode) []clusterNode { out := make([]clusterNode, 0, len(nodes)) for _, node := range nodes { @@ -570,6 +592,25 @@ func (c *Client) RPushRequestLog(ctx context.Context, payload []byte) error { return cmd.RPush(ctx, redisKeyRequestLog, payload).Err() } +func (c *Client) handleSubscriptionPayload(channel string, payload string, onConfig func([]byte) error) error { + payload = strings.TrimSpace(payload) + if payload == "" { + return nil + } + + switch strings.ToLower(strings.TrimSpace(channel)) { + case redisChannelConfig: + if onConfig == nil { + return nil + } + return onConfig([]byte(payload)) + case redisChannelCluster: + return c.updateClusterNodesFromPayload([]byte(payload)) + default: + return nil + } +} + // StartConfigSubscriber connects to home, fetches config once via GET config, then subscribes to // the "config" channel to receive runtime config updates. // @@ -664,8 +705,10 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte if msg == nil { continue } - if payload := strings.TrimSpace(msg.Payload); payload != "" { - if errApply := onConfig([]byte(payload)); errApply != nil { + if errApply := c.handleSubscriptionPayload(msg.Channel, msg.Payload, onConfig); errApply != nil { + if strings.EqualFold(strings.TrimSpace(msg.Channel), redisChannelCluster) { + log.Warn("failed to apply cluster update from home control center, ignoring") + } else { log.Warn("failed to apply config update from home control center, ignoring") } }