From b411d918853c41c3f6fd269ceba9f6f455ff08a2 Mon Sep 17 00:00:00 2001 From: Ben Younes Date: Wed, 15 Apr 2026 09:58:32 +0200 Subject: [PATCH] fix: add circuit breaker to OpenClaw worker client (#1636) (#1697) * fix: add circuit breaker to OpenClaw worker client (#1636) When the claude-mem worker is unreachable, every plugin event (before_agent_start, before_prompt_build, tool_result_persist, agent_end) triggered a new fetch that failed and logged a warning, causing CPU-spinning and continuous log spam. Add a CLOSED/OPEN/HALF_OPEN circuit breaker: after 3 consecutive network errors the circuit opens, silently drops all worker calls for 30 s, then sends one probe. Individual failures are only logged while the circuit is still CLOSED; once open it logs once ("disabling requests for 30s") and goes quiet until recovery. Generated by Claude Code Vibe coded by Ousama Ben Younes Co-Authored-By: Claude * fix: limit HALF_OPEN to single probe and move circuitOnSuccess after response.ok check - Add _halfOpenProbeInFlight flag so only one probe is allowed in HALF_OPEN state; concurrent callers are silently dropped until the probe completes (success or failure) - Move circuitOnSuccess() to after the response.ok check in workerPost, workerPostFireAndForget, and workerGetText so non-2xx HTTP responses no longer close the circuit - Clear _halfOpenProbeInFlight in both circuitOnSuccess and circuitOnFailure, and in circuitReset - Add regression test covering HALF_OPEN one-probe behavior: non-2xx keeps circuit open, 2xx closes it * chore: trigger CodeRabbit re-review --------- Co-authored-by: Claude --- openclaw/src/index.test.ts | 204 +++++++++++++++++++++++++++++++++++++ openclaw/src/index.ts | 97 +++++++++++++++++- 2 files changed, 298 insertions(+), 3 deletions(-) diff --git a/openclaw/src/index.test.ts b/openclaw/src/index.test.ts index e60df94f..5eceabe3 100644 --- a/openclaw/src/index.test.ts +++ b/openclaw/src/index.test.ts @@ -979,3 +979,207 @@ describe("SSE stream integration", () => { await getService().stop({}); }); }); + +describe("circuit breaker", () => { + // Reset circuit breaker state before each test by firing gateway_start. + // The circuit is module-level state, so tests would otherwise bleed into each other. + beforeEach(async () => { + const { api, fireEvent } = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(api); + await fireEvent("gateway_start", {}, {}); + }); + + it("opens after threshold failures and stops further requests", async () => { + const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(api); + // Reset circuit inside the test body to guard against timers from preceding + // tests (e.g. completionDelayMs timers) that may fire between beforeEach and here. + await fireEvent("gateway_start", {}, {}); + + // Fire threshold+1 calls so the circuit is open by the end of the loop + // regardless of whether a concurrent timer fires at the exact boundary. + for (let i = 0; i < 4; i++) { + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-open-${i}` }); + } + + // Circuit is now OPEN. Subsequent calls must be silently dropped. + const logCountBeforeDrop = logs.length; + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-drop" }); + const noisyDropLogs = logs.slice(logCountBeforeDrop).filter( + (l) => l.includes("failed") || l.includes("disabling") + ); + assert.equal(noisyDropLogs.length, 0, "calls when circuit is open should be silently dropped"); + }); + + it("logs individual failures while circuit is closed, then disabling when it opens", async () => { + const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(api); + await fireEvent("gateway_start", {}, {}); + const logsAfterReset = logs.length; + + // Fire exactly threshold (3) calls + for (let i = 0; i < 3; i++) { + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-log-${i}` }); + } + + const newLogs = logs.slice(logsAfterReset); + // At least some failures should have been logged (circuit was active) + assert.ok(newLogs.length > 0, "threshold calls should produce log output"); + // Exactly one disabling warning should appear + const disablingLogs = newLogs.filter((l) => l.includes("disabling requests")); + assert.equal(disablingLogs.length, 1, "should emit exactly one disabling warning when circuit opens"); + // The last call (the threshold-crossing one) should NOT log an individual failure + const failureLogs = newLogs.filter((l) => l.includes("failed:")); + assert.ok(failureLogs.length < 3, "threshold-crossing call should not log an individual failure"); + }); + + it("resets on gateway_start, allowing connections again", async () => { + const { api, logs, fireEvent } = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(api); + await fireEvent("gateway_start", {}, {}); + + // Open the circuit by firing threshold+1 calls + for (let i = 0; i < 4; i++) { + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: `cb-reset-${i}` }); + } + + // Confirm circuit is open (call is silently dropped) + const logCountWhileOpen = logs.length; + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-while-open" }); + assert.equal( + logs.slice(logCountWhileOpen).filter((l) => l.includes("failed") || l.includes("disabling")).length, + 0, + "call while circuit is open should be silently dropped" + ); + + // gateway_start resets the circuit + await fireEvent("gateway_start", {}, {}); + + // Next call should attempt to connect again (not silently drop) + const logCountAfterReset = logs.length; + await fireEvent("before_agent_start", { prompt: "hello" }, { sessionKey: "cb-after-reset" }); + const newLogs = logs.slice(logCountAfterReset); + assert.ok( + newLogs.some((l) => l.includes("failed:") || l.includes("disabling")), + "should attempt worker connection after gateway_start reset" + ); + }); + + it("HALF_OPEN allows only a single probe — non-2xx keeps circuit open, 2xx closes it", async () => { + // ---- Phase 1: open the circuit via network failures (unreachable port) ---- + // Reset circuit state first + const resetMock = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(resetMock.api); + await resetMock.fireEvent("gateway_start", {}, {}); + + // Drive 4 failures to ensure circuit is OPEN + for (let i = 0; i < 4; i++) { + await resetMock.fireEvent("before_agent_start", { prompt: "probe-test" }, { sessionKey: `probe-phase1-${i}` }); + } + + // ---- Phase 2: advance clock so cooldown has elapsed ---- + // _circuitOpenedAt was set during Phase 1 using the real Date.now(). + // Advancing Date.now by 31s means the next circuitAllow call sees the cooldown elapsed. + const realDateNow = Date.now.bind(Date); + Date.now = () => realDateNow() + 31_000; + + try { + // ---- Phase 3: non-2xx probe — circuit should stay OPEN ---- + // Start a server that returns 500 for all requests + let serverA: Server | null = null; + const portA: number = await new Promise((resolve) => { + serverA = createServer((_req: IncomingMessage, res: ServerResponse) => { + res.writeHead(500); + res.end(); + }); + serverA!.listen(0, () => { + const addr = serverA!.address(); + resolve((addr as any).port); + }); + }); + + // Reuse the same module-level circuit state — just change the worker port. + // Create a new mock api instance pointed at server A (500 responder). + const mockA = createMockApi({ workerPort: portA }); + claudeMemPlugin(mockA.api); + // Do NOT fire gateway_start here — we want the OPEN circuit state from Phase 1. + + // The circuit is OPEN but the mocked clock says cooldown elapsed. + // The next call should: transition to HALF_OPEN, set _halfOpenProbeInFlight=true, + // send the probe to server A (which returns 500), then call circuitOnFailure + // and re-open the circuit. + const logCountAtProbe = mockA.logs.length; + await mockA.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-call-non2xx" }); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const probeALogs = mockA.logs.slice(logCountAtProbe); + // After a 500 response, circuitOnFailure is called which logs "disabling requests" + // (because state was HALF_OPEN) and logger.warn logs the 500 status. + assert.ok( + probeALogs.some((l) => l.includes("disabling") || l.includes("returned 500") || l.includes("Worker POST")), + "non-2xx probe should keep circuit open (expected disabling or 500 status log)" + ); + + // Verify probe flag resets: a second call with cooldown elapsed should be allowed as a new probe + // (i.e., _halfOpenProbeInFlight was cleared by circuitOnFailure). + // But without advancing time further the circuit is OPEN again — so calls are dropped. + const logCountAfterFailedProbe = mockA.logs.length; + await mockA.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-concurrent" }); + await new Promise((resolve) => setTimeout(resolve, 100)); + const droppedLogs = mockA.logs.slice(logCountAfterFailedProbe).filter( + (l) => l.includes("failed") || l.includes("disabling") + ); + assert.equal(droppedLogs.length, 0, "call should be silently dropped while circuit is OPEN again after failed probe"); + + serverA!.close(); + + // ---- Phase 4: 2xx probe — circuit should close ---- + // Re-open the circuit with fresh failures, then probe with a 200-returning server. + // Reset circuit state first. + const resetMock2 = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(resetMock2.api); + await resetMock2.fireEvent("gateway_start", {}, {}); + + // Drive failures (still using mocked Date.now, but _circuitOpenedAt will be set to + // the mocked time, so cooldown is NOT elapsed yet from the mocked perspective). + // We need to temporarily restore real Date.now while opening the circuit, then + // re-mock it for the probe. + Date.now = realDateNow; + for (let i = 0; i < 4; i++) { + await resetMock2.fireEvent("before_agent_start", { prompt: "probe-test" }, { sessionKey: `probe-phase4-${i}` }); + } + // Re-advance the clock past cooldown + Date.now = () => realDateNow() + 31_000; + + let serverB: Server | null = null; + const portB: number = await new Promise((resolve) => { + serverB = createServer((_req: IncomingMessage, res: ServerResponse) => { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false })); + }); + serverB!.listen(0, () => { + const addr = serverB!.address(); + resolve((addr as any).port); + }); + }); + + const mockB = createMockApi({ workerPort: portB }); + claudeMemPlugin(mockB.api); + // Do NOT fire gateway_start — reuse OPEN circuit state from resetMock2. + + const logCountBeforeSuccessProbe = mockB.logs.length; + await mockB.fireEvent("before_agent_start", { prompt: "probe" }, { sessionKey: "probe-call-2xx" }); + await new Promise((resolve) => setTimeout(resolve, 150)); + + const successProbeLogs = mockB.logs.slice(logCountBeforeSuccessProbe); + assert.ok( + successProbeLogs.some((l) => l.includes("restored") || l.includes("circuit closed")), + "2xx probe should close the circuit — expected 'restored' or 'circuit closed' log" + ); + + serverB!.close(); + } finally { + Date.now = realDateNow; + } + }); +}); diff --git a/openclaw/src/index.ts b/openclaw/src/index.ts index e8623e5e..6c03f5e8 100644 --- a/openclaw/src/index.ts +++ b/openclaw/src/index.ts @@ -264,12 +264,80 @@ function workerBaseUrl(port: number): string { return `http://${_workerHost}:${port}`; } +// ============================================================================ +// Worker Circuit Breaker +// ============================================================================ +// Prevents CPU-spinning retry loops when the worker is unreachable. +// After CIRCUIT_BREAKER_THRESHOLD consecutive network errors, the circuit +// opens and all worker calls are silently dropped for CIRCUIT_BREAKER_COOLDOWN_MS. +// After the cooldown, one probe attempt is allowed to check if the worker recovered. + +const CIRCUIT_BREAKER_THRESHOLD = 3; +const CIRCUIT_BREAKER_COOLDOWN_MS = 30_000; + +type CircuitState = "CLOSED" | "OPEN" | "HALF_OPEN"; + +let _circuitState: CircuitState = "CLOSED"; +let _circuitFailures = 0; +let _circuitOpenedAt = 0; +let _halfOpenProbeInFlight = false; + +function circuitAllow(logger: PluginLogger): boolean { + if (_circuitState === "CLOSED") return true; + if (_circuitState === "OPEN") { + if (Date.now() - _circuitOpenedAt >= CIRCUIT_BREAKER_COOLDOWN_MS) { + _circuitState = "HALF_OPEN"; + logger.info("[claude-mem] Circuit breaker: probing worker connection"); + if (_halfOpenProbeInFlight) return false; + _halfOpenProbeInFlight = true; + return true; + } + return false; + } + // HALF_OPEN: allow one probe through + if (_halfOpenProbeInFlight) return false; + _halfOpenProbeInFlight = true; + return true; +} + +function circuitOnSuccess(logger: PluginLogger): void { + if (_circuitState !== "CLOSED") { + logger.info("[claude-mem] Worker connection restored — circuit closed"); + } + _circuitState = "CLOSED"; + _circuitFailures = 0; + _halfOpenProbeInFlight = false; +} + +function circuitOnFailure(logger: PluginLogger): void { + _halfOpenProbeInFlight = false; + _circuitFailures++; + if ( + _circuitState === "HALF_OPEN" || + (_circuitState === "CLOSED" && _circuitFailures >= CIRCUIT_BREAKER_THRESHOLD) + ) { + _circuitState = "OPEN"; + _circuitOpenedAt = Date.now(); + logger.warn( + `[claude-mem] Worker unreachable — disabling requests for ${CIRCUIT_BREAKER_COOLDOWN_MS / 1000}s` + ); + } +} + +function circuitReset(): void { + _circuitState = "CLOSED"; + _circuitFailures = 0; + _circuitOpenedAt = 0; + _halfOpenProbeInFlight = false; +} + async function workerPost( port: number, path: string, body: Record, logger: PluginLogger ): Promise | null> { + if (!circuitAllow(logger)) return null; try { const response = await fetch(`${workerBaseUrl(port)}${path}`, { method: "POST", @@ -277,13 +345,18 @@ async function workerPost( body: JSON.stringify(body), }); if (!response.ok) { + circuitOnFailure(logger); logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`); return null; } + circuitOnSuccess(logger); return (await response.json()) as Record; } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); - logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + circuitOnFailure(logger); + if (_circuitState !== "OPEN") { + logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + } return null; } } @@ -294,13 +367,24 @@ function workerPostFireAndForget( body: Record, logger: PluginLogger ): void { + if (!circuitAllow(logger)) return; fetch(`${workerBaseUrl(port)}${path}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(body), + }).then((response) => { + if (!response.ok) { + circuitOnFailure(logger); + logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`); + return; + } + circuitOnSuccess(logger); }).catch((error: unknown) => { const message = error instanceof Error ? error.message : String(error); - logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + circuitOnFailure(logger); + if (_circuitState !== "OPEN") { + logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + } }); } @@ -309,16 +393,22 @@ async function workerGetText( path: string, logger: PluginLogger ): Promise { + if (!circuitAllow(logger)) return null; try { const response = await fetch(`${workerBaseUrl(port)}${path}`); if (!response.ok) { + circuitOnFailure(logger); logger.warn(`[claude-mem] Worker GET ${path} returned ${response.status}`); return null; } + circuitOnSuccess(logger); return await response.text(); } catch (error: unknown) { const message = error instanceof Error ? error.message : String(error); - logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`); + circuitOnFailure(logger); + if (_circuitState !== "OPEN") { + logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`); + } return null; } } @@ -856,6 +946,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { // Event: gateway_start — clear session tracking for fresh start // ------------------------------------------------------------------ api.on("gateway_start", async () => { + circuitReset(); sessionIds.clear(); contextCache.clear(); recentPromptInits.clear();