feat(SessionStore): auto-create session records if missing

- Added functionality to automatically create a session record in the database if it does not exist when storing observations or session summaries.
- Updated `storeObservation` and `storeSummary` methods to include checks for existing session records and insert new records as needed.
- Added logging for auto-created session records for better traceability.
This commit is contained in:
Alex Newman
2025-10-21 22:13:13 -04:00
parent 726f167ebf
commit f7f62ef3f3
10 changed files with 254 additions and 99 deletions
+24 -12
View File
@@ -1,7 +1,7 @@
#!/usr/bin/env node
import H from"better-sqlite3";import{join as p,dirname as w,basename as Q}from"path";import{homedir as h}from"os";import{existsSync as se,mkdirSync as X}from"fs";import{fileURLToPath as M}from"url";function P(){return typeof __dirname<"u"?__dirname:w(M(import.meta.url))}var F=P(),c=process.env.CLAUDE_MEM_DATA_DIR||p(h(),".claude-mem"),S=process.env.CLAUDE_CONFIG_DIR||p(h(),".claude"),re=p(c,"archives"),oe=p(c,"logs"),ne=p(c,"trash"),ie=p(c,"backups"),ae=p(c,"settings.json"),L=p(c,"claude-mem.db"),de=p(S,"settings.json"),pe=p(S,"commands"),ce=p(S,"CLAUDE.md");function A(n){X(n,{recursive:!0})}function v(){return p(F,"..","..")}var g=(o=>(o[o.DEBUG=0]="DEBUG",o[o.INFO=1]="INFO",o[o.WARN=2]="WARN",o[o.ERROR=3]="ERROR",o[o.SILENT=4]="SILENT",o))(g||{}),N=class{level;useColor;constructor(){let e=process.env.CLAUDE_MEM_LOG_LEVEL?.toUpperCase()||"INFO";this.level=g[e]??1,this.useColor=process.stdout.isTTY??!1}correlationId(e,s){return`obs-${e}-${s}`}sessionId(e){return`session-${e}`}formatData(e){if(e==null)return"";if(typeof e=="string")return e;if(typeof e=="number"||typeof e=="boolean")return e.toString();if(typeof e=="object"){if(e instanceof Error)return this.level===0?`${e.message}
${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Object.keys(e);return s.length===0?"{}":s.length<=3?JSON.stringify(e):`{${s.length} keys: ${s.slice(0,3).join(", ")}...}`}return String(e)}formatTool(e,s){if(!s)return e;try{let t=typeof s=="string"?JSON.parse(s):s;if(e==="Bash"&&t.command){let r=t.command.length>50?t.command.substring(0,50)+"...":t.command;return`${e}(${r})`}if(e==="Read"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Edit"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Write"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}return e}catch{return e}}log(e,s,t,r,o){if(e<this.level)return;let a=new Date().toISOString().replace("T"," ").substring(0,23),i=g[e].padEnd(5),d=s.padEnd(6),_="";r?.correlationId?_=`[${r.correlationId}] `:r?.sessionId&&(_=`[session-${r.sessionId}] `);let m="";o!=null&&(this.level===0&&typeof o=="object"?m=`
`+JSON.stringify(o,null,2):m=" "+this.formatData(o));let E="";if(r){let{sessionId:K,sdkSessionId:Y,correlationId:V,...I}=r;Object.keys(I).length>0&&(E=` {${Object.entries(I).map(([x,U])=>`${x}=${U}`).join(", ")}}`)}let l=`[${a}] [${i}] [${d}] ${_}${t}${E}${m}`;e===3?console.error(l):console.log(l)}debug(e,s,t,r){this.log(0,e,s,t,r)}info(e,s,t,r){this.log(1,e,s,t,r)}warn(e,s,t,r){this.log(2,e,s,t,r)}error(e,s,t,r){this.log(3,e,s,t,r)}dataIn(e,s,t,r){this.info(e,`\u2192 ${s}`,t,r)}dataOut(e,s,t,r){this.info(e,`\u2190 ${s}`,t,r)}success(e,s,t,r){this.info(e,`\u2713 ${s}`,t,r)}failure(e,s,t,r){this.error(e,`\u2717 ${s}`,t,r)}timing(e,s,t,r){this.info(e,`\u23F1 ${s}`,r,{duration:`${t}ms`})}},u=new N;var T=class{db;constructor(){A(c),this.db=new H(L),this.db.pragma("journal_mode = WAL"),this.db.pragma("synchronous = NORMAL"),this.db.pragma("foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable()}initializeSchema(){try{this.db.exec(`
import H from"better-sqlite3";import{join as p,dirname as w,basename as Q}from"path";import{homedir as I}from"os";import{existsSync as se,mkdirSync as M}from"fs";import{fileURLToPath as X}from"url";function P(){return typeof __dirname<"u"?__dirname:w(X(import.meta.url))}var F=P(),u=process.env.CLAUDE_MEM_DATA_DIR||p(I(),".claude-mem"),S=process.env.CLAUDE_CONFIG_DIR||p(I(),".claude"),re=p(u,"archives"),oe=p(u,"logs"),ne=p(u,"trash"),ie=p(u,"backups"),ae=p(u,"settings.json"),L=p(u,"claude-mem.db"),de=p(S,"settings.json"),pe=p(S,"commands"),ce=p(S,"CLAUDE.md");function A(n){M(n,{recursive:!0})}function v(){return p(F,"..","..")}var g=(o=>(o[o.DEBUG=0]="DEBUG",o[o.INFO=1]="INFO",o[o.WARN=2]="WARN",o[o.ERROR=3]="ERROR",o[o.SILENT=4]="SILENT",o))(g||{}),N=class{level;useColor;constructor(){let e=process.env.CLAUDE_MEM_LOG_LEVEL?.toUpperCase()||"INFO";this.level=g[e]??1,this.useColor=process.stdout.isTTY??!1}correlationId(e,s){return`obs-${e}-${s}`}sessionId(e){return`session-${e}`}formatData(e){if(e==null)return"";if(typeof e=="string")return e;if(typeof e=="number"||typeof e=="boolean")return e.toString();if(typeof e=="object"){if(e instanceof Error)return this.level===0?`${e.message}
${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Object.keys(e);return s.length===0?"{}":s.length<=3?JSON.stringify(e):`{${s.length} keys: ${s.slice(0,3).join(", ")}...}`}return String(e)}formatTool(e,s){if(!s)return e;try{let t=typeof s=="string"?JSON.parse(s):s;if(e==="Bash"&&t.command){let r=t.command.length>50?t.command.substring(0,50)+"...":t.command;return`${e}(${r})`}if(e==="Read"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Edit"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Write"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}return e}catch{return e}}log(e,s,t,r,o){if(e<this.level)return;let i=new Date().toISOString().replace("T"," ").substring(0,23),a=g[e].padEnd(5),d=s.padEnd(6),c="";r?.correlationId?c=`[${r.correlationId}] `:r?.sessionId&&(c=`[session-${r.sessionId}] `);let E="";o!=null&&(this.level===0&&typeof o=="object"?E=`
`+JSON.stringify(o,null,2):E=" "+this.formatData(o));let _="";if(r){let{sessionId:K,sdkSessionId:Y,correlationId:V,...h}=r;Object.keys(h).length>0&&(_=` {${Object.entries(h).map(([x,U])=>`${x}=${U}`).join(", ")}}`)}let l=`[${i}] [${a}] [${d}] ${c}${t}${_}${E}`;e===3?console.error(l):console.log(l)}debug(e,s,t,r){this.log(0,e,s,t,r)}info(e,s,t,r){this.log(1,e,s,t,r)}warn(e,s,t,r){this.log(2,e,s,t,r)}error(e,s,t,r){this.log(3,e,s,t,r)}dataIn(e,s,t,r){this.info(e,`\u2192 ${s}`,t,r)}dataOut(e,s,t,r){this.info(e,`\u2190 ${s}`,t,r)}success(e,s,t,r){this.info(e,`\u2713 ${s}`,t,r)}failure(e,s,t,r){this.error(e,`\u2717 ${s}`,t,r)}timing(e,s,t,r){this.info(e,`\u23F1 ${s}`,r,{duration:`${t}ms`})}},m=new N;var T=class{db;constructor(){A(u),this.db=new H(L),this.db.pragma("journal_mode = WAL"),this.db.pragma("synchronous = NORMAL"),this.db.pragma("foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable()}initializeSchema(){try{this.db.exec(`
CREATE TABLE IF NOT EXISTS schema_versions (
id INTEGER PRIMARY KEY,
version INTEGER UNIQUE NOT NULL,
@@ -222,7 +222,7 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
SELECT files_read, files_modified
FROM observations
WHERE sdk_session_id = ?
`).all(e),r=new Set,o=new Set;for(let a of t){if(a.files_read)try{let i=JSON.parse(a.files_read);Array.isArray(i)&&i.forEach(d=>r.add(d))}catch{}if(a.files_modified)try{let i=JSON.parse(a.files_modified);Array.isArray(i)&&i.forEach(d=>o.add(d))}catch{}}return{filesRead:Array.from(r),filesModified:Array.from(o)}}getSessionById(e){return this.db.prepare(`
`).all(e),r=new Set,o=new Set;for(let i of t){if(i.files_read)try{let a=JSON.parse(i.files_read);Array.isArray(a)&&a.forEach(d=>r.add(d))}catch{}if(i.files_modified)try{let a=JSON.parse(i.files_modified);Array.isArray(a)&&a.forEach(d=>o.add(d))}catch{}}return{filesRead:Array.from(r),filesModified:Array.from(o)}}getSessionById(e){return this.db.prepare(`
SELECT id, sdk_session_id, project, user_prompt
FROM sdk_sessions
WHERE id = ?
@@ -249,17 +249,17 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
SELECT prompt_counter FROM sdk_sessions WHERE id = ?
`).get(e)?.prompt_counter||1}getPromptCounter(e){return this.db.prepare(`
SELECT prompt_counter FROM sdk_sessions WHERE id = ?
`).get(e)?.prompt_counter||0}createSDKSession(e,s,t){let r=new Date,o=r.getTime(),i=this.db.prepare(`
`).get(e)?.prompt_counter||0}createSDKSession(e,s,t){let r=new Date,o=r.getTime(),a=this.db.prepare(`
INSERT OR IGNORE INTO sdk_sessions
(claude_session_id, project, user_prompt, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, 'active')
`).run(e,s,t,r.toISOString(),o);return i.lastInsertRowid===0||i.changes===0?this.db.prepare(`
`).run(e,s,t,r.toISOString(),o);return a.lastInsertRowid===0||a.changes===0?this.db.prepare(`
SELECT id FROM sdk_sessions WHERE claude_session_id = ? LIMIT 1
`).get(e).id:i.lastInsertRowid}updateSDKSessionId(e,s){return this.db.prepare(`
`).get(e).id:a.lastInsertRowid}updateSDKSessionId(e,s){return this.db.prepare(`
UPDATE sdk_sessions
SET sdk_session_id = ?
WHERE id = ? AND sdk_session_id IS NULL
`).run(s,e).changes===0?(u.debug("DB","sdk_session_id already set, skipping update",{sessionId:e,sdkSessionId:s}),!1):!0}setWorkerPort(e,s){this.db.prepare(`
`).run(s,e).changes===0?(m.debug("DB","sdk_session_id already set, skipping update",{sessionId:e,sdkSessionId:s}),!1):!0}setWorkerPort(e,s){this.db.prepare(`
UPDATE sdk_sessions
SET worker_port = ?
WHERE id = ?
@@ -272,17 +272,29 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
INSERT INTO user_prompts
(claude_session_id, prompt_number, prompt_text, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?)
`).run(e,s,t,r.toISOString(),o).lastInsertRowid}storeObservation(e,s,t,r){let o=new Date,a=o.getTime();this.db.prepare(`
`).run(e,s,t,r.toISOString(),o).lastInsertRowid}storeObservation(e,s,t,r){let o=new Date,i=o.getTime();this.db.prepare(`
SELECT id FROM sdk_sessions WHERE sdk_session_id = ?
`).get(e)||(this.db.prepare(`
INSERT INTO sdk_sessions
(claude_session_id, sdk_session_id, project, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, 'active')
`).run(e,e,s,o.toISOString(),i),console.error(`[SessionStore] Auto-created session record for session_id: ${e}`)),this.db.prepare(`
INSERT INTO observations
(sdk_session_id, project, type, title, subtitle, facts, narrative, concepts,
files_read, files_modified, prompt_number, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(e,s,t.type,t.title,t.subtitle,JSON.stringify(t.facts),t.narrative,JSON.stringify(t.concepts),JSON.stringify(t.files_read),JSON.stringify(t.files_modified),r||null,o.toISOString(),a)}storeSummary(e,s,t,r){let o=new Date,a=o.getTime();this.db.prepare(`
`).run(e,s,t.type,t.title,t.subtitle,JSON.stringify(t.facts),t.narrative,JSON.stringify(t.concepts),JSON.stringify(t.files_read),JSON.stringify(t.files_modified),r||null,o.toISOString(),i)}storeSummary(e,s,t,r){let o=new Date,i=o.getTime();this.db.prepare(`
SELECT id FROM sdk_sessions WHERE sdk_session_id = ?
`).get(e)||(this.db.prepare(`
INSERT INTO sdk_sessions
(claude_session_id, sdk_session_id, project, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, 'active')
`).run(e,e,s,o.toISOString(),i),console.error(`[SessionStore] Auto-created session record for session_id: ${e}`)),this.db.prepare(`
INSERT INTO session_summaries
(sdk_session_id, project, request, investigated, learned, completed,
next_steps, notes, prompt_number, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`).run(e,s,t.request,t.investigated,t.learned,t.completed,t.next_steps,t.notes,r||null,o.toISOString(),a)}markSessionCompleted(e){let s=new Date,t=s.getTime();this.db.prepare(`
`).run(e,s,t.request,t.investigated,t.learned,t.completed,t.next_steps,t.notes,r||null,o.toISOString(),i)}markSessionCompleted(e){let s=new Date,t=s.getTime();this.db.prepare(`
UPDATE sdk_sessions
SET status = 'completed', completed_at = ?, completed_at_epoch = ?
WHERE id = ?
@@ -294,4 +306,4 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
UPDATE sdk_sessions
SET status = 'failed', completed_at = ?, completed_at_epoch = ?
WHERE status = 'active'
`).run(e.toISOString(),s).changes}close(){this.db.close()}};function G(n,e,s){return n==="PreCompact"?e?{continue:!0,suppressOutput:!0}:{continue:!1,stopReason:s.reason||"Pre-compact operation failed",suppressOutput:!0}:n==="SessionStart"?e&&s.context?{continue:!0,suppressOutput:!0,hookSpecificOutput:{hookEventName:"SessionStart",additionalContext:s.context}}:{continue:!0,suppressOutput:!0}:n==="UserPromptSubmit"||n==="PostToolUse"?{continue:!0,suppressOutput:!0}:n==="Stop"?{continue:!0,suppressOutput:!0}:{continue:e,suppressOutput:!0,...s.reason&&!e?{stopReason:s.reason}:{}}}function R(n,e,s={}){let t=G(n,e,s);return JSON.stringify(t)}import b from"path";import{existsSync as f}from"fs";import{spawn as W}from"child_process";var B=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),$=`http://127.0.0.1:${B}/health`;async function C(){try{return(await fetch($,{signal:AbortSignal.timeout(500)})).ok}catch{return!1}}async function k(){try{if(await C())return!0;console.error("[claude-mem] Worker not responding, starting...");let n=v(),e=b.join(n,"plugin","scripts","worker-service.cjs");if(!f(e))return console.error(`[claude-mem] Worker service not found at ${e}`),!1;let s=b.join(n,"ecosystem.config.cjs"),t=b.join(n,"node_modules",".bin","pm2");if(!f(t))throw new Error(`PM2 binary not found at ${t}. This is a bundled dependency - try running: npm install`);if(!f(s))throw new Error(`PM2 ecosystem config not found at ${s}. Plugin installation may be corrupted.`);let r=W(t,["start",s],{detached:!0,stdio:"ignore",cwd:n});r.on("error",o=>{throw new Error(`Failed to spawn PM2: ${o.message}`)}),r.unref(),console.error("[claude-mem] Worker started with PM2");for(let o=0;o<3;o++)if(await new Promise(a=>setTimeout(a,500)),await C())return console.error("[claude-mem] Worker is healthy"),!0;return console.error("[claude-mem] Worker failed to become healthy after startup"),!1}catch(n){return console.error(`[claude-mem] Failed to start worker: ${n.message}`),!1}}var j=new Set(["ListMcpResourcesTool"]);async function D(n){if(!n)throw new Error("saveHook requires input");let{session_id:e,tool_name:s,tool_input:t,tool_output:r}=n;if(j.has(s)){console.log(R("PostToolUse",!0));return}if(!await k())throw new Error("Worker service failed to start or become healthy");let a=new T,i=a.createSDKSession(e,"",""),d=a.getPromptCounter(i);a.close();let _=u.formatTool(s,t),m=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10);u.dataIn("HOOK",`PostToolUse: ${_}`,{sessionId:i,workerPort:m});let E=await fetch(`http://127.0.0.1:${m}/sessions/${i}/observations`,{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({tool_name:s,tool_input:t!==void 0?JSON.stringify(t):"{}",tool_output:r!==void 0?JSON.stringify(r):"{}",prompt_number:d}),signal:AbortSignal.timeout(2e3)});if(!E.ok){let l=await E.text();throw u.failure("HOOK","Failed to send observation",{sessionId:i,status:E.status},l),new Error(`Failed to send observation to worker: ${E.status} ${l}`)}u.debug("HOOK","Observation sent successfully",{sessionId:i,toolName:s}),console.log(R("PostToolUse",!0))}import{stdin as y}from"process";var O="";y.on("data",n=>O+=n);y.on("end",async()=>{let n=O.trim()?JSON.parse(O):void 0;await D(n),process.exit(0)});
`).run(e.toISOString(),s).changes}close(){this.db.close()}};function G(n,e,s){return n==="PreCompact"?e?{continue:!0,suppressOutput:!0}:{continue:!1,stopReason:s.reason||"Pre-compact operation failed",suppressOutput:!0}:n==="SessionStart"?e&&s.context?{continue:!0,suppressOutput:!0,hookSpecificOutput:{hookEventName:"SessionStart",additionalContext:s.context}}:{continue:!0,suppressOutput:!0}:n==="UserPromptSubmit"||n==="PostToolUse"?{continue:!0,suppressOutput:!0}:n==="Stop"?{continue:!0,suppressOutput:!0}:{continue:e,suppressOutput:!0,...s.reason&&!e?{stopReason:s.reason}:{}}}function R(n,e,s={}){let t=G(n,e,s);return JSON.stringify(t)}import b from"path";import{existsSync as f}from"fs";import{spawn as W}from"child_process";var B=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),$=`http://127.0.0.1:${B}/health`;async function C(){try{return(await fetch($,{signal:AbortSignal.timeout(500)})).ok}catch{return!1}}async function k(){try{if(await C())return!0;console.error("[claude-mem] Worker not responding, starting...");let n=v(),e=b.join(n,"plugin","scripts","worker-service.cjs");if(!f(e))return console.error(`[claude-mem] Worker service not found at ${e}`),!1;let s=b.join(n,"ecosystem.config.cjs"),t=b.join(n,"node_modules",".bin","pm2");if(!f(t))throw new Error(`PM2 binary not found at ${t}. This is a bundled dependency - try running: npm install`);if(!f(s))throw new Error(`PM2 ecosystem config not found at ${s}. Plugin installation may be corrupted.`);let r=W(t,["start",s],{detached:!0,stdio:"ignore",cwd:n});r.on("error",o=>{throw new Error(`Failed to spawn PM2: ${o.message}`)}),r.unref(),console.error("[claude-mem] Worker started with PM2");for(let o=0;o<3;o++)if(await new Promise(i=>setTimeout(i,500)),await C())return console.error("[claude-mem] Worker is healthy"),!0;return console.error("[claude-mem] Worker failed to become healthy after startup"),!1}catch(n){return console.error(`[claude-mem] Failed to start worker: ${n.message}`),!1}}var j=new Set(["ListMcpResourcesTool"]);async function D(n){if(!n)throw new Error("saveHook requires input");let{session_id:e,tool_name:s,tool_input:t,tool_output:r}=n;if(j.has(s)){console.log(R("PostToolUse",!0));return}if(!await k())throw new Error("Worker service failed to start or become healthy");let i=new T,a=i.createSDKSession(e,"",""),d=i.getPromptCounter(a);i.close();let c=m.formatTool(s,t),E=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10);m.dataIn("HOOK",`PostToolUse: ${c}`,{sessionId:a,workerPort:E});let _=await fetch(`http://127.0.0.1:${E}/sessions/${a}/observations`,{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({tool_name:s,tool_input:t!==void 0?JSON.stringify(t):"{}",tool_output:r!==void 0?JSON.stringify(r):"{}",prompt_number:d}),signal:AbortSignal.timeout(2e3)});if(!_.ok){let l=await _.text();throw m.failure("HOOK","Failed to send observation",{sessionId:a,status:_.status},l),new Error(`Failed to send observation to worker: ${_.status} ${l}`)}m.debug("HOOK","Observation sent successfully",{sessionId:a,toolName:s}),console.log(R("PostToolUse",!0))}import{stdin as y}from"process";var O="";y.on("data",n=>O+=n);y.on("end",async()=>{let n=O.trim()?JSON.parse(O):void 0;await D(n),process.exit(0)});