fix: Move ensureWorkerRunning to start of hooks to prevent race condition
All hooks now call ensureWorkerRunning() BEFORE querying the database. This ensures the worker's orphaned session cleanup runs before hooks check for active sessions, preventing 404 errors when hooks try to use sessions that don't exist in worker memory after a restart. Hook order now: 1. ensureWorkerRunning() - starts worker, runs cleanup 2. Query DB - cleanup already marked orphaned sessions as failed 3. Use session - only valid sessions are processed Fixed in: - new-hook: Line 26, before DB queries - save-hook: Line 37, before DB queries - summary-hook: Line 24, before DB queries - cleanup-hook: Line 50, before DB queries This prevents the race condition where hooks would read session status before cleanup ran, then get 404 from worker after cleanup marked sessions failed.
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
#!/usr/bin/env node
|
||||
import K from"path";import W from"better-sqlite3";import{join as d,dirname as P,basename as z}from"path";import{homedir as R}from"os";import{existsSync as te,mkdirSync as M}from"fs";import{fileURLToPath as F}from"url";function H(){return typeof __dirname<"u"?__dirname:P(F(import.meta.url))}var $=H(),m=process.env.CLAUDE_MEM_DATA_DIR||d(R(),".claude-mem"),T=process.env.CLAUDE_CONFIG_DIR||d(R(),".claude"),ne=d(m,"archives"),oe=d(m,"logs"),ie=d(m,"trash"),ae=d(m,"backups"),de=d(m,"settings.json"),O=d(m,"claude-mem.db"),pe=d(T,"settings.json"),ce=d(T,"commands"),ue=d(T,"CLAUDE.md");function I(o){M(o,{recursive:!0})}function L(){return d($,"..","..")}var g=(n=>(n[n.DEBUG=0]="DEBUG",n[n.INFO=1]="INFO",n[n.WARN=2]="WARN",n[n.ERROR=3]="ERROR",n[n.SILENT=4]="SILENT",n))(g||{}),S=class{level;useColor;constructor(){let e=process.env.CLAUDE_MEM_LOG_LEVEL?.toUpperCase()||"INFO";this.level=g[e]??1,this.useColor=process.stdout.isTTY??!1}correlationId(e,s){return`obs-${e}-${s}`}sessionId(e){return`session-${e}`}formatData(e){if(e==null)return"";if(typeof e=="string")return e;if(typeof e=="number"||typeof e=="boolean")return e.toString();if(typeof e=="object"){if(e instanceof Error)return this.level===0?`${e.message}
|
||||
${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Object.keys(e);return s.length===0?"{}":s.length<=3?JSON.stringify(e):`{${s.length} keys: ${s.slice(0,3).join(", ")}...}`}return String(e)}formatTool(e,s){if(!s)return e;try{let t=typeof s=="string"?JSON.parse(s):s;if(e==="Bash"&&t.command){let r=t.command.length>50?t.command.substring(0,50)+"...":t.command;return`${e}(${r})`}if(e==="Read"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Edit"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Write"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}return e}catch{return e}}log(e,s,t,r,n){if(e<this.level)return;let a=new Date().toISOString().replace("T"," ").substring(0,23),i=g[e].padEnd(5),p=s.padEnd(6),l="";r?.correlationId?l=`[${r.correlationId}] `:r?.sessionId&&(l=`[session-${r.sessionId}] `);let E="";n!=null&&(this.level===0&&typeof n=="object"?E=`
|
||||
`+JSON.stringify(n,null,2):E=" "+this.formatData(n));let c="";if(r){let{sessionId:Y,sdkSessionId:q,correlationId:V,...h}=r;Object.keys(h).length>0&&(c=` {${Object.entries(h).map(([w,X])=>`${w}=${X}`).join(", ")}}`)}let u=`[${a}] [${i}] [${p}] ${l}${t}${c}${E}`;e===3?console.error(u):console.log(u)}debug(e,s,t,r){this.log(0,e,s,t,r)}info(e,s,t,r){this.log(1,e,s,t,r)}warn(e,s,t,r){this.log(2,e,s,t,r)}error(e,s,t,r){this.log(3,e,s,t,r)}dataIn(e,s,t,r){this.info(e,`\u2192 ${s}`,t,r)}dataOut(e,s,t,r){this.info(e,`\u2190 ${s}`,t,r)}success(e,s,t,r){this.info(e,`\u2713 ${s}`,t,r)}failure(e,s,t,r){this.error(e,`\u2717 ${s}`,t,r)}timing(e,s,t,r){this.info(e,`\u23F1 ${s}`,r,{duration:`${t}ms`})}},v=new S;var _=class{db;constructor(){I(m),this.db=new W(O),this.db.pragma("journal_mode = WAL"),this.db.pragma("synchronous = NORMAL"),this.db.pragma("foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable()}initializeSchema(){try{this.db.exec(`
|
||||
${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Object.keys(e);return s.length===0?"{}":s.length<=3?JSON.stringify(e):`{${s.length} keys: ${s.slice(0,3).join(", ")}...}`}return String(e)}formatTool(e,s){if(!s)return e;try{let t=typeof s=="string"?JSON.parse(s):s;if(e==="Bash"&&t.command){let r=t.command.length>50?t.command.substring(0,50)+"...":t.command;return`${e}(${r})`}if(e==="Read"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Edit"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}if(e==="Write"&&t.file_path){let r=t.file_path.split("/").pop()||t.file_path;return`${e}(${r})`}return e}catch{return e}}log(e,s,t,r,n){if(e<this.level)return;let a=new Date().toISOString().replace("T"," ").substring(0,23),c=g[e].padEnd(5),i=s.padEnd(6),E="";r?.correlationId?E=`[${r.correlationId}] `:r?.sessionId&&(E=`[session-${r.sessionId}] `);let l="";n!=null&&(this.level===0&&typeof n=="object"?l=`
|
||||
`+JSON.stringify(n,null,2):l=" "+this.formatData(n));let p="";if(r){let{sessionId:Y,sdkSessionId:q,correlationId:V,...h}=r;Object.keys(h).length>0&&(p=` {${Object.entries(h).map(([w,X])=>`${w}=${X}`).join(", ")}}`)}let u=`[${a}] [${c}] [${i}] ${E}${t}${p}${l}`;e===3?console.error(u):console.log(u)}debug(e,s,t,r){this.log(0,e,s,t,r)}info(e,s,t,r){this.log(1,e,s,t,r)}warn(e,s,t,r){this.log(2,e,s,t,r)}error(e,s,t,r){this.log(3,e,s,t,r)}dataIn(e,s,t,r){this.info(e,`\u2192 ${s}`,t,r)}dataOut(e,s,t,r){this.info(e,`\u2190 ${s}`,t,r)}success(e,s,t,r){this.info(e,`\u2713 ${s}`,t,r)}failure(e,s,t,r){this.error(e,`\u2717 ${s}`,t,r)}timing(e,s,t,r){this.info(e,`\u23F1 ${s}`,r,{duration:`${t}ms`})}},v=new S;var _=class{db;constructor(){I(m),this.db=new W(O),this.db.pragma("journal_mode = WAL"),this.db.pragma("synchronous = NORMAL"),this.db.pragma("foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable()}initializeSchema(){try{this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS schema_versions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
version INTEGER UNIQUE NOT NULL,
|
||||
@@ -63,7 +63,7 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_sdk_session ON session_summaries(sdk_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_project ON session_summaries(project);
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_created ON session_summaries(created_at_epoch DESC);
|
||||
`),this.db.prepare("INSERT INTO schema_versions (version, applied_at) VALUES (?, ?)").run(4,new Date().toISOString()),console.error("[SessionStore] Migration004 applied successfully"))}catch(e){throw console.error("[SessionStore] Schema initialization error:",e.message),e}}ensureWorkerPortColumn(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(5))return;this.db.pragma("table_info(sdk_sessions)").some(r=>r.name==="worker_port")||(this.db.exec("ALTER TABLE sdk_sessions ADD COLUMN worker_port INTEGER"),console.error("[SessionStore] Added worker_port column to sdk_sessions table")),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(5,new Date().toISOString())}catch(e){console.error("[SessionStore] Migration error:",e.message)}}ensurePromptTrackingColumns(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(6))return;this.db.pragma("table_info(sdk_sessions)").some(p=>p.name==="prompt_counter")||(this.db.exec("ALTER TABLE sdk_sessions ADD COLUMN prompt_counter INTEGER DEFAULT 0"),console.error("[SessionStore] Added prompt_counter column to sdk_sessions table")),this.db.pragma("table_info(observations)").some(p=>p.name==="prompt_number")||(this.db.exec("ALTER TABLE observations ADD COLUMN prompt_number INTEGER"),console.error("[SessionStore] Added prompt_number column to observations table")),this.db.pragma("table_info(session_summaries)").some(p=>p.name==="prompt_number")||(this.db.exec("ALTER TABLE session_summaries ADD COLUMN prompt_number INTEGER"),console.error("[SessionStore] Added prompt_number column to session_summaries table")),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(6,new Date().toISOString())}catch(e){console.error("[SessionStore] Prompt tracking migration error:",e.message)}}removeSessionSummariesUniqueConstraint(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(7))return;if(!this.db.pragma("index_list(session_summaries)").some(r=>r.unique===1)){this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(7,new Date().toISOString());return}console.error("[SessionStore] Removing UNIQUE constraint from session_summaries.sdk_session_id..."),this.db.exec("BEGIN TRANSACTION");try{this.db.exec(`
|
||||
`),this.db.prepare("INSERT INTO schema_versions (version, applied_at) VALUES (?, ?)").run(4,new Date().toISOString()),console.error("[SessionStore] Migration004 applied successfully"))}catch(e){throw console.error("[SessionStore] Schema initialization error:",e.message),e}}ensureWorkerPortColumn(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(5))return;this.db.pragma("table_info(sdk_sessions)").some(r=>r.name==="worker_port")||(this.db.exec("ALTER TABLE sdk_sessions ADD COLUMN worker_port INTEGER"),console.error("[SessionStore] Added worker_port column to sdk_sessions table")),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(5,new Date().toISOString())}catch(e){console.error("[SessionStore] Migration error:",e.message)}}ensurePromptTrackingColumns(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(6))return;this.db.pragma("table_info(sdk_sessions)").some(i=>i.name==="prompt_counter")||(this.db.exec("ALTER TABLE sdk_sessions ADD COLUMN prompt_counter INTEGER DEFAULT 0"),console.error("[SessionStore] Added prompt_counter column to sdk_sessions table")),this.db.pragma("table_info(observations)").some(i=>i.name==="prompt_number")||(this.db.exec("ALTER TABLE observations ADD COLUMN prompt_number INTEGER"),console.error("[SessionStore] Added prompt_number column to observations table")),this.db.pragma("table_info(session_summaries)").some(i=>i.name==="prompt_number")||(this.db.exec("ALTER TABLE session_summaries ADD COLUMN prompt_number INTEGER"),console.error("[SessionStore] Added prompt_number column to session_summaries table")),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(6,new Date().toISOString())}catch(e){console.error("[SessionStore] Prompt tracking migration error:",e.message)}}removeSessionSummariesUniqueConstraint(){try{if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(7))return;if(!this.db.pragma("index_list(session_summaries)").some(r=>r.unique===1)){this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(7,new Date().toISOString());return}console.error("[SessionStore] Removing UNIQUE constraint from session_summaries.sdk_session_id..."),this.db.exec("BEGIN TRANSACTION");try{this.db.exec(`
|
||||
CREATE TABLE session_summaries_new (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
sdk_session_id TEXT NOT NULL,
|
||||
@@ -239,4 +239,4 @@ ${e.stack}`:e.message;if(Array.isArray(e))return`[${e.length} items]`;let s=Obje
|
||||
UPDATE sdk_sessions
|
||||
SET status = 'failed', completed_at = ?, completed_at_epoch = ?
|
||||
WHERE status = 'active'
|
||||
`).run(e.toISOString(),s).changes}close(){this.db.close()}};function j(o,e,s){return o==="PreCompact"?e?{continue:!0,suppressOutput:!0}:{continue:!1,stopReason:s.reason||"Pre-compact operation failed",suppressOutput:!0}:o==="SessionStart"?e&&s.context?{continue:!0,suppressOutput:!0,hookSpecificOutput:{hookEventName:"SessionStart",additionalContext:s.context}}:{continue:!0,suppressOutput:!0}:o==="UserPromptSubmit"||o==="PostToolUse"?{continue:!0,suppressOutput:!0}:o==="Stop"?{continue:!0,suppressOutput:!0}:{continue:e,suppressOutput:!0,...s.reason&&!e?{stopReason:s.reason}:{}}}function A(o,e,s={}){let t=j(o,e,s);return JSON.stringify(t)}import b from"path";import{existsSync as N}from"fs";import{spawn as B}from"child_process";var C=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),G=`http://127.0.0.1:${C}/health`;async function k(){try{return(await fetch(G,{signal:AbortSignal.timeout(500)})).ok}catch{return!1}}async function D(){try{if(await k())return!0;console.error("[claude-mem] Worker not responding, starting...");let o=L(),e=b.join(o,"plugin","scripts","worker-service.cjs");if(!N(e))return console.error(`[claude-mem] Worker service not found at ${e}`),!1;let s=b.join(o,"ecosystem.config.cjs"),t=b.join(o,"node_modules",".bin","pm2");if(!N(t))throw new Error(`PM2 binary not found at ${t}. This is a bundled dependency - try running: npm install`);if(!N(s))throw new Error(`PM2 ecosystem config not found at ${s}. Plugin installation may be corrupted.`);let r=B(t,["start",s],{detached:!0,stdio:"ignore",cwd:o});r.on("error",n=>{throw new Error(`Failed to spawn PM2: ${n.message}`)}),r.unref(),console.error("[claude-mem] Worker started with PM2");for(let n=0;n<3;n++)if(await new Promise(a=>setTimeout(a,500)),await k())return console.error("[claude-mem] Worker is healthy"),!0;return console.error("[claude-mem] Worker failed to become healthy after startup"),!1}catch(o){return console.error(`[claude-mem] Failed to start worker: ${o.message}`),!1}}function y(){return C}async function x(o){if(!o)throw new Error("newHook requires input");let{session_id:e,cwd:s,prompt:t}=o,r=K.basename(s),n=new _;try{let a=n.findActiveSDKSession(e),i,p=!1;if(a){i=a.id;let c=n.incrementPromptCounter(i);console.error(`[new-hook] Continuing session ${i}, prompt #${c}`)}else{let c=n.findAnySDKSession(e);if(c){i=c.id,n.reactivateSession(i,t);let u=n.incrementPromptCounter(i);p=!0,console.error(`[new-hook] Reactivated session ${i}, prompt #${u}`)}else{i=n.createSDKSession(e,r,t);let u=n.incrementPromptCounter(i);p=!0,console.error(`[new-hook] Created new session ${i}, prompt #${u}`)}}if(!await D())throw new Error("Worker service failed to start or become healthy");let E=y();if(p){let c=await fetch(`http://127.0.0.1:${E}/sessions/${i}/init`,{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({project:r,userPrompt:t}),signal:AbortSignal.timeout(5e3)});if(!c.ok){let u=await c.text();throw new Error(`Failed to initialize session: ${c.status} ${u}`)}}console.log(A("UserPromptSubmit",!0))}finally{n.close()}}import{stdin as U}from"process";var f="";U.on("data",o=>f+=o);U.on("end",async()=>{let o=f.trim()?JSON.parse(f):void 0;await x(o),process.exit(0)});
|
||||
`).run(e.toISOString(),s).changes}close(){this.db.close()}};function j(o,e,s){return o==="PreCompact"?e?{continue:!0,suppressOutput:!0}:{continue:!1,stopReason:s.reason||"Pre-compact operation failed",suppressOutput:!0}:o==="SessionStart"?e&&s.context?{continue:!0,suppressOutput:!0,hookSpecificOutput:{hookEventName:"SessionStart",additionalContext:s.context}}:{continue:!0,suppressOutput:!0}:o==="UserPromptSubmit"||o==="PostToolUse"?{continue:!0,suppressOutput:!0}:o==="Stop"?{continue:!0,suppressOutput:!0}:{continue:e,suppressOutput:!0,...s.reason&&!e?{stopReason:s.reason}:{}}}function A(o,e,s={}){let t=j(o,e,s);return JSON.stringify(t)}import b from"path";import{existsSync as N}from"fs";import{spawn as B}from"child_process";var C=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),G=`http://127.0.0.1:${C}/health`;async function k(){try{return(await fetch(G,{signal:AbortSignal.timeout(500)})).ok}catch{return!1}}async function D(){try{if(await k())return!0;console.error("[claude-mem] Worker not responding, starting...");let o=L(),e=b.join(o,"plugin","scripts","worker-service.cjs");if(!N(e))return console.error(`[claude-mem] Worker service not found at ${e}`),!1;let s=b.join(o,"ecosystem.config.cjs"),t=b.join(o,"node_modules",".bin","pm2");if(!N(t))throw new Error(`PM2 binary not found at ${t}. This is a bundled dependency - try running: npm install`);if(!N(s))throw new Error(`PM2 ecosystem config not found at ${s}. Plugin installation may be corrupted.`);let r=B(t,["start",s],{detached:!0,stdio:"ignore",cwd:o});r.on("error",n=>{throw new Error(`Failed to spawn PM2: ${n.message}`)}),r.unref(),console.error("[claude-mem] Worker started with PM2");for(let n=0;n<3;n++)if(await new Promise(a=>setTimeout(a,500)),await k())return console.error("[claude-mem] Worker is healthy"),!0;return console.error("[claude-mem] Worker failed to become healthy after startup"),!1}catch(o){return console.error(`[claude-mem] Failed to start worker: ${o.message}`),!1}}function y(){return C}async function x(o){if(!o)throw new Error("newHook requires input");let{session_id:e,cwd:s,prompt:t}=o,r=K.basename(s);if(!await D())throw new Error("Worker service failed to start or become healthy");let a=new _;try{let c=a.findActiveSDKSession(e),i,E=!1;if(c){i=c.id;let p=a.incrementPromptCounter(i);console.error(`[new-hook] Continuing session ${i}, prompt #${p}`)}else{let p=a.findAnySDKSession(e);if(p){i=p.id,a.reactivateSession(i,t);let u=a.incrementPromptCounter(i);E=!0,console.error(`[new-hook] Reactivated session ${i}, prompt #${u}`)}else{i=a.createSDKSession(e,r,t);let u=a.incrementPromptCounter(i);E=!0,console.error(`[new-hook] Created new session ${i}, prompt #${u}`)}}let l=y();if(E){let p=await fetch(`http://127.0.0.1:${l}/sessions/${i}/init`,{method:"POST",headers:{"Content-Type":"application/json"},body:JSON.stringify({project:r,userPrompt:t}),signal:AbortSignal.timeout(5e3)});if(!p.ok){let u=await p.text();throw new Error(`Failed to initialize session: ${p.status} ${u}`)}}console.log(A("UserPromptSubmit",!0))}finally{a.close()}}import{stdin as U}from"process";var f="";U.on("data",o=>f+=o);U.on("end",async()=>{let o=f.trim()?JSON.parse(f):void 0;await x(o),process.exit(0)});
|
||||
|
||||
Reference in New Issue
Block a user