diff --git a/openclaw/openclaw.plugin.json b/openclaw/openclaw.plugin.json index a80ee362..6ba0b5a5 100644 --- a/openclaw/openclaw.plugin.json +++ b/openclaw/openclaw.plugin.json @@ -1,7 +1,7 @@ { "id": "claude-mem", "name": "Claude-Mem (Persistent Memory)", - "description": "Official OpenClaw plugin for Claude-Mem. Persistent memory across sessions with live observation feed.", + "description": "Official OpenClaw plugin for Claude-Mem. Records observations from embedded runner sessions and streams them to messaging channels.", "kind": "memory", "version": "1.0.0", "author": "thedotmack", @@ -20,9 +20,10 @@ "default": 37777, "description": "Port for Claude-Mem worker service" }, - "workerPath": { + "project": { "type": "string", - "description": "Custom path to worker-service.cjs (auto-detected if not set)" + "default": "openclaw", + "description": "Project name for scoping observations in the memory database" }, "observationFeed": { "type": "object", diff --git a/openclaw/src/index.test.ts b/openclaw/src/index.test.ts index 5635c4dd..31dc82f5 100644 --- a/openclaw/src/index.test.ts +++ b/openclaw/src/index.test.ts @@ -1,6 +1,9 @@ import { describe, it, beforeEach, afterEach } from "node:test"; import assert from "node:assert/strict"; import { createServer, type Server, type IncomingMessage, type ServerResponse } from "node:http"; +import { mkdtemp, readFile, rm } from "fs/promises"; +import { join } from "path"; +import { tmpdir } from "os"; import claudeMemPlugin from "./index.js"; function createMockApi(pluginConfigOverride: Record = {}) { @@ -8,7 +11,8 @@ function createMockApi(pluginConfigOverride: Record = {}) { const sentMessages: Array<{ to: string; text: string; channel: string }> = []; let registeredService: any = null; - let registeredCommand: any = null; + const registeredCommands: Map = new Map(); + const eventHandlers: Map = new Map(); const api = { id: "claude-mem", @@ -27,7 +31,13 @@ function createMockApi(pluginConfigOverride: Record = {}) { registeredService = service; }, registerCommand: (command: any) => { - registeredCommand = command; + registeredCommands.set(command.name, command); + }, + on: (event: string, callback: Function) => { + if (!eventHandlers.has(event)) { + eventHandlers.set(event, []); + } + eventHandlers.get(event)!.push(callback); }, runtime: { channel: { @@ -70,19 +80,33 @@ function createMockApi(pluginConfigOverride: Record = {}) { logs, sentMessages, getService: () => registeredService, - getCommand: () => registeredCommand, + getCommand: (name?: string) => { + if (name) return registeredCommands.get(name); + return registeredCommands.get("claude-mem-feed"); + }, + getEventHandlers: (event: string) => eventHandlers.get(event) || [], + fireEvent: async (event: string, data: any, ctx: any = {}) => { + const handlers = eventHandlers.get(event) || []; + for (const handler of handlers) { + await handler(data, ctx); + } + }, }; } describe("claudeMemPlugin", () => { - it("registers service and command on load", () => { - const { api, logs, getService, getCommand } = createMockApi(); + it("registers service, commands, and event handlers on load", () => { + const { api, logs, getService, getCommand, getEventHandlers } = createMockApi(); claudeMemPlugin(api); assert.ok(getService(), "service should be registered"); assert.equal(getService().id, "claude-mem-observation-feed"); - assert.ok(getCommand(), "command should be registered"); - assert.equal(getCommand().name, "claude-mem-feed"); + assert.ok(getCommand("claude-mem-feed"), "feed command should be registered"); + assert.ok(getCommand("claude-mem-status"), "status command should be registered"); + assert.ok(getEventHandlers("before_agent_start").length > 0, "before_agent_start handler registered"); + assert.ok(getEventHandlers("tool_result_persist").length > 0, "tool_result_persist handler registered"); + assert.ok(getEventHandlers("agent_end").length > 0, "agent_end handler registered"); + assert.ok(getEventHandlers("gateway_start").length > 0, "gateway_start handler registered"); assert.ok(logs.some((l) => l.includes("plugin loaded"))); }); @@ -192,6 +216,536 @@ describe("claudeMemPlugin", () => { }); }); +describe("Observation I/O event handlers", () => { + let workerServer: Server; + let workerPort: number; + let receivedRequests: Array<{ method: string; url: string; body: any }> = []; + + function startWorkerMock(): Promise { + return new Promise((resolve) => { + workerServer = createServer((req: IncomingMessage, res: ServerResponse) => { + let body = ""; + req.on("data", (chunk) => { body += chunk.toString(); }); + req.on("end", () => { + let parsedBody: any = null; + try { parsedBody = JSON.parse(body); } catch {} + + receivedRequests.push({ + method: req.method || "GET", + url: req.url || "/", + body: parsedBody, + }); + + // Handle different endpoints + if (req.url === "/api/health") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "ok" })); + return; + } + + if (req.url === "/api/sessions/init") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false })); + return; + } + + if (req.url === "/api/sessions/observations") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "queued" })); + return; + } + + if (req.url === "/api/sessions/summarize") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "queued" })); + return; + } + + if (req.url === "/api/sessions/complete") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "completed" })); + return; + } + + if (req.url?.startsWith("/api/context/inject")) { + res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" }); + res.end("# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work"); + return; + } + + if (req.url === "/stream") { + res.writeHead(200, { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }); + return; + } + + res.writeHead(404); + res.end(); + }); + }); + workerServer.listen(0, () => { + const address = workerServer.address(); + if (address && typeof address === "object") { + resolve(address.port); + } + }); + }); + } + + beforeEach(async () => { + receivedRequests = []; + workerPort = await startWorkerMock(); + }); + + afterEach(() => { + workerServer?.close(); + }); + + it("before_agent_start sends session init to worker", async () => { + const { api, logs, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, { sessionKey: "agent-1" }); + + // Wait for HTTP request + await new Promise((resolve) => setTimeout(resolve, 100)); + + const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init"); + assert.ok(initRequest, "should send init request to worker"); + assert.equal(initRequest!.body.project, "openclaw"); + assert.ok(initRequest!.body.contentSessionId.startsWith("openclaw-agent-1-")); + assert.equal(initRequest!.body.prompt, "Help me write a function that parses JSON"); + assert.ok(logs.some((l) => l.includes("Session initialized"))); + }); + + it("before_agent_start skips short prompts", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { prompt: "hi" }, {}); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init"); + assert.ok(!initRequest, "should not send init for short prompts"); + }); + + it("tool_result_persist sends observation to worker", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + // Init session first to establish contentSessionId + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, { sessionKey: "test-agent" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Fire tool result event + await fireEvent("tool_result_persist", { + toolName: "Read", + params: { file_path: "/src/index.ts" }, + message: { + content: [{ type: "text", text: "file contents here..." }], + }, + }, { sessionKey: "test-agent" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations"); + assert.ok(obsRequest, "should send observation to worker"); + assert.equal(obsRequest!.body.tool_name, "Read"); + assert.deepEqual(obsRequest!.body.tool_input, { file_path: "/src/index.ts" }); + assert.equal(obsRequest!.body.tool_response, "file contents here..."); + assert.ok(obsRequest!.body.contentSessionId.startsWith("openclaw-test-agent-")); + }); + + it("tool_result_persist skips memory_ tools", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("tool_result_persist", { + toolName: "memory_search", + params: {}, + }, {}); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations"); + assert.ok(!obsRequest, "should skip memory_ tools"); + }); + + it("tool_result_persist truncates long responses", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + const longText = "x".repeat(2000); + await fireEvent("tool_result_persist", { + toolName: "Bash", + params: { command: "ls" }, + message: { + content: [{ type: "text", text: longText }], + }, + }, {}); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations"); + assert.ok(obsRequest, "should send observation"); + assert.equal(obsRequest!.body.tool_response.length, 1000, "should truncate to 1000 chars"); + }); + + it("agent_end sends summarize and complete to worker", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + // Init session + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, { sessionKey: "summarize-test" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Fire agent end + await fireEvent("agent_end", { + messages: [ + { role: "user", content: "help me" }, + { role: "assistant", content: "Here is the solution..." }, + ], + }, { sessionKey: "summarize-test" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const summarizeRequest = receivedRequests.find((r) => r.url === "/api/sessions/summarize"); + assert.ok(summarizeRequest, "should send summarize to worker"); + assert.equal(summarizeRequest!.body.last_assistant_message, "Here is the solution..."); + assert.ok(summarizeRequest!.body.contentSessionId.startsWith("openclaw-summarize-test-")); + + const completeRequest = receivedRequests.find((r) => r.url === "/api/sessions/complete"); + assert.ok(completeRequest, "should send complete to worker"); + assert.ok(completeRequest!.body.contentSessionId.startsWith("openclaw-summarize-test-")); + }); + + it("agent_end extracts text from array content", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, { sessionKey: "array-content" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + await fireEvent("agent_end", { + messages: [ + { + role: "assistant", + content: [ + { type: "text", text: "First part" }, + { type: "text", text: "Second part" }, + ], + }, + ], + }, { sessionKey: "array-content" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const summarizeRequest = receivedRequests.find((r) => r.url === "/api/sessions/summarize"); + assert.ok(summarizeRequest, "should send summarize"); + assert.equal(summarizeRequest!.body.last_assistant_message, "First part\nSecond part"); + }); + + it("uses custom project name from config", async () => { + const { api, fireEvent } = createMockApi({ workerPort, project: "my-project" }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, {}); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init"); + assert.ok(initRequest, "should send init"); + assert.equal(initRequest!.body.project, "my-project"); + }); + + it("claude-mem-status command reports worker health", async () => { + const { api, getCommand } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + const statusCmd = getCommand("claude-mem-status"); + assert.ok(statusCmd, "status command should exist"); + + const result = await statusCmd.handler({ args: "", channel: "telegram", isAuthorizedSender: true, commandBody: "/claude-mem-status", config: {} }); + assert.ok(result.includes("Status: ok")); + assert.ok(result.includes(`Port: ${workerPort}`)); + }); + + it("claude-mem-status reports unreachable when worker is down", async () => { + workerServer.close(); + await new Promise((resolve) => setTimeout(resolve, 100)); + + const { api, getCommand } = createMockApi({ workerPort: 59999 }); + claudeMemPlugin(api); + + const statusCmd = getCommand("claude-mem-status"); + const result = await statusCmd.handler({ args: "", channel: "telegram", isAuthorizedSender: true, commandBody: "/claude-mem-status", config: {} }); + assert.ok(result.includes("unreachable")); + }); + + it("reuses same contentSessionId for same sessionKey", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function that parses JSON", + }, { sessionKey: "reuse-test" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + await fireEvent("tool_result_persist", { + toolName: "Read", + params: { file_path: "/src/index.ts" }, + message: { content: [{ type: "text", text: "contents" }] }, + }, { sessionKey: "reuse-test" }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init"); + const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations"); + assert.ok(initRequest && obsRequest, "both requests should exist"); + assert.equal( + initRequest!.body.contentSessionId, + obsRequest!.body.contentSessionId, + "should reuse contentSessionId for same sessionKey" + ); + }); +}); + +describe("MEMORY.md context sync", () => { + let workerServer: Server; + let workerPort: number; + let receivedRequests: Array<{ method: string; url: string; body: any }> = []; + let tmpDir: string; + let contextResponse = "# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work"; + + function startWorkerMock(): Promise { + return new Promise((resolve) => { + workerServer = createServer((req: IncomingMessage, res: ServerResponse) => { + let body = ""; + req.on("data", (chunk) => { body += chunk.toString(); }); + req.on("end", () => { + let parsedBody: any = null; + try { parsedBody = JSON.parse(body); } catch {} + + receivedRequests.push({ + method: req.method || "GET", + url: req.url || "/", + body: parsedBody, + }); + + if (req.url?.startsWith("/api/context/inject")) { + res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" }); + res.end(contextResponse); + return; + } + + if (req.url === "/api/sessions/init") { + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false })); + return; + } + + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ status: "ok" })); + }); + }); + workerServer.listen(0, () => { + const address = workerServer.address(); + if (address && typeof address === "object") { + resolve(address.port); + } + }); + }); + } + + beforeEach(async () => { + receivedRequests = []; + contextResponse = "# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work"; + workerPort = await startWorkerMock(); + tmpDir = await mkdtemp(join(tmpdir(), "claude-mem-test-")); + }); + + afterEach(async () => { + workerServer?.close(); + await rm(tmpDir, { recursive: true, force: true }); + }); + + it("writes MEMORY.md to workspace on before_agent_start", async () => { + const { api, logs, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "sync-test", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject")); + assert.ok(contextRequest, "should request context from worker"); + assert.ok(contextRequest!.url!.includes("projects=openclaw")); + + const memoryContent = await readFile(join(tmpDir, "MEMORY.md"), "utf-8"); + assert.ok(memoryContent.includes("Claude-Mem Context"), "MEMORY.md should contain context"); + assert.ok(memoryContent.includes("Session 1"), "MEMORY.md should contain timeline"); + assert.ok(logs.some((l) => l.includes("MEMORY.md synced"))); + }); + + it("syncs MEMORY.md on every before_agent_start call", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "First prompt for this agent", + }, { sessionKey: "agent-a", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const firstContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(firstContextRequests.length, 1, "first call should fetch context"); + + await fireEvent("before_agent_start", { + prompt: "Second prompt for same agent", + }, { sessionKey: "agent-a", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const allContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(allContextRequests.length, 2, "should re-fetch context on every call"); + }); + + it("syncs MEMORY.md on tool_result_persist via fire-and-forget", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + // Init session to register workspace dir + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "tool-sync", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const preToolContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(preToolContextRequests.length, 1, "before_agent_start should sync once"); + + // Fire tool result — should trigger another MEMORY.md sync + await fireEvent("tool_result_persist", { + toolName: "Read", + params: { file_path: "/src/app.ts" }, + message: { content: [{ type: "text", text: "file contents" }] }, + }, { sessionKey: "tool-sync" }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const postToolContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(postToolContextRequests.length, 2, "tool_result_persist should trigger another sync"); + + const memoryContent = await readFile(join(tmpDir, "MEMORY.md"), "utf-8"); + assert.ok(memoryContent.includes("Claude-Mem Context"), "MEMORY.md should be updated"); + }); + + it("skips MEMORY.md sync when syncMemoryFile is false", async () => { + const { api, fireEvent } = createMockApi({ workerPort, syncMemoryFile: false }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "no-sync", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject")); + assert.ok(!contextRequest, "should not fetch context when sync disabled"); + }); + + it("skips MEMORY.md sync when no workspaceDir in context", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "no-workspace" }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject")); + assert.ok(!contextRequest, "should not fetch context without workspaceDir"); + }); + + it("skips writing MEMORY.md when context is empty", async () => { + contextResponse = " "; + const { api, logs, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "empty-ctx", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + assert.ok(!logs.some((l) => l.includes("MEMORY.md synced")), "should not log sync for empty context"); + }); + + it("gateway_start resets sync tracking so next agent re-syncs", async () => { + const { api, fireEvent } = createMockApi({ workerPort }); + claudeMemPlugin(api); + + // First sync + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "agent-1", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const firstContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(firstContextRequests.length, 1); + + // Gateway restart + await fireEvent("gateway_start", {}, {}); + + // Second sync after gateway restart — same workspace should re-sync + await fireEvent("before_agent_start", { + prompt: "Help me after gateway restart", + }, { sessionKey: "agent-1", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const allContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject")); + assert.equal(allContextRequests.length, 2, "should re-fetch context after gateway restart"); + }); + + it("uses custom project name in context inject URL", async () => { + const { api, fireEvent } = createMockApi({ workerPort, project: "my-bot" }); + claudeMemPlugin(api); + + await fireEvent("before_agent_start", { + prompt: "Help me write a function", + }, { sessionKey: "proj-test", workspaceDir: tmpDir }); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject")); + assert.ok(contextRequest, "should request context"); + assert.ok(contextRequest!.url!.includes("projects=my-bot"), "should use custom project name"); + }); +}); + describe("SSE stream integration", () => { let server: Server; let serverPort: number; diff --git a/openclaw/src/index.ts b/openclaw/src/index.ts index e13860ed..9cbb2b0e 100644 --- a/openclaw/src/index.ts +++ b/openclaw/src/index.ts @@ -1,3 +1,6 @@ +import { writeFile } from "fs/promises"; +import { join } from "path"; + // Minimal type declarations for the OpenClaw Plugin SDK. // These match the real OpenClawPluginApi provided by the gateway at runtime. // See: https://docs.openclaw.ai/plugin @@ -27,6 +30,34 @@ interface PluginCommandContext { type PluginCommandResult = string | { text: string } | { text: string; format?: string }; +// OpenClaw event types for agent lifecycle +interface BeforeAgentStartEvent { + prompt?: string; +} + +interface ToolResultPersistEvent { + toolName?: string; + params?: Record; + message?: { + content?: Array<{ type: string; text?: string }>; + }; +} + +interface AgentEndEvent { + messages?: Array<{ + role: string; + content: string | Array<{ type: string; text?: string }>; + }>; +} + +interface EventContext { + sessionKey?: string; + workspaceDir?: string; + agentId?: string; +} + +type EventCallback = (event: T, ctx: EventContext) => void | Promise; + interface OpenClawPluginApi { id: string; name: string; @@ -47,11 +78,19 @@ interface OpenClawPluginApi { requireAuth?: boolean; handler: (ctx: PluginCommandContext) => PluginCommandResult | Promise; }) => void; + on: ((event: "before_agent_start", callback: EventCallback) => void) & + ((event: "tool_result_persist", callback: EventCallback) => void) & + ((event: "agent_end", callback: EventCallback) => void) & + ((event: "gateway_start", callback: EventCallback>) => void); runtime: { channel: Record Promise>>; }; } +// ============================================================================ +// SSE Observation Feed Types +// ============================================================================ + interface ObservationSSEPayload { id: number; memory_session_id: string; @@ -78,7 +117,99 @@ interface SSENewObservationEvent { type ConnectionState = "disconnected" | "connected" | "reconnecting"; +// ============================================================================ +// Plugin Configuration +// ============================================================================ + +interface ClaudeMemPluginConfig { + syncMemoryFile?: boolean; + project?: string; + workerPort?: number; + observationFeed?: { + enabled?: boolean; + channel?: string; + to?: string; + }; +} + +// ============================================================================ +// Constants +// ============================================================================ + const MAX_SSE_BUFFER_SIZE = 1024 * 1024; // 1MB +const DEFAULT_WORKER_PORT = 37777; +const TOOL_RESULT_MAX_LENGTH = 1000; + +// ============================================================================ +// Worker HTTP Client +// ============================================================================ + +function workerBaseUrl(port: number): string { + return `http://127.0.0.1:${port}`; +} + +async function workerPost( + port: number, + path: string, + body: Record, + logger: PluginLogger +): Promise | null> { + try { + const response = await fetch(`${workerBaseUrl(port)}${path}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + if (!response.ok) { + logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`); + return null; + } + return (await response.json()) as Record; + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + return null; + } +} + +function workerPostFireAndForget( + port: number, + path: string, + body: Record, + logger: PluginLogger +): void { + fetch(`${workerBaseUrl(port)}${path}`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }).catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`); + }); +} + +async function workerGetText( + port: number, + path: string, + logger: PluginLogger +): Promise { + try { + const response = await fetch(`${workerBaseUrl(port)}${path}`); + if (!response.ok) { + logger.warn(`[claude-mem] Worker GET ${path} returned ${response.status}`); + return null; + } + return await response.text(); + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`); + return null; + } +} + +// ============================================================================ +// SSE Observation Feed +// ============================================================================ function formatObservationMessage(observation: ObservationSSEPayload): string { const title = observation.title || "Untitled"; @@ -205,7 +336,158 @@ async function connectToSSEStream( setConnectionState("disconnected"); } +// ============================================================================ +// Plugin Entry Point +// ============================================================================ + export default function claudeMemPlugin(api: OpenClawPluginApi): void { + const userConfig = (api.pluginConfig || {}) as ClaudeMemPluginConfig; + const workerPort = userConfig.workerPort || DEFAULT_WORKER_PORT; + const projectName = userConfig.project || "openclaw"; + + // ------------------------------------------------------------------ + // Session tracking for observation I/O + // ------------------------------------------------------------------ + const sessionIds = new Map(); + const workspaceDirsBySessionKey = new Map(); + const syncMemoryFile = userConfig.syncMemoryFile !== false; // default true + + function getContentSessionId(sessionKey?: string): string { + const key = sessionKey || "default"; + if (!sessionIds.has(key)) { + sessionIds.set(key, `openclaw-${key}-${Date.now()}`); + } + return sessionIds.get(key)!; + } + + async function syncMemoryToWorkspace(workspaceDir: string): Promise { + const contextText = await workerGetText( + workerPort, + `/api/context/inject?projects=${encodeURIComponent(projectName)}`, + api.logger + ); + if (contextText && contextText.trim().length > 0) { + try { + await writeFile(join(workspaceDir, "MEMORY.md"), contextText, "utf-8"); + api.logger.info(`[claude-mem] MEMORY.md synced to ${workspaceDir}`); + } catch (writeError: unknown) { + const msg = writeError instanceof Error ? writeError.message : String(writeError); + api.logger.warn(`[claude-mem] Failed to write MEMORY.md: ${msg}`); + } + } + } + + // ------------------------------------------------------------------ + // Event: before_agent_start — init session + sync MEMORY.md + // ------------------------------------------------------------------ + api.on("before_agent_start", async (event, ctx) => { + const contentSessionId = getContentSessionId(ctx.sessionKey); + const prompt = event.prompt || ""; + + // Track workspace dir so tool_result_persist can sync MEMORY.md later + if (ctx.workspaceDir) { + workspaceDirsBySessionKey.set(ctx.sessionKey || "default", ctx.workspaceDir); + } + + // Sync MEMORY.md before session init (provides context to agent) + if (syncMemoryFile && ctx.workspaceDir) { + await syncMemoryToWorkspace(ctx.workspaceDir); + } + + if (prompt.length < 10) return; + + await workerPost(workerPort, "/api/sessions/init", { + contentSessionId, + project: projectName, + prompt, + }, api.logger); + + api.logger.info(`[claude-mem] Session initialized: ${contentSessionId}`); + }); + + // ------------------------------------------------------------------ + // Event: tool_result_persist — record tool observations + sync MEMORY.md + // ------------------------------------------------------------------ + api.on("tool_result_persist", (event, ctx) => { + const toolName = event.toolName; + if (!toolName || toolName.startsWith("memory_")) return; + + const contentSessionId = getContentSessionId(ctx.sessionKey); + + // Extract result text from message content + let toolResponseText = ""; + const content = event.message?.content; + if (Array.isArray(content)) { + const textBlock = content.find( + (block) => block.type === "tool_result" || block.type === "text" + ); + if (textBlock && "text" in textBlock) { + toolResponseText = String(textBlock.text).slice(0, TOOL_RESULT_MAX_LENGTH); + } + } + + // Fire-and-forget: send observation + sync MEMORY.md in parallel + workerPostFireAndForget(workerPort, "/api/sessions/observations", { + contentSessionId, + tool_name: toolName, + tool_input: event.params || {}, + tool_response: toolResponseText, + cwd: "", + }, api.logger); + + const workspaceDir = ctx.workspaceDir || workspaceDirsBySessionKey.get(ctx.sessionKey || "default"); + if (syncMemoryFile && workspaceDir) { + syncMemoryToWorkspace(workspaceDir); + } + }); + + // ------------------------------------------------------------------ + // Event: agent_end — summarize and complete session + // ------------------------------------------------------------------ + api.on("agent_end", async (event, ctx) => { + const contentSessionId = getContentSessionId(ctx.sessionKey); + + // Extract last assistant message for summarization + let lastAssistantMessage = ""; + if (Array.isArray(event.messages)) { + for (let i = event.messages.length - 1; i >= 0; i--) { + const message = event.messages[i]; + if (message?.role === "assistant") { + if (typeof message.content === "string") { + lastAssistantMessage = message.content; + } else if (Array.isArray(message.content)) { + lastAssistantMessage = message.content + .filter((block) => block.type === "text") + .map((block) => block.text || "") + .join("\n"); + } + break; + } + } + } + + workerPostFireAndForget(workerPort, "/api/sessions/summarize", { + contentSessionId, + last_assistant_message: lastAssistantMessage, + }, api.logger); + + workerPostFireAndForget(workerPort, "/api/sessions/complete", { + contentSessionId, + }, api.logger); + }); + + // ------------------------------------------------------------------ + // Event: gateway_start — clear session tracking for fresh start + // ------------------------------------------------------------------ + api.on("gateway_start", async () => { + workspaceDirsBySessionKey.clear(); + sessionIds.clear(); + api.logger.info("[claude-mem] Gateway started — session tracking reset"); + }); + + // ------------------------------------------------------------------ + // Service: SSE observation feed → messaging channels + // ------------------------------------------------------------------ let sseAbortController: AbortController | null = null; let connectionState: ConnectionState = "disconnected"; let connectionPromise: Promise | null = null; @@ -213,7 +495,6 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { api.registerService({ id: "claude-mem-observation-feed", start: async (_ctx) => { - // Abort any existing connection before starting a new one if (sseAbortController) { sseAbortController.abort(); if (connectionPromise) { @@ -222,11 +503,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { } } - const config = api.pluginConfig || {}; - const workerPort = (config.workerPort as number) || 37777; - const feedConfig = config.observationFeed as - | { enabled?: boolean; channel?: string; to?: string } - | undefined; + const feedConfig = userConfig.observationFeed; if (!feedConfig?.enabled) { api.logger.info("[claude-mem] Observation feed disabled"); @@ -264,15 +541,15 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { }, }); + // ------------------------------------------------------------------ + // Command: /claude-mem-feed — status & toggle + // ------------------------------------------------------------------ api.registerCommand({ name: "claude-mem-feed", description: "Show or toggle Claude-Mem observation feed status", acceptsArgs: true, handler: async (ctx) => { - const config = api.pluginConfig || {}; - const feedConfig = config.observationFeed as - | { enabled?: boolean; channel?: string; to?: string } - | undefined; + const feedConfig = userConfig.observationFeed; if (!feedConfig) { return "Observation feed not configured. Add observationFeed to your plugin config."; @@ -300,5 +577,32 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { }, }); - api.logger.info("[claude-mem] OpenClaw plugin loaded — v1.0.0"); + // ------------------------------------------------------------------ + // Command: /claude-mem-status — worker health check + // ------------------------------------------------------------------ + api.registerCommand({ + name: "claude-mem-status", + description: "Check Claude-Mem worker health and session status", + handler: async () => { + const healthText = await workerGetText(workerPort, "/api/health", api.logger); + if (!healthText) { + return `Claude-Mem worker unreachable at port ${workerPort}`; + } + + try { + const health = JSON.parse(healthText); + return [ + "Claude-Mem Worker Status", + `Status: ${health.status || "unknown"}`, + `Port: ${workerPort}`, + `Active sessions: ${sessionIds.size}`, + `Observation feed: ${connectionState}`, + ].join("\n"); + } catch { + return `Claude-Mem worker responded but returned unexpected data`; + } + }, + }); + + api.logger.info(`[claude-mem] OpenClaw plugin loaded — v1.0.0 (worker: 127.0.0.1:${workerPort})`); } diff --git a/openclaw/test-sse-consumer.js b/openclaw/test-sse-consumer.js index 5af5f487..eb1e88ec 100644 --- a/openclaw/test-sse-consumer.js +++ b/openclaw/test-sse-consumer.js @@ -8,7 +8,8 @@ import claudeMemPlugin from "./dist/index.js"; let registeredService = null; -let registeredCommand = null; +const registeredCommands = new Map(); +const eventHandlers = new Map(); const logs = []; const mockApi = { @@ -28,7 +29,13 @@ const mockApi = { registeredService = service; }, registerCommand: (command) => { - registeredCommand = command; + registeredCommands.set(command.name, command); + }, + on: (event, callback) => { + if (!eventHandlers.has(event)) { + eventHandlers.set(event, []); + } + eventHandlers.get(event).push(callback); }, runtime: { channel: { @@ -45,7 +52,7 @@ const mockApi = { // Call the default export with mock API claudeMemPlugin(mockApi); -// Verify service registration +// Verify registration let failures = 0; if (!registeredService) { @@ -60,18 +67,30 @@ if (!registeredService) { console.log("OK: Service registered with id 'claude-mem-observation-feed'"); } -if (!registeredCommand) { - console.error("FAIL: No command was registered"); - failures++; -} else if (registeredCommand.name !== "claude-mem-feed") { - console.error( - `FAIL: Command name is "${registeredCommand.name}", expected "claude-mem-feed"` - ); +if (!registeredCommands.has("claude-mem-feed")) { + console.error("FAIL: No 'claude-mem-feed' command registered"); failures++; } else { console.log("OK: Command registered with name 'claude-mem-feed'"); } +if (!registeredCommands.has("claude-mem-status")) { + console.error("FAIL: No 'claude-mem-status' command registered"); + failures++; +} else { + console.log("OK: Command registered with name 'claude-mem-status'"); +} + +const expectedEvents = ["before_agent_start", "tool_result_persist", "agent_end", "gateway_start"]; +for (const event of expectedEvents) { + if (!eventHandlers.has(event) || eventHandlers.get(event).length === 0) { + console.error(`FAIL: No handler registered for '${event}'`); + failures++; + } else { + console.log(`OK: Event handler registered for '${event}'`); + } +} + if (!logs.some((l) => l.includes("plugin loaded"))) { console.error("FAIL: Plugin did not log a load message"); failures++; @@ -83,5 +102,5 @@ if (failures > 0) { console.error(`\nFAIL: ${failures} check(s) failed`); process.exit(1); } else { - console.log("\nPASS: Plugin registers service and command correctly"); + console.log("\nPASS: Plugin registers service, commands, and event handlers correctly"); }