From 7fce21c1453a892a4fb925e2d1c1db5cfd615580 Mon Sep 17 00:00:00 2001 From: GigiTiti-Kai Date: Mon, 30 Mar 2026 23:20:18 +0900 Subject: [PATCH] fix: deduplicate session init to prevent multiple prompt records When using the OpenClaw integration, a single user message would produce 3 prompt records because session_start, message_received, after_compaction, and before_agent_start each independently called /api/sessions/init with different session keys. Changes: - Centralize /api/sessions/init to before_agent_start only - Add canonical session key unification (sessionKey, conversationId, channelId mapped to a single contentSessionId) - Add 2-second dedup guard for edge cases - Fix cwd: "" in tool_result_persist (use workspaceDir fallback chain, skip + warn if unavailable) - Add delayed session completion (configurable, default 5s) to avoid race with in-flight observations - Clean up all tracking Maps on session_end and gateway_start Co-Authored-By: Claude Opus 4.6 (1M context) --- openclaw/src/index.ts | 176 ++++++++++++++++++++++++++++++++---------- 1 file changed, 135 insertions(+), 41 deletions(-) diff --git a/openclaw/src/index.ts b/openclaw/src/index.ts index 7e4ac304..1ae74b8a 100644 --- a/openclaw/src/index.ts +++ b/openclaw/src/index.ts @@ -547,6 +547,14 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { // Session tracking for observation I/O // ------------------------------------------------------------------ const sessionIds = new Map(); + const canonicalSessionKeys = new Map(); + const sessionAliasesByCanonicalKey = new Map>(); + const pendingCompletionTimers = new Map>(); + const recentPromptInits = new Map(); + const completionDelayMs = (() => { + const val = Number((userConfig as Record).completionDelayMs); + return Number.isFinite(val) ? Math.max(0, val) : 5000; + })(); const syncMemoryFile = userConfig.syncMemoryFile !== false; // default true const syncMemoryFileExclude = new Set(userConfig.syncMemoryFileExclude || []); @@ -565,6 +573,83 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { return true; } + type SessionTrackingContext = { + sessionKey?: string; + workspaceDir?: string; + channelId?: string; + conversationId?: string; + }; + + function getSessionAliases(ctx: SessionTrackingContext): string[] { + const aliases = new Set(); + for (const rawKey of [ctx.sessionKey, ctx.conversationId, ctx.channelId]) { + const key = typeof rawKey === "string" ? rawKey.trim() : ""; + if (key) aliases.add(key); + } + if (aliases.size === 0) aliases.add("default"); + return Array.from(aliases); + } + + function rememberSessionContext(ctx: SessionTrackingContext): { canonicalKey: string; contentSessionId: string } { + const aliases = getSessionAliases(ctx); + let canonicalKey = aliases.find((alias) => canonicalSessionKeys.has(alias)); + canonicalKey = canonicalKey ? canonicalSessionKeys.get(canonicalKey)! : aliases[0]; + let aliasSet = sessionAliasesByCanonicalKey.get(canonicalKey); + if (!aliasSet) { + aliasSet = new Set([canonicalKey]); + sessionAliasesByCanonicalKey.set(canonicalKey, aliasSet); + } + for (const alias of aliases) { + aliasSet.add(alias); + canonicalSessionKeys.set(alias, canonicalKey); + } + const contentSessionId = getContentSessionId(canonicalKey); + for (const alias of aliasSet) { + sessionIds.set(alias, contentSessionId); + } + return { canonicalKey, contentSessionId }; + } + + function shouldSkipDuplicatePromptInit(contentSessionId: string, project: string, prompt: string): boolean { + const now = Date.now(); + for (const [key, timestamp] of recentPromptInits) { + if (now - timestamp > 2000) recentPromptInits.delete(key); + } + const cacheKey = `${contentSessionId}::${project}::${prompt}`; + const lastSeenAt = recentPromptInits.get(cacheKey); + // Note: cache is set unconditionally before return. If workerPost fails + // after this check, a retry within 2s would be incorrectly skipped. + // Acceptable because before_agent_start is not retried by the runtime. + recentPromptInits.set(cacheKey, now); + return typeof lastSeenAt === "number" && now - lastSeenAt <= 2000; + } + + function clearSessionContext(ctx: SessionTrackingContext): void { + const aliases = getSessionAliases(ctx); + const canonicalKey = aliases + .map((alias) => canonicalSessionKeys.get(alias)) + .find(Boolean) || aliases[0]; + const knownAliases = sessionAliasesByCanonicalKey.get(canonicalKey) || new Set([canonicalKey, ...aliases]); + for (const alias of knownAliases) { + canonicalSessionKeys.delete(alias); + sessionIds.delete(alias); + } + sessionAliasesByCanonicalKey.delete(canonicalKey); + sessionIds.delete(canonicalKey); + } + + function scheduleSessionComplete(contentSessionId: string): void { + const existingTimer = pendingCompletionTimers.get(contentSessionId); + if (existingTimer) clearTimeout(existingTimer); + const timer = setTimeout(() => { + pendingCompletionTimers.delete(contentSessionId); + workerPostFireAndForget(workerPort, "/api/sessions/complete", { + contentSessionId, + }, api.logger); + }, completionDelayMs); + pendingCompletionTimers.set(contentSessionId, timer); + } + // TTL cache for context injection to avoid re-fetching on every LLM turn. // before_prompt_build fires on every turn; caching for 60s keeps the worker // load manageable while still picking up new observations reasonably quickly. @@ -600,61 +685,54 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { } // ------------------------------------------------------------------ - // Event: session_start — init claude-mem session (fires on /new, /reset) + // Event: session_start — track session (fires on /new, /reset) + // Init is deferred to before_agent_start to avoid duplicate prompt records. // ------------------------------------------------------------------ api.on("session_start", async (_event, ctx) => { - const contentSessionId = getContentSessionId(ctx.sessionKey); - - await workerPost(workerPort, "/api/sessions/init", { - contentSessionId, - project: getProjectName(ctx), - prompt: "", - }, api.logger); - - api.logger.info(`[claude-mem] Session initialized: ${contentSessionId}`); + const { contentSessionId } = rememberSessionContext(ctx); + api.logger.info(`[claude-mem] Session tracking initialized: ${contentSessionId}`); }); // ------------------------------------------------------------------ - // Event: message_received — capture inbound user prompts from channels + // Event: message_received — alias tracking only; init deferred to before_agent_start // ------------------------------------------------------------------ api.on("message_received", async (event, ctx) => { - const sessionKey = ctx.conversationId || ctx.channelId || "default"; - const contentSessionId = getContentSessionId(sessionKey); - - await workerPost(workerPort, "/api/sessions/init", { - contentSessionId, - project: baseProjectName, - prompt: event.content || "[media prompt]", - }, api.logger); + const { canonicalKey, contentSessionId } = rememberSessionContext(ctx); + api.logger.info(`[claude-mem] Message received — prompt capture deferred to before_agent_start: session=${canonicalKey} contentSessionId=${contentSessionId} hasContent=${Boolean(event.content)}`); }); // ------------------------------------------------------------------ - // Event: after_compaction — re-init session after context compaction + // Event: after_compaction — preserve session tracking after context compaction. + // Re-init is intentionally NOT called here; the worker retains session state + // independently and re-initializing would create duplicate prompt records. // ------------------------------------------------------------------ api.on("after_compaction", async (_event, ctx) => { - const contentSessionId = getContentSessionId(ctx.sessionKey); - - await workerPost(workerPort, "/api/sessions/init", { - contentSessionId, - project: getProjectName(ctx), - prompt: "", - }, api.logger); - - api.logger.info(`[claude-mem] Session re-initialized after compaction: ${contentSessionId}`); + const { contentSessionId } = rememberSessionContext(ctx); + api.logger.info(`[claude-mem] Session preserved after compaction: ${contentSessionId}`); }); // ------------------------------------------------------------------ - // Event: before_agent_start — init session + // Event: before_agent_start — single init point with dedup guard // ------------------------------------------------------------------ api.on("before_agent_start", async (event, ctx) => { + const { contentSessionId } = rememberSessionContext(ctx); + const projectName = getProjectName(ctx); + const promptText = event.prompt || "agent run"; + + if (shouldSkipDuplicatePromptInit(contentSessionId, projectName, promptText)) { + api.logger.info(`[claude-mem] Skipping duplicate prompt init: contentSessionId=${contentSessionId} project=${projectName}`); + return; + } + // Initialize session in the worker so observations are not skipped // (the privacy check requires a stored user prompt to exist) - const contentSessionId = getContentSessionId(ctx.sessionKey); await workerPost(workerPort, "/api/sessions/init", { contentSessionId, - project: getProjectName(ctx), - prompt: event.prompt || "agent run", + project: projectName, + prompt: promptText, }, api.logger); + + api.logger.info(`[claude-mem] Session initialized via before_agent_start: contentSessionId=${contentSessionId} project=${projectName}`); }); // ------------------------------------------------------------------ @@ -686,7 +764,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { // Skip memory_ tools to prevent recursive observation loops if (toolName.startsWith("memory_")) return; - const contentSessionId = getContentSessionId(ctx.sessionKey); + const { canonicalKey, contentSessionId } = rememberSessionContext(ctx); // Extract result text from all content blocks let toolResponseText = ""; @@ -704,13 +782,23 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { toolResponseText = toolResponseText.slice(0, MAX_TOOL_RESPONSE_LENGTH); } + // Resolve workspaceDir with fallback chain. + // Empty cwd causes worker-side observation queueing failures, + // so we drop the observation rather than sending cwd: "". + const workspaceDir = ctx.workspaceDir; + + if (!workspaceDir) { + api.logger.warn(`[claude-mem] Skipping observation persist because workspaceDir is unavailable: session=${canonicalKey} tool=${toolName}`); + return; + } + // Fire-and-forget: send observation to worker workerPostFireAndForget(workerPort, "/api/sessions/observations", { contentSessionId, tool_name: toolName, tool_input: event.params || {}, tool_response: toolResponseText, - cwd: "", + cwd: workspaceDir, }, api.logger); }); @@ -718,7 +806,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { // Event: agent_end — summarize and complete session // ------------------------------------------------------------------ api.on("agent_end", async (event, ctx) => { - const contentSessionId = getContentSessionId(ctx.sessionKey); + const { contentSessionId } = rememberSessionContext(ctx); // Extract last assistant message for summarization let lastAssistantMessage = ""; @@ -747,17 +835,16 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { last_assistant_message: lastAssistantMessage, }, api.logger); - workerPostFireAndForget(workerPort, "/api/sessions/complete", { - contentSessionId, - }, api.logger); + api.logger.info(`[claude-mem] Scheduling session complete in ${completionDelayMs}ms: ${contentSessionId}`); + scheduleSessionComplete(contentSessionId); }); // ------------------------------------------------------------------ // Event: session_end — clean up session tracking to prevent unbounded growth // ------------------------------------------------------------------ api.on("session_end", async (_event, ctx) => { - const key = ctx.sessionKey || "default"; - sessionIds.delete(key); + clearSessionContext(ctx); + api.logger.info(`[claude-mem] Session tracking cleaned up`); }); // ------------------------------------------------------------------ @@ -766,6 +853,13 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void { api.on("gateway_start", async () => { sessionIds.clear(); contextCache.clear(); + recentPromptInits.clear(); + canonicalSessionKeys.clear(); + sessionAliasesByCanonicalKey.clear(); + for (const timer of pendingCompletionTimers.values()) { + clearTimeout(timer); + } + pendingCompletionTimers.clear(); api.logger.info("[claude-mem] Gateway started — session tracking reset"); });