diff --git a/openclaw/package.json b/openclaw/package.json index 16360ed5..ad95ca25 100644 --- a/openclaw/package.json +++ b/openclaw/package.json @@ -5,9 +5,11 @@ "type": "module", "main": "dist/index.js", "scripts": { - "build": "tsc" + "build": "tsc", + "test": "tsc && node --test dist/index.test.js" }, "devDependencies": { + "@types/node": "^25.2.1", "typescript": "^5.3.0" } } diff --git a/openclaw/src/index.test.ts b/openclaw/src/index.test.ts new file mode 100644 index 00000000..e2f63a00 --- /dev/null +++ b/openclaw/src/index.test.ts @@ -0,0 +1,388 @@ +import { describe, it, beforeEach, afterEach } from "node:test"; +import assert from "node:assert/strict"; +import { createServer, type Server, type IncomingMessage, type ServerResponse } from "node:http"; +import claudeMemPlugin from "./index.js"; + +function createMockApi(configOverride: Record = {}) { + const logs: string[] = []; + const sentMessages: Array<{ to: string; text: string; channel: string }> = []; + + let registeredService: any = null; + let registeredCommand: any = null; + + const api = { + getConfig: () => configOverride, + log: (message: string) => { + logs.push(message); + }, + registerService: (service: any) => { + registeredService = service; + }, + registerCommand: (command: any) => { + registeredCommand = command; + }, + runtime: { + channel: { + telegram: { + sendMessageTelegram: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "telegram" }); + }, + }, + discord: { + sendMessageDiscord: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "discord" }); + }, + }, + signal: { + sendMessageSignal: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "signal" }); + }, + }, + slack: { + sendMessageSlack: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "slack" }); + }, + }, + whatsapp: { + sendMessageWhatsApp: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "whatsapp" }); + }, + }, + line: { + sendMessageLine: async (to: string, text: string) => { + sentMessages.push({ to, text, channel: "line" }); + }, + }, + }, + }, + }; + + return { + api: api as any, + logs, + sentMessages, + getService: () => registeredService, + getCommand: () => registeredCommand, + }; +} + +describe("claudeMemPlugin", () => { + it("registers service and command on load", () => { + const { api, logs, getService, getCommand } = createMockApi(); + claudeMemPlugin(api); + + assert.ok(getService(), "service should be registered"); + assert.equal(getService().id, "claude-mem-observation-feed"); + assert.ok(getCommand(), "command should be registered"); + assert.equal(getCommand().name, "claude-mem-feed"); + assert.ok(logs.some((l) => l.includes("plugin loaded"))); + }); + + describe("service start", () => { + it("logs disabled when feed not enabled", async () => { + const { api, logs, getService } = createMockApi({}); + claudeMemPlugin(api); + + await getService().start({}); + assert.ok(logs.some((l) => l.includes("feed disabled"))); + }); + + it("logs disabled when enabled is false", async () => { + const { api, logs, getService } = createMockApi({ + observationFeed: { enabled: false }, + }); + claudeMemPlugin(api); + + await getService().start({}); + assert.ok(logs.some((l) => l.includes("feed disabled"))); + }); + + it("logs misconfigured when channel is missing", async () => { + const { api, logs, getService } = createMockApi({ + observationFeed: { enabled: true, to: "123" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + assert.ok(logs.some((l) => l.includes("misconfigured"))); + }); + + it("logs misconfigured when to is missing", async () => { + const { api, logs, getService } = createMockApi({ + observationFeed: { enabled: true, channel: "telegram" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + assert.ok(logs.some((l) => l.includes("misconfigured"))); + }); + }); + + describe("service stop", () => { + it("logs disconnection on stop", async () => { + const { api, logs, getService } = createMockApi({}); + claudeMemPlugin(api); + + await getService().stop({}); + assert.ok(logs.some((l) => l.includes("feed stopped"))); + }); + }); + + describe("command handler", () => { + it("returns not configured when no feedConfig", async () => { + const { api, getCommand } = createMockApi({}); + claudeMemPlugin(api); + + const result = await getCommand().handler([], {}); + assert.ok(result.includes("not configured")); + }); + + it("returns status when no args", async () => { + const { api, getCommand } = createMockApi({ + observationFeed: { enabled: true, channel: "telegram", to: "123" }, + }); + claudeMemPlugin(api); + + const result = await getCommand().handler([], {}); + assert.ok(result.includes("Enabled: yes")); + assert.ok(result.includes("Channel: telegram")); + assert.ok(result.includes("Target: 123")); + assert.ok(result.includes("Connection:")); + }); + + it("handles 'on' argument", async () => { + const { api, logs, getCommand } = createMockApi({ + observationFeed: { enabled: false }, + }); + claudeMemPlugin(api); + + const result = await getCommand().handler(["on"], {}); + assert.ok(result.includes("enable requested")); + assert.ok(logs.some((l) => l.includes("enable requested"))); + }); + + it("handles 'off' argument", async () => { + const { api, logs, getCommand } = createMockApi({ + observationFeed: { enabled: true }, + }); + claudeMemPlugin(api); + + const result = await getCommand().handler(["off"], {}); + assert.ok(result.includes("disable requested")); + assert.ok(logs.some((l) => l.includes("disable requested"))); + }); + + it("shows connection state in status output", async () => { + const { api, getCommand } = createMockApi({ + observationFeed: { enabled: false, channel: "slack", to: "#general" }, + }); + claudeMemPlugin(api); + + const result = await getCommand().handler([], {}); + assert.ok(result.includes("Connection: disconnected")); + }); + }); +}); + +describe("SSE stream integration", () => { + let server: Server; + let serverPort: number; + let serverResponses: ServerResponse[] = []; + + function startSSEServer(): Promise { + return new Promise((resolve) => { + server = createServer((req: IncomingMessage, res: ServerResponse) => { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + serverResponses.push(res); + }); + server.listen(0, () => { + const address = server.address(); + if (address && typeof address === "object") { + resolve(address.port); + } + }); + }); + } + + beforeEach(async () => { + serverResponses = []; + serverPort = await startSSEServer(); + }); + + afterEach(() => { + for (const res of serverResponses) { + try { + res.end(); + } catch {} + } + server?.close(); + }); + + it("connects to SSE stream and receives new_observation events", async () => { + const { api, logs, sentMessages, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "telegram", to: "12345" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + + // Wait for connection + await new Promise((resolve) => setTimeout(resolve, 200)); + + assert.ok(logs.some((l) => l.includes("Connecting to SSE stream"))); + + // Send an SSE event + const observation = { + type: "new_observation", + observation: { + id: 1, + title: "Test Observation", + subtitle: "Found something interesting", + type: "discovery", + project: "test", + prompt_number: 1, + created_at_epoch: Date.now(), + }, + timestamp: Date.now(), + }; + + for (const res of serverResponses) { + res.write(`data: ${JSON.stringify(observation)}\n\n`); + } + + // Wait for processing + await new Promise((resolve) => setTimeout(resolve, 200)); + + assert.equal(sentMessages.length, 1); + assert.equal(sentMessages[0].channel, "telegram"); + assert.equal(sentMessages[0].to, "12345"); + assert.ok(sentMessages[0].text.includes("Test Observation")); + assert.ok(sentMessages[0].text.includes("Found something interesting")); + + await getService().stop({}); + }); + + it("filters out non-observation events", async () => { + const { api, sentMessages, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "discord", to: "channel-id" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Send non-observation events + for (const res of serverResponses) { + res.write(`data: ${JSON.stringify({ type: "processing_status", isProcessing: true })}\n\n`); + res.write(`data: ${JSON.stringify({ type: "session_started", sessionId: "abc" })}\n\n`); + } + + await new Promise((resolve) => setTimeout(resolve, 200)); + assert.equal(sentMessages.length, 0, "non-observation events should be filtered"); + + await getService().stop({}); + }); + + it("handles observation with null subtitle", async () => { + const { api, sentMessages, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "telegram", to: "999" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + await new Promise((resolve) => setTimeout(resolve, 200)); + + for (const res of serverResponses) { + res.write( + `data: ${JSON.stringify({ + type: "new_observation", + observation: { id: 2, title: "No Subtitle", subtitle: null }, + timestamp: Date.now(), + })}\n\n` + ); + } + + await new Promise((resolve) => setTimeout(resolve, 200)); + assert.equal(sentMessages.length, 1); + assert.ok(sentMessages[0].text.includes("No Subtitle")); + assert.ok(!sentMessages[0].text.includes("null")); + + await getService().stop({}); + }); + + it("handles observation with null title", async () => { + const { api, sentMessages, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "telegram", to: "999" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + await new Promise((resolve) => setTimeout(resolve, 200)); + + for (const res of serverResponses) { + res.write( + `data: ${JSON.stringify({ + type: "new_observation", + observation: { id: 3, title: null, subtitle: "Has subtitle" }, + timestamp: Date.now(), + })}\n\n` + ); + } + + await new Promise((resolve) => setTimeout(resolve, 200)); + assert.equal(sentMessages.length, 1); + assert.ok(sentMessages[0].text.includes("Untitled")); + + await getService().stop({}); + }); + + it("uses custom workerPort from config", async () => { + const { api, logs, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "telegram", to: "12345" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + await new Promise((resolve) => setTimeout(resolve, 200)); + + assert.ok(logs.some((l) => l.includes(`localhost:${serverPort}`))); + + await getService().stop({}); + }); + + it("logs unknown channel type", async () => { + const { api, logs, sentMessages, getService } = createMockApi({ + workerPort: serverPort, + observationFeed: { enabled: true, channel: "matrix", to: "room-id" }, + }); + claudeMemPlugin(api); + + await getService().start({}); + await new Promise((resolve) => setTimeout(resolve, 200)); + + for (const res of serverResponses) { + res.write( + `data: ${JSON.stringify({ + type: "new_observation", + observation: { id: 4, title: "Test", subtitle: null }, + timestamp: Date.now(), + })}\n\n` + ); + } + + await new Promise((resolve) => setTimeout(resolve, 200)); + assert.equal(sentMessages.length, 0); + assert.ok(logs.some((l) => l.includes("Unknown channel type: matrix"))); + + await getService().stop({}); + }); +}); diff --git a/openclaw/src/index.ts b/openclaw/src/index.ts index 544fdad3..b83d7ce0 100644 --- a/openclaw/src/index.ts +++ b/openclaw/src/index.ts @@ -16,36 +16,220 @@ interface OpenClawPluginApi { }; } +interface ObservationSSEPayload { + id: number; + memory_session_id: string; + session_id: string; + type: string; + title: string | null; + subtitle: string | null; + text: string | null; + narrative: string | null; + facts: string | null; + concepts: string | null; + files_read: string | null; + files_modified: string | null; + project: string; + prompt_number: number; + created_at_epoch: number; +} + +interface SSENewObservationEvent { + type: "new_observation"; + observation: ObservationSSEPayload; + timestamp: number; +} + +type ConnectionState = "disconnected" | "connected" | "reconnecting"; + +let sseAbortController: AbortController | null = null; +let connectionState: ConnectionState = "disconnected"; + +function formatObservationMessage(observation: ObservationSSEPayload): string { + const title = observation.title || "Untitled"; + let message = `🧠 Claude-Mem Observation\n**${title}**`; + if (observation.subtitle) { + message += `\n${observation.subtitle}`; + } + return message; +} + +async function sendToChannel( + api: OpenClawPluginApi, + channel: string, + to: string, + text: string +): Promise { + const channelSendFunctions: Record Promise> = { + telegram: api.runtime.channel.telegram.sendMessageTelegram, + discord: api.runtime.channel.discord.sendMessageDiscord, + signal: api.runtime.channel.signal.sendMessageSignal, + slack: api.runtime.channel.slack.sendMessageSlack, + whatsapp: api.runtime.channel.whatsapp.sendMessageWhatsApp, + line: api.runtime.channel.line.sendMessageLine, + }; + + const senderFunction = channelSendFunctions[channel]; + if (!senderFunction) { + api.log(`[claude-mem] Unknown channel type: ${channel}`); + return; + } + + try { + await senderFunction(to, text); + } catch (error) { + api.log(`[claude-mem] Failed to send to ${channel}: ${error}`); + } +} + +async function connectToSSEStream( + api: OpenClawPluginApi, + port: number, + channel: string, + to: string +): Promise { + let backoffMs = 1000; + const maxBackoffMs = 30000; + + while (sseAbortController && !sseAbortController.signal.aborted) { + try { + connectionState = "reconnecting"; + api.log(`[claude-mem] Connecting to SSE stream at http://localhost:${port}/stream`); + + const response = await fetch(`http://localhost:${port}/stream`, { + signal: sseAbortController.signal, + headers: { Accept: "text/event-stream" }, + }); + + if (!response.ok) { + throw new Error(`SSE stream returned HTTP ${response.status}`); + } + + if (!response.body) { + throw new Error("SSE stream response has no body"); + } + + connectionState = "connected"; + backoffMs = 1000; + api.log("[claude-mem] Connected to SSE stream"); + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + const frames = buffer.split("\n\n"); + buffer = frames.pop() || ""; + + for (const frame of frames) { + const dataLine = frame + .split("\n") + .find((line) => line.startsWith("data:")); + if (!dataLine) continue; + + const jsonStr = dataLine.slice(5).trim(); + if (!jsonStr) continue; + + try { + const parsed = JSON.parse(jsonStr); + if (parsed.type === "new_observation") { + const event = parsed as SSENewObservationEvent; + const message = formatObservationMessage(event.observation); + await sendToChannel(api, channel, to, message); + } + } catch { + // Ignore malformed JSON frames + } + } + } + } catch (error: any) { + if (sseAbortController?.signal.aborted) { + break; + } + connectionState = "reconnecting"; + api.log(`[claude-mem] SSE stream error: ${error.message ?? error}. Reconnecting in ${backoffMs / 1000}s`); + } + + if (sseAbortController?.signal.aborted) break; + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + backoffMs = Math.min(backoffMs * 2, maxBackoffMs); + } + + connectionState = "disconnected"; +} + export default function claudeMemPlugin(api: OpenClawPluginApi): void { api.registerService({ id: "claude-mem-observation-feed", - start: async (ctx) => { + start: async (_ctx) => { const config = api.getConfig(); - const feedConfig = config.observationFeed as any; + const workerPort = (config.workerPort as number) || 37777; + const feedConfig = config.observationFeed as + | { enabled?: boolean; channel?: string; to?: string } + | undefined; + if (!feedConfig?.enabled) { api.log("[claude-mem] Observation feed disabled"); return; } + + if (!feedConfig.channel || !feedConfig.to) { + api.log("[claude-mem] Observation feed misconfigured — channel or target missing"); + return; + } + api.log(`[claude-mem] Observation feed starting — channel: ${feedConfig.channel}, target: ${feedConfig.to}`); - // SSE connection logic added in Phase 2 + + sseAbortController = new AbortController(); + connectToSSEStream(api, workerPort, feedConfig.channel, feedConfig.to); + }, + stop: async (_ctx) => { + if (sseAbortController) { + sseAbortController.abort(); + sseAbortController = null; + } + connectionState = "disconnected"; + api.log("[claude-mem] Observation feed stopped — SSE connection closed"); }, - stop: async (ctx) => { - api.log("[claude-mem] Observation feed stopping"); - // SSE disconnect logic added in Phase 2 - } }); api.registerCommand({ name: "claude-mem-feed", description: "Show or toggle Claude-Mem observation feed status", - handler: async (args, ctx) => { + handler: async (args, _ctx) => { const config = api.getConfig(); - const feedConfig = config.observationFeed as any; + const feedConfig = config.observationFeed as + | { enabled?: boolean; channel?: string; to?: string } + | undefined; + if (!feedConfig) { return "Observation feed not configured. Add observationFeed to your plugin config."; } - return `Claude-Mem Observation Feed\nEnabled: ${feedConfig.enabled ? "yes" : "no"}\nChannel: ${feedConfig.channel || "not set"}\nTarget: ${feedConfig.to || "not set"}`; - } + + if (args[0] === "on") { + api.log("[claude-mem] Feed enable requested via command"); + return "Feed enable requested. Update observationFeed.enabled in your plugin config to persist."; + } + + if (args[0] === "off") { + api.log("[claude-mem] Feed disable requested via command"); + return "Feed disable requested. Update observationFeed.enabled in your plugin config to persist."; + } + + return [ + "Claude-Mem Observation Feed", + `Enabled: ${feedConfig.enabled ? "yes" : "no"}`, + `Channel: ${feedConfig.channel || "not set"}`, + `Target: ${feedConfig.to || "not set"}`, + `Connection: ${connectionState}`, + ].join("\n"); + }, }); api.log("[claude-mem] OpenClaw plugin loaded — v1.0.0"); diff --git a/openclaw/tsconfig.json b/openclaw/tsconfig.json index 129f41be..ac91678f 100644 --- a/openclaw/tsconfig.json +++ b/openclaw/tsconfig.json @@ -3,7 +3,7 @@ "target": "ES2022", "module": "ESNext", "moduleResolution": "node", - "lib": ["ES2022"], + "lib": ["ES2022", "DOM"], "outDir": "./dist", "rootDir": "./src", "strict": true,