diff --git a/openclaw/TESTING.md b/openclaw/TESTING.md index 883810dd..49281ddb 100644 --- a/openclaw/TESTING.md +++ b/openclaw/TESTING.md @@ -28,7 +28,7 @@ curl -s -N http://localhost:37777/stream --max-time 3 2>/dev/null || true **If the worker is not running:** ```bash -cd /Users/alexnewman/Scripts/claude-mem +cd /path/to/claude-mem npm run build-and-sync ``` @@ -50,7 +50,7 @@ cat ~/.openclaw/openclaw.json { "claude-mem": { "enabled": true, - "source": "/Users/alexnewman/Scripts/claude-mem/openclaw", + "source": "/path/to/claude-mem/openclaw", "config": { "syncMemoryFile": true, "workerPort": 37777, @@ -135,7 +135,7 @@ node test-sse-consumer.js ### Worker not running - **Symptom:** Gateway logs show `SSE stream error: fetch failed. Reconnecting in 1s` -- **Fix:** Start the worker with `cd /Users/alexnewman/Scripts/claude-mem && npm run build-and-sync` +- **Fix:** Start the worker with `cd /path/to/claude-mem && npm run build-and-sync` ### Port mismatch - **Symptom:** SSE connection fails even though worker health check passes diff --git a/openclaw/src/index.ts b/openclaw/src/index.ts index b83d7ce0..65e0e03a 100644 --- a/openclaw/src/index.ts +++ b/openclaw/src/index.ts @@ -42,8 +42,7 @@ interface SSENewObservationEvent { type ConnectionState = "disconnected" | "connected" | "reconnecting"; -let sseAbortController: AbortController | null = null; -let connectionState: ConnectionState = "disconnected"; +const MAX_SSE_BUFFER_SIZE = 1024 * 1024; // 1MB function formatObservationMessage(observation: ObservationSSEPayload): string { const title = observation.title || "Untitled"; @@ -54,50 +53,49 @@ function formatObservationMessage(observation: ObservationSSEPayload): string { return message; } -async function sendToChannel( +function sendToChannel( api: OpenClawPluginApi, channel: string, to: string, text: string ): Promise { - const channelSendFunctions: Record Promise> = { - telegram: api.runtime.channel.telegram.sendMessageTelegram, - discord: api.runtime.channel.discord.sendMessageDiscord, - signal: api.runtime.channel.signal.sendMessageSignal, - slack: api.runtime.channel.slack.sendMessageSlack, - whatsapp: api.runtime.channel.whatsapp.sendMessageWhatsApp, - line: api.runtime.channel.line.sendMessageLine, - }; - - const senderFunction = channelSendFunctions[channel]; - if (!senderFunction) { + const channelApi = api.runtime.channel[channel]; + if (!channelApi) { api.log(`[claude-mem] Unknown channel type: ${channel}`); - return; + return Promise.resolve(); } - try { - await senderFunction(to, text); - } catch (error) { - api.log(`[claude-mem] Failed to send to ${channel}: ${error}`); + const sendFunctionName = `sendMessage${channel.charAt(0).toUpperCase()}${channel.slice(1)}`; + const senderFunction = channelApi[sendFunctionName]; + if (!senderFunction) { + api.log(`[claude-mem] Channel "${channel}" has no ${sendFunctionName} function`); + return Promise.resolve(); } + + return senderFunction(to, text).catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + api.log(`[claude-mem] Failed to send to ${channel}: ${message}`); + }); } async function connectToSSEStream( api: OpenClawPluginApi, port: number, channel: string, - to: string + to: string, + abortController: AbortController, + setConnectionState: (state: ConnectionState) => void ): Promise { let backoffMs = 1000; const maxBackoffMs = 30000; - while (sseAbortController && !sseAbortController.signal.aborted) { + while (!abortController.signal.aborted) { try { - connectionState = "reconnecting"; + setConnectionState("reconnecting"); api.log(`[claude-mem] Connecting to SSE stream at http://localhost:${port}/stream`); const response = await fetch(`http://localhost:${port}/stream`, { - signal: sseAbortController.signal, + signal: abortController.signal, headers: { Accept: "text/event-stream" }, }); @@ -109,7 +107,7 @@ async function connectToSSEStream( throw new Error("SSE stream response has no body"); } - connectionState = "connected"; + setConnectionState("connected"); backoffMs = 1000; api.log("[claude-mem] Connected to SSE stream"); @@ -123,6 +121,11 @@ async function connectToSSEStream( buffer += decoder.decode(value, { stream: true }); + if (buffer.length > MAX_SSE_BUFFER_SIZE) { + api.log("[claude-mem] SSE buffer overflow, clearing buffer"); + buffer = ""; + } + const frames = buffer.split("\n\n"); buffer = frames.pop() || ""; @@ -142,32 +145,47 @@ async function connectToSSEStream( const message = formatObservationMessage(event.observation); await sendToChannel(api, channel, to, message); } - } catch { - // Ignore malformed JSON frames + } catch (parseError: unknown) { + const errorMessage = parseError instanceof Error ? parseError.message : String(parseError); + api.log(`[claude-mem] Failed to parse SSE frame: ${errorMessage}`); } } } - } catch (error: any) { - if (sseAbortController?.signal.aborted) { + } catch (error: unknown) { + if (abortController.signal.aborted) { break; } - connectionState = "reconnecting"; - api.log(`[claude-mem] SSE stream error: ${error.message ?? error}. Reconnecting in ${backoffMs / 1000}s`); + setConnectionState("reconnecting"); + const errorMessage = error instanceof Error ? error.message : String(error); + api.log(`[claude-mem] SSE stream error: ${errorMessage}. Reconnecting in ${backoffMs / 1000}s`); } - if (sseAbortController?.signal.aborted) break; + if (abortController.signal.aborted) break; await new Promise((resolve) => setTimeout(resolve, backoffMs)); backoffMs = Math.min(backoffMs * 2, maxBackoffMs); } - connectionState = "disconnected"; + setConnectionState("disconnected"); } export default function claudeMemPlugin(api: OpenClawPluginApi): void { + let sseAbortController: AbortController | null = null; + let connectionState: ConnectionState = "disconnected"; + let connectionPromise: Promise | null = null; + api.registerService({ id: "claude-mem-observation-feed", start: async (_ctx) => { + // Abort any existing connection before starting a new one + if (sseAbortController) { + sseAbortController.abort(); + if (connectionPromise) { + await connectionPromise; + connectionPromise = null; + } + } + const config = api.getConfig(); const workerPort = (config.workerPort as number) || 37777; const feedConfig = config.observationFeed as @@ -187,13 +205,24 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { api.log(`[claude-mem] Observation feed starting — channel: ${feedConfig.channel}, target: ${feedConfig.to}`); sseAbortController = new AbortController(); - connectToSSEStream(api, workerPort, feedConfig.channel, feedConfig.to); + connectionPromise = connectToSSEStream( + api, + workerPort, + feedConfig.channel, + feedConfig.to, + sseAbortController, + (state) => { connectionState = state; } + ); }, stop: async (_ctx) => { if (sseAbortController) { sseAbortController.abort(); sseAbortController = null; } + if (connectionPromise) { + await connectionPromise; + connectionPromise = null; + } connectionState = "disconnected"; api.log("[claude-mem] Observation feed stopped — SSE connection closed"); },