MAESTRO: Add observation I/O, MEMORY.md live sync, and gateway lifecycle support
Merge crab-mem observation recording with existing SSE broadcasting to create a complete OpenClaw plugin. Records observations from embedded runner sessions via worker HTTP API, and continuously syncs MEMORY.md to agent workspaces so agents always have fresh context. - Add event handlers: before_agent_start, tool_result_persist, agent_end, gateway_start - Add MEMORY.md live sync on every agent start and tool use (fire-and-forget) - Add worker HTTP client (POST, fire-and-forget POST, GET text) - Add /claude-mem-status health check command - Add workspace dir tracking across session events - Expand test suite from 17 to 36 tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+561
-7
@@ -1,6 +1,9 @@
|
||||
import { describe, it, beforeEach, afterEach } from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { createServer, type Server, type IncomingMessage, type ServerResponse } from "node:http";
|
||||
import { mkdtemp, readFile, rm } from "fs/promises";
|
||||
import { join } from "path";
|
||||
import { tmpdir } from "os";
|
||||
import claudeMemPlugin from "./index.js";
|
||||
|
||||
function createMockApi(pluginConfigOverride: Record<string, any> = {}) {
|
||||
@@ -8,7 +11,8 @@ function createMockApi(pluginConfigOverride: Record<string, any> = {}) {
|
||||
const sentMessages: Array<{ to: string; text: string; channel: string }> = [];
|
||||
|
||||
let registeredService: any = null;
|
||||
let registeredCommand: any = null;
|
||||
const registeredCommands: Map<string, any> = new Map();
|
||||
const eventHandlers: Map<string, Function[]> = new Map();
|
||||
|
||||
const api = {
|
||||
id: "claude-mem",
|
||||
@@ -27,7 +31,13 @@ function createMockApi(pluginConfigOverride: Record<string, any> = {}) {
|
||||
registeredService = service;
|
||||
},
|
||||
registerCommand: (command: any) => {
|
||||
registeredCommand = command;
|
||||
registeredCommands.set(command.name, command);
|
||||
},
|
||||
on: (event: string, callback: Function) => {
|
||||
if (!eventHandlers.has(event)) {
|
||||
eventHandlers.set(event, []);
|
||||
}
|
||||
eventHandlers.get(event)!.push(callback);
|
||||
},
|
||||
runtime: {
|
||||
channel: {
|
||||
@@ -70,19 +80,33 @@ function createMockApi(pluginConfigOverride: Record<string, any> = {}) {
|
||||
logs,
|
||||
sentMessages,
|
||||
getService: () => registeredService,
|
||||
getCommand: () => registeredCommand,
|
||||
getCommand: (name?: string) => {
|
||||
if (name) return registeredCommands.get(name);
|
||||
return registeredCommands.get("claude-mem-feed");
|
||||
},
|
||||
getEventHandlers: (event: string) => eventHandlers.get(event) || [],
|
||||
fireEvent: async (event: string, data: any, ctx: any = {}) => {
|
||||
const handlers = eventHandlers.get(event) || [];
|
||||
for (const handler of handlers) {
|
||||
await handler(data, ctx);
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe("claudeMemPlugin", () => {
|
||||
it("registers service and command on load", () => {
|
||||
const { api, logs, getService, getCommand } = createMockApi();
|
||||
it("registers service, commands, and event handlers on load", () => {
|
||||
const { api, logs, getService, getCommand, getEventHandlers } = createMockApi();
|
||||
claudeMemPlugin(api);
|
||||
|
||||
assert.ok(getService(), "service should be registered");
|
||||
assert.equal(getService().id, "claude-mem-observation-feed");
|
||||
assert.ok(getCommand(), "command should be registered");
|
||||
assert.equal(getCommand().name, "claude-mem-feed");
|
||||
assert.ok(getCommand("claude-mem-feed"), "feed command should be registered");
|
||||
assert.ok(getCommand("claude-mem-status"), "status command should be registered");
|
||||
assert.ok(getEventHandlers("before_agent_start").length > 0, "before_agent_start handler registered");
|
||||
assert.ok(getEventHandlers("tool_result_persist").length > 0, "tool_result_persist handler registered");
|
||||
assert.ok(getEventHandlers("agent_end").length > 0, "agent_end handler registered");
|
||||
assert.ok(getEventHandlers("gateway_start").length > 0, "gateway_start handler registered");
|
||||
assert.ok(logs.some((l) => l.includes("plugin loaded")));
|
||||
});
|
||||
|
||||
@@ -192,6 +216,536 @@ describe("claudeMemPlugin", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("Observation I/O event handlers", () => {
|
||||
let workerServer: Server;
|
||||
let workerPort: number;
|
||||
let receivedRequests: Array<{ method: string; url: string; body: any }> = [];
|
||||
|
||||
function startWorkerMock(): Promise<number> {
|
||||
return new Promise((resolve) => {
|
||||
workerServer = createServer((req: IncomingMessage, res: ServerResponse) => {
|
||||
let body = "";
|
||||
req.on("data", (chunk) => { body += chunk.toString(); });
|
||||
req.on("end", () => {
|
||||
let parsedBody: any = null;
|
||||
try { parsedBody = JSON.parse(body); } catch {}
|
||||
|
||||
receivedRequests.push({
|
||||
method: req.method || "GET",
|
||||
url: req.url || "/",
|
||||
body: parsedBody,
|
||||
});
|
||||
|
||||
// Handle different endpoints
|
||||
if (req.url === "/api/health") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ status: "ok" }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/api/sessions/init") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/api/sessions/observations") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ status: "queued" }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/api/sessions/summarize") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ status: "queued" }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/api/sessions/complete") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ status: "completed" }));
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url?.startsWith("/api/context/inject")) {
|
||||
res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end("# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work");
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/stream") {
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
Connection: "keep-alive",
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(404);
|
||||
res.end();
|
||||
});
|
||||
});
|
||||
workerServer.listen(0, () => {
|
||||
const address = workerServer.address();
|
||||
if (address && typeof address === "object") {
|
||||
resolve(address.port);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
receivedRequests = [];
|
||||
workerPort = await startWorkerMock();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
workerServer?.close();
|
||||
});
|
||||
|
||||
it("before_agent_start sends session init to worker", async () => {
|
||||
const { api, logs, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, { sessionKey: "agent-1" });
|
||||
|
||||
// Wait for HTTP request
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init");
|
||||
assert.ok(initRequest, "should send init request to worker");
|
||||
assert.equal(initRequest!.body.project, "openclaw");
|
||||
assert.ok(initRequest!.body.contentSessionId.startsWith("openclaw-agent-1-"));
|
||||
assert.equal(initRequest!.body.prompt, "Help me write a function that parses JSON");
|
||||
assert.ok(logs.some((l) => l.includes("Session initialized")));
|
||||
});
|
||||
|
||||
it("before_agent_start skips short prompts", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", { prompt: "hi" }, {});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init");
|
||||
assert.ok(!initRequest, "should not send init for short prompts");
|
||||
});
|
||||
|
||||
it("tool_result_persist sends observation to worker", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
// Init session first to establish contentSessionId
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, { sessionKey: "test-agent" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// Fire tool result event
|
||||
await fireEvent("tool_result_persist", {
|
||||
toolName: "Read",
|
||||
params: { file_path: "/src/index.ts" },
|
||||
message: {
|
||||
content: [{ type: "text", text: "file contents here..." }],
|
||||
},
|
||||
}, { sessionKey: "test-agent" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations");
|
||||
assert.ok(obsRequest, "should send observation to worker");
|
||||
assert.equal(obsRequest!.body.tool_name, "Read");
|
||||
assert.deepEqual(obsRequest!.body.tool_input, { file_path: "/src/index.ts" });
|
||||
assert.equal(obsRequest!.body.tool_response, "file contents here...");
|
||||
assert.ok(obsRequest!.body.contentSessionId.startsWith("openclaw-test-agent-"));
|
||||
});
|
||||
|
||||
it("tool_result_persist skips memory_ tools", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("tool_result_persist", {
|
||||
toolName: "memory_search",
|
||||
params: {},
|
||||
}, {});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations");
|
||||
assert.ok(!obsRequest, "should skip memory_ tools");
|
||||
});
|
||||
|
||||
it("tool_result_persist truncates long responses", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
const longText = "x".repeat(2000);
|
||||
await fireEvent("tool_result_persist", {
|
||||
toolName: "Bash",
|
||||
params: { command: "ls" },
|
||||
message: {
|
||||
content: [{ type: "text", text: longText }],
|
||||
},
|
||||
}, {});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations");
|
||||
assert.ok(obsRequest, "should send observation");
|
||||
assert.equal(obsRequest!.body.tool_response.length, 1000, "should truncate to 1000 chars");
|
||||
});
|
||||
|
||||
it("agent_end sends summarize and complete to worker", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
// Init session
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, { sessionKey: "summarize-test" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
// Fire agent end
|
||||
await fireEvent("agent_end", {
|
||||
messages: [
|
||||
{ role: "user", content: "help me" },
|
||||
{ role: "assistant", content: "Here is the solution..." },
|
||||
],
|
||||
}, { sessionKey: "summarize-test" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const summarizeRequest = receivedRequests.find((r) => r.url === "/api/sessions/summarize");
|
||||
assert.ok(summarizeRequest, "should send summarize to worker");
|
||||
assert.equal(summarizeRequest!.body.last_assistant_message, "Here is the solution...");
|
||||
assert.ok(summarizeRequest!.body.contentSessionId.startsWith("openclaw-summarize-test-"));
|
||||
|
||||
const completeRequest = receivedRequests.find((r) => r.url === "/api/sessions/complete");
|
||||
assert.ok(completeRequest, "should send complete to worker");
|
||||
assert.ok(completeRequest!.body.contentSessionId.startsWith("openclaw-summarize-test-"));
|
||||
});
|
||||
|
||||
it("agent_end extracts text from array content", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, { sessionKey: "array-content" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
await fireEvent("agent_end", {
|
||||
messages: [
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{ type: "text", text: "First part" },
|
||||
{ type: "text", text: "Second part" },
|
||||
],
|
||||
},
|
||||
],
|
||||
}, { sessionKey: "array-content" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const summarizeRequest = receivedRequests.find((r) => r.url === "/api/sessions/summarize");
|
||||
assert.ok(summarizeRequest, "should send summarize");
|
||||
assert.equal(summarizeRequest!.body.last_assistant_message, "First part\nSecond part");
|
||||
});
|
||||
|
||||
it("uses custom project name from config", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort, project: "my-project" });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, {});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init");
|
||||
assert.ok(initRequest, "should send init");
|
||||
assert.equal(initRequest!.body.project, "my-project");
|
||||
});
|
||||
|
||||
it("claude-mem-status command reports worker health", async () => {
|
||||
const { api, getCommand } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
const statusCmd = getCommand("claude-mem-status");
|
||||
assert.ok(statusCmd, "status command should exist");
|
||||
|
||||
const result = await statusCmd.handler({ args: "", channel: "telegram", isAuthorizedSender: true, commandBody: "/claude-mem-status", config: {} });
|
||||
assert.ok(result.includes("Status: ok"));
|
||||
assert.ok(result.includes(`Port: ${workerPort}`));
|
||||
});
|
||||
|
||||
it("claude-mem-status reports unreachable when worker is down", async () => {
|
||||
workerServer.close();
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const { api, getCommand } = createMockApi({ workerPort: 59999 });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
const statusCmd = getCommand("claude-mem-status");
|
||||
const result = await statusCmd.handler({ args: "", channel: "telegram", isAuthorizedSender: true, commandBody: "/claude-mem-status", config: {} });
|
||||
assert.ok(result.includes("unreachable"));
|
||||
});
|
||||
|
||||
it("reuses same contentSessionId for same sessionKey", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function that parses JSON",
|
||||
}, { sessionKey: "reuse-test" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
await fireEvent("tool_result_persist", {
|
||||
toolName: "Read",
|
||||
params: { file_path: "/src/index.ts" },
|
||||
message: { content: [{ type: "text", text: "contents" }] },
|
||||
}, { sessionKey: "reuse-test" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
const initRequest = receivedRequests.find((r) => r.url === "/api/sessions/init");
|
||||
const obsRequest = receivedRequests.find((r) => r.url === "/api/sessions/observations");
|
||||
assert.ok(initRequest && obsRequest, "both requests should exist");
|
||||
assert.equal(
|
||||
initRequest!.body.contentSessionId,
|
||||
obsRequest!.body.contentSessionId,
|
||||
"should reuse contentSessionId for same sessionKey"
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("MEMORY.md context sync", () => {
|
||||
let workerServer: Server;
|
||||
let workerPort: number;
|
||||
let receivedRequests: Array<{ method: string; url: string; body: any }> = [];
|
||||
let tmpDir: string;
|
||||
let contextResponse = "# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work";
|
||||
|
||||
function startWorkerMock(): Promise<number> {
|
||||
return new Promise((resolve) => {
|
||||
workerServer = createServer((req: IncomingMessage, res: ServerResponse) => {
|
||||
let body = "";
|
||||
req.on("data", (chunk) => { body += chunk.toString(); });
|
||||
req.on("end", () => {
|
||||
let parsedBody: any = null;
|
||||
try { parsedBody = JSON.parse(body); } catch {}
|
||||
|
||||
receivedRequests.push({
|
||||
method: req.method || "GET",
|
||||
url: req.url || "/",
|
||||
body: parsedBody,
|
||||
});
|
||||
|
||||
if (req.url?.startsWith("/api/context/inject")) {
|
||||
res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end(contextResponse);
|
||||
return;
|
||||
}
|
||||
|
||||
if (req.url === "/api/sessions/init") {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ sessionDbId: 1, promptNumber: 1, skipped: false }));
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ status: "ok" }));
|
||||
});
|
||||
});
|
||||
workerServer.listen(0, () => {
|
||||
const address = workerServer.address();
|
||||
if (address && typeof address === "object") {
|
||||
resolve(address.port);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
receivedRequests = [];
|
||||
contextResponse = "# Claude-Mem Context\n\n## Timeline\n- Session 1: Did some work";
|
||||
workerPort = await startWorkerMock();
|
||||
tmpDir = await mkdtemp(join(tmpdir(), "claude-mem-test-"));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
workerServer?.close();
|
||||
await rm(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("writes MEMORY.md to workspace on before_agent_start", async () => {
|
||||
const { api, logs, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "sync-test", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.ok(contextRequest, "should request context from worker");
|
||||
assert.ok(contextRequest!.url!.includes("projects=openclaw"));
|
||||
|
||||
const memoryContent = await readFile(join(tmpDir, "MEMORY.md"), "utf-8");
|
||||
assert.ok(memoryContent.includes("Claude-Mem Context"), "MEMORY.md should contain context");
|
||||
assert.ok(memoryContent.includes("Session 1"), "MEMORY.md should contain timeline");
|
||||
assert.ok(logs.some((l) => l.includes("MEMORY.md synced")));
|
||||
});
|
||||
|
||||
it("syncs MEMORY.md on every before_agent_start call", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "First prompt for this agent",
|
||||
}, { sessionKey: "agent-a", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const firstContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(firstContextRequests.length, 1, "first call should fetch context");
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Second prompt for same agent",
|
||||
}, { sessionKey: "agent-a", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const allContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(allContextRequests.length, 2, "should re-fetch context on every call");
|
||||
});
|
||||
|
||||
it("syncs MEMORY.md on tool_result_persist via fire-and-forget", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
// Init session to register workspace dir
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "tool-sync", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const preToolContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(preToolContextRequests.length, 1, "before_agent_start should sync once");
|
||||
|
||||
// Fire tool result — should trigger another MEMORY.md sync
|
||||
await fireEvent("tool_result_persist", {
|
||||
toolName: "Read",
|
||||
params: { file_path: "/src/app.ts" },
|
||||
message: { content: [{ type: "text", text: "file contents" }] },
|
||||
}, { sessionKey: "tool-sync" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const postToolContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(postToolContextRequests.length, 2, "tool_result_persist should trigger another sync");
|
||||
|
||||
const memoryContent = await readFile(join(tmpDir, "MEMORY.md"), "utf-8");
|
||||
assert.ok(memoryContent.includes("Claude-Mem Context"), "MEMORY.md should be updated");
|
||||
});
|
||||
|
||||
it("skips MEMORY.md sync when syncMemoryFile is false", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort, syncMemoryFile: false });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "no-sync", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.ok(!contextRequest, "should not fetch context when sync disabled");
|
||||
});
|
||||
|
||||
it("skips MEMORY.md sync when no workspaceDir in context", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "no-workspace" });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.ok(!contextRequest, "should not fetch context without workspaceDir");
|
||||
});
|
||||
|
||||
it("skips writing MEMORY.md when context is empty", async () => {
|
||||
contextResponse = " ";
|
||||
const { api, logs, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "empty-ctx", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
assert.ok(!logs.some((l) => l.includes("MEMORY.md synced")), "should not log sync for empty context");
|
||||
});
|
||||
|
||||
it("gateway_start resets sync tracking so next agent re-syncs", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
// First sync
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "agent-1", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const firstContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(firstContextRequests.length, 1);
|
||||
|
||||
// Gateway restart
|
||||
await fireEvent("gateway_start", {}, {});
|
||||
|
||||
// Second sync after gateway restart — same workspace should re-sync
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me after gateway restart",
|
||||
}, { sessionKey: "agent-1", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const allContextRequests = receivedRequests.filter((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.equal(allContextRequests.length, 2, "should re-fetch context after gateway restart");
|
||||
});
|
||||
|
||||
it("uses custom project name in context inject URL", async () => {
|
||||
const { api, fireEvent } = createMockApi({ workerPort, project: "my-bot" });
|
||||
claudeMemPlugin(api);
|
||||
|
||||
await fireEvent("before_agent_start", {
|
||||
prompt: "Help me write a function",
|
||||
}, { sessionKey: "proj-test", workspaceDir: tmpDir });
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
const contextRequest = receivedRequests.find((r) => r.url?.startsWith("/api/context/inject"));
|
||||
assert.ok(contextRequest, "should request context");
|
||||
assert.ok(contextRequest!.url!.includes("projects=my-bot"), "should use custom project name");
|
||||
});
|
||||
});
|
||||
|
||||
describe("SSE stream integration", () => {
|
||||
let server: Server;
|
||||
let serverPort: number;
|
||||
|
||||
+315
-11
@@ -1,3 +1,6 @@
|
||||
import { writeFile } from "fs/promises";
|
||||
import { join } from "path";
|
||||
|
||||
// Minimal type declarations for the OpenClaw Plugin SDK.
|
||||
// These match the real OpenClawPluginApi provided by the gateway at runtime.
|
||||
// See: https://docs.openclaw.ai/plugin
|
||||
@@ -27,6 +30,34 @@ interface PluginCommandContext {
|
||||
|
||||
type PluginCommandResult = string | { text: string } | { text: string; format?: string };
|
||||
|
||||
// OpenClaw event types for agent lifecycle
|
||||
interface BeforeAgentStartEvent {
|
||||
prompt?: string;
|
||||
}
|
||||
|
||||
interface ToolResultPersistEvent {
|
||||
toolName?: string;
|
||||
params?: Record<string, unknown>;
|
||||
message?: {
|
||||
content?: Array<{ type: string; text?: string }>;
|
||||
};
|
||||
}
|
||||
|
||||
interface AgentEndEvent {
|
||||
messages?: Array<{
|
||||
role: string;
|
||||
content: string | Array<{ type: string; text?: string }>;
|
||||
}>;
|
||||
}
|
||||
|
||||
interface EventContext {
|
||||
sessionKey?: string;
|
||||
workspaceDir?: string;
|
||||
agentId?: string;
|
||||
}
|
||||
|
||||
type EventCallback<T> = (event: T, ctx: EventContext) => void | Promise<void>;
|
||||
|
||||
interface OpenClawPluginApi {
|
||||
id: string;
|
||||
name: string;
|
||||
@@ -47,11 +78,19 @@ interface OpenClawPluginApi {
|
||||
requireAuth?: boolean;
|
||||
handler: (ctx: PluginCommandContext) => PluginCommandResult | Promise<PluginCommandResult>;
|
||||
}) => void;
|
||||
on: ((event: "before_agent_start", callback: EventCallback<BeforeAgentStartEvent>) => void) &
|
||||
((event: "tool_result_persist", callback: EventCallback<ToolResultPersistEvent>) => void) &
|
||||
((event: "agent_end", callback: EventCallback<AgentEndEvent>) => void) &
|
||||
((event: "gateway_start", callback: EventCallback<Record<string, never>>) => void);
|
||||
runtime: {
|
||||
channel: Record<string, Record<string, (...args: any[]) => Promise<any>>>;
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE Observation Feed Types
|
||||
// ============================================================================
|
||||
|
||||
interface ObservationSSEPayload {
|
||||
id: number;
|
||||
memory_session_id: string;
|
||||
@@ -78,7 +117,99 @@ interface SSENewObservationEvent {
|
||||
|
||||
type ConnectionState = "disconnected" | "connected" | "reconnecting";
|
||||
|
||||
// ============================================================================
|
||||
// Plugin Configuration
|
||||
// ============================================================================
|
||||
|
||||
interface ClaudeMemPluginConfig {
|
||||
syncMemoryFile?: boolean;
|
||||
project?: string;
|
||||
workerPort?: number;
|
||||
observationFeed?: {
|
||||
enabled?: boolean;
|
||||
channel?: string;
|
||||
to?: string;
|
||||
};
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Constants
|
||||
// ============================================================================
|
||||
|
||||
const MAX_SSE_BUFFER_SIZE = 1024 * 1024; // 1MB
|
||||
const DEFAULT_WORKER_PORT = 37777;
|
||||
const TOOL_RESULT_MAX_LENGTH = 1000;
|
||||
|
||||
// ============================================================================
|
||||
// Worker HTTP Client
|
||||
// ============================================================================
|
||||
|
||||
function workerBaseUrl(port: number): string {
|
||||
return `http://127.0.0.1:${port}`;
|
||||
}
|
||||
|
||||
async function workerPost(
|
||||
port: number,
|
||||
path: string,
|
||||
body: Record<string, unknown>,
|
||||
logger: PluginLogger
|
||||
): Promise<Record<string, unknown> | null> {
|
||||
try {
|
||||
const response = await fetch(`${workerBaseUrl(port)}${path}`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(body),
|
||||
});
|
||||
if (!response.ok) {
|
||||
logger.warn(`[claude-mem] Worker POST ${path} returned ${response.status}`);
|
||||
return null;
|
||||
}
|
||||
return (await response.json()) as Record<string, unknown>;
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function workerPostFireAndForget(
|
||||
port: number,
|
||||
path: string,
|
||||
body: Record<string, unknown>,
|
||||
logger: PluginLogger
|
||||
): void {
|
||||
fetch(`${workerBaseUrl(port)}${path}`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(body),
|
||||
}).catch((error: unknown) => {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
logger.warn(`[claude-mem] Worker POST ${path} failed: ${message}`);
|
||||
});
|
||||
}
|
||||
|
||||
async function workerGetText(
|
||||
port: number,
|
||||
path: string,
|
||||
logger: PluginLogger
|
||||
): Promise<string | null> {
|
||||
try {
|
||||
const response = await fetch(`${workerBaseUrl(port)}${path}`);
|
||||
if (!response.ok) {
|
||||
logger.warn(`[claude-mem] Worker GET ${path} returned ${response.status}`);
|
||||
return null;
|
||||
}
|
||||
return await response.text();
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
logger.warn(`[claude-mem] Worker GET ${path} failed: ${message}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// SSE Observation Feed
|
||||
// ============================================================================
|
||||
|
||||
function formatObservationMessage(observation: ObservationSSEPayload): string {
|
||||
const title = observation.title || "Untitled";
|
||||
@@ -205,7 +336,158 @@ async function connectToSSEStream(
|
||||
setConnectionState("disconnected");
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Plugin Entry Point
|
||||
// ============================================================================
|
||||
|
||||
export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
const userConfig = (api.pluginConfig || {}) as ClaudeMemPluginConfig;
|
||||
const workerPort = userConfig.workerPort || DEFAULT_WORKER_PORT;
|
||||
const projectName = userConfig.project || "openclaw";
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Session tracking for observation I/O
|
||||
// ------------------------------------------------------------------
|
||||
const sessionIds = new Map<string, string>();
|
||||
const workspaceDirsBySessionKey = new Map<string, string>();
|
||||
const syncMemoryFile = userConfig.syncMemoryFile !== false; // default true
|
||||
|
||||
function getContentSessionId(sessionKey?: string): string {
|
||||
const key = sessionKey || "default";
|
||||
if (!sessionIds.has(key)) {
|
||||
sessionIds.set(key, `openclaw-${key}-${Date.now()}`);
|
||||
}
|
||||
return sessionIds.get(key)!;
|
||||
}
|
||||
|
||||
async function syncMemoryToWorkspace(workspaceDir: string): Promise<void> {
|
||||
const contextText = await workerGetText(
|
||||
workerPort,
|
||||
`/api/context/inject?projects=${encodeURIComponent(projectName)}`,
|
||||
api.logger
|
||||
);
|
||||
if (contextText && contextText.trim().length > 0) {
|
||||
try {
|
||||
await writeFile(join(workspaceDir, "MEMORY.md"), contextText, "utf-8");
|
||||
api.logger.info(`[claude-mem] MEMORY.md synced to ${workspaceDir}`);
|
||||
} catch (writeError: unknown) {
|
||||
const msg = writeError instanceof Error ? writeError.message : String(writeError);
|
||||
api.logger.warn(`[claude-mem] Failed to write MEMORY.md: ${msg}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: before_agent_start — init session + sync MEMORY.md
|
||||
// ------------------------------------------------------------------
|
||||
api.on("before_agent_start", async (event, ctx) => {
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
const prompt = event.prompt || "";
|
||||
|
||||
// Track workspace dir so tool_result_persist can sync MEMORY.md later
|
||||
if (ctx.workspaceDir) {
|
||||
workspaceDirsBySessionKey.set(ctx.sessionKey || "default", ctx.workspaceDir);
|
||||
}
|
||||
|
||||
// Sync MEMORY.md before session init (provides context to agent)
|
||||
if (syncMemoryFile && ctx.workspaceDir) {
|
||||
await syncMemoryToWorkspace(ctx.workspaceDir);
|
||||
}
|
||||
|
||||
if (prompt.length < 10) return;
|
||||
|
||||
await workerPost(workerPort, "/api/sessions/init", {
|
||||
contentSessionId,
|
||||
project: projectName,
|
||||
prompt,
|
||||
}, api.logger);
|
||||
|
||||
api.logger.info(`[claude-mem] Session initialized: ${contentSessionId}`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: tool_result_persist — record tool observations + sync MEMORY.md
|
||||
// ------------------------------------------------------------------
|
||||
api.on("tool_result_persist", (event, ctx) => {
|
||||
const toolName = event.toolName;
|
||||
if (!toolName || toolName.startsWith("memory_")) return;
|
||||
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
|
||||
// Extract result text from message content
|
||||
let toolResponseText = "";
|
||||
const content = event.message?.content;
|
||||
if (Array.isArray(content)) {
|
||||
const textBlock = content.find(
|
||||
(block) => block.type === "tool_result" || block.type === "text"
|
||||
);
|
||||
if (textBlock && "text" in textBlock) {
|
||||
toolResponseText = String(textBlock.text).slice(0, TOOL_RESULT_MAX_LENGTH);
|
||||
}
|
||||
}
|
||||
|
||||
// Fire-and-forget: send observation + sync MEMORY.md in parallel
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/observations", {
|
||||
contentSessionId,
|
||||
tool_name: toolName,
|
||||
tool_input: event.params || {},
|
||||
tool_response: toolResponseText,
|
||||
cwd: "",
|
||||
}, api.logger);
|
||||
|
||||
const workspaceDir = ctx.workspaceDir || workspaceDirsBySessionKey.get(ctx.sessionKey || "default");
|
||||
if (syncMemoryFile && workspaceDir) {
|
||||
syncMemoryToWorkspace(workspaceDir);
|
||||
}
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: agent_end — summarize and complete session
|
||||
// ------------------------------------------------------------------
|
||||
api.on("agent_end", async (event, ctx) => {
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
|
||||
// Extract last assistant message for summarization
|
||||
let lastAssistantMessage = "";
|
||||
if (Array.isArray(event.messages)) {
|
||||
for (let i = event.messages.length - 1; i >= 0; i--) {
|
||||
const message = event.messages[i];
|
||||
if (message?.role === "assistant") {
|
||||
if (typeof message.content === "string") {
|
||||
lastAssistantMessage = message.content;
|
||||
} else if (Array.isArray(message.content)) {
|
||||
lastAssistantMessage = message.content
|
||||
.filter((block) => block.type === "text")
|
||||
.map((block) => block.text || "")
|
||||
.join("\n");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/summarize", {
|
||||
contentSessionId,
|
||||
last_assistant_message: lastAssistantMessage,
|
||||
}, api.logger);
|
||||
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/complete", {
|
||||
contentSessionId,
|
||||
}, api.logger);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: gateway_start — clear session tracking for fresh start
|
||||
// ------------------------------------------------------------------
|
||||
api.on("gateway_start", async () => {
|
||||
workspaceDirsBySessionKey.clear();
|
||||
sessionIds.clear();
|
||||
api.logger.info("[claude-mem] Gateway started — session tracking reset");
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Service: SSE observation feed → messaging channels
|
||||
// ------------------------------------------------------------------
|
||||
let sseAbortController: AbortController | null = null;
|
||||
let connectionState: ConnectionState = "disconnected";
|
||||
let connectionPromise: Promise<void> | null = null;
|
||||
@@ -213,7 +495,6 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
api.registerService({
|
||||
id: "claude-mem-observation-feed",
|
||||
start: async (_ctx) => {
|
||||
// Abort any existing connection before starting a new one
|
||||
if (sseAbortController) {
|
||||
sseAbortController.abort();
|
||||
if (connectionPromise) {
|
||||
@@ -222,11 +503,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
}
|
||||
}
|
||||
|
||||
const config = api.pluginConfig || {};
|
||||
const workerPort = (config.workerPort as number) || 37777;
|
||||
const feedConfig = config.observationFeed as
|
||||
| { enabled?: boolean; channel?: string; to?: string }
|
||||
| undefined;
|
||||
const feedConfig = userConfig.observationFeed;
|
||||
|
||||
if (!feedConfig?.enabled) {
|
||||
api.logger.info("[claude-mem] Observation feed disabled");
|
||||
@@ -264,15 +541,15 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
},
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Command: /claude-mem-feed — status & toggle
|
||||
// ------------------------------------------------------------------
|
||||
api.registerCommand({
|
||||
name: "claude-mem-feed",
|
||||
description: "Show or toggle Claude-Mem observation feed status",
|
||||
acceptsArgs: true,
|
||||
handler: async (ctx) => {
|
||||
const config = api.pluginConfig || {};
|
||||
const feedConfig = config.observationFeed as
|
||||
| { enabled?: boolean; channel?: string; to?: string }
|
||||
| undefined;
|
||||
const feedConfig = userConfig.observationFeed;
|
||||
|
||||
if (!feedConfig) {
|
||||
return "Observation feed not configured. Add observationFeed to your plugin config.";
|
||||
@@ -300,5 +577,32 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
},
|
||||
});
|
||||
|
||||
api.logger.info("[claude-mem] OpenClaw plugin loaded — v1.0.0");
|
||||
// ------------------------------------------------------------------
|
||||
// Command: /claude-mem-status — worker health check
|
||||
// ------------------------------------------------------------------
|
||||
api.registerCommand({
|
||||
name: "claude-mem-status",
|
||||
description: "Check Claude-Mem worker health and session status",
|
||||
handler: async () => {
|
||||
const healthText = await workerGetText(workerPort, "/api/health", api.logger);
|
||||
if (!healthText) {
|
||||
return `Claude-Mem worker unreachable at port ${workerPort}`;
|
||||
}
|
||||
|
||||
try {
|
||||
const health = JSON.parse(healthText);
|
||||
return [
|
||||
"Claude-Mem Worker Status",
|
||||
`Status: ${health.status || "unknown"}`,
|
||||
`Port: ${workerPort}`,
|
||||
`Active sessions: ${sessionIds.size}`,
|
||||
`Observation feed: ${connectionState}`,
|
||||
].join("\n");
|
||||
} catch {
|
||||
return `Claude-Mem worker responded but returned unexpected data`;
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
api.logger.info(`[claude-mem] OpenClaw plugin loaded — v1.0.0 (worker: 127.0.0.1:${workerPort})`);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user