MAESTRO: Address PR review feedback — fix connection lifecycle, lazy channel access, buffer safety

- Move sseAbortController/connectionState from module globals into closure for multi-instance safety
- Make start() idempotent by aborting existing connection before creating a new one
- Track connectionPromise and await it on stop() for proper cleanup
- Guard channel API access lazily to prevent crash when integrations are missing
- Add 1MB MAX_SSE_BUFFER_SIZE to prevent unbounded buffer growth
- Log malformed JSON parse errors instead of silently ignoring
- Replace error: any with proper instanceof Error type narrowing
- Remove hardcoded user paths from TESTING.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-02-07 20:06:29 -05:00
parent f1ecf5bc68
commit db207807cb
2 changed files with 65 additions and 36 deletions
+62 -33
View File
@@ -42,8 +42,7 @@ interface SSENewObservationEvent {
type ConnectionState = "disconnected" | "connected" | "reconnecting";
let sseAbortController: AbortController | null = null;
let connectionState: ConnectionState = "disconnected";
const MAX_SSE_BUFFER_SIZE = 1024 * 1024; // 1MB
function formatObservationMessage(observation: ObservationSSEPayload): string {
const title = observation.title || "Untitled";
@@ -54,50 +53,49 @@ function formatObservationMessage(observation: ObservationSSEPayload): string {
return message;
}
async function sendToChannel(
function sendToChannel(
api: OpenClawPluginApi,
channel: string,
to: string,
text: string
): Promise<void> {
const channelSendFunctions: Record<string, (to: string, text: string) => Promise<any>> = {
telegram: api.runtime.channel.telegram.sendMessageTelegram,
discord: api.runtime.channel.discord.sendMessageDiscord,
signal: api.runtime.channel.signal.sendMessageSignal,
slack: api.runtime.channel.slack.sendMessageSlack,
whatsapp: api.runtime.channel.whatsapp.sendMessageWhatsApp,
line: api.runtime.channel.line.sendMessageLine,
};
const senderFunction = channelSendFunctions[channel];
if (!senderFunction) {
const channelApi = api.runtime.channel[channel];
if (!channelApi) {
api.log(`[claude-mem] Unknown channel type: ${channel}`);
return;
return Promise.resolve();
}
try {
await senderFunction(to, text);
} catch (error) {
api.log(`[claude-mem] Failed to send to ${channel}: ${error}`);
const sendFunctionName = `sendMessage${channel.charAt(0).toUpperCase()}${channel.slice(1)}`;
const senderFunction = channelApi[sendFunctionName];
if (!senderFunction) {
api.log(`[claude-mem] Channel "${channel}" has no ${sendFunctionName} function`);
return Promise.resolve();
}
return senderFunction(to, text).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
api.log(`[claude-mem] Failed to send to ${channel}: ${message}`);
});
}
async function connectToSSEStream(
api: OpenClawPluginApi,
port: number,
channel: string,
to: string
to: string,
abortController: AbortController,
setConnectionState: (state: ConnectionState) => void
): Promise<void> {
let backoffMs = 1000;
const maxBackoffMs = 30000;
while (sseAbortController && !sseAbortController.signal.aborted) {
while (!abortController.signal.aborted) {
try {
connectionState = "reconnecting";
setConnectionState("reconnecting");
api.log(`[claude-mem] Connecting to SSE stream at http://localhost:${port}/stream`);
const response = await fetch(`http://localhost:${port}/stream`, {
signal: sseAbortController.signal,
signal: abortController.signal,
headers: { Accept: "text/event-stream" },
});
@@ -109,7 +107,7 @@ async function connectToSSEStream(
throw new Error("SSE stream response has no body");
}
connectionState = "connected";
setConnectionState("connected");
backoffMs = 1000;
api.log("[claude-mem] Connected to SSE stream");
@@ -123,6 +121,11 @@ async function connectToSSEStream(
buffer += decoder.decode(value, { stream: true });
if (buffer.length > MAX_SSE_BUFFER_SIZE) {
api.log("[claude-mem] SSE buffer overflow, clearing buffer");
buffer = "";
}
const frames = buffer.split("\n\n");
buffer = frames.pop() || "";
@@ -142,32 +145,47 @@ async function connectToSSEStream(
const message = formatObservationMessage(event.observation);
await sendToChannel(api, channel, to, message);
}
} catch {
// Ignore malformed JSON frames
} catch (parseError: unknown) {
const errorMessage = parseError instanceof Error ? parseError.message : String(parseError);
api.log(`[claude-mem] Failed to parse SSE frame: ${errorMessage}`);
}
}
}
} catch (error: any) {
if (sseAbortController?.signal.aborted) {
} catch (error: unknown) {
if (abortController.signal.aborted) {
break;
}
connectionState = "reconnecting";
api.log(`[claude-mem] SSE stream error: ${error.message ?? error}. Reconnecting in ${backoffMs / 1000}s`);
setConnectionState("reconnecting");
const errorMessage = error instanceof Error ? error.message : String(error);
api.log(`[claude-mem] SSE stream error: ${errorMessage}. Reconnecting in ${backoffMs / 1000}s`);
}
if (sseAbortController?.signal.aborted) break;
if (abortController.signal.aborted) break;
await new Promise((resolve) => setTimeout(resolve, backoffMs));
backoffMs = Math.min(backoffMs * 2, maxBackoffMs);
}
connectionState = "disconnected";
setConnectionState("disconnected");
}
export default function claudeMemPlugin(api: OpenClawPluginApi): void {
let sseAbortController: AbortController | null = null;
let connectionState: ConnectionState = "disconnected";
let connectionPromise: Promise<void> | null = null;
api.registerService({
id: "claude-mem-observation-feed",
start: async (_ctx) => {
// Abort any existing connection before starting a new one
if (sseAbortController) {
sseAbortController.abort();
if (connectionPromise) {
await connectionPromise;
connectionPromise = null;
}
}
const config = api.getConfig();
const workerPort = (config.workerPort as number) || 37777;
const feedConfig = config.observationFeed as
@@ -187,13 +205,24 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
api.log(`[claude-mem] Observation feed starting — channel: ${feedConfig.channel}, target: ${feedConfig.to}`);
sseAbortController = new AbortController();
connectToSSEStream(api, workerPort, feedConfig.channel, feedConfig.to);
connectionPromise = connectToSSEStream(
api,
workerPort,
feedConfig.channel,
feedConfig.to,
sseAbortController,
(state) => { connectionState = state; }
);
},
stop: async (_ctx) => {
if (sseAbortController) {
sseAbortController.abort();
sseAbortController = null;
}
if (connectionPromise) {
await connectionPromise;
connectionPromise = null;
}
connectionState = "disconnected";
api.log("[claude-mem] Observation feed stopped — SSE connection closed");
},