Compare commits

...

9 Commits

Author SHA1 Message Date
Alex Newman 149f548667 chore: bump version to 10.2.5
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:17:08 -05:00
Alex Newman b88251bc8b fix: self-healing claimNextMessage prevents stuck processing messages (#1159)
* fix: self-healing claimNextMessage prevents stuck processing messages

claimAndDelete → claimNextMessage with atomic self-healing: resets stale
processing messages (>60s) back to pending before claiming. Eliminates
stuck messages from generator crashes without external timers. Removes
redundant idle-timeout reset in worker-service.ts. Adds QUEUE to logger
Component type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: update stale comments in SessionQueueProcessor to reflect claim-confirm pattern

Comments still referenced the old claim-and-delete pattern after the
claimNextMessage rename. Updated to accurately describe the current
lifecycle where messages are marked as processing and stay in DB until
confirmProcessed() is called.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move Date.now() inside transaction and extract stale threshold constant

- Move Date.now() inside claimNextMessage transaction closure so timestamp
  is fresh if WAL contention causes retry
- Extract STALE_PROCESSING_THRESHOLD_MS to module-level constant
- Add comment clarifying strict < boundary semantics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:15:46 -05:00
Alex Newman b2e3a7e668 docs: update CHANGELOG.md for v10.2.4
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:49:42 -05:00
Alex Newman b1cfc85333 chore: bump version to 10.2.4
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:49:10 -05:00
Alex Newman ca8421611c fix: backfill Chroma vector DB for all projects on startup (#1154)
* fix: backfill all Chroma projects on worker startup

ChromaSync.ensureBackfilled() existed but was never called. After
v10.2.2's bun cache clear destroyed the ONNX model cache, Chroma only
had ~2 days of embeddings while SQLite had 49k+ observations.

- Add static backfillAllProjects() to ChromaSync — iterates all projects
  in SQLite, creates temporary ChromaSync per project, runs smart diff
- Call backfillAllProjects() fire-and-forget on worker startup
- Add 'CHROMA_SYNC' to logger Component type (pre-existing gap)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: sanitize project names for Chroma collection naming

Replace characters outside [a-zA-Z0-9._-] with underscores so projects
like "YC Stuff" map to collection "cm__YC_Stuff" instead of failing
Chroma's collection name validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: route backfill to shared cm__claude-mem collection, harden sanitization

- Use single ChromaSync('claude-mem') in backfillAllProjects() instead of
  per-project instances, matching how DatabaseManager and SearchManager
  operate — fixes critical bug where backfilled data landed in orphaned
  collections that no search path reads from
- Strip trailing non-alphanumeric chars from sanitized collection names
  to satisfy Chroma's end-character constraint
- Guard backfill behind Chroma server readiness to avoid N spurious error
  logs when Chroma failed to start
- Use CHROMA_SYNC log component consistently for backfill messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: pass project as parameter to ensureBackfilled instead of mutating instance state

Eliminates shared mutable state in backfillAllProjects() loop. Project
scoping is now passed explicitly via parameter to both ensureBackfilled()
and getExistingChromaIds(), keeping a single Chroma connection while
avoiding fragile instance property mutation across iterations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:47:46 -05:00
Alex Newman eea4f599c0 docs: update CHANGELOG.md for v10.2.3
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:55:51 -05:00
Alex Newman b446f2630e chore: bump version to 10.2.3
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:55:09 -05:00
Alex Newman 224567f980 fix: prevent ONNX model cache corruption from bun cache clears
Remove nuclear `bun pm cache rm` from smart-install.js and
sync-marketplace.cjs (only needed for removed sharp dependency).
Add `bun install` in cache version directory after sync so worker
can resolve dependencies. Move HuggingFace model cache to
~/.claude-mem/models/ so reinstalls don't corrupt it. Add self-healing
retry for Protobuf parsing failures.

Fixes recurring issues #1104, #1105, #1110.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:54:17 -05:00
Alex Newman 2b31792f06 docs: update CHANGELOG.md for v10.2.2
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:22:09 -05:00
18 changed files with 768 additions and 442 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
"plugins": [
{
"name": "claude-mem",
"version": "10.2.2",
"version": "10.2.5",
"source": "./plugin",
"description": "Persistent memory system for Claude Code - context compression across sessions"
}
+51 -35
View File
@@ -2,6 +2,57 @@
All notable changes to claude-mem.
## [v10.2.4] - 2026-02-18
## Chroma Vector DB Backfill Fix
Fixes the Chroma backfill system to correctly sync all SQLite observations into the vector database on worker startup.
### Bug Fixes
- **Backfill all projects on startup** — `backfillAllProjects()` now runs on worker startup, iterating all projects in SQLite and syncing missing observations to Chroma. Previously `ensureBackfilled()` existed but was never called, leaving Chroma with incomplete data after cache clears.
- **Fixed critical collection routing bug** — Backfill now uses the shared `cm__claude-mem` collection (matching how DatabaseManager and SearchManager operate) instead of creating per-project orphan collections that no search path reads from.
- **Hardened collection name sanitization** — Project names with special characters (e.g., "YC Stuff") are sanitized for Chroma's naming constraints, including stripping trailing non-alphanumeric characters.
- **Eliminated shared mutable state** — `ensureBackfilled()` and `getExistingChromaIds()` now accept project as a parameter instead of mutating instance state, keeping a single Chroma connection while avoiding fragile property mutation across iterations.
- **Chroma readiness guard** — Backfill waits for Chroma server readiness before running, preventing spurious error logs when Chroma fails to start.
### Changed Files
- `src/services/sync/ChromaSync.ts` — Core backfill logic, sanitization, parameter passing
- `src/services/worker-service.ts` — Startup backfill trigger + readiness guard
- `src/utils/logger.ts` — Added `CHROMA_SYNC` log component
## [v10.2.3] - 2026-02-17
## Fix Chroma ONNX Model Cache Corruption
Addresses the persistent embedding pipeline failures reported across #1104, #1105, #1110, and subsequent sessions. Three root causes identified and fixed:
### Changes
- **Removed nuclear `bun pm cache rm`** from both `smart-install.js` and `sync-marketplace.cjs`. This was added in v10.2.2 for the now-removed sharp dependency but destroyed all cached packages, breaking the ONNX resolution chain.
- **Added `bun install` in plugin cache directory** after marketplace sync. The cache directory had a `package.json` with `@chroma-core/default-embed` as a dependency but never ran install, so the worker couldn't resolve it at runtime.
- **Moved HuggingFace model cache to `~/.claude-mem/models/`** outside `node_modules`. The ~23MB ONNX model was stored inside `node_modules/@huggingface/transformers/.cache/`, so any reinstall or cache clear corrupted it.
- **Added self-healing retry** for Protobuf parsing failures. If the downloaded model is corrupted, the cache is cleared and re-downloaded automatically on next use.
### Files Changed
- `scripts/smart-install.js` — removed `bun pm cache rm`
- `scripts/sync-marketplace.cjs` — removed `bun pm cache rm`, added `bun install` in cache dir
- `src/services/sync/ChromaSync.ts` — moved model cache, added corruption recovery
## [v10.2.2] - 2026-02-17
## Bug Fixes
- **Removed `node-addon-api` dev dependency** — was only needed for `sharp`, which was already removed in v10.2.1
- **Simplified native module cache clearing** in `smart-install.js` and `sync-marketplace.cjs` — replaced targeted `@img/sharp` directory deletion and lockfile removal with `bun pm cache rm`
- Reduced ~30 lines of brittle file system manipulation to a clean Bun CLI command
## [v10.2.1] - 2026-02-16
## Bug Fixes
@@ -1388,38 +1439,3 @@ This patch release addresses several issues discovered after the session continu
Full changelog: https://github.com/thedotmack/claude-mem/compare/v8.2.4...v8.2.5
## [v8.2.4] - 2025-12-28
Patch release v8.2.4
## [v8.2.3] - 2025-12-27
## Bug Fixes
- Fix worker port environment variable in smart-install script
- Implement file-based locking mechanism for worker operations to prevent race conditions
- Fix restart command references in documentation (changed from `claude-mem restart` to `npm run worker:restart`)
## [v8.2.2] - 2025-12-27
## What's Changed
### Features
- Add OpenRouter provider settings and documentation
- Add modal footer with save button and status indicators
- Implement self-spawn pattern for background worker execution
### Bug Fixes
- Resolve critical error handling issues in worker lifecycle
- Handle Windows/Unix kill errors in orphaned process cleanup
- Validate spawn pid before writing PID file
- Handle process exit in waitForProcessesExit filter
- Use readiness endpoint for health checks instead of port check
- Add missing OpenRouter and Gemini settings to settingKeys array
### Other Changes
- Enhance error handling and validation in agents and routes
- Delete obsolete process management files (ProcessManager, worker-wrapper, worker-cli)
- Update hooks.json to use worker-service.cjs CLI
- Add comprehensive tests for hook constants and worker spawn functionality
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.2",
"version": "10.2.5",
"description": "Memory compression system for Claude Code - persist context across sessions",
"keywords": [
"claude",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.2",
"version": "10.2.5",
"description": "Persistent memory system for Claude Code - seamlessly preserve context across sessions",
"author": {
"name": "Alex Newman"
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem-plugin",
"version": "10.2.2",
"version": "10.2.5",
"private": true,
"description": "Runtime dependencies for claude-mem bundled hooks",
"type": "module",
+6 -6
View File
@@ -6,7 +6,7 @@ ${o.stack}`:` ${o.message}`:this.getLevel()===0&&typeof o=="object"?m=`
`,"utf8")}catch(E){process.stderr.write(`[LOGGER] Failed to write to log file: ${E}
`)}else process.stderr.write(T+`
`)}debug(e,t,s,n){this.log(0,e,t,s,n)}info(e,t,s,n){this.log(1,e,t,s,n)}warn(e,t,s,n){this.log(2,e,t,s,n)}error(e,t,s,n){this.log(3,e,t,s,n)}dataIn(e,t,s,n){this.info(e,`\u2192 ${t}`,s,n)}dataOut(e,t,s,n){this.info(e,`\u2190 ${t}`,s,n)}success(e,t,s,n){this.info(e,`\u2713 ${t}`,s,n)}failure(e,t,s,n){this.error(e,`\u2717 ${t}`,s,n)}timing(e,t,s,n){this.info(e,`\u23F1 ${t}`,n,{duration:`${s}ms`})}happyPathError(e,t,s,n,o=""){let c=((new Error().stack||"").split(`
`)[2]||"").match(/at\s+(?:.*\s+)?\(?([^:]+):(\d+):(\d+)\)?/),m=c?`${c[1].split("/").pop()}:${c[2]}`:"unknown",l={...s,location:m};return this.warn(e,`[HAPPY-PATH] ${t}`,l,n),o}},_=new W;var Ot={};function bt(){return typeof __dirname<"u"?__dirname:(0,b.dirname)((0,ce.fileURLToPath)(Ot.url))}var ht=bt(),N=I.get("CLAUDE_MEM_DATA_DIR"),y=process.env.CLAUDE_CONFIG_DIR||(0,b.join)((0,de.homedir)(),".claude"),Bt=(0,b.join)(y,"plugins","marketplaces","thedotmack"),Ht=(0,b.join)(N,"archives"),Wt=(0,b.join)(N,"logs"),Yt=(0,b.join)(N,"trash"),Vt=(0,b.join)(N,"backups"),qt=(0,b.join)(N,"modes"),Kt=(0,b.join)(N,"settings.json"),_e=(0,b.join)(N,"claude-mem.db"),Jt=(0,b.join)(N,"vector-db"),zt=(0,b.join)(N,"observer-sessions"),Qt=(0,b.join)(y,"settings.json"),Zt=(0,b.join)(y,"commands"),es=(0,b.join)(y,"CLAUDE.md");function me(r){(0,pe.mkdirSync)(r,{recursive:!0})}function le(){return(0,b.join)(ht,"..")}var $=class{db;constructor(e=_e){e!==":memory:"&&me(N),this.db=new ue.Database(e),this.db.run("PRAGMA journal_mode = WAL"),this.db.run("PRAGMA synchronous = NORMAL"),this.db.run("PRAGMA foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable(),this.ensureDiscoveryTokensColumn(),this.createPendingMessagesTable(),this.renameSessionIdColumns(),this.repairSessionIdColumnRename(),this.addFailedAtEpochColumn(),this.addOnUpdateCascadeToForeignKeys()}initializeSchema(){this.db.run(`
`)[2]||"").match(/at\s+(?:.*\s+)?\(?([^:]+):(\d+):(\d+)\)?/),m=c?`${c[1].split("/").pop()}:${c[2]}`:"unknown",l={...s,location:m};return this.warn(e,`[HAPPY-PATH] ${t}`,l,n),o}},_=new W;var Ot={};function bt(){return typeof __dirname<"u"?__dirname:(0,b.dirname)((0,ce.fileURLToPath)(Ot.url))}var ht=bt(),N=I.get("CLAUDE_MEM_DATA_DIR"),y=process.env.CLAUDE_CONFIG_DIR||(0,b.join)((0,de.homedir)(),".claude"),Bt=(0,b.join)(y,"plugins","marketplaces","thedotmack"),Ht=(0,b.join)(N,"archives"),Wt=(0,b.join)(N,"logs"),Yt=(0,b.join)(N,"trash"),Vt=(0,b.join)(N,"backups"),qt=(0,b.join)(N,"modes"),Kt=(0,b.join)(N,"settings.json"),_e=(0,b.join)(N,"claude-mem.db"),Jt=(0,b.join)(N,"vector-db"),Qt=(0,b.join)(N,"observer-sessions"),zt=(0,b.join)(y,"settings.json"),Zt=(0,b.join)(y,"commands"),es=(0,b.join)(y,"CLAUDE.md");function me(r){(0,pe.mkdirSync)(r,{recursive:!0})}function le(){return(0,b.join)(ht,"..")}var $=class{db;constructor(e=_e){e!==":memory:"&&me(N),this.db=new ue.Database(e),this.db.run("PRAGMA journal_mode = WAL"),this.db.run("PRAGMA synchronous = NORMAL"),this.db.run("PRAGMA foreign_keys = ON"),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable(),this.ensureDiscoveryTokensColumn(),this.createPendingMessagesTable(),this.renameSessionIdColumns(),this.repairSessionIdColumnRename(),this.addFailedAtEpochColumn(),this.addOnUpdateCascadeToForeignKeys()}initializeSchema(){this.db.run(`
CREATE TABLE IF NOT EXISTS schema_versions (
id INTEGER PRIMARY KEY,
version INTEGER UNIQUE NOT NULL,
@@ -611,7 +611,7 @@ ${o.stack}`:` ${o.message}`:this.getLevel()===0&&typeof o=="object"?m=`
)
ORDER BY created_at_epoch DESC
LIMIT ?
`).all(e,...s,...o,t.totalObservationCount)}function z(r,e,t){return r.db.prepare(`
`).all(e,...s,...o,t.totalObservationCount)}function Q(r,e,t){return r.db.prepare(`
SELECT id, memory_session_id, request, investigated, learned, completed, next_steps, created_at, created_at_epoch
FROM session_summaries
WHERE project = ?
@@ -638,13 +638,13 @@ ${o.stack}`:` ${o.message}`:this.getLevel()===0&&typeof o=="object"?m=`
ORDER BY created_at_epoch DESC
LIMIT ?
`).all(...e,t.sessionCount+V)}function Nt(r){return r.replace(/\//g,"-")}function Ct(r){try{if(!(0,X.existsSync)(r))return{userMessage:"",assistantMessage:""};let e=(0,X.readFileSync)(r,"utf-8").trim();if(!e)return{userMessage:"",assistantMessage:""};let t=e.split(`
`).filter(n=>n.trim()),s="";for(let n=t.length-1;n>=0;n--)try{let o=t[n];if(!o.includes('"type":"assistant"'))continue;let i=JSON.parse(o);if(i.type==="assistant"&&i.message?.content&&Array.isArray(i.message.content)){let a="";for(let d of i.message.content)d.type==="text"&&(a+=d.text);if(a=a.replace(/<system-reminder>[\s\S]*?<\/system-reminder>/g,"").trim(),a){s=a;break}}}catch(o){_.debug("PARSER","Skipping malformed transcript line",{lineIndex:n},o);continue}return{userMessage:"",assistantMessage:s}}catch(e){return _.failure("WORKER","Failed to extract prior messages from transcript",{transcriptPath:r},e),{userMessage:"",assistantMessage:""}}}function Q(r,e,t,s){if(!e.showLastMessage||r.length===0)return{userMessage:"",assistantMessage:""};let n=r.find(d=>d.memory_session_id!==t);if(!n)return{userMessage:"",assistantMessage:""};let o=n.memory_session_id,i=Nt(s),a=be.default.join(y,"projects",i,`${o}.jsonl`);return Ct(a)}function Re(r,e){let t=e[0]?.id;return r.map((s,n)=>{let o=n===0?null:e[n+1];return{...s,displayEpoch:o?o.created_at_epoch:s.created_at_epoch,displayTime:o?o.created_at:s.created_at,shouldShowLink:s.id!==t}})}function Z(r,e){let t=[...r.map(s=>({type:"observation",data:s})),...e.map(s=>({type:"summary",data:s}))];return t.sort((s,n)=>{let o=s.type==="observation"?s.data.created_at_epoch:s.data.displayEpoch,i=n.type==="observation"?n.data.created_at_epoch:n.data.displayEpoch;return o-i}),t}function Ne(r,e){return new Set(r.slice(0,e).map(t=>t.id))}function Ce(){let r=new Date,e=r.toLocaleDateString("en-CA"),t=r.toLocaleTimeString("en-US",{hour:"numeric",minute:"2-digit",hour12:!0}).toLowerCase().replace(" ",""),s=r.toLocaleTimeString("en-US",{timeZoneName:"short"}).split(" ").pop();return`${e} ${t} ${s}`}function Ae(r){return[`# [${r}] recent context, ${Ce()}`,""]}function Ie(){return[`**Legend:** session-request | ${O.getInstance().getActiveMode().observation_types.map(t=>`${t.emoji} ${t.id}`).join(" | ")}`,""]}function ye(){return["**Column Key**:","- **Read**: Tokens to read this observation (cost to learn it now)","- **Work**: Tokens spent on work that produced this record ( research, building, deciding)",""]}function Me(){return["**Context Index:** This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.","","When you need implementation details, rationale, or debugging context:","- Use MCP tools (search, get_observations) to fetch full observations on-demand","- Critical types ( bugfix, decision) often need detailed fetching","- Trust this index over re-reading code for past decisions and learnings",""]}function Le(r,e){let t=[];if(t.push("**Context Economics**:"),t.push(`- Loading: ${r.totalObservations} observations (${r.totalReadTokens.toLocaleString()} tokens to read)`),t.push(`- Work investment: ${r.totalDiscoveryTokens.toLocaleString()} tokens spent on research, building, and decisions`),r.totalDiscoveryTokens>0&&(e.showSavingsAmount||e.showSavingsPercent)){let s="- Your savings: ";e.showSavingsAmount&&e.showSavingsPercent?s+=`${r.savings.toLocaleString()} tokens (${r.savingsPercent}% reduction from reuse)`:e.showSavingsAmount?s+=`${r.savings.toLocaleString()} tokens`:s+=`${r.savingsPercent}% reduction from reuse`,t.push(s)}return t.push(""),t}function ve(r){return[`### ${r}`,""]}function De(r){return[`**${r}**`,"| ID | Time | T | Title | Read | Work |","|----|------|---|-------|------|------|"]}function xe(r,e,t){let s=r.title||"Untitled",n=O.getInstance().getTypeIcon(r.type),{readTokens:o,discoveryDisplay:i}=M(r,t),a=t.showReadTokens?`~${o}`:"",d=t.showWorkTokens?i:"";return`| #${r.id} | ${e||'"'} | ${n} | ${s} | ${a} | ${d} |`}function Ue(r,e,t,s){let n=[],o=r.title||"Untitled",i=O.getInstance().getTypeIcon(r.type),{readTokens:a,discoveryDisplay:d}=M(r,s);n.push(`**#${r.id}** ${e||'"'} ${i} **${o}**`),t&&(n.push(""),n.push(t),n.push(""));let c=[];return s.showReadTokens&&c.push(`Read: ~${a}`),s.showWorkTokens&&c.push(`Work: ${d}`),c.length>0&&n.push(c.join(", ")),n.push(""),n}function ke(r,e){let t=`${r.request||"Session started"} (${e})`;return[`**#S${r.id}** ${t}`,""]}function x(r,e){return e?[`**${r}**: ${e}`,""]:[]}function we(r){return r.assistantMessage?["","---","","**Previously**","",`A: ${r.assistantMessage}`,""]:[]}function $e(r,e){return["",`Access ${Math.round(r/1e3)}k tokens of past research & decisions for just ${e.toLocaleString()}t. Use MCP search tools to access memories by ID.`]}function Fe(r){return`# [${r}] recent context, ${Ce()}
`).filter(n=>n.trim()),s="";for(let n=t.length-1;n>=0;n--)try{let o=t[n];if(!o.includes('"type":"assistant"'))continue;let i=JSON.parse(o);if(i.type==="assistant"&&i.message?.content&&Array.isArray(i.message.content)){let a="";for(let d of i.message.content)d.type==="text"&&(a+=d.text);if(a=a.replace(/<system-reminder>[\s\S]*?<\/system-reminder>/g,"").trim(),a){s=a;break}}}catch(o){_.debug("PARSER","Skipping malformed transcript line",{lineIndex:n},o);continue}return{userMessage:"",assistantMessage:s}}catch(e){return _.failure("WORKER","Failed to extract prior messages from transcript",{transcriptPath:r},e),{userMessage:"",assistantMessage:""}}}function z(r,e,t,s){if(!e.showLastMessage||r.length===0)return{userMessage:"",assistantMessage:""};let n=r.find(d=>d.memory_session_id!==t);if(!n)return{userMessage:"",assistantMessage:""};let o=n.memory_session_id,i=Nt(s),a=be.default.join(y,"projects",i,`${o}.jsonl`);return Ct(a)}function Re(r,e){let t=e[0]?.id;return r.map((s,n)=>{let o=n===0?null:e[n+1];return{...s,displayEpoch:o?o.created_at_epoch:s.created_at_epoch,displayTime:o?o.created_at:s.created_at,shouldShowLink:s.id!==t}})}function Z(r,e){let t=[...r.map(s=>({type:"observation",data:s})),...e.map(s=>({type:"summary",data:s}))];return t.sort((s,n)=>{let o=s.type==="observation"?s.data.created_at_epoch:s.data.displayEpoch,i=n.type==="observation"?n.data.created_at_epoch:n.data.displayEpoch;return o-i}),t}function Ne(r,e){return new Set(r.slice(0,e).map(t=>t.id))}function Ce(){let r=new Date,e=r.toLocaleDateString("en-CA"),t=r.toLocaleTimeString("en-US",{hour:"numeric",minute:"2-digit",hour12:!0}).toLowerCase().replace(" ",""),s=r.toLocaleTimeString("en-US",{timeZoneName:"short"}).split(" ").pop();return`${e} ${t} ${s}`}function Ae(r){return[`# [${r}] recent context, ${Ce()}`,""]}function Ie(){return[`**Legend:** session-request | ${O.getInstance().getActiveMode().observation_types.map(t=>`${t.emoji} ${t.id}`).join(" | ")}`,""]}function ye(){return["**Column Key**:","- **Read**: Tokens to read this observation (cost to learn it now)","- **Work**: Tokens spent on work that produced this record ( research, building, deciding)",""]}function Me(){return["**Context Index:** This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.","","When you need implementation details, rationale, or debugging context:","- Use MCP tools (search, get_observations) to fetch full observations on-demand","- Critical types ( bugfix, decision) often need detailed fetching","- Trust this index over re-reading code for past decisions and learnings",""]}function Le(r,e){let t=[];if(t.push("**Context Economics**:"),t.push(`- Loading: ${r.totalObservations} observations (${r.totalReadTokens.toLocaleString()} tokens to read)`),t.push(`- Work investment: ${r.totalDiscoveryTokens.toLocaleString()} tokens spent on research, building, and decisions`),r.totalDiscoveryTokens>0&&(e.showSavingsAmount||e.showSavingsPercent)){let s="- Your savings: ";e.showSavingsAmount&&e.showSavingsPercent?s+=`${r.savings.toLocaleString()} tokens (${r.savingsPercent}% reduction from reuse)`:e.showSavingsAmount?s+=`${r.savings.toLocaleString()} tokens`:s+=`${r.savingsPercent}% reduction from reuse`,t.push(s)}return t.push(""),t}function ve(r){return[`### ${r}`,""]}function De(r){return[`**${r}**`,"| ID | Time | T | Title | Read | Work |","|----|------|---|-------|------|------|"]}function xe(r,e,t){let s=r.title||"Untitled",n=O.getInstance().getTypeIcon(r.type),{readTokens:o,discoveryDisplay:i}=M(r,t),a=t.showReadTokens?`~${o}`:"",d=t.showWorkTokens?i:"";return`| #${r.id} | ${e||'"'} | ${n} | ${s} | ${a} | ${d} |`}function Ue(r,e,t,s){let n=[],o=r.title||"Untitled",i=O.getInstance().getTypeIcon(r.type),{readTokens:a,discoveryDisplay:d}=M(r,s);n.push(`**#${r.id}** ${e||'"'} ${i} **${o}**`),t&&(n.push(""),n.push(t),n.push(""));let c=[];return s.showReadTokens&&c.push(`Read: ~${a}`),s.showWorkTokens&&c.push(`Work: ${d}`),c.length>0&&n.push(c.join(", ")),n.push(""),n}function ke(r,e){let t=`${r.request||"Session started"} (${e})`;return[`**#S${r.id}** ${t}`,""]}function x(r,e){return e?[`**${r}**: ${e}`,""]:[]}function we(r){return r.assistantMessage?["","---","","**Previously**","",`A: ${r.assistantMessage}`,""]:[]}function $e(r,e){return["",`Access ${Math.round(r/1e3)}k tokens of past research & decisions for just ${e.toLocaleString()}t. Use MCP search tools to access memories by ID.`]}function Fe(r){return`# [${r}] recent context, ${Ce()}
No previous sessions found for this project yet.`}function Pe(){let r=new Date,e=r.toLocaleDateString("en-CA"),t=r.toLocaleTimeString("en-US",{hour:"numeric",minute:"2-digit",hour12:!0}).toLowerCase().replace(" ",""),s=r.toLocaleTimeString("en-US",{timeZoneName:"short"}).split(" ").pop();return`${e} ${t} ${s}`}function Xe(r){return["",`${p.bright}${p.cyan}[${r}] recent context, ${Pe()}${p.reset}`,`${p.gray}${"\u2500".repeat(60)}${p.reset}`,""]}function je(){let e=O.getInstance().getActiveMode().observation_types.map(t=>`${t.emoji} ${t.id}`).join(" | ");return[`${p.dim}Legend: session-request | ${e}${p.reset}`,""]}function Ge(){return[`${p.bright}Column Key${p.reset}`,`${p.dim} Read: Tokens to read this observation (cost to learn it now)${p.reset}`,`${p.dim} Work: Tokens spent on work that produced this record ( research, building, deciding)${p.reset}`,""]}function Be(){return[`${p.dim}Context Index: This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.${p.reset}`,"",`${p.dim}When you need implementation details, rationale, or debugging context:${p.reset}`,`${p.dim} - Use MCP tools (search, get_observations) to fetch full observations on-demand${p.reset}`,`${p.dim} - Critical types ( bugfix, decision) often need detailed fetching${p.reset}`,`${p.dim} - Trust this index over re-reading code for past decisions and learnings${p.reset}`,""]}function He(r,e){let t=[];if(t.push(`${p.bright}${p.cyan}Context Economics${p.reset}`),t.push(`${p.dim} Loading: ${r.totalObservations} observations (${r.totalReadTokens.toLocaleString()} tokens to read)${p.reset}`),t.push(`${p.dim} Work investment: ${r.totalDiscoveryTokens.toLocaleString()} tokens spent on research, building, and decisions${p.reset}`),r.totalDiscoveryTokens>0&&(e.showSavingsAmount||e.showSavingsPercent)){let s=" Your savings: ";e.showSavingsAmount&&e.showSavingsPercent?s+=`${r.savings.toLocaleString()} tokens (${r.savingsPercent}% reduction from reuse)`:e.showSavingsAmount?s+=`${r.savings.toLocaleString()} tokens`:s+=`${r.savingsPercent}% reduction from reuse`,t.push(`${p.green}${s}${p.reset}`)}return t.push(""),t}function We(r){return[`${p.bright}${p.cyan}${r}${p.reset}`,""]}function Ye(r){return[`${p.dim}${r}${p.reset}`]}function Ve(r,e,t,s){let n=r.title||"Untitled",o=O.getInstance().getTypeIcon(r.type),{readTokens:i,discoveryTokens:a,workEmoji:d}=M(r,s),c=t?`${p.dim}${e}${p.reset}`:" ".repeat(e.length),m=s.showReadTokens&&i>0?`${p.dim}(~${i}t)${p.reset}`:"",l=s.showWorkTokens&&a>0?`${p.dim}(${d} ${a.toLocaleString()}t)${p.reset}`:"";return` ${p.dim}#${r.id}${p.reset} ${c} ${o} ${n} ${m} ${l}`}function qe(r,e,t,s,n){let o=[],i=r.title||"Untitled",a=O.getInstance().getTypeIcon(r.type),{readTokens:d,discoveryTokens:c,workEmoji:m}=M(r,n),l=t?`${p.dim}${e}${p.reset}`:" ".repeat(e.length),T=n.showReadTokens&&d>0?`${p.dim}(~${d}t)${p.reset}`:"",E=n.showWorkTokens&&c>0?`${p.dim}(${m} ${c.toLocaleString()}t)${p.reset}`:"";return o.push(` ${p.dim}#${r.id}${p.reset} ${l} ${a} ${p.bright}${i}${p.reset}`),s&&o.push(` ${p.dim}${s}${p.reset}`),(T||E)&&o.push(` ${T} ${E}`),o.push(""),o}function Ke(r,e){let t=`${r.request||"Session started"} (${e})`;return[`${p.yellow}#S${r.id}${p.reset} ${t}`,""]}function U(r,e,t){return e?[`${t}${r}:${p.reset} ${e}`,""]:[]}function Je(r){return r.assistantMessage?["","---","",`${p.bright}${p.magenta}Previously${p.reset}`,"",`${p.dim}A: ${r.assistantMessage}${p.reset}`,""]:[]}function ze(r,e){let t=Math.round(r/1e3);return["",`${p.dim}Access ${t}k tokens of past research & decisions for just ${e.toLocaleString()}t. Use MCP search tools to access memories by ID.${p.reset}`]}function Qe(r){return`
No previous sessions found for this project yet.`}function Pe(){let r=new Date,e=r.toLocaleDateString("en-CA"),t=r.toLocaleTimeString("en-US",{hour:"numeric",minute:"2-digit",hour12:!0}).toLowerCase().replace(" ",""),s=r.toLocaleTimeString("en-US",{timeZoneName:"short"}).split(" ").pop();return`${e} ${t} ${s}`}function Xe(r){return["",`${p.bright}${p.cyan}[${r}] recent context, ${Pe()}${p.reset}`,`${p.gray}${"\u2500".repeat(60)}${p.reset}`,""]}function je(){let e=O.getInstance().getActiveMode().observation_types.map(t=>`${t.emoji} ${t.id}`).join(" | ");return[`${p.dim}Legend: session-request | ${e}${p.reset}`,""]}function Ge(){return[`${p.bright}Column Key${p.reset}`,`${p.dim} Read: Tokens to read this observation (cost to learn it now)${p.reset}`,`${p.dim} Work: Tokens spent on work that produced this record ( research, building, deciding)${p.reset}`,""]}function Be(){return[`${p.dim}Context Index: This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.${p.reset}`,"",`${p.dim}When you need implementation details, rationale, or debugging context:${p.reset}`,`${p.dim} - Use MCP tools (search, get_observations) to fetch full observations on-demand${p.reset}`,`${p.dim} - Critical types ( bugfix, decision) often need detailed fetching${p.reset}`,`${p.dim} - Trust this index over re-reading code for past decisions and learnings${p.reset}`,""]}function He(r,e){let t=[];if(t.push(`${p.bright}${p.cyan}Context Economics${p.reset}`),t.push(`${p.dim} Loading: ${r.totalObservations} observations (${r.totalReadTokens.toLocaleString()} tokens to read)${p.reset}`),t.push(`${p.dim} Work investment: ${r.totalDiscoveryTokens.toLocaleString()} tokens spent on research, building, and decisions${p.reset}`),r.totalDiscoveryTokens>0&&(e.showSavingsAmount||e.showSavingsPercent)){let s=" Your savings: ";e.showSavingsAmount&&e.showSavingsPercent?s+=`${r.savings.toLocaleString()} tokens (${r.savingsPercent}% reduction from reuse)`:e.showSavingsAmount?s+=`${r.savings.toLocaleString()} tokens`:s+=`${r.savingsPercent}% reduction from reuse`,t.push(`${p.green}${s}${p.reset}`)}return t.push(""),t}function We(r){return[`${p.bright}${p.cyan}${r}${p.reset}`,""]}function Ye(r){return[`${p.dim}${r}${p.reset}`]}function Ve(r,e,t,s){let n=r.title||"Untitled",o=O.getInstance().getTypeIcon(r.type),{readTokens:i,discoveryTokens:a,workEmoji:d}=M(r,s),c=t?`${p.dim}${e}${p.reset}`:" ".repeat(e.length),m=s.showReadTokens&&i>0?`${p.dim}(~${i}t)${p.reset}`:"",l=s.showWorkTokens&&a>0?`${p.dim}(${d} ${a.toLocaleString()}t)${p.reset}`:"";return` ${p.dim}#${r.id}${p.reset} ${c} ${o} ${n} ${m} ${l}`}function qe(r,e,t,s,n){let o=[],i=r.title||"Untitled",a=O.getInstance().getTypeIcon(r.type),{readTokens:d,discoveryTokens:c,workEmoji:m}=M(r,n),l=t?`${p.dim}${e}${p.reset}`:" ".repeat(e.length),T=n.showReadTokens&&d>0?`${p.dim}(~${d}t)${p.reset}`:"",E=n.showWorkTokens&&c>0?`${p.dim}(${m} ${c.toLocaleString()}t)${p.reset}`:"";return o.push(` ${p.dim}#${r.id}${p.reset} ${l} ${a} ${p.bright}${i}${p.reset}`),s&&o.push(` ${p.dim}${s}${p.reset}`),(T||E)&&o.push(` ${T} ${E}`),o.push(""),o}function Ke(r,e){let t=`${r.request||"Session started"} (${e})`;return[`${p.yellow}#S${r.id}${p.reset} ${t}`,""]}function U(r,e,t){return e?[`${t}${r}:${p.reset} ${e}`,""]:[]}function Je(r){return r.assistantMessage?["","---","",`${p.bright}${p.magenta}Previously${p.reset}`,"",`${p.dim}A: ${r.assistantMessage}${p.reset}`,""]:[]}function Qe(r,e){let t=Math.round(r/1e3);return["",`${p.dim}Access ${t}k tokens of past research & decisions for just ${e.toLocaleString()}t. Use MCP search tools to access memories by ID.${p.reset}`]}function ze(r){return`
${p.bright}${p.cyan}[${r}] recent context, ${Pe()}${p.reset}
${p.gray}${"\u2500".repeat(60)}${p.reset}
${p.dim}No previous sessions found for this project yet.${p.reset}
`}function Ze(r,e,t,s){let n=[];return s?n.push(...Xe(r)):n.push(...Ae(r)),s?n.push(...je()):n.push(...Ie()),s?n.push(...Ge()):n.push(...ye()),s?n.push(...Be()):n.push(...Me()),P(t)&&(s?n.push(...He(e,t)):n.push(...Le(e,t))),n}var ee=L(require("path"),1);function B(r){if(!r)return[];try{let e=JSON.parse(r);return Array.isArray(e)?e:[]}catch(e){return _.debug("PARSER","Failed to parse JSON array, using empty fallback",{preview:r?.substring(0,50)},e),[]}}function tt(r){return new Date(r).toLocaleString("en-US",{month:"short",day:"numeric",hour:"numeric",minute:"2-digit",hour12:!0})}function st(r){return new Date(r).toLocaleString("en-US",{hour:"numeric",minute:"2-digit",hour12:!0})}function rt(r){return new Date(r).toLocaleString("en-US",{month:"short",day:"numeric",year:"numeric"})}function et(r,e){return ee.default.isAbsolute(r)?ee.default.relative(e,r):r}function nt(r,e,t){let s=B(r);if(s.length>0)return et(s[0],e);if(t){let n=B(t);if(n.length>0)return et(n[0],e)}return"General"}function At(r){let e=new Map;for(let s of r){let n=s.type==="observation"?s.data.created_at:s.data.displayTime,o=rt(n);e.has(o)||e.set(o,[]),e.get(o).push(s)}let t=Array.from(e.entries()).sort((s,n)=>{let o=new Date(s[0]).getTime(),i=new Date(n[0]).getTime();return o-i});return new Map(t)}function It(r,e){return e.fullObservationField==="narrative"?r.narrative:r.facts?B(r.facts).join(`
`):null}function yt(r,e,t,s,n,o){let i=[];o?i.push(...We(r)):i.push(...ve(r));let a=null,d="",c=!1;for(let m of e)if(m.type==="summary"){c&&(i.push(""),c=!1,a=null,d="");let l=m.data,T=tt(l.displayTime);o?i.push(...Ke(l,T)):i.push(...ke(l,T))}else{let l=m.data,T=nt(l.files_modified,n,l.files_read),E=st(l.created_at),g=E!==d,h=g?E:"";d=E;let u=t.has(l.id);if(T!==a&&(c&&i.push(""),o?i.push(...Ye(T)):i.push(...De(T)),a=T,c=!0),u){let S=It(l,s);o?i.push(...qe(l,E,g,S,s)):(c&&!o&&(i.push(""),c=!1),i.push(...Ue(l,h,S,s)),a=null)}else o?i.push(Ve(l,E,g,s)):i.push(xe(l,h,s))}return c&&i.push(""),i}function ot(r,e,t,s,n){let o=[],i=At(r);for(let[a,d]of i)o.push(...yt(a,d,e,t,s,n));return o}function it(r,e,t){return!(!r.showLastSummary||!e||!!!(e.investigated||e.learned||e.completed||e.next_steps)||t&&e.created_at_epoch<=t.created_at_epoch)}function at(r,e){let t=[];return e?(t.push(...U("Investigated",r.investigated,p.blue)),t.push(...U("Learned",r.learned,p.yellow)),t.push(...U("Completed",r.completed,p.green)),t.push(...U("Next Steps",r.next_steps,p.magenta))):(t.push(...x("Investigated",r.investigated)),t.push(...x("Learned",r.learned)),t.push(...x("Completed",r.completed)),t.push(...x("Next Steps",r.next_steps))),t}function dt(r,e){return e?Je(r):we(r)}function pt(r,e,t){return!P(e)||r.totalDiscoveryTokens<=0||r.savings<=0?[]:t?ze(r.totalDiscoveryTokens,r.totalReadTokens):$e(r.totalDiscoveryTokens,r.totalReadTokens)}var Mt=ct.default.join((0,_t.homedir)(),".claude","plugins","marketplaces","thedotmack","plugin",".install-version");function Lt(){try{return new $}catch(r){if(r.code==="ERR_DLOPEN_FAILED"){try{(0,mt.unlinkSync)(Mt)}catch(e){_.debug("SYSTEM","Marker file cleanup failed (may not exist)",{},e)}return _.error("SYSTEM","Native module rebuild needed - restart Claude Code to auto-fix"),null}throw r}}function vt(r,e){return e?Qe(r):Fe(r)}function Dt(r,e,t,s,n,o,i){let a=[],d=K(e);a.push(...Ze(r,d,s,i));let c=t.slice(0,s.sessionCount),m=Re(c,t),l=Z(e,m),T=Ne(e,s.fullObservationCount);a.push(...ot(l,T,s,n,i));let E=t[0],g=e[0];it(s,E,g)&&a.push(...at(E,i));let h=Q(e,s,o,n);return a.push(...dt(h,i)),a.push(...pt(d,s,i)),a.join(`
`).trimEnd()}async function te(r,e=!1){let t=Y(),s=r?.cwd??process.cwd(),n=Te(s),o=r?.projects||[n],i=Lt();if(!i)return"";try{let a=o.length>1?he(i,o,t):J(i,n,t),d=o.length>1?Oe(i,o,t):z(i,n,t);return a.length===0&&d.length===0?vt(n,e):Dt(n,a,d,t,s,r?.session_id,e)}finally{i.close()}}0&&(module.exports={generateContext});
`):null}function yt(r,e,t,s,n,o){let i=[];o?i.push(...We(r)):i.push(...ve(r));let a=null,d="",c=!1;for(let m of e)if(m.type==="summary"){c&&(i.push(""),c=!1,a=null,d="");let l=m.data,T=tt(l.displayTime);o?i.push(...Ke(l,T)):i.push(...ke(l,T))}else{let l=m.data,T=nt(l.files_modified,n,l.files_read),E=st(l.created_at),g=E!==d,h=g?E:"";d=E;let u=t.has(l.id);if(T!==a&&(c&&i.push(""),o?i.push(...Ye(T)):i.push(...De(T)),a=T,c=!0),u){let S=It(l,s);o?i.push(...qe(l,E,g,S,s)):(c&&!o&&(i.push(""),c=!1),i.push(...Ue(l,h,S,s)),a=null)}else o?i.push(Ve(l,E,g,s)):i.push(xe(l,h,s))}return c&&i.push(""),i}function ot(r,e,t,s,n){let o=[],i=At(r);for(let[a,d]of i)o.push(...yt(a,d,e,t,s,n));return o}function it(r,e,t){return!(!r.showLastSummary||!e||!!!(e.investigated||e.learned||e.completed||e.next_steps)||t&&e.created_at_epoch<=t.created_at_epoch)}function at(r,e){let t=[];return e?(t.push(...U("Investigated",r.investigated,p.blue)),t.push(...U("Learned",r.learned,p.yellow)),t.push(...U("Completed",r.completed,p.green)),t.push(...U("Next Steps",r.next_steps,p.magenta))):(t.push(...x("Investigated",r.investigated)),t.push(...x("Learned",r.learned)),t.push(...x("Completed",r.completed)),t.push(...x("Next Steps",r.next_steps))),t}function dt(r,e){return e?Je(r):we(r)}function pt(r,e,t){return!P(e)||r.totalDiscoveryTokens<=0||r.savings<=0?[]:t?Qe(r.totalDiscoveryTokens,r.totalReadTokens):$e(r.totalDiscoveryTokens,r.totalReadTokens)}var Mt=ct.default.join((0,_t.homedir)(),".claude","plugins","marketplaces","thedotmack","plugin",".install-version");function Lt(){try{return new $}catch(r){if(r.code==="ERR_DLOPEN_FAILED"){try{(0,mt.unlinkSync)(Mt)}catch(e){_.debug("SYSTEM","Marker file cleanup failed (may not exist)",{},e)}return _.error("SYSTEM","Native module rebuild needed - restart Claude Code to auto-fix"),null}throw r}}function vt(r,e){return e?ze(r):Fe(r)}function Dt(r,e,t,s,n,o,i){let a=[],d=K(e);a.push(...Ze(r,d,s,i));let c=t.slice(0,s.sessionCount),m=Re(c,t),l=Z(e,m),T=Ne(e,s.fullObservationCount);a.push(...ot(l,T,s,n,i));let E=t[0],g=e[0];it(s,E,g)&&a.push(...at(E,i));let h=z(e,s,o,n);return a.push(...dt(h,i)),a.push(...pt(d,s,i)),a.join(`
`).trimEnd()}async function te(r,e=!1){let t=Y(),s=r?.cwd??process.cwd(),n=Te(s),o=r?.projects||[n],i=Lt();if(!i)return"";try{let a=o.length>1?he(i,o,t):J(i,n,t),d=o.length>1?Oe(i,o,t):Q(i,n,t);return a.length===0&&d.length===0?vt(n,e):Dt(n,a,d,t,s,r?.session_id,e)}finally{i.close()}}0&&(module.exports={generateContext});
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
-8
View File
@@ -263,14 +263,6 @@ function installDeps() {
// Quote path for Windows paths with spaces
const bunCmd = IS_WINDOWS && bunPath.includes(' ') ? `"${bunPath}"` : bunPath;
// Clear Bun's package cache to prevent stale native module artifacts
try {
execSync(`${bunCmd} pm cache rm`, { cwd: ROOT, stdio: 'pipe', shell: IS_WINDOWS });
console.error(' Cleared Bun package cache');
} catch {
// Cache may not exist yet on first install
}
execSync(`${bunCmd} install`, { cwd: ROOT, stdio: 'inherit', shell: IS_WINDOWS });
// Write version marker
+4 -8
View File
@@ -80,14 +80,6 @@ try {
{ stdio: 'inherit' }
);
// Clear Bun's package cache to prevent stale native module artifacts
try {
execSync('bun pm cache rm', { cwd: INSTALLED_PATH, stdio: 'pipe' });
console.log('Cleared Bun package cache');
} catch {
// Cache may not exist yet on first install
}
console.log('Running bun install in marketplace...');
execSync(
'cd ~/.claude/plugins/marketplaces/thedotmack/ && bun install',
@@ -107,6 +99,10 @@ try {
{ stdio: 'inherit' }
);
// Install dependencies in cache directory so worker can resolve them
console.log(`Running bun install in cache folder (version ${version})...`);
execSync(`bun install`, { cwd: CACHE_VERSION_PATH, stdio: 'inherit' });
console.log('\x1b[32m%s\x1b[0m', 'Sync complete!');
// Trigger worker restart after file sync
+7 -6
View File
@@ -20,8 +20,9 @@ export class SessionQueueProcessor {
/**
* Create an async iterator that yields messages as they become available.
* Uses atomic claim-and-delete to prevent duplicates.
* The queue is a pure buffer: claim it, delete it, process in memory.
* Uses atomic claim-confirm to prevent duplicates.
* Messages are claimed (marked processing) and stay in DB until confirmProcessed().
* Self-heals stale processing messages before each claim.
* Waits for 'message' event when queue is empty.
*
* CRITICAL: Calls onIdleTimeout callback after 3 minutes of inactivity.
@@ -34,14 +35,14 @@ export class SessionQueueProcessor {
while (!signal.aborted) {
try {
// Atomically claim AND DELETE next message from DB
// Message is now in memory only - no "processing" state tracking needed
const persistentMessage = this.store.claimAndDelete(sessionDbId);
// Atomically claim next pending message (marks as 'processing')
// Self-heals any stale processing messages before claiming
const persistentMessage = this.store.claimNextMessage(sessionDbId);
if (persistentMessage) {
// Reset activity time when we successfully yield a message
lastActivityTime = Date.now();
// Yield the message for processing (it's already deleted from queue)
// Yield the message for processing (it's marked as 'processing' in DB)
yield this.toPendingMessageWithId(persistentMessage);
} else {
// Queue empty - wait for wake-up event or timeout
+30 -6
View File
@@ -2,6 +2,9 @@ import { Database } from './sqlite-compat.js';
import type { PendingMessage } from '../worker-types.js';
import { logger } from '../../utils/logger.js';
/** Messages processing longer than this are considered stale and reset to pending by self-healing */
const STALE_PROCESSING_THRESHOLD_MS = 60_000;
/**
* Persistent pending message record from database
*/
@@ -26,12 +29,17 @@ export interface PersistentPendingMessage {
/**
* PendingMessageStore - Persistent work queue for SDK messages
*
* Messages are persisted before processing using a claim-and-delete pattern.
* Messages are persisted before processing using a claim-confirm pattern.
* This simplifies the lifecycle and eliminates duplicate processing bugs.
*
* Lifecycle:
* 1. enqueue() - Message persisted with status 'pending'
* 2. claimAndDelete() - Atomically claims and deletes message (process in memory)
* 2. claimNextMessage() - Atomically claims next pending message (marks as 'processing')
* 3. confirmProcessed() - Deletes message after successful processing
*
* Self-healing:
* - claimNextMessage() resets stale 'processing' messages (>60s) back to 'pending' before claiming
* - This eliminates stuck messages from generator crashes without external timers
*
* Recovery:
* - getSessionsWithPendingMessages() - Find sessions that need recovery on startup
@@ -78,13 +86,29 @@ export class PendingMessageStore {
/**
* Atomically claim the next pending message by marking it as 'processing'.
* CRITICAL FIX: Does NOT delete - message stays in DB until confirmProcessed() is called.
* This prevents message loss if the generator crashes mid-processing.
* Self-healing: resets any stale 'processing' messages (>60s) back to 'pending' first.
* Message stays in DB until confirmProcessed() is called.
* Uses a transaction to prevent race conditions.
*/
claimAndDelete(sessionDbId: number): PersistentPendingMessage | null {
const now = Date.now();
claimNextMessage(sessionDbId: number): PersistentPendingMessage | null {
const claimTx = this.db.transaction((sessionId: number) => {
// Capture time inside transaction so it's fresh if WAL contention causes retry
const now = Date.now();
// Self-healing: reset stale 'processing' messages back to 'pending'
// This recovers from generator crashes without external timers
// Note: strict < means messages must be OLDER than threshold to be reset
const staleCutoff = now - STALE_PROCESSING_THRESHOLD_MS;
const resetStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE session_db_id = ? AND status = 'processing'
AND started_processing_at_epoch < ?
`);
const resetResult = resetStmt.run(sessionId, staleCutoff);
if (resetResult.changes > 0) {
logger.info('QUEUE', `SELF_HEAL | sessionDbId=${sessionId} | recovered ${resetResult.changes} stale processing message(s)`);
}
const peekStmt = this.db.prepare(`
SELECT * FROM pending_messages
WHERE session_db_id = ? AND status = 'pending'
+86 -26
View File
@@ -81,10 +81,16 @@ export class ChromaSync {
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private readonly BATCH_SIZE = 100;
private modelCacheCorruptionRetried = false;
constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
// Chroma collection names only allow [a-zA-Z0-9._-], 3-512 chars,
// must start/end with [a-zA-Z0-9]
const sanitized = project
.replace(/[^a-zA-Z0-9._-]/g, '_')
.replace(/[^a-zA-Z0-9]+$/, ''); // strip trailing non-alphanumeric
this.collectionName = `cm__${sanitized || 'unknown'}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
}
@@ -189,6 +195,10 @@ export class ChromaSync {
}
try {
// Store model cache outside node_modules so reinstalls don't corrupt it
const { env } = await import('@huggingface/transformers');
env.cacheDir = path.join(os.homedir(), '.claude-mem', 'models');
// Use WASM backend to avoid native ONNX binary issues (#1104, #1105, #1110).
// Same model (all-MiniLM-L6-v2), same embeddings, but runs in WASM —
// no native binary loading, no segfaults, no ENOENT errors.
@@ -204,8 +214,22 @@ export class ChromaSync {
collection: this.collectionName
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Self-heal: corrupted model cache → clear and retry once
if (errorMessage.includes('Protobuf parsing failed') && !this.modelCacheCorruptionRetried) {
this.modelCacheCorruptionRetried = true;
logger.warn('CHROMA_SYNC', 'Corrupted model cache detected, clearing and retrying...');
const modelCacheDir = path.join(os.homedir(), '.claude-mem', 'models');
const fs = await import('fs');
if (fs.existsSync(modelCacheDir)) {
fs.rmSync(modelCacheDir, { recursive: true, force: true });
}
return this.ensureCollection(); // retry once
}
logger.error('CHROMA_SYNC', 'Failed to get/create collection', { collection: this.collectionName }, error as Error);
throw new Error(`Collection setup failed: ${error instanceof Error ? error.message : String(error)}`);
throw new Error(`Collection setup failed: ${errorMessage}`);
}
}
@@ -524,17 +548,18 @@ export class ChromaSync {
* Fetch all existing document IDs from Chroma collection
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(): Promise<{
private async getExistingChromaIds(projectOverride?: string): Promise<{
observations: Set<number>;
summaries: Set<number>;
prompts: Set<number>;
}> {
const targetProject = projectOverride ?? this.project;
await this.ensureCollection();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
` Project: ${targetProject}`
);
}
@@ -545,14 +570,14 @@ export class ChromaSync {
let offset = 0;
const limit = 1000; // Large batches, metadata only = fast
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: this.project });
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: targetProject });
while (true) {
try {
const result = await this.collection.get({
limit,
offset,
where: { project: this.project },
where: { project: targetProject },
include: ['metadatas']
});
@@ -579,18 +604,18 @@ export class ChromaSync {
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: this.project,
project: targetProject,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: this.project }, error as Error);
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: targetProject }, error as Error);
throw error;
}
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
project: this.project,
project: targetProject,
observations: observationIds.size,
summaries: summaryIds.size,
prompts: promptIds.size
@@ -602,15 +627,18 @@ export class ChromaSync {
/**
* Backfill: Sync all observations missing from Chroma
* Reads from SQLite and syncs in batches
* @param projectOverride - If provided, backfill this project instead of this.project.
* Used by backfillAllProjects() to iterate projects without mutating instance state.
* Throws error if backfill fails
*/
async ensureBackfilled(): Promise<void> {
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: this.project });
async ensureBackfilled(projectOverride?: string): Promise<void> {
const backfillProject = projectOverride ?? this.project;
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: backfillProject });
await this.ensureCollection();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds();
const existing = await this.getExistingChromaIds(backfillProject);
const db = new SessionStore();
@@ -626,14 +654,14 @@ export class ChromaSync {
SELECT * FROM observations
WHERE project = ? ${obsExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredObservation[];
`).all(backfillProject) as StoredObservation[];
const totalObsCount = db.db.prepare(`
SELECT COUNT(*) as count FROM observations WHERE project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling observations', {
project: this.project,
project: backfillProject,
missing: observations.length,
existing: existing.observations.size,
total: totalObsCount.count
@@ -651,7 +679,7 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
});
}
@@ -667,14 +695,14 @@ export class ChromaSync {
SELECT * FROM session_summaries
WHERE project = ? ${summaryExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredSummary[];
`).all(backfillProject) as StoredSummary[];
const totalSummaryCount = db.db.prepare(`
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
project: this.project,
project: backfillProject,
missing: summaries.length,
existing: existing.summaries.size,
total: totalSummaryCount.count
@@ -692,7 +720,7 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
});
}
@@ -713,17 +741,17 @@ export class ChromaSync {
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE s.project = ? ${promptExclusionClause}
ORDER BY up.id ASC
`).all(this.project) as StoredUserPrompt[];
`).all(backfillProject) as StoredUserPrompt[];
const totalPromptCount = db.db.prepare(`
SELECT COUNT(*) as count
FROM user_prompts up
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE s.project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
project: this.project,
project: backfillProject,
missing: prompts.length,
existing: existing.prompts.size,
total: totalPromptCount.count
@@ -741,13 +769,13 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
});
}
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
project: this.project,
project: backfillProject,
synced: {
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length,
@@ -761,7 +789,7 @@ export class ChromaSync {
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Backfill failed', { project: this.project }, error as Error);
logger.error('CHROMA_SYNC', 'Backfill failed', { project: backfillProject }, error as Error);
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
} finally {
db.close();
@@ -848,6 +876,38 @@ export class ChromaSync {
}
}
/**
* Backfill all projects that have observations in SQLite but may be missing from Chroma.
* Uses a single shared ChromaSync('claude-mem') instance and Chroma connection.
* Per-project scoping is passed as a parameter to ensureBackfilled(), avoiding
* instance state mutation. All documents land in the cm__claude-mem collection
* with project scoped via metadata, matching how DatabaseManager and SearchManager operate.
* Designed to be called fire-and-forget on worker startup.
*/
static async backfillAllProjects(): Promise<void> {
const db = new SessionStore();
const sync = new ChromaSync('claude-mem');
try {
const projects = db.db.prepare(
'SELECT DISTINCT project FROM observations WHERE project IS NOT NULL AND project != ?'
).all('') as { project: string }[];
logger.info('CHROMA_SYNC', `Backfill check for ${projects.length} projects`);
for (const { project } of projects) {
try {
await sync.ensureBackfilled(project);
} catch (error) {
logger.error('CHROMA_SYNC', `Backfill failed for project: ${project}`, {}, error as Error);
// Continue to next project — don't let one failure stop others
}
}
} finally {
await sync.close();
db.close();
}
}
/**
* Close the Chroma client connection
* Server lifecycle is managed by ChromaServerManager, not here
+12 -3
View File
@@ -19,6 +19,7 @@ import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
import { getAuthMethodDescription } from '../shared/EnvManager.js';
import { logger } from '../utils/logger.js';
import { ChromaServerManager } from './sync/ChromaServerManager.js';
import { ChromaSync } from './sync/ChromaSync.js';
// Windows: avoid repeated spawn popups when startup fails (issue #921)
const WINDOWS_SPAWN_COOLDOWN_MS = 2 * 60 * 1000;
@@ -423,6 +424,15 @@ export class WorkerService {
this.server.registerRoutes(this.searchRoutes);
logger.info('WORKER', 'SearchManager initialized and search routes registered');
// Auto-backfill Chroma for all projects if out of sync with SQLite (fire-and-forget)
if (this.chromaServer !== null || chromaMode !== 'local') {
ChromaSync.backfillAllProjects().then(() => {
logger.info('CHROMA_SYNC', 'Backfill check complete for all projects');
}).catch(error => {
logger.error('CHROMA_SYNC', 'Backfill failed (non-blocking)', {}, error as Error);
});
}
// Connect to MCP server
const mcpServerPath = path.join(__dirname, 'mcp-server.cjs');
const transport = new StdioClientTransport({
@@ -604,17 +614,16 @@ export class WorkerService {
return;
}
// Shared store for idle-reset and pending-count checks below
// Store for pending-count check below
const { PendingMessageStore } = require('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
// Idle timeout means no new work arrived for 3 minutes - don't restart
// No need to reset stale processing messages here — claimNextMessage() self-heals
if (session.idleTimedOut) {
logger.info('SYSTEM', 'Generator exited due to idle timeout, not restarting', {
sessionId: session.sessionDbId
});
// Reset stale processing messages so they can be picked up later
pendingStore.resetStaleProcessingMessages(0, session.sessionDbId); // Reset this session's messages only
session.idleTimedOut = false; // Reset flag
this.broadcastProcessingStatus();
return;
+1 -1
View File
@@ -15,7 +15,7 @@ export enum LogLevel {
SILENT = 4
}
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'FOLDER_INDEX' | 'CLAUDE_MD';
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE';
interface LogContext {
sessionId?: number;
@@ -5,11 +5,11 @@ import type { PendingMessageStore, PersistentPendingMessage } from '../../../src
/**
* Mock PendingMessageStore that returns null (empty queue) by default.
* Individual tests can override claimAndDelete behavior.
* Individual tests can override claimNextMessage behavior.
*/
function createMockStore(): PendingMessageStore {
return {
claimAndDelete: mock(() => null),
claimNextMessage: mock(() => null),
toPendingMessage: mock((msg: PersistentPendingMessage) => ({
type: msg.message_type,
tool_name: msg.tool_name || undefined,
@@ -140,7 +140,7 @@ describe('SessionQueueProcessor', () => {
let callCount = 0;
// Return a message on first call, then null
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
return createMockMessage({ id: 1 });
@@ -170,7 +170,7 @@ describe('SessionQueueProcessor', () => {
expect(results).toHaveLength(1);
expect(results[0]._persistentId).toBe(1);
// Store's claimAndDelete should have been called at least twice
// Store's claimNextMessage should have been called at least twice
// (once returning message, once returning null)
expect(callCount).toBeGreaterThanOrEqual(1);
});
@@ -206,7 +206,7 @@ describe('SessionQueueProcessor', () => {
const onIdleTimeout = mock(() => {});
// Return null to trigger wait
(store.claimAndDelete as any) = mock(() => null);
(store.claimNextMessage as any) = mock(() => null);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -242,7 +242,7 @@ describe('SessionQueueProcessor', () => {
// First call: return null (queue empty)
// After message event: return message
// Then return null again
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
// First check - queue empty, will wait
@@ -312,7 +312,7 @@ describe('SessionQueueProcessor', () => {
it('should clean up event listeners when message received', async () => {
// Return a message immediately
(store.claimAndDelete as any) = mock(() => createMockMessage({ id: 1 }));
(store.claimNextMessage as any) = mock(() => createMockMessage({ id: 1 }));
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -344,7 +344,7 @@ describe('SessionQueueProcessor', () => {
it('should continue after store error with backoff', async () => {
let callCount = 0;
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
throw new Error('Database error');
@@ -377,7 +377,7 @@ describe('SessionQueueProcessor', () => {
});
it('should exit cleanly if aborted during error backoff', async () => {
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
throw new Error('Database error');
});
@@ -413,7 +413,7 @@ describe('SessionQueueProcessor', () => {
created_at_epoch: 1704067200000
});
(store.claimAndDelete as any) = mock(() => mockPersistentMessage);
(store.claimNextMessage as any) = mock(() => mockPersistentMessage);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -0,0 +1,146 @@
import { describe, test, expect, beforeEach, afterEach } from 'bun:test';
import { ClaudeMemDatabase } from '../../../src/services/sqlite/Database.js';
import { PendingMessageStore } from '../../../src/services/sqlite/PendingMessageStore.js';
import { createSDKSession } from '../../../src/services/sqlite/Sessions.js';
import type { PendingMessage } from '../../../src/services/worker-types.js';
import type { Database } from 'bun:sqlite';
describe('PendingMessageStore - Self-Healing claimNextMessage', () => {
let db: Database;
let store: PendingMessageStore;
let sessionDbId: number;
const CONTENT_SESSION_ID = 'test-self-heal';
beforeEach(() => {
db = new ClaudeMemDatabase(':memory:').db;
store = new PendingMessageStore(db, 3);
sessionDbId = createSDKSession(db, CONTENT_SESSION_ID, 'test-project', 'Test prompt');
});
afterEach(() => {
db.close();
});
function enqueueMessage(overrides: Partial<PendingMessage> = {}): number {
const message: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
...overrides,
};
return store.enqueue(sessionDbId, CONTENT_SESSION_ID, message);
}
/**
* Helper to simulate a stuck processing message by directly updating the DB
* to set started_processing_at_epoch to a time in the past (>60s ago)
*/
function makeMessageStaleProcessing(messageId: number): void {
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago (well past 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, messageId]
);
}
test('stuck processing messages are recovered on next claim', () => {
// Enqueue a message and make it stuck in processing
const msgId = enqueueMessage();
makeMessageStaleProcessing(msgId);
// Verify it's stuck (status = processing)
const beforeClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(beforeClaim.status).toBe('processing');
// claimNextMessage should self-heal: reset the stuck message, then claim it
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// It should now be in 'processing' status again (freshly claimed)
const afterClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(afterClaim.status).toBe('processing');
});
test('actively processing messages are NOT recovered', () => {
// Enqueue two messages
const activeId = enqueueMessage();
const pendingId = enqueueMessage();
// Make the first one actively processing (recent timestamp, NOT stale)
const recentTimestamp = Date.now() - 5_000; // 5 seconds ago (well within 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[recentTimestamp, activeId]
);
// claimNextMessage should NOT reset the active one — should claim the pending one instead
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(pendingId);
// The active message should still be processing
const activeMsg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(activeId) as { status: string };
expect(activeMsg.status).toBe('processing');
});
test('recovery and claim is atomic within single call', () => {
// Enqueue three messages
const stuckId = enqueueMessage();
const pendingId1 = enqueueMessage();
const pendingId2 = enqueueMessage();
// Make the first one stuck
makeMessageStaleProcessing(stuckId);
// Single claimNextMessage should reset stuck AND claim oldest pending (which is the reset stuck one)
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
// The stuck message was reset to pending, and being oldest, it gets claimed
expect(claimed!.id).toBe(stuckId);
// The other two should still be pending
const msg1 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId1) as { status: string };
const msg2 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId2) as { status: string };
expect(msg1.status).toBe('pending');
expect(msg2.status).toBe('pending');
});
test('no messages returns null without error', () => {
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).toBeNull();
});
test('self-healing only affects the specified session', () => {
// Create a second session
const session2Id = createSDKSession(db, 'other-session', 'test-project', 'Test');
// Enqueue and make stuck in session 1
const stuckInSession1 = enqueueMessage();
makeMessageStaleProcessing(stuckInSession1);
// Enqueue in session 2
const msg: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
};
const session2MsgId = store.enqueue(session2Id, 'other-session', msg);
makeMessageStaleProcessing(session2MsgId);
// Claim for session 2 — should only heal session 2's stuck message
const claimed = store.claimNextMessage(session2Id);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(session2MsgId);
// Session 1's stuck message should still be stuck (not healed by session 2's claim)
const session1Msg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(stuckInSession1) as { status: string };
expect(session1Msg.status).toBe('processing');
});
});
+34 -4
View File
@@ -192,8 +192,8 @@ describe('Zombie Agent Prevention', () => {
// hasAnyPendingWork should return true
expect(pendingStore.hasAnyPendingWork()).toBe(true);
// CLAIM-CONFIRM pattern: claimAndDelete marks as 'processing' (not deleted)
const claimed = pendingStore.claimAndDelete(sessionId);
// CLAIM-CONFIRM pattern: claimNextMessage marks as 'processing' (not deleted)
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed?.id).toBe(msgId1);
@@ -206,11 +206,11 @@ describe('Zombie Agent Prevention', () => {
expect(pendingStore.getPendingCount(sessionId)).toBe(2);
// Claim and confirm remaining messages
const msg2 = pendingStore.claimAndDelete(sessionId);
const msg2 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg2!.id);
expect(pendingStore.getPendingCount(sessionId)).toBe(1);
const msg3 = pendingStore.claimAndDelete(sessionId);
const msg3 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg3!.id);
// Should be empty now
@@ -266,6 +266,36 @@ describe('Zombie Agent Prevention', () => {
expect(session.abortController.signal.aborted).toBe(false);
});
// Test: Stuck processing messages are recovered by claimNextMessage self-healing
test('should recover stuck processing messages via claimNextMessage self-healing', async () => {
const sessionId = createDbSession('content-stuck-recovery');
// Enqueue and claim a message (transitions to 'processing')
const msgId = enqueueTestMessage(sessionId, 'content-stuck-recovery');
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// Simulate crash: message stuck in 'processing' with stale timestamp
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago
db.run(
`UPDATE pending_messages SET started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, msgId]
);
// Verify it's stuck
expect(pendingStore.getPendingCount(sessionId)).toBe(1); // processing counts as pending work
// Next claimNextMessage should self-heal: reset stuck message and re-claim it
const recovered = pendingStore.claimNextMessage(sessionId);
expect(recovered).not.toBeNull();
expect(recovered!.id).toBe(msgId);
// Confirm it can be processed successfully
pendingStore.confirmProcessed(msgId);
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
});
// Test: Generator cleanup on session delete
test('should properly cleanup generator promise on session delete', async () => {
const session = createMockSession(1);