diff --git a/plugin/scripts/worker-service.cjs b/plugin/scripts/worker-service.cjs index 5c3584fd..08fc0dbb 100755 --- a/plugin/scripts/worker-service.cjs +++ b/plugin/scripts/worker-service.cjs @@ -1074,7 +1074,9 @@ Set the \`cycles\` parameter to \`"ref"\` to resolve cyclical schemas with defs. ) `).all(),p=[];for(let m of d){let f=l.get(m.cwd);!f||f.kind==="skip"||m.old_project!==f.project&&p.push({sessionId:m.session_id,memorySessionId:m.memory_session_id,newProject:f.project})}if(p.length===0)y.info("SYSTEM","cwd-remap: no sessions need updating");else{let m=i.prepare("UPDATE sdk_sessions SET project = ? WHERE id = ?"),f=i.prepare("UPDATE observations SET project = ? WHERE memory_session_id = ?"),h=i.prepare("UPDATE session_summaries SET project = ? WHERE memory_session_id = ?"),g=0,v=0,_=0;i.transaction(()=>{for(let S of p)g+=m.run(S.newProject,S.sessionId).changes,S.memorySessionId&&(v+=f.run(S.newProject,S.memorySessionId).changes,_+=h.run(S.newProject,S.memorySessionId).changes)})(),y.info("SYSTEM","cwd-remap applied",{sessions:g,observations:v,summaries:_,backup:c})}(0,it.mkdirSync)(e,{recursive:!0}),(0,it.writeFileSync)(r,new Date().toISOString()),y.info("SYSTEM","cwd-remap marker written",{markerPath:r})}catch(s){y.error("SYSTEM","cwd-remap failed, marker not written (will retry on next startup)",{},s)}finally{i?.close()}}function yg(t,e,r={}){let n=process.platform==="win32";bt().assertCanSpawn("worker daemon");let i=An({...process.env,CLAUDE_MEM_WORKER_PORT:String(e),...r}),s=nee();if(!s){y.error("SYSTEM","Bun runtime not found \u2014 install from https://bun.sh and ensure it is on PATH or set BUN env var. The worker daemon requires Bun because it uses bun:sqlite.");return}if(n){let c=`Start-Process -FilePath '${s.replace(/'/g,"''")}' -ArgumentList @('${t.replace(/'/g,"''")}','--daemon') -WindowStyle Hidden`,u=Buffer.from(c,"utf16le").toString("base64");try{return(0,Si.execSync)(`powershell -NoProfile -EncodedCommand ${u}`,{stdio:"ignore",windowsHide:!0,env:i}),0}catch(l){y.error("SYSTEM","Failed to spawn worker daemon on Windows",{runtimePath:s},l);return}}let o="/usr/bin/setsid";if((0,it.existsSync)(o)){let c=(0,Si.spawn)(o,[s,t,"--daemon"],{detached:!0,stdio:"ignore",env:i});return c.pid===void 0?void 0:(c.unref(),c.pid)}let a=(0,Si.spawn)(s,[t,"--daemon"],{detached:!0,stdio:"ignore",env:i});if(a.pid!==void 0)return a.unref(),a.pid}function SD(t){if(t===0)return!0;if(!Number.isInteger(t)||t<0)return!1;try{return process.kill(t,0),!0}catch(e){return e.code==="EPERM"}}function xD(){try{if(!(0,it.existsSync)(As))return;let t=new Date;(0,it.utimesSync)(As,t,t)}catch{}}function wD(){return JE({logAlive:!1})}var ED=Ie(require("net"),1);Q();It();async function kD(t,e,r="GET"){let n=await fetch(`http://127.0.0.1:${t}${e}`,{method:r}),i="";try{i=await n.text()}catch{}return{ok:n.ok,statusCode:n.status,body:i}}async function Wc(t){if(process.platform==="win32")try{return(await fetch(`http://127.0.0.1:${t}/api/health`)).ok}catch{return!1}return new Promise(e=>{let r=ED.default.createServer();r.once("error",n=>{n.code==="EADDRINUSE"?e(!0):e(!1)}),r.once("listening",()=>{r.close(()=>e(!1))}),r.listen(t,"127.0.0.1")})}async function TD(t,e,r,n){let i=Date.now();for(;Date.now()-isetTimeout(s,500))}return!1}function bo(t,e=3e4){return TD(t,"/api/health",e,"Service not ready yet, will retry")}function sk(t,e=3e4){return TD(t,"/api/readiness",e,"Worker not ready yet, will retry")}async function ok(t,e=1e4){let r=Date.now();for(;Date.now()-rsetTimeout(n,500))}return!1}async function ak(t){try{let e=await kD(t,"/api/admin/shutdown","POST");return e.ok?!0:(y.warn("SYSTEM","Shutdown request returned error",{status:e.statusCode}),!1)}catch(e){return e instanceof Error&&e.message?.includes("ECONNREFUSED")?(y.debug("SYSTEM","Worker already stopped",{},e),!1):(y.error("SYSTEM","Shutdown request failed unexpectedly",{},e),!1)}}var lee=120*1e3;function uk(){return ck.default.join(ge.get("CLAUDE_MEM_DATA_DIR"),".worker-start-attempted")}function dee(){if(process.platform!=="win32")return!1;let t=uk();if(!(0,Xn.existsSync)(t))return!1;try{let e=(0,Xn.statSync)(t).mtimeMs;return Date.now()-esetTimeout(e,500)),await new Promise((e,r)=>{t.close(n=>n?r(n):e())}),process.platform==="win32"&&(await new Promise(e=>setTimeout(e,500)),y.info("SYSTEM","Waited for Windows port cleanup"))}var ep=Ie(require("path"),1),OD=require("os"),wg=require("fs"),CD=require("child_process");Q();Ji();var PD=ep.default.join((0,OD.homedir)(),".claude-mem"),hee=5e3,xg=class extends Error{constructor(){super("dry-run rollback"),this.name="DryRunRollback"}};function dk(t,e){let r=(0,CD.spawnSync)("git",["-C",t,...e],{encoding:"utf8",timeout:hee});return r.status!==0?null:(r.stdout??"").trim()}function AD(t){let e=dk(t,["rev-parse","--path-format=absolute","--git-common-dir"]);if(!e)return null;let r=e.endsWith("/.git")?ep.default.dirname(e):e.replace(/\.git$/,"");return(0,wg.existsSync)(r)?r:null}function gee(t){let e=dk(t,["worktree","list","--porcelain"]);if(!e)return[];let r=[],n={};for(let i of e.split(` `))if(i.startsWith("worktree "))n.path&&r.push({path:n.path,branch:n.branch??null}),n={path:i.slice(9).trim(),branch:null};else if(i.startsWith("branch ")){let s=i.slice(7).trim();n.branch=s.startsWith("refs/heads/")?s.slice(11):s}else i===""&&n.path&&(r.push({path:n.path,branch:n.branch??null}),n={});return n.path&&r.push({path:n.path,branch:n.branch??null}),r}function vee(t){let e=dk(t,["branch","--merged","HEAD","--format=%(refname:short)"]);return e?new Set(e.split(` -`).map(r=>r.trim()).filter(r=>r.length>0)):new Set}async function pk(t={}){let e=t.dataDirectory??PD,r=t.dryRun??!1,n=t.repoPath??process.cwd(),i=AD(n),s=i?ar(i).primary:"",o={repoPath:i??n,parentProject:s,scannedWorktrees:0,mergedBranches:[],adoptedObservations:0,adoptedSummaries:0,chromaUpdates:0,chromaFailed:0,dryRun:r,errors:[]};if(!i)return y.debug("SYSTEM","Worktree adoption skipped (not a git repo)",{startCwd:n}),o;let a=ep.default.join(e,"claude-mem.db");if(!(0,wg.existsSync)(a))return y.debug("SYSTEM","Worktree adoption skipped (no DB yet)",{dbPath:a}),o;let u=gee(i).filter(m=>m.path!==i);if(o.scannedWorktrees=u.length,u.length===0)return o;let l;if(t.onlyBranch)l=u.filter(m=>m.branch===t.onlyBranch);else{let m=vee(i);l=u.filter(f=>f.branch!==null&&m.has(f.branch))}if(o.mergedBranches=l.map(m=>m.branch).filter(m=>m!==null),l.length===0)return o;let d=[],p=null;try{let{Database:m}=require("bun:sqlite");p=new m(a);let f=p.prepare("PRAGMA table_info(observations)").all(),h=p.prepare("PRAGMA table_info(session_summaries)").all(),g=f.some(w=>w.name==="merged_into_project"),v=h.some(w=>w.name==="merged_into_project");if(!g||!v)return y.debug("SYSTEM","Worktree adoption skipped (merged_into_project column missing; will run after migration)",{obsHasColumn:g,sumHasColumn:v}),o;let _=p.prepare("SELECT id FROM observations WHERE project = ? AND merged_into_project IS NULL"),b=p.prepare("UPDATE observations SET merged_into_project = ? WHERE project = ? AND merged_into_project IS NULL"),S=p.prepare("UPDATE session_summaries SET merged_into_project = ? WHERE project = ? AND merged_into_project IS NULL"),x=p.transaction(()=>{for(let w of l)try{let E=ar(w.path).primary,k=_.all(E);for(let A of k)d.push(A.id);let $=b.run(s,E).changes,C=S.run(s,E).changes;o.adoptedObservations+=$,o.adoptedSummaries+=C}catch(E){let k=E instanceof Error?E.message:String(E);y.warn("SYSTEM","Worktree adoption skipped branch",{worktree:w.path,branch:w.branch,error:k}),o.errors.push({worktree:w.path,error:k})}if(r)throw new xg});try{x()}catch(w){if(!(w instanceof xg))throw w}}finally{p?.close()}if(!r&&d.length>0){let m=new _o("claude-mem");try{await m.updateMergedIntoProject(d,s),o.chromaUpdates=d.length}catch(f){y.error("CHROMA_SYNC","Worktree adoption Chroma patch failed (SQL already committed)",{parentProject:s,sqliteIdCount:d.length},f),o.chromaFailed=d.length}finally{await m.close()}}return(o.adoptedObservations>0||o.adoptedSummaries>0||o.chromaUpdates>0||o.errors.length>0)&&y.info("SYSTEM","Worktree adoption applied",{parentProject:s,dryRun:r,scannedWorktrees:o.scannedWorktrees,mergedBranches:o.mergedBranches,adoptedObservations:o.adoptedObservations,adoptedSummaries:o.adoptedSummaries,chromaUpdates:o.chromaUpdates,chromaFailed:o.chromaFailed,errors:o.errors.length}),o}async function ND(t={}){let e=t.dataDirectory??PD,r=ep.default.join(e,"claude-mem.db"),n=[];if(!(0,wg.existsSync)(r))return y.debug("SYSTEM","Worktree adoption skipped (no DB yet)",{dbPath:r}),n;let i=new Set,s=null;try{let{Database:o}=require("bun:sqlite");if(s=new o(r,{readonly:!0}),!s.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'").get())return y.debug("SYSTEM","Worktree adoption skipped (pending_messages table missing)"),n;let c=s.prepare(` +`).map(r=>r.trim()).filter(r=>r.length>0)):new Set}async function pk(t={}){let e=t.dataDirectory??PD,r=t.dryRun??!1,n=t.repoPath??process.cwd(),i=AD(n),s=i?ar(i).primary:"",o={repoPath:i??n,parentProject:s,scannedWorktrees:0,mergedBranches:[],adoptedObservations:0,adoptedSummaries:0,chromaUpdates:0,chromaFailed:0,dryRun:r,errors:[]};if(!i)return y.debug("SYSTEM","Worktree adoption skipped (not a git repo)",{startCwd:n}),o;let a=ep.default.join(e,"claude-mem.db");if(!(0,wg.existsSync)(a))return y.debug("SYSTEM","Worktree adoption skipped (no DB yet)",{dbPath:a}),o;let u=gee(i).filter(m=>m.path!==i);if(o.scannedWorktrees=u.length,u.length===0)return o;let l;if(t.onlyBranch)l=u.filter(m=>m.branch===t.onlyBranch);else{let m=vee(i);l=u.filter(f=>f.branch!==null&&m.has(f.branch))}if(o.mergedBranches=l.map(m=>m.branch).filter(m=>m!==null),l.length===0)return o;let d=[],p=null;try{let{Database:m}=require("bun:sqlite");p=new m(a);let f=p.prepare("PRAGMA table_info(observations)").all(),h=p.prepare("PRAGMA table_info(session_summaries)").all(),g=f.some(w=>w.name==="merged_into_project"),v=h.some(w=>w.name==="merged_into_project");if(!g||!v)return y.debug("SYSTEM","Worktree adoption skipped (merged_into_project column missing; will run after migration)",{obsHasColumn:g,sumHasColumn:v}),o;let _=p.prepare(`SELECT id FROM observations + WHERE project = ? + AND (merged_into_project IS NULL OR merged_into_project = ?)`),b=p.prepare("UPDATE observations SET merged_into_project = ? WHERE project = ? AND merged_into_project IS NULL"),S=p.prepare("UPDATE session_summaries SET merged_into_project = ? WHERE project = ? AND merged_into_project IS NULL"),x=p.transaction(()=>{for(let w of l)try{let E=ar(w.path).primary,k=_.all(E,s);for(let A of k)d.push(A.id);let $=b.run(s,E).changes,C=S.run(s,E).changes;o.adoptedObservations+=$,o.adoptedSummaries+=C}catch(E){let k=E instanceof Error?E.message:String(E);y.warn("SYSTEM","Worktree adoption skipped branch",{worktree:w.path,branch:w.branch,error:k}),o.errors.push({worktree:w.path,error:k})}if(r)throw new xg});try{x()}catch(w){if(!(w instanceof xg))throw w}}finally{p?.close()}if(!r&&d.length>0){let m=new _o("claude-mem");try{await m.updateMergedIntoProject(d,s),o.chromaUpdates=d.length}catch(f){y.error("CHROMA_SYNC","Worktree adoption Chroma patch failed (SQL already committed)",{parentProject:s,sqliteIdCount:d.length},f),o.chromaFailed=d.length}finally{await m.close()}}return(o.adoptedObservations>0||o.adoptedSummaries>0||o.chromaUpdates>0||o.errors.length>0)&&y.info("SYSTEM","Worktree adoption applied",{parentProject:s,dryRun:r,scannedWorktrees:o.scannedWorktrees,mergedBranches:o.mergedBranches,adoptedObservations:o.adoptedObservations,adoptedSummaries:o.adoptedSummaries,chromaUpdates:o.chromaUpdates,chromaFailed:o.chromaFailed,errors:o.errors.length}),o}async function ND(t={}){let e=t.dataDirectory??PD,r=ep.default.join(e,"claude-mem.db"),n=[];if(!(0,wg.existsSync)(r))return y.debug("SYSTEM","Worktree adoption skipped (no DB yet)",{dbPath:r}),n;let i=new Set,s=null;try{let{Database:o}=require("bun:sqlite");if(s=new o(r,{readonly:!0}),!s.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'").get())return y.debug("SYSTEM","Worktree adoption skipped (pending_messages table missing)"),n;let c=s.prepare(` SELECT cwd FROM pending_messages WHERE cwd IS NOT NULL AND cwd != '' GROUP BY cwd diff --git a/src/services/infrastructure/WorktreeAdoption.ts b/src/services/infrastructure/WorktreeAdoption.ts index 1f8e1145..a4a47c1c 100644 --- a/src/services/infrastructure/WorktreeAdoption.ts +++ b/src/services/infrastructure/WorktreeAdoption.ts @@ -127,15 +127,17 @@ function listMergedBranches(mainRepo: string): Set { * Stamp `merged_into_project` on observations and session_summaries for every * worktree of `opts.repoPath` whose branch has been merged into the parent's HEAD. * - * Idempotent: a row is only touched when its `merged_into_project IS NULL`. + * SQL writes are idempotent: an UPDATE only touches rows where + * `merged_into_project IS NULL`. `result.adoptedObservations` / `adoptedSummaries` + * reflect the actual SQL changes on each run. * - * Chroma is patched AFTER SQL commits. Chroma failure does NOT roll back SQL — - * SQL is source of truth. A transient Chroma failure does NOT auto-retry: - * once SQL commits, `merged_into_project IS NULL` no longer matches those rows, - * so the same adoption pass won't rediscover them. If Chroma patching fails, - * `result.chromaFailed` reflects the count — callers should surface this to - * the operator, and re-running adoption after clearing `merged_into_project` - * (or reseeding Chroma) is the recovery path. + * Chroma patches are self-healing: the Chroma id set is built from ALL + * observations whose `project` matches a merged worktree (both unadopted rows + * AND rows previously stamped to this parent), and `updateMergedIntoProject` + * is idempotent, so a transient Chroma failure on an earlier run is retried + * automatically on the next adoption pass. `result.chromaUpdates` therefore + * counts the total Chroma writes performed this pass (which may exceed + * `adoptedObservations` when retries happen). */ export async function adoptMergedWorktrees(opts: { repoPath?: string; @@ -228,8 +230,16 @@ export async function adoptMergedWorktrees(opts: { return result; } - const selectObs = db.prepare( - 'SELECT id FROM observations WHERE project = ? AND merged_into_project IS NULL' + // Select ALL observations for the worktree project (both unadopted rows + // AND rows already stamped to this parent), not just unadopted ones. This + // ensures a transient Chroma failure on a prior run gets retried the next + // time adoption executes: SQL may already be stamped, but we re-include + // those ids in the Chroma patch set (updateMergedIntoProject is idempotent + // — it replays the same metadata write). + const selectObsForPatch = db.prepare( + `SELECT id FROM observations + WHERE project = ? + AND (merged_into_project IS NULL OR merged_into_project = ?)` ); const updateObs = db.prepare( 'UPDATE observations SET merged_into_project = ? WHERE project = ? AND merged_into_project IS NULL' @@ -242,9 +252,14 @@ export async function adoptMergedWorktrees(opts: { for (const wt of targets) { try { const worktreeProject = getProjectContext(wt.path).primary; - const rows = selectObs.all(worktreeProject) as Array<{ id: number }>; + const rows = selectObsForPatch.all( + worktreeProject, + parentProject + ) as Array<{ id: number }>; for (const r of rows) adoptedSqliteIds.push(r.id); + // updateObs/updateSum only touch WHERE merged_into_project IS NULL, + // so .changes reflects only newly-adopted rows (not the re-patched ones). const obsChanges = updateObs.run(parentProject, worktreeProject).changes; const sumChanges = updateSum.run(parentProject, worktreeProject).changes; result.adoptedObservations += obsChanges;