fix: deduplicate session init to prevent multiple prompt records
When using the OpenClaw integration, a single user message would produce 3 prompt records because session_start, message_received, after_compaction, and before_agent_start each independently called /api/sessions/init with different session keys. Changes: - Centralize /api/sessions/init to before_agent_start only - Add canonical session key unification (sessionKey, conversationId, channelId mapped to a single contentSessionId) - Add 2-second dedup guard for edge cases - Fix cwd: "" in tool_result_persist (use workspaceDir fallback chain, skip + warn if unavailable) - Add delayed session completion (configurable, default 5s) to avoid race with in-flight observations - Clean up all tracking Maps on session_end and gateway_start Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
+135
-41
@@ -547,6 +547,14 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
// Session tracking for observation I/O
|
||||
// ------------------------------------------------------------------
|
||||
const sessionIds = new Map<string, string>();
|
||||
const canonicalSessionKeys = new Map<string, string>();
|
||||
const sessionAliasesByCanonicalKey = new Map<string, Set<string>>();
|
||||
const pendingCompletionTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
const recentPromptInits = new Map<string, number>();
|
||||
const completionDelayMs = (() => {
|
||||
const val = Number((userConfig as Record<string, unknown>).completionDelayMs);
|
||||
return Number.isFinite(val) ? Math.max(0, val) : 5000;
|
||||
})();
|
||||
const syncMemoryFile = userConfig.syncMemoryFile !== false; // default true
|
||||
const syncMemoryFileExclude = new Set(userConfig.syncMemoryFileExclude || []);
|
||||
|
||||
@@ -565,6 +573,83 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
return true;
|
||||
}
|
||||
|
||||
type SessionTrackingContext = {
|
||||
sessionKey?: string;
|
||||
workspaceDir?: string;
|
||||
channelId?: string;
|
||||
conversationId?: string;
|
||||
};
|
||||
|
||||
function getSessionAliases(ctx: SessionTrackingContext): string[] {
|
||||
const aliases = new Set<string>();
|
||||
for (const rawKey of [ctx.sessionKey, ctx.conversationId, ctx.channelId]) {
|
||||
const key = typeof rawKey === "string" ? rawKey.trim() : "";
|
||||
if (key) aliases.add(key);
|
||||
}
|
||||
if (aliases.size === 0) aliases.add("default");
|
||||
return Array.from(aliases);
|
||||
}
|
||||
|
||||
function rememberSessionContext(ctx: SessionTrackingContext): { canonicalKey: string; contentSessionId: string } {
|
||||
const aliases = getSessionAliases(ctx);
|
||||
let canonicalKey = aliases.find((alias) => canonicalSessionKeys.has(alias));
|
||||
canonicalKey = canonicalKey ? canonicalSessionKeys.get(canonicalKey)! : aliases[0];
|
||||
let aliasSet = sessionAliasesByCanonicalKey.get(canonicalKey);
|
||||
if (!aliasSet) {
|
||||
aliasSet = new Set([canonicalKey]);
|
||||
sessionAliasesByCanonicalKey.set(canonicalKey, aliasSet);
|
||||
}
|
||||
for (const alias of aliases) {
|
||||
aliasSet.add(alias);
|
||||
canonicalSessionKeys.set(alias, canonicalKey);
|
||||
}
|
||||
const contentSessionId = getContentSessionId(canonicalKey);
|
||||
for (const alias of aliasSet) {
|
||||
sessionIds.set(alias, contentSessionId);
|
||||
}
|
||||
return { canonicalKey, contentSessionId };
|
||||
}
|
||||
|
||||
function shouldSkipDuplicatePromptInit(contentSessionId: string, project: string, prompt: string): boolean {
|
||||
const now = Date.now();
|
||||
for (const [key, timestamp] of recentPromptInits) {
|
||||
if (now - timestamp > 2000) recentPromptInits.delete(key);
|
||||
}
|
||||
const cacheKey = `${contentSessionId}::${project}::${prompt}`;
|
||||
const lastSeenAt = recentPromptInits.get(cacheKey);
|
||||
// Note: cache is set unconditionally before return. If workerPost fails
|
||||
// after this check, a retry within 2s would be incorrectly skipped.
|
||||
// Acceptable because before_agent_start is not retried by the runtime.
|
||||
recentPromptInits.set(cacheKey, now);
|
||||
return typeof lastSeenAt === "number" && now - lastSeenAt <= 2000;
|
||||
}
|
||||
|
||||
function clearSessionContext(ctx: SessionTrackingContext): void {
|
||||
const aliases = getSessionAliases(ctx);
|
||||
const canonicalKey = aliases
|
||||
.map((alias) => canonicalSessionKeys.get(alias))
|
||||
.find(Boolean) || aliases[0];
|
||||
const knownAliases = sessionAliasesByCanonicalKey.get(canonicalKey) || new Set([canonicalKey, ...aliases]);
|
||||
for (const alias of knownAliases) {
|
||||
canonicalSessionKeys.delete(alias);
|
||||
sessionIds.delete(alias);
|
||||
}
|
||||
sessionAliasesByCanonicalKey.delete(canonicalKey);
|
||||
sessionIds.delete(canonicalKey);
|
||||
}
|
||||
|
||||
function scheduleSessionComplete(contentSessionId: string): void {
|
||||
const existingTimer = pendingCompletionTimers.get(contentSessionId);
|
||||
if (existingTimer) clearTimeout(existingTimer);
|
||||
const timer = setTimeout(() => {
|
||||
pendingCompletionTimers.delete(contentSessionId);
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/complete", {
|
||||
contentSessionId,
|
||||
}, api.logger);
|
||||
}, completionDelayMs);
|
||||
pendingCompletionTimers.set(contentSessionId, timer);
|
||||
}
|
||||
|
||||
// TTL cache for context injection to avoid re-fetching on every LLM turn.
|
||||
// before_prompt_build fires on every turn; caching for 60s keeps the worker
|
||||
// load manageable while still picking up new observations reasonably quickly.
|
||||
@@ -600,61 +685,54 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: session_start — init claude-mem session (fires on /new, /reset)
|
||||
// Event: session_start — track session (fires on /new, /reset)
|
||||
// Init is deferred to before_agent_start to avoid duplicate prompt records.
|
||||
// ------------------------------------------------------------------
|
||||
api.on("session_start", async (_event, ctx) => {
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
|
||||
await workerPost(workerPort, "/api/sessions/init", {
|
||||
contentSessionId,
|
||||
project: getProjectName(ctx),
|
||||
prompt: "",
|
||||
}, api.logger);
|
||||
|
||||
api.logger.info(`[claude-mem] Session initialized: ${contentSessionId}`);
|
||||
const { contentSessionId } = rememberSessionContext(ctx);
|
||||
api.logger.info(`[claude-mem] Session tracking initialized: ${contentSessionId}`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: message_received — capture inbound user prompts from channels
|
||||
// Event: message_received — alias tracking only; init deferred to before_agent_start
|
||||
// ------------------------------------------------------------------
|
||||
api.on("message_received", async (event, ctx) => {
|
||||
const sessionKey = ctx.conversationId || ctx.channelId || "default";
|
||||
const contentSessionId = getContentSessionId(sessionKey);
|
||||
|
||||
await workerPost(workerPort, "/api/sessions/init", {
|
||||
contentSessionId,
|
||||
project: baseProjectName,
|
||||
prompt: event.content || "[media prompt]",
|
||||
}, api.logger);
|
||||
const { canonicalKey, contentSessionId } = rememberSessionContext(ctx);
|
||||
api.logger.info(`[claude-mem] Message received — prompt capture deferred to before_agent_start: session=${canonicalKey} contentSessionId=${contentSessionId} hasContent=${Boolean(event.content)}`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: after_compaction — re-init session after context compaction
|
||||
// Event: after_compaction — preserve session tracking after context compaction.
|
||||
// Re-init is intentionally NOT called here; the worker retains session state
|
||||
// independently and re-initializing would create duplicate prompt records.
|
||||
// ------------------------------------------------------------------
|
||||
api.on("after_compaction", async (_event, ctx) => {
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
|
||||
await workerPost(workerPort, "/api/sessions/init", {
|
||||
contentSessionId,
|
||||
project: getProjectName(ctx),
|
||||
prompt: "",
|
||||
}, api.logger);
|
||||
|
||||
api.logger.info(`[claude-mem] Session re-initialized after compaction: ${contentSessionId}`);
|
||||
const { contentSessionId } = rememberSessionContext(ctx);
|
||||
api.logger.info(`[claude-mem] Session preserved after compaction: ${contentSessionId}`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: before_agent_start — init session
|
||||
// Event: before_agent_start — single init point with dedup guard
|
||||
// ------------------------------------------------------------------
|
||||
api.on("before_agent_start", async (event, ctx) => {
|
||||
const { contentSessionId } = rememberSessionContext(ctx);
|
||||
const projectName = getProjectName(ctx);
|
||||
const promptText = event.prompt || "agent run";
|
||||
|
||||
if (shouldSkipDuplicatePromptInit(contentSessionId, projectName, promptText)) {
|
||||
api.logger.info(`[claude-mem] Skipping duplicate prompt init: contentSessionId=${contentSessionId} project=${projectName}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Initialize session in the worker so observations are not skipped
|
||||
// (the privacy check requires a stored user prompt to exist)
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
await workerPost(workerPort, "/api/sessions/init", {
|
||||
contentSessionId,
|
||||
project: getProjectName(ctx),
|
||||
prompt: event.prompt || "agent run",
|
||||
project: projectName,
|
||||
prompt: promptText,
|
||||
}, api.logger);
|
||||
|
||||
api.logger.info(`[claude-mem] Session initialized via before_agent_start: contentSessionId=${contentSessionId} project=${projectName}`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
@@ -686,7 +764,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
// Skip memory_ tools to prevent recursive observation loops
|
||||
if (toolName.startsWith("memory_")) return;
|
||||
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
const { canonicalKey, contentSessionId } = rememberSessionContext(ctx);
|
||||
|
||||
// Extract result text from all content blocks
|
||||
let toolResponseText = "";
|
||||
@@ -704,13 +782,23 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
toolResponseText = toolResponseText.slice(0, MAX_TOOL_RESPONSE_LENGTH);
|
||||
}
|
||||
|
||||
// Resolve workspaceDir with fallback chain.
|
||||
// Empty cwd causes worker-side observation queueing failures,
|
||||
// so we drop the observation rather than sending cwd: "".
|
||||
const workspaceDir = ctx.workspaceDir;
|
||||
|
||||
if (!workspaceDir) {
|
||||
api.logger.warn(`[claude-mem] Skipping observation persist because workspaceDir is unavailable: session=${canonicalKey} tool=${toolName}`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Fire-and-forget: send observation to worker
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/observations", {
|
||||
contentSessionId,
|
||||
tool_name: toolName,
|
||||
tool_input: event.params || {},
|
||||
tool_response: toolResponseText,
|
||||
cwd: "",
|
||||
cwd: workspaceDir,
|
||||
}, api.logger);
|
||||
});
|
||||
|
||||
@@ -718,7 +806,7 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
// Event: agent_end — summarize and complete session
|
||||
// ------------------------------------------------------------------
|
||||
api.on("agent_end", async (event, ctx) => {
|
||||
const contentSessionId = getContentSessionId(ctx.sessionKey);
|
||||
const { contentSessionId } = rememberSessionContext(ctx);
|
||||
|
||||
// Extract last assistant message for summarization
|
||||
let lastAssistantMessage = "";
|
||||
@@ -747,17 +835,16 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
last_assistant_message: lastAssistantMessage,
|
||||
}, api.logger);
|
||||
|
||||
workerPostFireAndForget(workerPort, "/api/sessions/complete", {
|
||||
contentSessionId,
|
||||
}, api.logger);
|
||||
api.logger.info(`[claude-mem] Scheduling session complete in ${completionDelayMs}ms: ${contentSessionId}`);
|
||||
scheduleSessionComplete(contentSessionId);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
// Event: session_end — clean up session tracking to prevent unbounded growth
|
||||
// ------------------------------------------------------------------
|
||||
api.on("session_end", async (_event, ctx) => {
|
||||
const key = ctx.sessionKey || "default";
|
||||
sessionIds.delete(key);
|
||||
clearSessionContext(ctx);
|
||||
api.logger.info(`[claude-mem] Session tracking cleaned up`);
|
||||
});
|
||||
|
||||
// ------------------------------------------------------------------
|
||||
@@ -766,6 +853,13 @@ export default function claudeMemPlugin(api: OpenClawPluginApi): void {
|
||||
api.on("gateway_start", async () => {
|
||||
sessionIds.clear();
|
||||
contextCache.clear();
|
||||
recentPromptInits.clear();
|
||||
canonicalSessionKeys.clear();
|
||||
sessionAliasesByCanonicalKey.clear();
|
||||
for (const timer of pendingCompletionTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
pendingCompletionTimers.clear();
|
||||
api.logger.info("[claude-mem] Gateway started — session tracking reset");
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user