Compare commits

...

25 Commits

Author SHA1 Message Date
Alex Newman 2db9d0e383 chore: bump version to 10.3.3
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 03:47:48 -05:00
Alex Newman 0a26bb18bf fix: update footer text to reference claude-mem skill instead of MCP search tools
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 03:47:02 -05:00
Alex Newman bd11ccf12e docs: update CHANGELOG.md for v10.3.2
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 03:32:51 -05:00
Alex Newman c2c3e3069c chore: bump version to 10.3.2
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 03:32:22 -05:00
Alex Newman 7966c6cba9 fix: rename save_memory and fix MCP search instructions + startup hook (#1210)
* fix: rename save_memory to save_observation and fix MCP search instructions

Stop the primary agent from proactively saving memories by renaming
save_memory to save_observation with a neutral description. Remove
"Saving Memories" section from SKILL.md. Update context formatters
and output styles to reference the mem-search skill instead of raw
MCP tool names.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: split SessionStart hooks so smart-install failure doesn't block worker start

smart-install.js and worker-start were in the same hook group, so if
smart-install exited non-zero the worker never started. Split into
separate hook groups so they run independently.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: worker startup waits for readiness before hooks fire

Move initializationCompleteFlag to set after DB/search init (not MCP),
add waitForReadiness() polling /api/readiness, and extract shared
pollEndpointUntilOk helper to DRY up health/readiness checks.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 03:30:31 -05:00
Alex Newman e4e735d3ff fix: add rewrite rule so install.cmem.ai root serves install.sh
Without this, curl https://install.cmem.ai returns 404 because
Vercel has no index file mapping for the root path.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 22:39:36 -05:00
Alex Newman 780cc3894e fix: serve installer JS from install.cmem.ai instead of GitHub raw
Copied compiled installer to install/public/installer.js so Vercel
serves it at install.cmem.ai/installer.js. Updated install.sh to
fetch from same domain instead of raw.githubusercontent.com.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 22:08:43 -05:00
Alex Newman 8d46c00dd8 fix: add compiled installer dist so CLI installation works
The bootstrap script (install.sh) fetches installer/dist/index.js from
main, but it was never committed due to the global dist/ gitignore rule.
Added negation rule and the compiled installer bundle.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 22:06:05 -05:00
Alex Newman 4ab601fc9f docs: update CHANGELOG.md for v10.3.1
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 20:12:46 -05:00
Alex Newman 097035de6c chore: bump version to 10.3.1
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 20:12:17 -05:00
Alex Newman e788fd3676 fix: prevent duplicate worker daemons and zombie processes (#1178)
* fix: prevent duplicate worker daemons and zombie processes

Three root causes of chroma-mcp timeouts:

1. HTTP shutdown (POST /api/admin/shutdown) closed resources but never
   called process.exit(). Zombie workers stayed alive, background tasks
   reconnected to chroma-mcp, spawning duplicate subprocesses that all
   contended for the same persistent data directory.

2. No guard against concurrent daemon startup. When hooks fired
   simultaneously, multiple daemons started before either wrote a PID
   file. The loser got EADDRINUSE but stayed alive because signal
   handlers registered in the constructor prevented exit.

3. Corrupt 147GB HNSW index file caused all chroma queries to timeout
   (MCP error -32001). Data fix: deleted corrupt collection, backfill
   rebuilds from SQLite.

Code fixes:
- Add PID-based guard in daemon startup: exit if PID file process alive
- Add port-based guard in daemon startup: exit if port already bound
  (runs before WorkerService constructor registers keepalive handlers)
- Add process.exit(0) after HTTP shutdown/restart completes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: aggressive startup cleanup and one-time chroma wipe for upgrade

Kill orphaned worker-service.cjs and chroma-mcp processes immediately
at startup (no age gate) while keeping 30-min threshold for mcp-server.
Wipe corrupt chroma data once on upgrade from pre-v10.3 versions —
backfill rebuilds from SQLite automatically.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: wrap shutdown handlers in try/finally to guarantee process.exit

If onShutdown() or onRestart() threw, process.exit(0) was never reached,
leaving the daemon alive as a zombie. Also removed redundant require('fs')
calls in process-manager tests where ESM imports already existed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 20:10:28 -05:00
Alex Newman 44cdbec173 docs: update CHANGELOG.md for v10.3.0
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:34:33 -05:00
Alex Newman 91b48a6481 chore: bump version to 10.3.0
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:33:52 -05:00
Alex Newman 40daf8f3fa feat: replace WASM embeddings with persistent chroma-mcp MCP connection (#1176)
* feat: replace WASM embeddings with persistent chroma-mcp MCP connection

Replace ChromaServerManager (npx chroma run + chromadb npm + ONNX/WASM)
with ChromaMcpManager, a singleton stdio MCP client that communicates with
chroma-mcp via uvx. This eliminates native binary issues, segfaults, and
WASM embedding failures that plagued cross-platform installs.

Key changes:
- Add ChromaMcpManager: singleton MCP client with lazy connect, auto-reconnect,
  connection lock, and Zscaler SSL cert support
- Rewrite ChromaSync to use MCP tool calls instead of chromadb npm client
- Handle chroma-mcp's non-JSON responses (plain text success/error messages)
- Treat "collection already exists" as idempotent success
- Wire ChromaMcpManager into GracefulShutdown for clean subprocess teardown
- Delete ChromaServerManager (no longer needed)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address PR review — connection guard leak, timer leak, async reset

- Clear connecting guard in finally block to prevent permanent reconnection block
- Clear timeout after successful connection to prevent timer leak
- Make reset() async to await stop() before nullifying instance
- Delete obsolete chroma-server-manager test (imports deleted class)
- Update graceful-shutdown test to use chromaMcpManager property name

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: prevent chroma-mcp spawn storm — zombie cleanup, stale onclose guard, reconnect backoff

Three bugs caused chroma-mcp processes to accumulate (92+ observed):

1. Zombie on timeout: failed connections left subprocess alive because
   only the timer was cleared, not the transport. Now catch block
   explicitly closes transport+client before rethrowing.

2. Stale onclose race: old transport's onclose handler captured `this`
   and overwrote the current connection reference after reconnect,
   orphaning the new subprocess. Now guarded with reference check.

3. No backoff: every failure triggered immediate reconnect. With
   backfill doing hundreds of MCP calls, this created rapid-fire
   spawning. Added 10s backoff on both connection failure and
   unexpected process death.

Also includes ChromaSync fixes from PR review:
- queryChroma deduplication now preserves index-aligned arrays
- SQL injection guard on backfill ID exclusion lists

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:32:38 -05:00
Alex Newman 7e57b6e02d docs: update CHANGELOG.md for v10.2.6
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:42:24 -05:00
Alex Newman ea683a4e6c chore: bump version to 10.2.6
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:41:46 -05:00
Alex Newman 5d79bb7a7a fix: prevent zombie process accumulation by verifying subprocess exit (#1168) (#1175)
Two changes fix the observer process resource leak:

1. Add ensureProcessExit to generator finally blocks in SessionRoutes and
   worker-service, matching the pattern already working in SDKAgent.

2. Add stale session reaper (every 2m) that removes sessions with no active
   generator and no pending work after 15m idle. This unblocks the orphan
   reaper which previously skipped processes for "active" sessions.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:33:23 -05:00
Alex Newman 2180d31ee6 chore: update version to 10.2.5 in plugin.json 2026-02-18 15:26:50 -05:00
Alex Newman 75dd8e3174 docs: update CHANGELOG.md for v10.2.5
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:17:38 -05:00
Alex Newman 149f548667 chore: bump version to 10.2.5
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:17:08 -05:00
Alex Newman b88251bc8b fix: self-healing claimNextMessage prevents stuck processing messages (#1159)
* fix: self-healing claimNextMessage prevents stuck processing messages

claimAndDelete → claimNextMessage with atomic self-healing: resets stale
processing messages (>60s) back to pending before claiming. Eliminates
stuck messages from generator crashes without external timers. Removes
redundant idle-timeout reset in worker-service.ts. Adds QUEUE to logger
Component type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: update stale comments in SessionQueueProcessor to reflect claim-confirm pattern

Comments still referenced the old claim-and-delete pattern after the
claimNextMessage rename. Updated to accurately describe the current
lifecycle where messages are marked as processing and stay in DB until
confirmProcessed() is called.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move Date.now() inside transaction and extract stale threshold constant

- Move Date.now() inside claimNextMessage transaction closure so timestamp
  is fresh if WAL contention causes retry
- Extract STALE_PROCESSING_THRESHOLD_MS to module-level constant
- Add comment clarifying strict < boundary semantics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:15:46 -05:00
Alex Newman b2e3a7e668 docs: update CHANGELOG.md for v10.2.4
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:49:42 -05:00
Alex Newman b1cfc85333 chore: bump version to 10.2.4
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:49:10 -05:00
Alex Newman ca8421611c fix: backfill Chroma vector DB for all projects on startup (#1154)
* fix: backfill all Chroma projects on worker startup

ChromaSync.ensureBackfilled() existed but was never called. After
v10.2.2's bun cache clear destroyed the ONNX model cache, Chroma only
had ~2 days of embeddings while SQLite had 49k+ observations.

- Add static backfillAllProjects() to ChromaSync — iterates all projects
  in SQLite, creates temporary ChromaSync per project, runs smart diff
- Call backfillAllProjects() fire-and-forget on worker startup
- Add 'CHROMA_SYNC' to logger Component type (pre-existing gap)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: sanitize project names for Chroma collection naming

Replace characters outside [a-zA-Z0-9._-] with underscores so projects
like "YC Stuff" map to collection "cm__YC_Stuff" instead of failing
Chroma's collection name validation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: route backfill to shared cm__claude-mem collection, harden sanitization

- Use single ChromaSync('claude-mem') in backfillAllProjects() instead of
  per-project instances, matching how DatabaseManager and SearchManager
  operate — fixes critical bug where backfilled data landed in orphaned
  collections that no search path reads from
- Strip trailing non-alphanumeric chars from sanitized collection names
  to satisfy Chroma's end-character constraint
- Guard backfill behind Chroma server readiness to avoid N spurious error
  logs when Chroma failed to start
- Use CHROMA_SYNC log component consistently for backfill messages

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* refactor: pass project as parameter to ensureBackfilled instead of mutating instance state

Eliminates shared mutable state in backfillAllProjects() loop. Project
scoping is now passed explicitly via parameter to both ensureBackfilled()
and getExistingChromaIds(), keeping a single Chroma connection while
avoiding fragile instance property mutation across iterations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 22:47:46 -05:00
Alex Newman eea4f599c0 docs: update CHANGELOG.md for v10.2.3
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 19:55:51 -05:00
44 changed files with 6057 additions and 1494 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
"plugins": [
{
"name": "claude-mem",
"version": "10.2.3",
"version": "10.3.3",
"source": "./plugin",
"description": "Persistent memory system for Claude Code - context compression across sessions"
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "9.0.6",
"version": "10.2.5",
"description": "Persistent memory system for Claude Code - seamlessly preserve context across sessions",
"author": {
"name": "Alex Newman"
+1
View File
@@ -1,6 +1,7 @@
datasets/
node_modules/
dist/
!installer/dist/
*.log
.DS_Store
.env
+125 -87
View File
@@ -2,6 +2,131 @@
All notable changes to claude-mem.
## [v10.3.2] - 2026-02-23
## Bug Fixes
- **Worker startup readiness**: Worker startup hook now waits for full DB/search readiness before proceeding, fixing the race condition where hooks would fire before the worker was initialized on first start (#1210)
- **MCP tool naming**: Renamed `save_memory` to `save_observation` for consistency with the observation-based data model (#1210)
- **MCP search instructions**: Updated MCP server tool descriptions to accurately reflect the 3-layer search workflow (#1210)
- **Installer hosting**: Serve installer JS from install.cmem.ai instead of GitHub raw URLs for reliability
- **Installer routing**: Added rewrite rule so install.cmem.ai root path correctly serves the install script
- **Installer build**: Added compiled installer dist so CLI installation works out of the box
## [v10.3.1] - 2026-02-19
## Fix: Prevent Duplicate Worker Daemons and Zombie Processes
Three root causes of chroma-mcp timeouts identified and fixed:
### PID-based daemon guard
Exit immediately on startup if PID file points to a live process. Prevents the race condition where hooks firing simultaneously could start multiple daemons before either wrote a PID file.
### Port-based daemon guard
Exit if port 37777 is already bound — runs before WorkerService constructor registers keepalive signal handlers that previously prevented exit on EADDRINUSE.
### Guaranteed process.exit() after HTTP shutdown
HTTP shutdown (POST /api/admin/shutdown) now calls `process.exit(0)` in a `try/finally` block. Previously, zombie workers stayed alive after shutdown, and background tasks reconnected to chroma-mcp, spawning duplicate subprocesses contending for the same data directory.
## [v10.3.0] - 2026-02-18
## Replace WASM Embeddings with Persistent chroma-mcp MCP Connection
### Highlights
- **New: ChromaMcpManager** — Singleton stdio MCP client communicating with chroma-mcp via `uvx`, replacing the previous ChromaServerManager (`npx chroma run` + `chromadb` npm + ONNX/WASM)
- **Eliminates native binary issues** — No more segfaults, WASM embedding failures, or cross-platform install headaches
- **Graceful subprocess lifecycle** — Wired into GracefulShutdown for clean teardown; zombie process prevention with kill-on-failure and stale `onclose` handler guards
- **Connection backoff** — 10-second reconnect backoff prevents chroma-mcp spawn storms
- **SQL injection guards** — Added parameterization to ChromaSync ID exclusion queries
- **Simplified ChromaSync** — Reduced complexity by delegating embedding concerns to chroma-mcp
### Breaking Changes
None — backward compatible. ChromaDB data is preserved; only the connection mechanism changed.
### Files Changed
- `src/services/sync/ChromaMcpManager.ts` (new) — MCP client singleton
- `src/services/sync/ChromaServerManager.ts` (deleted) — Old WASM/native approach
- `src/services/sync/ChromaSync.ts` — Simplified to use MCP client
- `src/services/worker-service.ts` — Updated startup sequence
- `src/services/infrastructure/GracefulShutdown.ts` — Subprocess cleanup integration
## [v10.2.6] - 2026-02-18
## Bug Fixes
### Zombie Process Prevention (#1168, #1175)
Observer Claude CLI subprocesses were accumulating as zombies — processes that never exited after their session ended, causing massive resource leaks on long-running systems.
**Root cause:** When observer sessions ended (via idle timeout, abort, or error), the spawned Claude CLI subprocesses were not being reliably killed. The existing `ensureProcessExit()` in `SDKAgent` only covered the happy path; sessions terminated through `SessionRoutes` or `worker-service` bypassed process cleanup entirely.
**Fix — dual-layer approach:**
1. **Immediate cleanup:** Added `ensureProcessExit()` calls to the `finally` blocks in both `SessionRoutes.ts` and `worker-service.ts`, ensuring every session exit path kills its subprocess
2. **Periodic reaping:** Added `reapStaleSessions()` to `SessionManager` — a background interval that scans `~/.claude-mem/observer-sessions/` for stale PID files, verifies the process is still running, and kills any orphans with SIGKILL escalation
This ensures no observer subprocess survives beyond its session lifetime, even in crash scenarios.
## [v10.2.5] - 2026-02-18
### Bug Fixes
- **Self-healing message queue**: Renamed `claimAndDelete``claimNextMessage` with atomic self-healing — automatically resets stale processing messages (>60s) back to pending before claiming, eliminating stuck messages from generator crashes without external timers
- **Removed redundant idle-timeout reset**: The `resetStaleProcessingMessages()` call during idle timeout in worker-service was removed (startup reset kept), since the atomic self-healing in `claimNextMessage` now handles recovery inline
- **TypeScript diagnostic fix**: Added `QUEUE` to logger `Component` type
### Tests
- 5 new tests for self-healing behavior (stuck recovery, active protection, atomicity, empty queue, session isolation)
- 1 new integration test for stuck recovery in zombie-prevention suite
- All existing queue tests updated for renamed method
## [v10.2.4] - 2026-02-18
## Chroma Vector DB Backfill Fix
Fixes the Chroma backfill system to correctly sync all SQLite observations into the vector database on worker startup.
### Bug Fixes
- **Backfill all projects on startup** — `backfillAllProjects()` now runs on worker startup, iterating all projects in SQLite and syncing missing observations to Chroma. Previously `ensureBackfilled()` existed but was never called, leaving Chroma with incomplete data after cache clears.
- **Fixed critical collection routing bug** — Backfill now uses the shared `cm__claude-mem` collection (matching how DatabaseManager and SearchManager operate) instead of creating per-project orphan collections that no search path reads from.
- **Hardened collection name sanitization** — Project names with special characters (e.g., "YC Stuff") are sanitized for Chroma's naming constraints, including stripping trailing non-alphanumeric characters.
- **Eliminated shared mutable state** — `ensureBackfilled()` and `getExistingChromaIds()` now accept project as a parameter instead of mutating instance state, keeping a single Chroma connection while avoiding fragile property mutation across iterations.
- **Chroma readiness guard** — Backfill waits for Chroma server readiness before running, preventing spurious error logs when Chroma fails to start.
### Changed Files
- `src/services/sync/ChromaSync.ts` — Core backfill logic, sanitization, parameter passing
- `src/services/worker-service.ts` — Startup backfill trigger + readiness guard
- `src/utils/logger.ts` — Added `CHROMA_SYNC` log component
## [v10.2.3] - 2026-02-17
## Fix Chroma ONNX Model Cache Corruption
Addresses the persistent embedding pipeline failures reported across #1104, #1105, #1110, and subsequent sessions. Three root causes identified and fixed:
### Changes
- **Removed nuclear `bun pm cache rm`** from both `smart-install.js` and `sync-marketplace.cjs`. This was added in v10.2.2 for the now-removed sharp dependency but destroyed all cached packages, breaking the ONNX resolution chain.
- **Added `bun install` in plugin cache directory** after marketplace sync. The cache directory had a `package.json` with `@chroma-core/default-embed` as a dependency but never ran install, so the worker couldn't resolve it at runtime.
- **Moved HuggingFace model cache to `~/.claude-mem/models/`** outside `node_modules`. The ~23MB ONNX model was stored inside `node_modules/@huggingface/transformers/.cache/`, so any reinstall or cache clear corrupted it.
- **Added self-healing retry** for Protobuf parsing failures. If the downloaded model is corrupted, the cache is cleared and re-downloaded automatically on next use.
### Files Changed
- `scripts/smart-install.js` — removed `bun pm cache rm`
- `scripts/sync-marketplace.cjs` — removed `bun pm cache rm`, added `bun install` in cache dir
- `src/services/sync/ChromaSync.ts` — moved model cache, added corruption recovery
## [v10.2.2] - 2026-02-17
## Bug Fixes
@@ -1321,90 +1446,3 @@ claude-mem cursor install
Thanks @yungweng for the detailed bug report!
## [v8.2.9] - 2025-12-29
## Bug Fixes
- **Worker Service**: Remove file-based locking and improve Windows stability
- Replaced file-based locking with health-check-first approach for cleaner mutual exclusion
- Removed AbortSignal.timeout() calls to reduce Bun libuv assertion errors on Windows
- Added 500ms shutdown delays on Windows to prevent zombie ports
- Reduced hook timeout values for improved responsiveness
- Increased worker readiness polling duration from 5s to 15s
## Internal Changes
- Updated worker CLI scripts to reference worker-service.cjs directly
- Simplified hook command configurations
## [v8.2.8] - 2025-12-29
## Bug Fixes
- Fixed orphaned chroma-mcp processes during shutdown (#489)
- Added graceful shutdown handling with signal handlers registered early in WorkerService lifecycle
- Ensures ChromaSync subprocess cleanup even when interrupted during initialization
- Removes PID file during shutdown to prevent stale process tracking
## Technical Details
This patch release addresses a race condition where SIGTERM/SIGINT signals arriving during ChromaSync initialization could leave orphaned chroma-mcp processes. The fix moves signal handler registration from the start() method to the constructor, ensuring cleanup handlers exist throughout the entire initialization lifecycle.
**Full Changelog**: https://github.com/thedotmack/claude-mem/compare/v8.2.7...v8.2.8
## [v8.2.7] - 2025-12-29
## What's Changed
### Token Optimizations
- Simplified MCP server tool definitions for reduced token usage
- Removed outdated troubleshooting and mem-search skill documentation
- Enhanced search parameter descriptions for better clarity
- Streamlined MCP workflows for improved efficiency
This release significantly reduces the token footprint of the plugin's MCP tools and documentation.
**Full Changelog**: https://github.com/thedotmack/claude-mem/compare/v8.2.6...v8.2.7
## [v8.2.6] - 2025-12-29
## What's Changed
### Bug Fixes & Improvements
- Session ID semantic renaming for clarity (content_session_id, memory_session_id)
- Queue system simplification with unified processing logic
- Memory session ID capture for agent resume functionality
- Comprehensive test suite for session ID refactoring
**Full Changelog**: https://github.com/thedotmack/claude-mem/compare/v8.2.5...v8.2.6
## [v8.2.5] - 2025-12-28
## Bug Fixes
- **Logger**: Enhanced Error object handling in debug mode to prevent empty JSON serialization
- **ChromaSync**: Refactored DatabaseManager to initialize ChromaSync lazily, removing background backfill on startup
- **SessionManager**: Simplified message handling and removed linger timeout that was blocking completion
## Technical Details
This patch release addresses several issues discovered after the session continuity fix:
1. Logger now properly serializes Error objects with stack traces in debug mode
2. ChromaSync initialization is now lazy to prevent silent failures during startup
3. Session linger timeout removed to eliminate artificial 5-second delays on session completion
Full changelog: https://github.com/thedotmack/claude-mem/compare/v8.2.4...v8.2.5
## [v8.2.4] - 2025-12-28
Patch release v8.2.4
## [v8.2.3] - 2025-12-27
## Bug Fixes
- Fix worker port environment variable in smart-install script
- Implement file-based locking mechanism for worker operations to prevent race conditions
- Fix restart command references in documentation (changed from `claude-mem restart` to `npm run worker:restart`)
+1 -7
View File
@@ -198,7 +198,7 @@ See [Architecture Overview](https://docs.claude-mem.ai/architecture/overview) fo
## MCP Search Tools
Claude-Mem provides intelligent memory search through **5 MCP tools** following a token-efficient **3-layer workflow pattern**:
Claude-Mem provides intelligent memory search through **4 MCP tools** following a token-efficient **3-layer workflow pattern**:
**The 3-Layer Workflow:**
@@ -211,7 +211,6 @@ Claude-Mem provides intelligent memory search through **5 MCP tools** following
- Start with `search` to get an index of results
- Use `timeline` to see what was happening around specific observations
- Use `get_observations` to fetch full details for relevant IDs
- Use `save_memory` to manually store important information
- **~10x token savings** by filtering before fetching details
**Available MCP Tools:**
@@ -219,8 +218,6 @@ Claude-Mem provides intelligent memory search through **5 MCP tools** following
1. **`search`** - Search memory index with full-text queries, filters by type/date/project
2. **`timeline`** - Get chronological context around a specific observation or query
3. **`get_observations`** - Fetch full observation details by IDs (always batch multiple IDs)
4. **`save_memory`** - Manually save a memory/observation for semantic search
5. **`__IMPORTANT`** - Workflow documentation (always visible to Claude)
**Example Usage:**
@@ -232,9 +229,6 @@ search(query="authentication bug", type="bugfix", limit=10)
// Step 3: Fetch full details
get_observations(ids=[123, 456])
// Save important information manually
save_memory(text="API requires auth header X-API-Key", title="API Auth")
```
See [Search Tools Guide](https://docs.claude-mem.ai/usage/search-tools) for detailed examples.
+1 -1
View File
@@ -5,7 +5,7 @@ set -euo pipefail
# Usage: curl -fsSL https://install.cmem.ai | bash
# or: curl -fsSL https://install.cmem.ai | bash -s -- --provider=gemini --api-key=YOUR_KEY
INSTALLER_URL="https://raw.githubusercontent.com/thedotmack/claude-mem/main/installer/dist/index.js"
INSTALLER_URL="https://install.cmem.ai/installer.js"
# Colors
RED='\033[0;31m'
File diff suppressed because it is too large Load Diff
+3
View File
@@ -1,5 +1,8 @@
{
"$schema": "https://openapi.vercel.sh/vercel.json",
"rewrites": [
{ "source": "/", "destination": "/install.sh" }
],
"headers": [
{
"source": "/(.*)\\.sh",
+2107
View File
File diff suppressed because it is too large Load Diff
+1 -3
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.3",
"version": "10.3.3",
"description": "Memory compression system for Claude Code - persist context across sessions",
"keywords": [
"claude",
@@ -98,9 +98,7 @@
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.1.76",
"@modelcontextprotocol/sdk": "^1.25.1",
"@chroma-core/default-embed": "^0.1.9",
"ansi-to-html": "^0.7.2",
"chromadb": "^3.2.2",
"dompurify": "^3.3.1",
"express": "^4.18.2",
"glob": "^11.0.3",
+52
View File
@@ -0,0 +1,52 @@
# Fix: SessionStart Hook "startup hook error" — Worker Not Waiting
## Root Cause
The **installed plugin** (`~/.claude/plugins/marketplaces/thedotmack/`) is version **10.2.5** and has **none** of the recent fixes:
| Fix | Repo Status | Installed Status |
|-----|-------------|-----------------|
| Hook group split (smart-install isolated from worker start) | In `plugin/hooks/hooks.json` | **Missing** — all 3 hooks in one group, smart-install failure blocks worker |
| `waitForReadiness()` after spawn | In `src/services/infrastructure/HealthMonitor.ts` | **Missing** — 0 occurrences in installed `worker-service.cjs` |
| Early `initializationCompleteFlag` (after DB+search, not MCP) | In `src/services/worker-service.ts` | **Missing** — flag set after MCP connection (5+ minute wait) |
The changes exist in source code but were **never built and synced** to the installed location.
---
## Phase 1: Build and Sync
```bash
npm run build-and-sync
```
### Verification
```bash
# 1. Confirm waitForReadiness exists in installed build
grep -c "waitForReadiness" ~/.claude/plugins/marketplaces/thedotmack/plugin/scripts/worker-service.cjs
# Expected: > 0
# 2. Confirm hooks.json has two SessionStart groups (the split)
python3 -c "import json; d=json.load(open('$(echo $HOME)/.claude/plugins/marketplaces/thedotmack/plugin/hooks/hooks.json')); print('SessionStart groups:', len(d['hooks']['SessionStart']))"
# Expected: 2
# 3. Confirm initializationCompleteFlag is set before MCP connection
grep -n "Core initialization complete" ~/.claude/plugins/marketplaces/thedotmack/plugin/scripts/worker-service.cjs | head -1
# Expected: appears BEFORE "MCP server connected"
```
## Phase 2: Restart Worker and Test
```bash
# Stop existing worker
bun plugin/scripts/worker-service.cjs stop
# Verify stopped
curl -s http://127.0.0.1:37777/api/health && echo "STILL RUNNING" || echo "STOPPED"
```
Then start a new Claude Code session and verify:
- No "SessionStart:startup hook error" messages
- Worker is running: `curl http://127.0.0.1:37777/api/health`
- Readiness endpoint works: `curl http://127.0.0.1:37777/api/readiness`
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.3",
"version": "10.3.3",
"description": "Persistent memory system for Claude Code - seamlessly preserve context across sessions",
"author": {
"name": "Alex Newman"
+7 -2
View File
@@ -8,7 +8,7 @@
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/scripts/setup.sh",
"timeout": 120
"timeout": 300
}
]
}
@@ -21,7 +21,12 @@
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/smart-install.js\"",
"timeout": 300
},
}
]
},
{
"matcher": "startup|clear|compact",
"hooks": [
{
"type": "command",
"command": "node \"${CLAUDE_PLUGIN_ROOT}/scripts/bun-runner.js\" \"${CLAUDE_PLUGIN_ROOT}/scripts/worker-service.cjs\" start",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem-plugin",
"version": "10.2.3",
"version": "10.3.3",
"private": true,
"description": "Runtime dependencies for claude-mem bundled hooks",
"type": "module",
BIN
View File
Binary file not shown.
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
-14
View File
@@ -93,20 +93,6 @@ get_observations(ids=[11131, 10942])
**Returns:** Complete observation objects with title, subtitle, narrative, facts, concepts, files (~500-1000 tokens each)
## Saving Memories
Use the `save_memory` MCP tool to store manual observations:
```
save_memory(text="Important discovery about the auth system", title="Auth Architecture", project="my-project")
```
**Parameters:**
- `text` (string, required) - Content to remember
- `title` (string, optional) - Short title, auto-generated if omitted
- `project` (string, optional) - Project name, defaults to "claude-mem"
## Examples
**Find recent bug fixes:**
+2 -2
View File
@@ -235,8 +235,8 @@ NEVER fetch full details without filtering first. 10x token savings.`,
}
},
{
name: 'save_memory',
description: 'Save a manual memory/observation for semantic search. Use this to remember important information.',
name: 'save_observation',
description: 'Save an observation to the database. Params: text (required), title, project',
inputSchema: {
type: 'object',
properties: {
@@ -74,8 +74,8 @@ export function renderColorContextIndex(): string[] {
`${colors.dim}Context Index: This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.${colors.reset}`,
'',
`${colors.dim}When you need implementation details, rationale, or debugging context:${colors.reset}`,
`${colors.dim} - Use MCP tools (search, get_observations) to fetch full observations on-demand${colors.reset}`,
`${colors.dim} - Critical types ( bugfix, decision) often need detailed fetching${colors.reset}`,
`${colors.dim} - Fetch by ID: get_observations([IDs]) for observations visible in this index${colors.reset}`,
`${colors.dim} - Search history: Use the mem-search skill for past decisions, bugs, and deeper research${colors.reset}`,
`${colors.dim} - Trust this index over re-reading code for past decisions and learnings${colors.reset}`,
''
];
@@ -226,7 +226,7 @@ export function renderColorFooter(totalDiscoveryTokens: number, totalReadTokens:
const workTokensK = Math.round(totalDiscoveryTokens / 1000);
return [
'',
`${colors.dim}Access ${workTokensK}k tokens of past research & decisions for just ${totalReadTokens.toLocaleString()}t. Use MCP search tools to access memories by ID.${colors.reset}`
`${colors.dim}Access ${workTokensK}k tokens of past research & decisions for just ${totalReadTokens.toLocaleString()}t. Use the claude-mem skill to access memories by ID.${colors.reset}`
];
}
@@ -72,8 +72,8 @@ export function renderMarkdownContextIndex(): string[] {
`**Context Index:** This semantic index (titles, types, files, tokens) is usually sufficient to understand past work.`,
'',
`When you need implementation details, rationale, or debugging context:`,
`- Use MCP tools (search, get_observations) to fetch full observations on-demand`,
`- Critical types ( bugfix, decision) often need detailed fetching`,
`- Fetch by ID: get_observations([IDs]) for observations visible in this index`,
`- Search history: Use the mem-search skill for past decisions, bugs, and deeper research`,
`- Trust this index over re-reading code for past decisions and learnings`,
''
];
@@ -229,7 +229,7 @@ export function renderMarkdownFooter(totalDiscoveryTokens: number, totalReadToke
const workTokensK = Math.round(totalDiscoveryTokens / 1000);
return [
'',
`Access ${workTokensK}k tokens of past research & decisions for just ${totalReadTokens.toLocaleString()}t. Use MCP search tools to access memories by ID.`
`Access ${workTokensK}k tokens of past research & decisions for just ${totalReadTokens.toLocaleString()}t. Use the claude-mem skill to access memories by ID.`
];
}
@@ -30,9 +30,9 @@ export interface CloseableDatabase {
}
/**
* Stoppable service interface for Chroma server
* Stoppable service interface for ChromaMcpManager
*/
export interface StoppableServer {
export interface StoppableService {
stop(): Promise<void>;
}
@@ -44,7 +44,7 @@ export interface GracefulShutdownConfig {
sessionManager: ShutdownableService;
mcpClient?: CloseableClient;
dbManager?: CloseableDatabase;
chromaServer?: StoppableServer;
chromaMcpManager?: StoppableService;
}
/**
@@ -79,11 +79,11 @@ export async function performGracefulShutdown(config: GracefulShutdownConfig): P
logger.info('SYSTEM', 'MCP client closed');
}
// STEP 5: Stop Chroma server (local mode only)
if (config.chromaServer) {
logger.info('SHUTDOWN', 'Stopping Chroma server...');
await config.chromaServer.stop();
logger.info('SHUTDOWN', 'Chroma server stopped');
// STEP 5: Stop Chroma MCP connection
if (config.chromaMcpManager) {
logger.info('SHUTDOWN', 'Stopping Chroma MCP connection...');
await config.chromaMcpManager.stop();
logger.info('SHUTDOWN', 'Chroma MCP connection stopped');
}
// STEP 6: Close database connection (includes ChromaSync cleanup)
+29 -11
View File
@@ -29,31 +29,49 @@ export async function isPortInUse(port: number): Promise<boolean> {
}
/**
* Wait for the worker HTTP server to become responsive (liveness check)
* Uses /api/health instead of /api/readiness because:
* - /api/health returns 200 as soon as HTTP server is listening
* - /api/readiness waits for full initialization (MCP connection can take 5+ minutes)
* See: https://github.com/thedotmack/claude-mem/issues/811
* @param port Worker port to check
* @param timeoutMs Maximum time to wait in milliseconds
* @returns true if worker became responsive, false if timeout
* Poll a localhost endpoint until it returns 200 OK or timeout.
* Shared implementation for liveness and readiness checks.
*/
export async function waitForHealth(port: number, timeoutMs: number = 30000): Promise<boolean> {
async function pollEndpointUntilOk(
port: number,
endpointPath: string,
timeoutMs: number,
retryLogMessage: string
): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/health`);
const response = await fetch(`http://127.0.0.1:${port}${endpointPath}`);
if (response.ok) return true;
} catch (error) {
// [ANTI-PATTERN IGNORED]: Retry loop - expected failures during startup, will retry
logger.debug('SYSTEM', 'Service not ready yet, will retry', { port }, error as Error);
logger.debug('SYSTEM', retryLogMessage, { port }, error as Error);
}
await new Promise(r => setTimeout(r, 500));
}
return false;
}
/**
* Wait for the worker HTTP server to become responsive (liveness check).
* Uses /api/health which returns 200 as soon as the HTTP server is listening.
* For full initialization (DB + search), use waitForReadiness() instead.
*/
export function waitForHealth(port: number, timeoutMs: number = 30000): Promise<boolean> {
return pollEndpointUntilOk(port, '/api/health', timeoutMs, 'Service not ready yet, will retry');
}
/**
* Wait for the worker to be fully initialized (DB + search ready).
* Uses /api/readiness which returns 200 only after core initialization completes.
* Now that initializationCompleteFlag is set after DB/search init (not MCP),
* this typically completes in a few seconds.
*/
export function waitForReadiness(port: number, timeoutMs: number = 30000): Promise<boolean> {
return pollEndpointUntilOk(port, '/api/readiness', timeoutMs, 'Worker not ready yet, will retry');
}
/**
* Wait for a port to become free (no longer responding to health checks)
* Used after shutdown to confirm the port is available for restart
+177 -1
View File
@@ -10,7 +10,7 @@
import path from 'path';
import { homedir } from 'os';
import { existsSync, writeFileSync, readFileSync, unlinkSync, mkdirSync } from 'fs';
import { existsSync, writeFileSync, readFileSync, unlinkSync, mkdirSync, rmSync } from 'fs';
import { exec, execSync, spawn } from 'child_process';
import { promisify } from 'util';
import { logger } from '../../utils/logger.js';
@@ -426,6 +426,182 @@ export async function cleanupOrphanedProcesses(): Promise<void> {
logger.info('SYSTEM', 'Orphaned processes cleaned up', { count: pidsToKill.length });
}
// Patterns that should be killed immediately at startup (no age gate)
// These are child processes that should not outlive their parent worker
const AGGRESSIVE_CLEANUP_PATTERNS = ['worker-service.cjs', 'chroma-mcp'];
// Patterns that keep the age-gated threshold (may be legitimately running)
const AGE_GATED_CLEANUP_PATTERNS = ['mcp-server.cjs'];
/**
* Aggressive startup cleanup for orphaned claude-mem processes.
*
* Unlike cleanupOrphanedProcesses() which age-gates everything at 30 minutes,
* this function kills worker-service.cjs and chroma-mcp processes immediately
* (they should not outlive their parent worker). Only mcp-server.cjs keeps
* the age threshold since it may be legitimately running.
*
* Called once at daemon startup.
*/
export async function aggressiveStartupCleanup(): Promise<void> {
const isWindows = process.platform === 'win32';
const currentPid = process.pid;
const pidsToKill: number[] = [];
const allPatterns = [...AGGRESSIVE_CLEANUP_PATTERNS, ...AGE_GATED_CLEANUP_PATTERNS];
try {
if (isWindows) {
const patternConditions = allPatterns
.map(p => `$_.CommandLine -like '*${p}*'`)
.join(' -or ');
const cmd = `powershell -NoProfile -NonInteractive -Command "Get-CimInstance Win32_Process | Where-Object { (${patternConditions}) -and $_.ProcessId -ne ${currentPid} } | Select-Object ProcessId, CommandLine, CreationDate | ConvertTo-Json"`;
const { stdout } = await execAsync(cmd, { timeout: HOOK_TIMEOUTS.POWERSHELL_COMMAND });
if (!stdout.trim() || stdout.trim() === 'null') {
logger.debug('SYSTEM', 'No orphaned claude-mem processes found (Windows)');
return;
}
const processes = JSON.parse(stdout);
const processList = Array.isArray(processes) ? processes : [processes];
const now = Date.now();
for (const proc of processList) {
const pid = proc.ProcessId;
if (!Number.isInteger(pid) || pid <= 0 || pid === currentPid) continue;
const commandLine = proc.CommandLine || '';
const isAggressive = AGGRESSIVE_CLEANUP_PATTERNS.some(p => commandLine.includes(p));
if (isAggressive) {
// Kill immediately — no age check
pidsToKill.push(pid);
logger.debug('SYSTEM', 'Found orphaned process (aggressive)', { pid, commandLine: commandLine.substring(0, 80) });
} else {
// Age-gated: only kill if older than threshold
const creationMatch = proc.CreationDate?.match(/\/Date\((\d+)\)\//);
if (creationMatch) {
const creationTime = parseInt(creationMatch[1], 10);
const ageMinutes = (now - creationTime) / (1000 * 60);
if (ageMinutes >= ORPHAN_MAX_AGE_MINUTES) {
pidsToKill.push(pid);
logger.debug('SYSTEM', 'Found orphaned process (age-gated)', { pid, ageMinutes: Math.round(ageMinutes) });
}
}
}
}
} else {
// Unix: Use ps with elapsed time
const patternRegex = allPatterns.join('|');
const { stdout } = await execAsync(
`ps -eo pid,etime,command | grep -E "${patternRegex}" | grep -v grep || true`
);
if (!stdout.trim()) {
logger.debug('SYSTEM', 'No orphaned claude-mem processes found (Unix)');
return;
}
const lines = stdout.trim().split('\n');
for (const line of lines) {
const match = line.trim().match(/^(\d+)\s+(\S+)\s+(.*)$/);
if (!match) continue;
const pid = parseInt(match[1], 10);
const etime = match[2];
const command = match[3];
if (!Number.isInteger(pid) || pid <= 0 || pid === currentPid) continue;
const isAggressive = AGGRESSIVE_CLEANUP_PATTERNS.some(p => command.includes(p));
if (isAggressive) {
// Kill immediately — no age check
pidsToKill.push(pid);
logger.debug('SYSTEM', 'Found orphaned process (aggressive)', { pid, command: command.substring(0, 80) });
} else {
// Age-gated: only kill if older than threshold
const ageMinutes = parseElapsedTime(etime);
if (ageMinutes >= ORPHAN_MAX_AGE_MINUTES) {
pidsToKill.push(pid);
logger.debug('SYSTEM', 'Found orphaned process (age-gated)', { pid, ageMinutes, command: command.substring(0, 80) });
}
}
}
}
} catch (error) {
logger.error('SYSTEM', 'Failed to enumerate orphaned processes during aggressive cleanup', {}, error as Error);
return;
}
if (pidsToKill.length === 0) {
return;
}
logger.info('SYSTEM', 'Aggressive startup cleanup: killing orphaned processes', {
platform: isWindows ? 'Windows' : 'Unix',
count: pidsToKill.length,
pids: pidsToKill
});
if (isWindows) {
for (const pid of pidsToKill) {
if (!Number.isInteger(pid) || pid <= 0) continue;
try {
execSync(`taskkill /PID ${pid} /T /F`, { timeout: HOOK_TIMEOUTS.POWERSHELL_COMMAND, stdio: 'ignore' });
} catch (error) {
logger.debug('SYSTEM', 'Failed to kill process, may have already exited', { pid }, error as Error);
}
}
} else {
for (const pid of pidsToKill) {
try {
process.kill(pid, 'SIGKILL');
} catch (error) {
logger.debug('SYSTEM', 'Process already exited', { pid }, error as Error);
}
}
}
logger.info('SYSTEM', 'Aggressive startup cleanup complete', { count: pidsToKill.length });
}
const CHROMA_MIGRATION_MARKER_FILENAME = '.chroma-cleaned-v10.3';
/**
* One-time chroma data wipe for users upgrading from versions with duplicate
* worker bugs that could corrupt chroma data. Since chroma is always rebuildable
* from SQLite (via backfillAllProjects), this is safe.
*
* Checks for a marker file. If absent, wipes ~/.claude-mem/chroma/ and writes
* the marker. If present, skips. Idempotent.
*
* @param dataDirectory - Override for DATA_DIR (used in tests)
*/
export function runOneTimeChromaMigration(dataDirectory?: string): void {
const effectiveDataDir = dataDirectory ?? DATA_DIR;
const markerPath = path.join(effectiveDataDir, CHROMA_MIGRATION_MARKER_FILENAME);
const chromaDir = path.join(effectiveDataDir, 'chroma');
if (existsSync(markerPath)) {
logger.debug('SYSTEM', 'Chroma migration marker exists, skipping wipe');
return;
}
logger.warn('SYSTEM', 'Running one-time chroma data wipe (upgrade from pre-v10.3)', { chromaDir });
if (existsSync(chromaDir)) {
rmSync(chromaDir, { recursive: true, force: true });
logger.info('SYSTEM', 'Chroma data directory removed', { chromaDir });
}
// Write marker file to prevent future wipes
mkdirSync(effectiveDataDir, { recursive: true });
writeFileSync(markerPath, new Date().toISOString());
logger.info('SYSTEM', 'Chroma migration marker written', { markerPath });
}
/**
* Spawn a detached daemon process
* Returns the child PID or undefined if spawn failed
+7 -6
View File
@@ -20,8 +20,9 @@ export class SessionQueueProcessor {
/**
* Create an async iterator that yields messages as they become available.
* Uses atomic claim-and-delete to prevent duplicates.
* The queue is a pure buffer: claim it, delete it, process in memory.
* Uses atomic claim-confirm to prevent duplicates.
* Messages are claimed (marked processing) and stay in DB until confirmProcessed().
* Self-heals stale processing messages before each claim.
* Waits for 'message' event when queue is empty.
*
* CRITICAL: Calls onIdleTimeout callback after 3 minutes of inactivity.
@@ -34,14 +35,14 @@ export class SessionQueueProcessor {
while (!signal.aborted) {
try {
// Atomically claim AND DELETE next message from DB
// Message is now in memory only - no "processing" state tracking needed
const persistentMessage = this.store.claimAndDelete(sessionDbId);
// Atomically claim next pending message (marks as 'processing')
// Self-heals any stale processing messages before claiming
const persistentMessage = this.store.claimNextMessage(sessionDbId);
if (persistentMessage) {
// Reset activity time when we successfully yield a message
lastActivityTime = Date.now();
// Yield the message for processing (it's already deleted from queue)
// Yield the message for processing (it's marked as 'processing' in DB)
yield this.toPendingMessageWithId(persistentMessage);
} else {
// Queue empty - wait for wake-up event or timeout
+15 -2
View File
@@ -248,8 +248,14 @@ export class Server {
process.send!({ type: 'restart' });
} else {
// Unix or standalone Windows - handle restart ourselves
// The spawner (ensureWorkerStarted/restart command) handles spawning the new daemon.
// This process just needs to shut down and exit.
setTimeout(async () => {
await this.options.onRestart();
try {
await this.options.onRestart();
} finally {
process.exit(0);
}
}, 100);
}
});
@@ -268,7 +274,14 @@ export class Server {
} else {
// Unix or standalone Windows - handle shutdown ourselves
setTimeout(async () => {
await this.options.onShutdown();
try {
await this.options.onShutdown();
} finally {
// CRITICAL: Exit the process after shutdown completes (or fails).
// Without this, the daemon stays alive as a zombie — background tasks
// (backfill, reconnects) keep running and respawn chroma-mcp subprocesses.
process.exit(0);
}
}, 100);
}
});
+30 -6
View File
@@ -2,6 +2,9 @@ import { Database } from './sqlite-compat.js';
import type { PendingMessage } from '../worker-types.js';
import { logger } from '../../utils/logger.js';
/** Messages processing longer than this are considered stale and reset to pending by self-healing */
const STALE_PROCESSING_THRESHOLD_MS = 60_000;
/**
* Persistent pending message record from database
*/
@@ -26,12 +29,17 @@ export interface PersistentPendingMessage {
/**
* PendingMessageStore - Persistent work queue for SDK messages
*
* Messages are persisted before processing using a claim-and-delete pattern.
* Messages are persisted before processing using a claim-confirm pattern.
* This simplifies the lifecycle and eliminates duplicate processing bugs.
*
* Lifecycle:
* 1. enqueue() - Message persisted with status 'pending'
* 2. claimAndDelete() - Atomically claims and deletes message (process in memory)
* 2. claimNextMessage() - Atomically claims next pending message (marks as 'processing')
* 3. confirmProcessed() - Deletes message after successful processing
*
* Self-healing:
* - claimNextMessage() resets stale 'processing' messages (>60s) back to 'pending' before claiming
* - This eliminates stuck messages from generator crashes without external timers
*
* Recovery:
* - getSessionsWithPendingMessages() - Find sessions that need recovery on startup
@@ -78,13 +86,29 @@ export class PendingMessageStore {
/**
* Atomically claim the next pending message by marking it as 'processing'.
* CRITICAL FIX: Does NOT delete - message stays in DB until confirmProcessed() is called.
* This prevents message loss if the generator crashes mid-processing.
* Self-healing: resets any stale 'processing' messages (>60s) back to 'pending' first.
* Message stays in DB until confirmProcessed() is called.
* Uses a transaction to prevent race conditions.
*/
claimAndDelete(sessionDbId: number): PersistentPendingMessage | null {
const now = Date.now();
claimNextMessage(sessionDbId: number): PersistentPendingMessage | null {
const claimTx = this.db.transaction((sessionId: number) => {
// Capture time inside transaction so it's fresh if WAL contention causes retry
const now = Date.now();
// Self-healing: reset stale 'processing' messages back to 'pending'
// This recovers from generator crashes without external timers
// Note: strict < means messages must be OLDER than threshold to be reset
const staleCutoff = now - STALE_PROCESSING_THRESHOLD_MS;
const resetStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE session_db_id = ? AND status = 'processing'
AND started_processing_at_epoch < ?
`);
const resetResult = resetStmt.run(sessionId, staleCutoff);
if (resetResult.changes > 0) {
logger.info('QUEUE', `SELF_HEAL | sessionDbId=${sessionId} | recovered ${resetResult.changes} stale processing message(s)`);
}
const peekStmt = this.db.prepare(`
SELECT * FROM pending_messages
WHERE session_db_id = ? AND status = 'pending'
+422
View File
@@ -0,0 +1,422 @@
/**
* ChromaMcpManager - Singleton managing a persistent MCP connection to chroma-mcp via uvx
*
* Replaces ChromaServerManager (which spawned `npx chroma run`) with a stdio-based
* MCP client that communicates with chroma-mcp as a subprocess. The chroma-mcp server
* handles its own embedding and persistent storage, eliminating the need for a separate
* HTTP server, chromadb npm package, and ONNX/WASM embedding dependencies.
*
* Lifecycle: lazy-connects on first callTool() use, maintains a single persistent
* connection per worker lifetime, and auto-reconnects if the subprocess dies.
*
* Cross-platform: Linux, macOS, Windows
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { execSync } from 'child_process';
import path from 'path';
import os from 'os';
import fs from 'fs';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
const CHROMA_MCP_CLIENT_NAME = 'claude-mem-chroma';
const CHROMA_MCP_CLIENT_VERSION = '1.0.0';
const MCP_CONNECTION_TIMEOUT_MS = 30_000;
const RECONNECT_BACKOFF_MS = 10_000; // Don't retry connections faster than this after failure
const DEFAULT_CHROMA_DATA_DIR = path.join(os.homedir(), '.claude-mem', 'chroma');
export class ChromaMcpManager {
private static instance: ChromaMcpManager | null = null;
private client: Client | null = null;
private transport: StdioClientTransport | null = null;
private connected: boolean = false;
private lastConnectionFailureTimestamp: number = 0;
private connecting: Promise<void> | null = null;
private constructor() {}
/**
* Get or create the singleton instance
*/
static getInstance(): ChromaMcpManager {
if (!ChromaMcpManager.instance) {
ChromaMcpManager.instance = new ChromaMcpManager();
}
return ChromaMcpManager.instance;
}
/**
* Ensure the MCP client is connected to chroma-mcp.
* Uses a connection lock to prevent concurrent connection attempts.
* If the subprocess has died since the last use, reconnects transparently.
*/
private async ensureConnected(): Promise<void> {
if (this.connected && this.client) {
return;
}
// Backoff: don't retry connections too fast after a failure
const timeSinceLastFailure = Date.now() - this.lastConnectionFailureTimestamp;
if (this.lastConnectionFailureTimestamp > 0 && timeSinceLastFailure < RECONNECT_BACKOFF_MS) {
throw new Error(`chroma-mcp connection in backoff (${Math.ceil((RECONNECT_BACKOFF_MS - timeSinceLastFailure) / 1000)}s remaining)`);
}
// If another caller is already connecting, wait for that attempt
if (this.connecting) {
await this.connecting;
return;
}
this.connecting = this.connectInternal();
try {
await this.connecting;
} catch (error) {
this.lastConnectionFailureTimestamp = Date.now();
throw error;
} finally {
this.connecting = null;
}
}
/**
* Internal connection logic - spawns uvx chroma-mcp and performs MCP handshake.
* Called behind the connection lock to ensure only one connection attempt at a time.
*/
private async connectInternal(): Promise<void> {
// Clean up any stale client/transport from a dead subprocess.
// Close transport first (kills subprocess via SIGTERM) before client
// to avoid hanging on a stuck process.
if (this.transport) {
try { await this.transport.close(); } catch { /* already dead */ }
}
if (this.client) {
try { await this.client.close(); } catch { /* already dead */ }
}
this.client = null;
this.transport = null;
this.connected = false;
const commandArgs = this.buildCommandArgs();
const spawnEnvironment = this.getSpawnEnv();
const isWindows = process.platform === 'win32';
const uvxCommand = isWindows ? 'uvx.cmd' : 'uvx';
logger.info('CHROMA_MCP', 'Connecting to chroma-mcp via MCP stdio', {
command: uvxCommand,
args: commandArgs.join(' ')
});
this.transport = new StdioClientTransport({
command: uvxCommand,
args: commandArgs,
env: spawnEnvironment,
stderr: 'pipe'
});
this.client = new Client(
{ name: CHROMA_MCP_CLIENT_NAME, version: CHROMA_MCP_CLIENT_VERSION },
{ capabilities: {} }
);
const mcpConnectionPromise = this.client.connect(this.transport);
let timeoutId: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error(`MCP connection to chroma-mcp timed out after ${MCP_CONNECTION_TIMEOUT_MS}ms`)),
MCP_CONNECTION_TIMEOUT_MS
);
});
try {
await Promise.race([mcpConnectionPromise, timeoutPromise]);
} catch (connectionError) {
// Connection failed or timed out - kill the subprocess to prevent zombies
clearTimeout(timeoutId!);
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess to prevent zombie', {
error: connectionError instanceof Error ? connectionError.message : String(connectionError)
});
try { await this.transport.close(); } catch { /* best effort */ }
try { await this.client.close(); } catch { /* best effort */ }
this.client = null;
this.transport = null;
this.connected = false;
throw connectionError;
}
clearTimeout(timeoutId!);
this.connected = true;
logger.info('CHROMA_MCP', 'Connected to chroma-mcp successfully');
// Listen for transport close to mark connection as dead and apply backoff.
// CRITICAL: Guard with reference check to prevent stale onclose handlers from
// previous transports overwriting the current connection (race condition).
const currentTransport = this.transport;
this.transport.onclose = () => {
if (this.transport !== currentTransport) {
logger.debug('CHROMA_MCP', 'Ignoring stale onclose from previous transport');
return;
}
logger.warn('CHROMA_MCP', 'chroma-mcp subprocess closed unexpectedly, applying reconnect backoff');
this.connected = false;
this.client = null;
this.transport = null;
this.lastConnectionFailureTimestamp = Date.now();
};
}
/**
* Build the uvx command arguments based on current settings.
* In local mode: uses persistent client with local data directory.
* In remote mode: uses http client with configured host/port/auth.
*/
private buildCommandArgs(): string[] {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const chromaMode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
if (chromaMode === 'remote') {
const chromaHost = settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1';
const chromaPort = settings.CLAUDE_MEM_CHROMA_PORT || '8000';
const chromaSsl = settings.CLAUDE_MEM_CHROMA_SSL === 'true';
const chromaTenant = settings.CLAUDE_MEM_CHROMA_TENANT || 'default_tenant';
const chromaDatabase = settings.CLAUDE_MEM_CHROMA_DATABASE || 'default_database';
const chromaApiKey = settings.CLAUDE_MEM_CHROMA_API_KEY || '';
const args = [
'chroma-mcp',
'--client-type', 'http',
'--host', chromaHost,
'--port', chromaPort
];
if (chromaSsl) {
args.push('--ssl');
}
if (chromaTenant !== 'default_tenant') {
args.push('--tenant', chromaTenant);
}
if (chromaDatabase !== 'default_database') {
args.push('--database', chromaDatabase);
}
if (chromaApiKey) {
args.push('--api-key', chromaApiKey);
}
return args;
}
// Local mode: persistent client with data directory
return [
'chroma-mcp',
'--client-type', 'persistent',
'--data-dir', DEFAULT_CHROMA_DATA_DIR
];
}
/**
* Call a chroma-mcp tool by name with the given arguments.
* Lazily connects on first call. Reconnects if the subprocess has died.
*
* @param toolName - The chroma-mcp tool name (e.g. 'chroma_query_documents')
* @param toolArguments - The tool arguments as a plain object
* @returns The parsed JSON result from the tool's text output
*/
async callTool(toolName: string, toolArguments: Record<string, unknown>): Promise<unknown> {
await this.ensureConnected();
logger.debug('CHROMA_MCP', `Calling tool: ${toolName}`, {
arguments: JSON.stringify(toolArguments).slice(0, 200)
});
const result = await this.client!.callTool({
name: toolName,
arguments: toolArguments
});
// MCP tools signal errors via isError flag on the CallToolResult
if (result.isError) {
const errorText = (result.content as Array<{ type: string; text?: string }>)
?.find(item => item.type === 'text')?.text || 'Unknown chroma-mcp error';
throw new Error(`chroma-mcp tool "${toolName}" returned error: ${errorText}`);
}
// Extract text from MCP CallToolResult: { content: Array<{ type, text? }> }
const contentArray = result.content as Array<{ type: string; text?: string }>;
if (!contentArray || contentArray.length === 0) {
return null;
}
const firstTextContent = contentArray.find(item => item.type === 'text' && item.text);
if (!firstTextContent || !firstTextContent.text) {
return null;
}
// chroma-mcp returns JSON for query/get results, but plain text for
// mutating operations (e.g. "Successfully created collection ...").
// Try JSON parse first; if it fails, return the raw text for non-error responses.
try {
return JSON.parse(firstTextContent.text);
} catch {
// Plain text response (e.g. "Successfully created collection cm__foo")
// Return null for void-like success messages, callers don't need the text
return null;
}
}
/**
* Check if the MCP connection is alive by calling chroma_list_collections.
* Returns true if the connection is healthy, false otherwise.
*/
async isHealthy(): Promise<boolean> {
try {
await this.callTool('chroma_list_collections', { limit: 1 });
return true;
} catch {
return false;
}
}
/**
* Gracefully stop the MCP connection and kill the chroma-mcp subprocess.
* client.close() sends stdin close -> SIGTERM -> SIGKILL to the subprocess.
*/
async stop(): Promise<void> {
if (!this.client) {
logger.debug('CHROMA_MCP', 'No active MCP connection to stop');
return;
}
logger.info('CHROMA_MCP', 'Stopping chroma-mcp MCP connection');
try {
await this.client.close();
} catch (error) {
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', {}, error as Error);
}
this.client = null;
this.transport = null;
this.connected = false;
this.connecting = null;
logger.info('CHROMA_MCP', 'chroma-mcp MCP connection stopped');
}
/**
* Reset the singleton instance (for testing).
* Awaits stop() to prevent dual subprocesses.
*/
static async reset(): Promise<void> {
if (ChromaMcpManager.instance) {
await ChromaMcpManager.instance.stop();
}
ChromaMcpManager.instance = null;
}
/**
* Get or create a combined SSL certificate bundle for Zscaler/corporate proxy environments.
* On macOS, combines the Python certifi CA bundle with any Zscaler certificates from
* the system keychain. Caches the result for 24 hours at ~/.claude-mem/combined_certs.pem.
*
* Returns the path to the combined cert file, or undefined if not needed/available.
*/
private getCombinedCertPath(): string | undefined {
const combinedCertPath = path.join(os.homedir(), '.claude-mem', 'combined_certs.pem');
if (fs.existsSync(combinedCertPath)) {
const stats = fs.statSync(combinedCertPath);
const ageMs = Date.now() - stats.mtimeMs;
if (ageMs < 24 * 60 * 60 * 1000) {
return combinedCertPath;
}
}
if (process.platform !== 'darwin') {
return undefined;
}
try {
let certifiPath: string | undefined;
try {
certifiPath = execSync(
'uvx --with certifi python -c "import certifi; print(certifi.where())"',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 10000 }
).trim();
} catch {
return undefined;
}
if (!certifiPath || !fs.existsSync(certifiPath)) {
return undefined;
}
let zscalerCert = '';
try {
zscalerCert = execSync(
'security find-certificate -a -c "Zscaler" -p /Library/Keychains/System.keychain',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 5000 }
);
} catch {
return undefined;
}
if (!zscalerCert ||
!zscalerCert.includes('-----BEGIN CERTIFICATE-----') ||
!zscalerCert.includes('-----END CERTIFICATE-----')) {
return undefined;
}
const certifiContent = fs.readFileSync(certifiPath, 'utf8');
const tempPath = combinedCertPath + '.tmp';
fs.writeFileSync(tempPath, certifiContent + '\n' + zscalerCert);
fs.renameSync(tempPath, combinedCertPath);
logger.info('CHROMA_MCP', 'Created combined SSL certificate bundle for Zscaler', {
path: combinedCertPath
});
return combinedCertPath;
} catch (error) {
logger.debug('CHROMA_MCP', 'Could not create combined cert bundle', {}, error as Error);
return undefined;
}
}
/**
* Build subprocess environment with SSL certificate overrides for enterprise proxy compatibility.
* If a combined cert bundle exists (Zscaler), injects SSL_CERT_FILE, REQUESTS_CA_BUNDLE, etc.
* Otherwise returns a plain string-keyed copy of process.env.
*/
private getSpawnEnv(): Record<string, string> {
const baseEnv: Record<string, string> = {};
for (const [key, value] of Object.entries(process.env)) {
if (value !== undefined) {
baseEnv[key] = value;
}
}
const combinedCertPath = this.getCombinedCertPath();
if (!combinedCertPath) {
return baseEnv;
}
logger.info('CHROMA_MCP', 'Using combined SSL certificates for enterprise compatibility', {
certPath: combinedCertPath
});
return {
...baseEnv,
SSL_CERT_FILE: combinedCertPath,
REQUESTS_CA_BUNDLE: combinedCertPath,
CURL_CA_BUNDLE: combinedCertPath,
NODE_EXTRA_CA_CERTS: combinedCertPath
};
}
}
-446
View File
@@ -1,446 +0,0 @@
/**
* ChromaServerManager - Singleton managing local Chroma HTTP server lifecycle
*
* Starts a persistent Chroma server via `npx chroma run` at worker startup
* and manages its lifecycle. In 'remote' mode, skips server start and connects
* to an existing server (future cloud support).
*
* Cross-platform: Linux, macOS, Windows
*/
import { spawn, ChildProcess, execSync } from 'child_process';
import path from 'path';
import os from 'os';
import fs, { existsSync } from 'fs';
import { logger } from '../../utils/logger.js';
export interface ChromaServerConfig {
dataDir: string;
host: string;
port: number;
}
export class ChromaServerManager {
private static instance: ChromaServerManager | null = null;
private serverProcess: ChildProcess | null = null;
private config: ChromaServerConfig;
private starting: boolean = false;
private ready: boolean = false;
private startPromise: Promise<boolean> | null = null;
private constructor(config: ChromaServerConfig) {
this.config = config;
}
/**
* Get or create the singleton instance
*/
static getInstance(config?: ChromaServerConfig): ChromaServerManager {
if (!ChromaServerManager.instance) {
const defaultConfig: ChromaServerConfig = {
dataDir: path.join(os.homedir(), '.claude-mem', 'vector-db'),
host: '127.0.0.1',
port: 8000
};
ChromaServerManager.instance = new ChromaServerManager(config || defaultConfig);
}
return ChromaServerManager.instance;
}
/**
* Start the Chroma HTTP server
* Reuses in-flight startup if already starting
* Spawns `npx chroma run` as a background process
* If a server is already running (from previous worker), reuses it
*/
async start(timeoutMs: number = 60000): Promise<boolean> {
if (this.ready) {
logger.debug('CHROMA_SERVER', 'Server already started or starting', {
ready: this.ready,
starting: this.starting
});
return true;
}
if (this.startPromise) {
logger.debug('CHROMA_SERVER', 'Awaiting existing startup', {
host: this.config.host,
port: this.config.port
});
return this.startPromise;
}
this.starting = true;
this.startPromise = this.startInternal(timeoutMs);
try {
return await this.startPromise;
} finally {
this.startPromise = null;
if (!this.ready) {
this.starting = false;
}
}
}
/**
* Internal startup path used behind a single shared startPromise lock
*/
private async startInternal(timeoutMs: number): Promise<boolean> {
// Check if a server is already running (from previous worker or manual start)
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`,
{ signal: AbortSignal.timeout(3000) }
);
if (response.ok) {
logger.info('CHROMA_SERVER', 'Existing server detected, reusing', {
host: this.config.host,
port: this.config.port
});
this.ready = true;
this.starting = false;
return true;
}
} catch {
// No server running, proceed to start one
}
// Cross-platform: use npx.cmd on Windows
const isWindows = process.platform === 'win32';
// Resolve chroma binary absolutely — npx fails when spawned from cache dirs (#1120)
let command: string;
let args: string[];
try {
// chromadb package installs a 'chroma' bin entry
const chromaBinDir = path.dirname(require.resolve('chromadb/package.json'));
// Check project-level .bin first (most common npm/bun installation layout)
const projectBin = path.join(chromaBinDir, '..', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
// Fallback: nested node_modules .bin (rare — pnpm or workspace hoisting)
const nestedBin = path.join(chromaBinDir, 'node_modules', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
if (existsSync(projectBin)) {
command = projectBin;
} else if (existsSync(nestedBin)) {
command = nestedBin;
} else {
// Last resort: npx with explicit cwd
command = isWindows ? 'npx.cmd' : 'npx';
}
} catch {
command = isWindows ? 'npx.cmd' : 'npx';
}
if (command.includes('npx')) {
args = ['chroma', 'run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
} else {
args = ['run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
}
logger.info('CHROMA_SERVER', 'Starting Chroma server', {
command,
args: args.join(' '),
dataDir: this.config.dataDir
});
const spawnEnv = this.getSpawnEnv();
// Resolve cwd for npx fallback — ensures node_modules is findable (#1120)
let spawnCwd: string | undefined;
try {
spawnCwd = path.dirname(require.resolve('chromadb/package.json'));
} catch {
// If chromadb isn't resolvable, omit cwd and let npx handle it
}
this.serverProcess = spawn(command, args, {
stdio: ['ignore', 'pipe', 'pipe'],
detached: !isWindows, // Don't detach on Windows (no process groups)
windowsHide: true, // Hide console window on Windows
env: spawnEnv,
...(spawnCwd && { cwd: spawnCwd })
});
// Log server output for debugging
this.serverProcess.stdout?.on('data', (data) => {
const msg = data.toString().trim();
if (msg) {
logger.debug('CHROMA_SERVER', msg);
}
});
this.serverProcess.stderr?.on('data', (data) => {
const msg = data.toString().trim();
if (msg) {
// Filter out noisy startup messages
if (!msg.includes('Chroma') || msg.includes('error') || msg.includes('Error')) {
logger.debug('CHROMA_SERVER', msg);
}
}
});
this.serverProcess.on('error', (err) => {
logger.error('CHROMA_SERVER', 'Server process error', {}, err);
this.ready = false;
this.starting = false;
});
this.serverProcess.on('exit', (code, signal) => {
logger.info('CHROMA_SERVER', 'Server process exited', { code, signal });
this.ready = false;
this.starting = false;
this.serverProcess = null;
});
return this.waitForReady(timeoutMs);
}
/**
* Wait for the server to become ready
* Polls the heartbeat endpoint until success or timeout
*/
async waitForReady(timeoutMs: number = 60000): Promise<boolean> {
if (this.ready) {
return true;
}
const startTime = Date.now();
const checkInterval = 500;
logger.info('CHROMA_SERVER', 'Waiting for server to be ready', {
host: this.config.host,
port: this.config.port,
timeoutMs
});
while (Date.now() - startTime < timeoutMs) {
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`
);
if (response.ok) {
this.ready = true;
this.starting = false;
logger.info('CHROMA_SERVER', 'Server ready', {
host: this.config.host,
port: this.config.port,
startupTimeMs: Date.now() - startTime
});
return true;
}
} catch {
// Server not ready yet, continue polling
}
await new Promise(resolve => setTimeout(resolve, checkInterval));
}
this.starting = false;
logger.error('CHROMA_SERVER', 'Server failed to start within timeout', {
timeoutMs,
elapsedMs: Date.now() - startTime
});
return false;
}
/**
* Check if the server is running and ready
* Returns true if we manage the process OR if a server is responding
*/
isRunning(): boolean {
return this.ready;
}
/**
* Async check if server is running by pinging heartbeat
* Use this when you need to verify server is actually reachable
*/
async isServerReachable(): Promise<boolean> {
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`
);
if (response.ok) {
this.ready = true;
return true;
}
} catch {
// Server not reachable
}
return false;
}
/**
* Get the server URL for client connections
*/
getUrl(): string {
return `http://${this.config.host}:${this.config.port}`;
}
/**
* Get the server configuration
*/
getConfig(): ChromaServerConfig {
return { ...this.config };
}
/**
* Stop the Chroma server
* Gracefully terminates the server process
*/
async stop(): Promise<void> {
if (!this.serverProcess) {
logger.debug('CHROMA_SERVER', 'No server process to stop');
return;
}
logger.info('CHROMA_SERVER', 'Stopping server', { pid: this.serverProcess.pid });
return new Promise((resolve) => {
const proc = this.serverProcess!;
const pid = proc.pid;
const cleanup = () => {
this.serverProcess = null;
this.ready = false;
this.starting = false;
this.startPromise = null;
logger.info('CHROMA_SERVER', 'Server stopped', { pid });
resolve();
};
// Set up exit handler
proc.once('exit', cleanup);
// Cross-platform graceful shutdown
if (process.platform === 'win32') {
// Windows: just send SIGTERM
proc.kill('SIGTERM');
} else {
// Unix: kill the process group to ensure all children are killed
if (pid !== undefined) {
try {
process.kill(-pid, 'SIGTERM');
} catch (err) {
// Process group kill failed, try direct kill
proc.kill('SIGTERM');
}
} else {
proc.kill('SIGTERM');
}
}
// Force kill after timeout if still running
setTimeout(() => {
if (this.serverProcess) {
logger.warn('CHROMA_SERVER', 'Force killing server after timeout', { pid });
try {
proc.kill('SIGKILL');
} catch {
// Already dead
}
cleanup();
}
}, 5000);
});
}
/**
* Get or create combined SSL certificate bundle for Zscaler/corporate proxy environments.
* This ports previous MCP SSL handling so local `npx chroma run` works behind enterprise proxies.
*/
private getCombinedCertPath(): string | undefined {
const combinedCertPath = path.join(os.homedir(), '.claude-mem', 'combined_certs.pem');
if (fs.existsSync(combinedCertPath)) {
const stats = fs.statSync(combinedCertPath);
const ageMs = Date.now() - stats.mtimeMs;
if (ageMs < 24 * 60 * 60 * 1000) {
return combinedCertPath;
}
}
if (process.platform !== 'darwin') {
return undefined;
}
try {
let certifiPath: string | undefined;
try {
certifiPath = execSync(
'uvx --with certifi python -c "import certifi; print(certifi.where())"',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 10000 }
).trim();
} catch {
return undefined;
}
if (!certifiPath || !fs.existsSync(certifiPath)) {
return undefined;
}
let zscalerCert = '';
try {
zscalerCert = execSync(
'security find-certificate -a -c "Zscaler" -p /Library/Keychains/System.keychain',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 5000 }
);
} catch {
return undefined;
}
if (!zscalerCert ||
!zscalerCert.includes('-----BEGIN CERTIFICATE-----') ||
!zscalerCert.includes('-----END CERTIFICATE-----')) {
return undefined;
}
const certifiContent = fs.readFileSync(certifiPath, 'utf8');
const tempPath = combinedCertPath + '.tmp';
fs.writeFileSync(tempPath, certifiContent + '\n' + zscalerCert);
fs.renameSync(tempPath, combinedCertPath);
logger.info('CHROMA_SERVER', 'Created combined SSL certificate bundle for Zscaler', {
path: combinedCertPath
});
return combinedCertPath;
} catch (error) {
logger.debug('CHROMA_SERVER', 'Could not create combined cert bundle', {}, error as Error);
return undefined;
}
}
/**
* Build subprocess env and preserve Zscaler compatibility from previous architecture.
*/
private getSpawnEnv(): NodeJS.ProcessEnv {
const combinedCertPath = this.getCombinedCertPath();
if (!combinedCertPath) {
return process.env;
}
logger.info('CHROMA_SERVER', 'Using combined SSL certificates for enterprise compatibility', {
certPath: combinedCertPath
});
return {
...process.env,
SSL_CERT_FILE: combinedCertPath,
REQUESTS_CA_BUNDLE: combinedCertPath,
CURL_CA_BUNDLE: combinedCertPath,
NODE_EXTRA_CA_CERTS: combinedCertPath
};
}
/**
* Reset the singleton instance (for testing)
*/
static reset(): void {
if (ChromaServerManager.instance) {
// Don't await - just trigger stop
ChromaServerManager.instance.stop().catch(() => {});
}
ChromaServerManager.instance = null;
}
}
+182 -266
View File
@@ -1,26 +1,21 @@
/**
* ChromaSync Service
*
* Automatically syncs observations and session summaries to ChromaDB via HTTP.
* Automatically syncs observations and session summaries to ChromaDB via MCP.
* This service provides real-time semantic search capabilities by maintaining
* a vector database synchronized with SQLite.
*
* Uses the chromadb npm package's built-in ChromaClient for HTTP connections.
* Supports both local server (managed by ChromaServerManager) and remote/cloud
* servers for future claude-mem pro features.
* Uses ChromaMcpManager to communicate with chroma-mcp over stdio MCP protocol.
* The chroma-mcp server handles its own embedding and persistent storage,
* eliminating the need for chromadb npm package and ONNX/WASM dependencies.
*
* Design: Fail-fast with no fallbacks - if Chroma is unavailable, syncing fails.
*/
import { ChromaClient, Collection } from 'chromadb';
import { ChromaMcpManager } from './ChromaMcpManager.js';
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
import { SessionStore } from '../sqlite/SessionStore.js';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import { ChromaServerManager } from './ChromaServerManager.js';
import path from 'path';
import os from 'os';
interface ChromaDocument {
id: string;
@@ -75,157 +70,49 @@ interface StoredUserPrompt {
}
export class ChromaSync {
private chromaClient: ChromaClient | null = null;
private collection: Collection | null = null;
private project: string;
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private collectionCreated = false;
private readonly BATCH_SIZE = 100;
private modelCacheCorruptionRetried = false;
constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
// Chroma collection names only allow [a-zA-Z0-9._-], 3-512 chars,
// must start/end with [a-zA-Z0-9]
const sanitized = project
.replace(/[^a-zA-Z0-9._-]/g, '_')
.replace(/[^a-zA-Z0-9]+$/, ''); // strip trailing non-alphanumeric
this.collectionName = `cm__${sanitized || 'unknown'}`;
}
/**
* Ensure HTTP client is connected to Chroma server
* In local mode, verifies ChromaServerManager has started the server
* In remote mode, connects directly to configured host
* Throws error if connection fails
* Ensure collection exists in Chroma via MCP.
* chroma_create_collection is idempotent - safe to call multiple times.
* Uses collectionCreated flag to avoid redundant calls within a session.
*/
private async ensureConnection(): Promise<void> {
if (this.chromaClient) {
private async ensureCollectionExists(): Promise<void> {
if (this.collectionCreated) {
return;
}
logger.info('CHROMA_SYNC', 'Connecting to Chroma HTTP server...', { project: this.project });
const chromaMcp = ChromaMcpManager.getInstance();
try {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const mode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
const host = settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1';
const port = parseInt(settings.CLAUDE_MEM_CHROMA_PORT || '8000', 10);
const ssl = settings.CLAUDE_MEM_CHROMA_SSL === 'true';
// Multi-tenancy settings (used in remote/pro mode)
const tenant = settings.CLAUDE_MEM_CHROMA_TENANT || 'default_tenant';
const database = settings.CLAUDE_MEM_CHROMA_DATABASE || 'default_database';
const apiKey = settings.CLAUDE_MEM_CHROMA_API_KEY || '';
// In local mode, verify server is reachable
if (mode === 'local') {
const serverManager = ChromaServerManager.getInstance();
const reachable = await serverManager.isServerReachable();
if (!reachable) {
throw new Error('Chroma server not reachable. Ensure worker started correctly.');
}
}
// Create HTTP client
const protocol = ssl ? 'https' : 'http';
const chromaPath = `${protocol}://${host}:${port}`;
// Build client options
const clientOptions: { path: string; tenant?: string; database?: string; headers?: Record<string, string> } = {
path: chromaPath
};
// In remote mode, use tenant isolation for pro users
if (mode === 'remote') {
clientOptions.tenant = tenant;
clientOptions.database = database;
// Add API key header if configured
if (apiKey) {
clientOptions.headers = {
'Authorization': `Bearer ${apiKey}`
};
}
logger.info('CHROMA_SYNC', 'Connecting with tenant isolation', {
tenant,
database,
hasApiKey: !!apiKey
});
}
this.chromaClient = new ChromaClient(clientOptions);
// Verify connection with heartbeat
await this.chromaClient.heartbeat();
logger.info('CHROMA_SYNC', 'Connected to Chroma HTTP server', {
project: this.project,
host,
port,
ssl,
mode,
tenant: mode === 'remote' ? tenant : 'default_tenant'
await chromaMcp.callTool('chroma_create_collection', {
collection_name: this.collectionName
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma HTTP server', { project: this.project }, error as Error);
this.chromaClient = null;
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Ensure collection exists, create if needed
* Throws error if collection creation fails
*/
private async ensureCollection(): Promise<void> {
await this.ensureConnection();
if (this.collection) {
return;
}
if (!this.chromaClient) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
try {
// Store model cache outside node_modules so reinstalls don't corrupt it
const { env } = await import('@huggingface/transformers');
env.cacheDir = path.join(os.homedir(), '.claude-mem', 'models');
// Use WASM backend to avoid native ONNX binary issues (#1104, #1105, #1110).
// Same model (all-MiniLM-L6-v2), same embeddings, but runs in WASM —
// no native binary loading, no segfaults, no ENOENT errors.
const { DefaultEmbeddingFunction } = await import('@chroma-core/default-embed');
const embeddingFunction = new DefaultEmbeddingFunction({ wasm: true });
this.collection = await this.chromaClient.getOrCreateCollection({
name: this.collectionName,
embeddingFunction
});
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Self-heal: corrupted model cache → clear and retry once
if (errorMessage.includes('Protobuf parsing failed') && !this.modelCacheCorruptionRetried) {
this.modelCacheCorruptionRetried = true;
logger.warn('CHROMA_SYNC', 'Corrupted model cache detected, clearing and retrying...');
const modelCacheDir = path.join(os.homedir(), '.claude-mem', 'models');
const fs = await import('fs');
if (fs.existsSync(modelCacheDir)) {
fs.rmSync(modelCacheDir, { recursive: true, force: true });
}
return this.ensureCollection(); // retry once
const message = error instanceof Error ? error.message : String(error);
if (!message.includes('already exists')) {
throw error;
}
logger.error('CHROMA_SYNC', 'Failed to get/create collection', { collection: this.collectionName }, error as Error);
throw new Error(`Collection setup failed: ${errorMessage}`);
// Collection already exists - this is the expected path after first creation
}
this.collectionCreated = true;
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
}
/**
@@ -364,7 +251,7 @@ export class ChromaSync {
}
/**
* Add documents to Chroma in batch
* Add documents to Chroma in batch via MCP
* Throws error if batch add fails
*/
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
@@ -372,33 +259,26 @@ export class ChromaSync {
return;
}
await this.ensureCollection();
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
const chromaMcp = ChromaMcpManager.getInstance();
// Add in batches
for (let i = 0; i < documents.length; i += this.BATCH_SIZE) {
const batch = documents.slice(i, i + this.BATCH_SIZE);
await chromaMcp.callTool('chroma_add_documents', {
collection_name: this.collectionName,
ids: batch.map(d => d.id),
documents: batch.map(d => d.document),
metadatas: batch.map(d => d.metadata)
});
}
try {
await this.collection.add({
ids: documents.map(d => d.id),
documents: documents.map(d => d.document),
metadatas: documents.map(d => d.metadata)
});
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to add documents', {
collection: this.collectionName,
count: documents.length
}, error as Error);
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
}
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
}
/**
@@ -540,22 +420,18 @@ export class ChromaSync {
}
/**
* Fetch all existing document IDs from Chroma collection
* Fetch all existing document IDs from Chroma collection via MCP
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(): Promise<{
private async getExistingChromaIds(projectOverride?: string): Promise<{
observations: Set<number>;
summaries: Set<number>;
prompts: Set<number>;
}> {
await this.ensureCollection();
const targetProject = projectOverride ?? this.project;
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
}
const chromaMcp = ChromaMcpManager.getInstance();
const observationIds = new Set<number>();
const summaryIds = new Set<number>();
@@ -564,52 +440,49 @@ export class ChromaSync {
let offset = 0;
const limit = 1000; // Large batches, metadata only = fast
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: this.project });
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: targetProject });
while (true) {
try {
const result = await this.collection.get({
limit,
offset,
where: { project: this.project },
include: ['metadatas']
});
const result = await chromaMcp.callTool('chroma_get_documents', {
collection_name: this.collectionName,
limit: limit,
offset: offset,
where: { project: targetProject },
include: ['metadatas']
}) as any;
const metadatas = result.metadatas || [];
// chroma_get_documents returns flat arrays: { ids, metadatas, documents }
const metadatas = result?.metadatas || [];
if (metadatas.length === 0) {
break; // No more documents
}
if (metadatas.length === 0) {
break; // No more documents
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: this.project,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: this.project }, error as Error);
throw error;
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: targetProject,
offset,
batchSize: metadatas.length
});
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
project: this.project,
project: targetProject,
observations: observationIds.size,
summaries: summaryIds.size,
prompts: promptIds.size
@@ -621,21 +494,25 @@ export class ChromaSync {
/**
* Backfill: Sync all observations missing from Chroma
* Reads from SQLite and syncs in batches
* @param projectOverride - If provided, backfill this project instead of this.project.
* Used by backfillAllProjects() to iterate projects without mutating instance state.
* Throws error if backfill fails
*/
async ensureBackfilled(): Promise<void> {
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: this.project });
async ensureBackfilled(projectOverride?: string): Promise<void> {
const backfillProject = projectOverride ?? this.project;
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: backfillProject });
await this.ensureCollection();
await this.ensureCollectionExists();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds();
const existing = await this.getExistingChromaIds(backfillProject);
const db = new SessionStore();
try {
// Build exclusion list for observations
const existingObsIds = Array.from(existing.observations);
// Filter to validated positive integers before interpolating into SQL
const existingObsIds = Array.from(existing.observations).filter(id => Number.isInteger(id) && id > 0);
const obsExclusionClause = existingObsIds.length > 0
? `AND id NOT IN (${existingObsIds.join(',')})`
: '';
@@ -645,14 +522,14 @@ export class ChromaSync {
SELECT * FROM observations
WHERE project = ? ${obsExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredObservation[];
`).all(backfillProject) as StoredObservation[];
const totalObsCount = db.db.prepare(`
SELECT COUNT(*) as count FROM observations WHERE project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling observations', {
project: this.project,
project: backfillProject,
missing: observations.length,
existing: existing.observations.size,
total: totalObsCount.count
@@ -670,13 +547,13 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
});
}
// Build exclusion list for summaries
const existingSummaryIds = Array.from(existing.summaries);
const existingSummaryIds = Array.from(existing.summaries).filter(id => Number.isInteger(id) && id > 0);
const summaryExclusionClause = existingSummaryIds.length > 0
? `AND id NOT IN (${existingSummaryIds.join(',')})`
: '';
@@ -686,14 +563,14 @@ export class ChromaSync {
SELECT * FROM session_summaries
WHERE project = ? ${summaryExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredSummary[];
`).all(backfillProject) as StoredSummary[];
const totalSummaryCount = db.db.prepare(`
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
project: this.project,
project: backfillProject,
missing: summaries.length,
existing: existing.summaries.size,
total: totalSummaryCount.count
@@ -711,13 +588,13 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
});
}
// Build exclusion list for prompts
const existingPromptIds = Array.from(existing.prompts);
const existingPromptIds = Array.from(existing.prompts).filter(id => Number.isInteger(id) && id > 0);
const promptExclusionClause = existingPromptIds.length > 0
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
: '';
@@ -732,17 +609,17 @@ export class ChromaSync {
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE s.project = ? ${promptExclusionClause}
ORDER BY up.id ASC
`).all(this.project) as StoredUserPrompt[];
`).all(backfillProject) as StoredUserPrompt[];
const totalPromptCount = db.db.prepare(`
SELECT COUNT(*) as count
FROM user_prompts up
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE s.project = ?
`).get(this.project) as { count: number };
`).get(backfillProject) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
project: this.project,
project: backfillProject,
missing: prompts.length,
existing: existing.prompts.size,
total: totalPromptCount.count
@@ -760,13 +637,13 @@ export class ChromaSync {
await this.addDocuments(batch);
logger.debug('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
project: backfillProject,
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
});
}
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
project: this.project,
project: backfillProject,
synced: {
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length,
@@ -780,7 +657,7 @@ export class ChromaSync {
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Backfill failed', { project: this.project }, error as Error);
logger.error('CHROMA_SYNC', 'Backfill failed', { project: backfillProject }, error as Error);
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
} finally {
db.close();
@@ -788,7 +665,7 @@ export class ChromaSync {
}
/**
* Query Chroma collection for semantic search
* Query Chroma collection for semantic search via MCP
* Used by SearchManager for vector-based search
*/
async queryChroma(
@@ -796,27 +673,34 @@ export class ChromaSync {
limit: number,
whereFilter?: Record<string, any>
): Promise<{ ids: number[]; distances: number[]; metadatas: any[] }> {
await this.ensureCollection();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
}
await this.ensureCollectionExists();
try {
const results = await this.collection.query({
queryTexts: [query],
nResults: limit,
where: whereFilter,
const chromaMcp = ChromaMcpManager.getInstance();
const results = await chromaMcp.callTool('chroma_query_documents', {
collection_name: this.collectionName,
query_texts: [query],
n_results: limit,
...(whereFilter && { where: whereFilter }),
include: ['documents', 'metadatas', 'distances']
});
}) as any;
// Extract unique SQLite IDs from document IDs
// chroma_query_documents returns nested arrays (one per query text)
// We always pass a single query text, so we access [0]
const ids: number[] = [];
const docIds = results.ids?.[0] || [];
for (const docId of docIds) {
const seen = new Set<number>();
const docIds = results?.ids?.[0] || [];
const rawMetadatas = results?.metadatas?.[0] || [];
const rawDistances = results?.distances?.[0] || [];
// Build deduplicated arrays that stay index-aligned:
// Multiple Chroma docs map to the same SQLite ID (one per field).
// Keep the first (best-ranked) distance and metadata per SQLite ID.
const metadatas: any[] = [];
const distances: number[] = [];
for (let i = 0; i < docIds.length; i++) {
const docId = docIds[i];
// Extract sqlite_id from document ID (supports three formats):
// - obs_{id}_narrative, obs_{id}_fact_0, etc (observations)
// - summary_{id}_request, summary_{id}_learned, etc (session summaries)
@@ -834,16 +718,15 @@ export class ChromaSync {
sqliteId = parseInt(promptMatch[1], 10);
}
if (sqliteId !== null && !ids.includes(sqliteId)) {
if (sqliteId !== null && !seen.has(sqliteId)) {
seen.add(sqliteId);
ids.push(sqliteId);
metadatas.push(rawMetadatas[i] ?? null);
distances.push(rawDistances[i] ?? 0);
}
}
return {
ids,
distances: results.distances?.[0] || [],
metadatas: results.metadatas?.[0] || []
};
return { ids, distances, metadatas };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -851,12 +734,13 @@ export class ChromaSync {
const isConnectionError =
errorMessage.includes('ECONNREFUSED') ||
errorMessage.includes('ENOTFOUND') ||
errorMessage.includes('fetch failed');
errorMessage.includes('fetch failed') ||
errorMessage.includes('subprocess closed') ||
errorMessage.includes('timed out');
if (isConnectionError) {
// Reset connection state so next call attempts reconnect
this.chromaClient = null;
this.collection = null;
// Reset collection state so next call attempts reconnect
this.collectionCreated = false;
logger.error('CHROMA_SYNC', 'Connection lost during query',
{ project: this.project, query }, error as Error);
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
@@ -868,13 +752,45 @@ export class ChromaSync {
}
/**
* Close the Chroma client connection
* Server lifecycle is managed by ChromaServerManager, not here
* Backfill all projects that have observations in SQLite but may be missing from Chroma.
* Uses a single shared ChromaSync('claude-mem') instance and Chroma connection.
* Per-project scoping is passed as a parameter to ensureBackfilled(), avoiding
* instance state mutation. All documents land in the cm__claude-mem collection
* with project scoped via metadata, matching how DatabaseManager and SearchManager operate.
* Designed to be called fire-and-forget on worker startup.
*/
static async backfillAllProjects(): Promise<void> {
const db = new SessionStore();
const sync = new ChromaSync('claude-mem');
try {
const projects = db.db.prepare(
'SELECT DISTINCT project FROM observations WHERE project IS NOT NULL AND project != ?'
).all('') as { project: string }[];
logger.info('CHROMA_SYNC', `Backfill check for ${projects.length} projects`);
for (const { project } of projects) {
try {
await sync.ensureBackfilled(project);
} catch (error) {
logger.error('CHROMA_SYNC', `Backfill failed for project: ${project}`, {}, error as Error);
// Continue to next project — don't let one failure stop others
}
}
} finally {
await sync.close();
db.close();
}
}
/**
* Close the ChromaSync instance
* ChromaMcpManager is a singleton and manages its own lifecycle
* We don't close it here - it's closed during graceful shutdown
*/
async close(): Promise<void> {
// Just clear references - server lifecycle managed by ChromaServerManager
this.chromaClient = null;
this.collection = null;
logger.info('CHROMA_SYNC', 'Chroma client closed', { project: this.project });
// ChromaMcpManager is a singleton and manages its own lifecycle
// We don't close it here - it's closed during graceful shutdown
logger.info('CHROMA_SYNC', 'ChromaSync closed', { project: this.project });
}
}
+95 -37
View File
@@ -18,7 +18,8 @@ import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
import { getAuthMethodDescription } from '../shared/EnvManager.js';
import { logger } from '../utils/logger.js';
import { ChromaServerManager } from './sync/ChromaServerManager.js';
import { ChromaMcpManager } from './sync/ChromaMcpManager.js';
import { ChromaSync } from './sync/ChromaSync.js';
// Windows: avoid repeated spawn popups when startup fails (issue #921)
const WINDOWS_SPAWN_COOLDOWN_MS = 2 * 60 * 1000;
@@ -68,14 +69,17 @@ import {
readPidFile,
removePidFile,
getPlatformTimeout,
cleanupOrphanedProcesses,
aggressiveStartupCleanup,
runOneTimeChromaMigration,
cleanStalePidFile,
isProcessAlive,
spawnDaemon,
createSignalHandler
} from './infrastructure/ProcessManager.js';
import {
isPortInUse,
waitForHealth,
waitForReadiness,
waitForPortFree,
httpShutdown,
checkVersionMatch
@@ -115,7 +119,7 @@ import { LogsRoutes } from './worker/http/routes/LogsRoutes.js';
import { MemoryRoutes } from './worker/http/routes/MemoryRoutes.js';
// Process management for zombie cleanup (Issue #737)
import { startOrphanReaper, reapOrphanedProcesses } from './worker/ProcessRegistry.js';
import { startOrphanReaper, reapOrphanedProcesses, getProcessBySession, ensureProcessExit } from './worker/ProcessRegistry.js';
/**
* Build JSON status output for hook framework communication.
@@ -165,8 +169,8 @@ export class WorkerService {
// Route handlers
private searchRoutes: SearchRoutes | null = null;
// Chroma server (local mode)
private chromaServer: ChromaServerManager | null = null;
// Chroma MCP manager (lazy - connects on first use)
private chromaMcpManager: ChromaMcpManager | null = null;
// Initialization tracking
private initializationComplete: Promise<void>;
@@ -175,6 +179,9 @@ export class WorkerService {
// Orphan reaper cleanup function (Issue #737)
private stopOrphanReaper: (() => void) | null = null;
// Stale session reaper interval (Issue #1168)
private staleSessionReaperInterval: ReturnType<typeof setInterval> | null = null;
// AI interaction tracking for health endpoint
private lastAiInteraction: {
timestamp: number;
@@ -363,38 +370,25 @@ export class WorkerService {
*/
private async initializeBackground(): Promise<void> {
try {
await cleanupOrphanedProcesses();
await aggressiveStartupCleanup();
// Load mode configuration
const { ModeManager } = await import('./domain/ModeManager.js');
const { SettingsDefaultsManager } = await import('../shared/SettingsDefaultsManager.js');
const { USER_SETTINGS_PATH } = await import('../shared/paths.js');
const os = await import('os');
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
// Start Chroma server if in local mode
const chromaMode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
if (chromaMode === 'local') {
logger.info('SYSTEM', 'Starting local Chroma server...');
this.chromaServer = ChromaServerManager.getInstance({
dataDir: path.join(os.homedir(), '.claude-mem', 'vector-db'),
host: settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1',
port: parseInt(settings.CLAUDE_MEM_CHROMA_PORT || '8000', 10)
});
const ready = await this.chromaServer.start(60000);
if (ready) {
logger.success('SYSTEM', 'Chroma server ready');
} else {
logger.warn('SYSTEM', 'Chroma server failed to start - vector search disabled');
this.chromaServer = null;
}
} else {
logger.info('SYSTEM', 'Chroma remote mode - skipping local server');
// One-time chroma wipe for users upgrading from versions with duplicate worker bugs.
// Only runs in local mode (chroma is local-only). Backfill at line ~414 rebuilds from SQLite.
if (settings.CLAUDE_MEM_MODE === 'local' || !settings.CLAUDE_MEM_MODE) {
runOneTimeChromaMigration();
}
// Initialize ChromaMcpManager (lazy - connects on first use via ChromaSync)
this.chromaMcpManager = ChromaMcpManager.getInstance();
logger.info('SYSTEM', 'ChromaMcpManager initialized (lazy - connects on first use)');
const modeId = settings.CLAUDE_MEM_MODE;
ModeManager.getInstance().loadMode(modeId);
logger.info('SYSTEM', `Mode loaded: ${modeId}`);
@@ -423,6 +417,22 @@ export class WorkerService {
this.server.registerRoutes(this.searchRoutes);
logger.info('WORKER', 'SearchManager initialized and search routes registered');
// DB and search are ready — mark initialization complete so hooks can proceed.
// MCP connection is tracked separately via mcpReady and is NOT required for
// the worker to serve context/search requests.
this.initializationCompleteFlag = true;
this.resolveInitialization();
logger.info('SYSTEM', 'Core initialization complete (DB + search ready)');
// Auto-backfill Chroma for all projects if out of sync with SQLite (fire-and-forget)
if (this.chromaMcpManager) {
ChromaSync.backfillAllProjects().then(() => {
logger.info('CHROMA_SYNC', 'Backfill check complete for all projects');
}).catch(error => {
logger.error('CHROMA_SYNC', 'Backfill failed (non-blocking)', {}, error as Error);
});
}
// Connect to MCP server
const mcpServerPath = path.join(__dirname, 'mcp-server.cjs');
const transport = new StdioClientTransport({
@@ -439,11 +449,7 @@ export class WorkerService {
await Promise.race([mcpConnectionPromise, timeoutPromise]);
this.mcpReady = true;
logger.success('WORKER', 'Connected to MCP server');
this.initializationCompleteFlag = true;
this.resolveInitialization();
logger.info('SYSTEM', 'Background initialization complete');
logger.success('WORKER', 'MCP server connected');
// Start orphan reaper to clean up zombie processes (Issue #737)
this.stopOrphanReaper = startOrphanReaper(() => {
@@ -455,6 +461,18 @@ export class WorkerService {
});
logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)');
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
this.staleSessionReaperInterval = setInterval(async () => {
try {
const reaped = await this.sessionManager.reapStaleSessions();
if (reaped > 0) {
logger.info('SYSTEM', `Reaped ${reaped} stale sessions`);
}
} catch (e) {
logger.error('SYSTEM', 'Stale session reaper error', { error: e instanceof Error ? e.message : String(e) });
}
}, 2 * 60 * 1000);
// Auto-recover orphaned queues (fire-and-forget with error logging)
this.processPendingQueues(50).then(result => {
if (result.sessionsStarted > 0) {
@@ -583,7 +601,13 @@ export class WorkerService {
};
throw error;
})
.finally(() => {
.finally(async () => {
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
const trackedProcess = getProcessBySession(session.sessionDbId);
if (trackedProcess && !trackedProcess.process.killed && trackedProcess.process.exitCode === null) {
await ensureProcessExit(trackedProcess, 5000);
}
session.generatorPromise = null;
// Record successful AI interaction if no error occurred
@@ -604,17 +628,16 @@ export class WorkerService {
return;
}
// Shared store for idle-reset and pending-count checks below
// Store for pending-count check below
const { PendingMessageStore } = require('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
// Idle timeout means no new work arrived for 3 minutes - don't restart
// No need to reset stale processing messages here — claimNextMessage() self-heals
if (session.idleTimedOut) {
logger.info('SYSTEM', 'Generator exited due to idle timeout, not restarting', {
sessionId: session.sessionDbId
});
// Reset stale processing messages so they can be picked up later
pendingStore.resetStaleProcessingMessages(0, session.sessionDbId); // Reset this session's messages only
session.idleTimedOut = false; // Reset flag
this.broadcastProcessingStatus();
return;
@@ -814,12 +837,18 @@ export class WorkerService {
this.stopOrphanReaper = null;
}
// Stop stale session reaper (Issue #1168)
if (this.staleSessionReaperInterval) {
clearInterval(this.staleSessionReaperInterval);
this.staleSessionReaperInterval = null;
}
await performGracefulShutdown({
server: this.server.getHttpServer(),
sessionManager: this.sessionManager,
mcpClient: this.mcpClient,
dbManager: this.dbManager,
chromaServer: this.chromaServer || undefined
chromaMcpManager: this.chromaMcpManager || undefined
});
}
@@ -920,6 +949,13 @@ async function ensureWorkerStarted(port: number): Promise<boolean> {
return false;
}
// Health passed (HTTP listening). Now wait for DB + search initialization
// so hooks that run immediately after can actually use the worker.
const ready = await waitForReadiness(port, getPlatformTimeout(HOOK_TIMEOUTS.READINESS_WAIT));
if (!ready) {
logger.warn('SYSTEM', 'Worker is alive but readiness timed out — proceeding anyway');
}
clearWorkerSpawnAttempted();
logger.info('SYSTEM', 'Worker started successfully');
return true;
@@ -1080,6 +1116,28 @@ async function main() {
case '--daemon':
default: {
// GUARD 1: Refuse to start if another worker is already alive (PID check).
// Instant check (kill -0) — no HTTP dependency.
const existingPidInfo = readPidFile();
if (existingPidInfo && isProcessAlive(existingPidInfo.pid)) {
logger.info('SYSTEM', 'Worker already running (PID alive), refusing to start duplicate', {
existingPid: existingPidInfo.pid,
existingPort: existingPidInfo.port,
startedAt: existingPidInfo.startedAt
});
process.exit(0);
}
// GUARD 2: Refuse to start if the port is already bound.
// Catches the race where two daemons start simultaneously before
// either writes a PID file. Must run BEFORE constructing WorkerService
// because the constructor registers signal handlers and timers that
// prevent the process from exiting even if listen() fails later.
if (await isPortInUse(port)) {
logger.info('SYSTEM', 'Port already in use, refusing to start duplicate', { port });
process.exit(0);
}
// Prevent daemon from dying silently on unhandled errors.
// The HTTP server can continue serving even if a background task throws.
process.on('unhandledRejection', (reason) => {
+1 -1
View File
@@ -37,7 +37,7 @@ export class DatabaseManager {
* Close database connection and cleanup all resources
*/
async close(): Promise<void> {
// Close ChromaSync first (terminates uvx/python processes)
// Close ChromaSync first (MCP connection lifecycle managed by ChromaMcpManager)
if (this.chromaSync) {
await this.chromaSync.close();
this.chromaSync = null;
+33
View File
@@ -341,6 +341,39 @@ export class SessionManager {
}
}
private static readonly MAX_SESSION_IDLE_MS = 15 * 60 * 1000; // 15 minutes
/**
* Reap sessions with no active generator and no pending work that have been idle too long.
* This unblocks the orphan reaper which skips processes for "active" sessions. (Issue #1168)
*/
async reapStaleSessions(): Promise<number> {
const now = Date.now();
const staleSessionIds: number[] = [];
for (const [sessionDbId, session] of this.sessions) {
// Skip sessions with active generators
if (session.generatorPromise) continue;
// Skip sessions with pending work
const pendingCount = this.getPendingStore().getPendingCount(sessionDbId);
if (pendingCount > 0) continue;
// No generator + no pending work + old enough = stale
const sessionAge = now - session.startTime;
if (sessionAge > SessionManager.MAX_SESSION_IDLE_MS) {
staleSessionIds.push(sessionDbId);
}
}
for (const sessionDbId of staleSessionIds) {
logger.warn('SESSION', `Reaping stale session ${sessionDbId} (no activity for >${Math.round(SessionManager.MAX_SESSION_IDLE_MS / 60000)}m)`, { sessionDbId });
await this.deleteSession(sessionDbId);
}
return staleSessionIds.length;
}
/**
* Shutdown all active sessions
*/
@@ -21,6 +21,7 @@ import { SessionCompletionHandler } from '../../session/SessionCompletionHandler
import { PrivacyCheckValidator } from '../../validation/PrivacyCheckValidator.js';
import { SettingsDefaultsManager } from '../../../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../../../shared/paths.js';
import { getProcessBySession, ensureProcessExit } from '../../ProcessRegistry.js';
export class SessionRoutes extends BaseRouteHandler {
private completionHandler: SessionCompletionHandler;
@@ -184,7 +185,13 @@ export class SessionRoutes extends BaseRouteHandler {
}, dbError as Error);
}
})
.finally(() => {
.finally(async () => {
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
const tracked = getProcessBySession(session.sessionDbId);
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
await ensureProcessExit(tracked, 5000);
}
const sessionDbId = session.sessionDbId;
this.spawnInProgress.delete(sessionDbId);
const wasAborted = session.abortController.signal.aborted;
+1 -1
View File
@@ -118,7 +118,7 @@ export class SettingsDefaultsManager {
CLAUDE_MEM_EXCLUDED_PROJECTS: '', // Comma-separated glob patterns for excluded project paths
CLAUDE_MEM_FOLDER_MD_EXCLUDE: '[]', // JSON array of folder paths to exclude from CLAUDE.md generation
// Chroma Vector Database Configuration
CLAUDE_MEM_CHROMA_MODE: 'local', // 'local' starts npx chroma run, 'remote' connects to existing server
CLAUDE_MEM_CHROMA_MODE: 'local', // 'local' uses persistent chroma-mcp via uvx, 'remote' connects to existing server
CLAUDE_MEM_CHROMA_HOST: '127.0.0.1',
CLAUDE_MEM_CHROMA_PORT: '8000',
CLAUDE_MEM_CHROMA_SSL: 'false',
+1
View File
@@ -2,6 +2,7 @@ export const HOOK_TIMEOUTS = {
DEFAULT: 300000, // Standard HTTP timeout (5 min for slow systems)
HEALTH_CHECK: 3000, // Worker health check (3s — healthy worker responds in <100ms)
POST_SPAWN_WAIT: 5000, // Wait for daemon to start after spawn (starts in <1s on Linux)
READINESS_WAIT: 30000, // Wait for DB + search init after spawn (typically <5s)
PORT_IN_USE_WAIT: 3000, // Wait when port occupied but health failing
WORKER_STARTUP_WAIT: 1000,
PRE_RESTART_SETTLE_DELAY: 2000, // Give files time to sync before restart
+1 -1
View File
@@ -15,7 +15,7 @@ export enum LogLevel {
SILENT = 4
}
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'FOLDER_INDEX' | 'CLAUDE_MD';
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE';
interface LogContext {
sessionId?: number;
@@ -87,9 +87,9 @@ describe('GracefulShutdown', () => {
})
};
const mockChromaServer = {
const mockChromaMcpManager = {
stop: mock(async () => {
callOrder.push('chromaServer.stop');
callOrder.push('chromaMcpManager.stop');
})
};
@@ -102,7 +102,7 @@ describe('GracefulShutdown', () => {
sessionManager: mockSessionManager,
mcpClient: mockMcpClient,
dbManager: mockDbManager,
chromaServer: mockChromaServer
chromaMcpManager: mockChromaMcpManager
};
await performGracefulShutdown(config);
@@ -112,7 +112,7 @@ describe('GracefulShutdown', () => {
expect(callOrder).toContain('serverClose');
expect(callOrder).toContain('sessionManager.shutdownAll');
expect(callOrder).toContain('mcpClient.close');
expect(callOrder).toContain('chromaServer.stop');
expect(callOrder).toContain('chromaMcpManager.stop');
expect(callOrder).toContain('dbManager.close');
// Verify server closes before session manager
@@ -125,7 +125,7 @@ describe('GracefulShutdown', () => {
expect(callOrder.indexOf('mcpClient.close')).toBeLessThan(callOrder.indexOf('dbManager.close'));
// Verify Chroma stops before DB closes
expect(callOrder.indexOf('chromaServer.stop')).toBeLessThan(callOrder.indexOf('dbManager.close'));
expect(callOrder.indexOf('chromaMcpManager.stop')).toBeLessThan(callOrder.indexOf('dbManager.close'));
});
it('should remove PID file during shutdown', async () => {
@@ -216,9 +216,9 @@ describe('GracefulShutdown', () => {
})
};
const mockChromaServer = {
const mockChromaMcpManager = {
stop: mock(async () => {
callOrder.push('chromaServer');
callOrder.push('chromaMcpManager');
})
};
@@ -227,12 +227,12 @@ describe('GracefulShutdown', () => {
sessionManager: mockSessionManager,
mcpClient: mockMcpClient,
dbManager: mockDbManager,
chromaServer: mockChromaServer
chromaMcpManager: mockChromaMcpManager
};
await performGracefulShutdown(config);
expect(callOrder).toEqual(['sessionManager', 'mcpClient', 'chromaServer', 'dbManager']);
expect(callOrder).toEqual(['sessionManager', 'mcpClient', 'chromaMcpManager', 'dbManager']);
});
it('should handle shutdown when PID file does not exist', async () => {
+52 -3
View File
@@ -1,6 +1,7 @@
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
import { existsSync, readFileSync } from 'fs';
import { existsSync, readFileSync, mkdirSync, writeFileSync, rmSync } from 'fs';
import { homedir } from 'os';
import { tmpdir } from 'os';
import path from 'path';
import {
writePidFile,
@@ -12,6 +13,7 @@ import {
cleanStalePidFile,
spawnDaemon,
resolveWorkerRuntimePath,
runOneTimeChromaMigration,
type PidInfo
} from '../../src/services/infrastructure/index.js';
@@ -32,7 +34,6 @@ describe('ProcessManager', () => {
afterEach(() => {
// Restore original PID file or remove test one
if (originalPidContent !== null) {
const { writeFileSync } = require('fs');
writeFileSync(PID_FILE, originalPidContent);
originalPidContent = null;
} else {
@@ -105,7 +106,6 @@ describe('ProcessManager', () => {
});
it('should return null for corrupted JSON', () => {
const { writeFileSync } = require('fs');
writeFileSync(PID_FILE, 'not valid json {{{');
const result = readPidFile();
@@ -415,4 +415,53 @@ describe('ProcessManager', () => {
// This is a logic verification test — actual signal delivery is tested manually
});
});
describe('runOneTimeChromaMigration', () => {
let testDataDir: string;
beforeEach(() => {
testDataDir = path.join(tmpdir(), `claude-mem-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
mkdirSync(testDataDir, { recursive: true });
});
afterEach(() => {
rmSync(testDataDir, { recursive: true, force: true });
});
it('should wipe chroma directory and write marker file', () => {
// Create a fake chroma directory with data
const chromaDir = path.join(testDataDir, 'chroma');
mkdirSync(chromaDir, { recursive: true });
writeFileSync(path.join(chromaDir, 'test-data.bin'), 'fake chroma data');
runOneTimeChromaMigration(testDataDir);
// Chroma dir should be gone
expect(existsSync(chromaDir)).toBe(false);
// Marker file should exist
expect(existsSync(path.join(testDataDir, '.chroma-cleaned-v10.3'))).toBe(true);
});
it('should skip when marker file already exists (idempotent)', () => {
// Write marker file first
writeFileSync(path.join(testDataDir, '.chroma-cleaned-v10.3'), 'already done');
// Create a chroma directory that should NOT be wiped
const chromaDir = path.join(testDataDir, 'chroma');
mkdirSync(chromaDir, { recursive: true });
writeFileSync(path.join(chromaDir, 'important.bin'), 'should survive');
runOneTimeChromaMigration(testDataDir);
// Chroma dir should still exist (migration was skipped)
expect(existsSync(chromaDir)).toBe(true);
expect(existsSync(path.join(chromaDir, 'important.bin'))).toBe(true);
});
it('should handle missing chroma directory gracefully', () => {
// No chroma dir exists — should just write marker without error
expect(() => runOneTimeChromaMigration(testDataDir)).not.toThrow();
expect(existsSync(path.join(testDataDir, '.chroma-cleaned-v10.3'))).toBe(true);
});
});
});
@@ -5,11 +5,11 @@ import type { PendingMessageStore, PersistentPendingMessage } from '../../../src
/**
* Mock PendingMessageStore that returns null (empty queue) by default.
* Individual tests can override claimAndDelete behavior.
* Individual tests can override claimNextMessage behavior.
*/
function createMockStore(): PendingMessageStore {
return {
claimAndDelete: mock(() => null),
claimNextMessage: mock(() => null),
toPendingMessage: mock((msg: PersistentPendingMessage) => ({
type: msg.message_type,
tool_name: msg.tool_name || undefined,
@@ -140,7 +140,7 @@ describe('SessionQueueProcessor', () => {
let callCount = 0;
// Return a message on first call, then null
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
return createMockMessage({ id: 1 });
@@ -170,7 +170,7 @@ describe('SessionQueueProcessor', () => {
expect(results).toHaveLength(1);
expect(results[0]._persistentId).toBe(1);
// Store's claimAndDelete should have been called at least twice
// Store's claimNextMessage should have been called at least twice
// (once returning message, once returning null)
expect(callCount).toBeGreaterThanOrEqual(1);
});
@@ -206,7 +206,7 @@ describe('SessionQueueProcessor', () => {
const onIdleTimeout = mock(() => {});
// Return null to trigger wait
(store.claimAndDelete as any) = mock(() => null);
(store.claimNextMessage as any) = mock(() => null);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -242,7 +242,7 @@ describe('SessionQueueProcessor', () => {
// First call: return null (queue empty)
// After message event: return message
// Then return null again
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
// First check - queue empty, will wait
@@ -312,7 +312,7 @@ describe('SessionQueueProcessor', () => {
it('should clean up event listeners when message received', async () => {
// Return a message immediately
(store.claimAndDelete as any) = mock(() => createMockMessage({ id: 1 }));
(store.claimNextMessage as any) = mock(() => createMockMessage({ id: 1 }));
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -344,7 +344,7 @@ describe('SessionQueueProcessor', () => {
it('should continue after store error with backoff', async () => {
let callCount = 0;
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
throw new Error('Database error');
@@ -377,7 +377,7 @@ describe('SessionQueueProcessor', () => {
});
it('should exit cleanly if aborted during error backoff', async () => {
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
throw new Error('Database error');
});
@@ -413,7 +413,7 @@ describe('SessionQueueProcessor', () => {
created_at_epoch: 1704067200000
});
(store.claimAndDelete as any) = mock(() => mockPersistentMessage);
(store.claimNextMessage as any) = mock(() => mockPersistentMessage);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -0,0 +1,146 @@
import { describe, test, expect, beforeEach, afterEach } from 'bun:test';
import { ClaudeMemDatabase } from '../../../src/services/sqlite/Database.js';
import { PendingMessageStore } from '../../../src/services/sqlite/PendingMessageStore.js';
import { createSDKSession } from '../../../src/services/sqlite/Sessions.js';
import type { PendingMessage } from '../../../src/services/worker-types.js';
import type { Database } from 'bun:sqlite';
describe('PendingMessageStore - Self-Healing claimNextMessage', () => {
let db: Database;
let store: PendingMessageStore;
let sessionDbId: number;
const CONTENT_SESSION_ID = 'test-self-heal';
beforeEach(() => {
db = new ClaudeMemDatabase(':memory:').db;
store = new PendingMessageStore(db, 3);
sessionDbId = createSDKSession(db, CONTENT_SESSION_ID, 'test-project', 'Test prompt');
});
afterEach(() => {
db.close();
});
function enqueueMessage(overrides: Partial<PendingMessage> = {}): number {
const message: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
...overrides,
};
return store.enqueue(sessionDbId, CONTENT_SESSION_ID, message);
}
/**
* Helper to simulate a stuck processing message by directly updating the DB
* to set started_processing_at_epoch to a time in the past (>60s ago)
*/
function makeMessageStaleProcessing(messageId: number): void {
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago (well past 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, messageId]
);
}
test('stuck processing messages are recovered on next claim', () => {
// Enqueue a message and make it stuck in processing
const msgId = enqueueMessage();
makeMessageStaleProcessing(msgId);
// Verify it's stuck (status = processing)
const beforeClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(beforeClaim.status).toBe('processing');
// claimNextMessage should self-heal: reset the stuck message, then claim it
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// It should now be in 'processing' status again (freshly claimed)
const afterClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(afterClaim.status).toBe('processing');
});
test('actively processing messages are NOT recovered', () => {
// Enqueue two messages
const activeId = enqueueMessage();
const pendingId = enqueueMessage();
// Make the first one actively processing (recent timestamp, NOT stale)
const recentTimestamp = Date.now() - 5_000; // 5 seconds ago (well within 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[recentTimestamp, activeId]
);
// claimNextMessage should NOT reset the active one — should claim the pending one instead
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(pendingId);
// The active message should still be processing
const activeMsg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(activeId) as { status: string };
expect(activeMsg.status).toBe('processing');
});
test('recovery and claim is atomic within single call', () => {
// Enqueue three messages
const stuckId = enqueueMessage();
const pendingId1 = enqueueMessage();
const pendingId2 = enqueueMessage();
// Make the first one stuck
makeMessageStaleProcessing(stuckId);
// Single claimNextMessage should reset stuck AND claim oldest pending (which is the reset stuck one)
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
// The stuck message was reset to pending, and being oldest, it gets claimed
expect(claimed!.id).toBe(stuckId);
// The other two should still be pending
const msg1 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId1) as { status: string };
const msg2 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId2) as { status: string };
expect(msg1.status).toBe('pending');
expect(msg2.status).toBe('pending');
});
test('no messages returns null without error', () => {
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).toBeNull();
});
test('self-healing only affects the specified session', () => {
// Create a second session
const session2Id = createSDKSession(db, 'other-session', 'test-project', 'Test');
// Enqueue and make stuck in session 1
const stuckInSession1 = enqueueMessage();
makeMessageStaleProcessing(stuckInSession1);
// Enqueue in session 2
const msg: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
};
const session2MsgId = store.enqueue(session2Id, 'other-session', msg);
makeMessageStaleProcessing(session2MsgId);
// Claim for session 2 — should only heal session 2's stuck message
const claimed = store.claimNextMessage(session2Id);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(session2MsgId);
// Session 1's stuck message should still be stuck (not healed by session 2's claim)
const session1Msg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(stuckInSession1) as { status: string };
expect(session1Msg.status).toBe('processing');
});
});
@@ -1,139 +0,0 @@
import { describe, it, expect, beforeEach, afterEach, mock, spyOn } from 'bun:test';
import { EventEmitter } from 'events';
import * as childProcess from 'child_process';
import { ChromaServerManager } from '../../../src/services/sync/ChromaServerManager.js';
function createFakeProcess(pid: number = 4242): childProcess.ChildProcess {
const proc = new EventEmitter() as childProcess.ChildProcess & EventEmitter;
let exited = false;
(proc as any).stdout = new EventEmitter();
(proc as any).stderr = new EventEmitter();
(proc as any).pid = pid;
(proc as any).kill = mock(() => {
if (!exited) {
exited = true;
setTimeout(() => proc.emit('exit', 0, 'SIGTERM'), 0);
}
return true;
});
return proc as childProcess.ChildProcess;
}
describe('ChromaServerManager', () => {
const originalFetch = global.fetch;
const originalPlatform = process.platform;
beforeEach(() => {
mock.restore();
ChromaServerManager.reset();
// Avoid macOS cert bundle shelling in tests; these tests only exercise startup races.
Object.defineProperty(process, 'platform', {
value: 'linux',
writable: true,
configurable: true
});
});
afterEach(() => {
global.fetch = originalFetch;
mock.restore();
ChromaServerManager.reset();
Object.defineProperty(process, 'platform', {
value: originalPlatform,
writable: true,
configurable: true
});
});
it('reuses in-flight startup and only spawns one server process', async () => {
const fetchMock = mock(async () => {
// First call: existing server check fails, second call: waitForReady succeeds.
if (fetchMock.mock.calls.length === 1) {
throw new Error('no server yet');
}
return new Response(null, { status: 200 });
});
global.fetch = fetchMock as typeof fetch;
const spawnSpy = spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const [first, second] = await Promise.all([
manager.start(2000),
manager.start(2000)
]);
expect(first).toBe(true);
expect(second).toBe(true);
expect(spawnSpy).toHaveBeenCalledTimes(1);
});
it('reuses existing reachable server without spawning', async () => {
global.fetch = mock(async () => new Response(null, { status: 200 })) as typeof fetch;
const spawnSpy = spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const ready = await manager.start(2000);
expect(ready).toBe(true);
expect(spawnSpy).not.toHaveBeenCalled();
});
it('waits for ongoing startup instead of returning early', async () => {
let resolveReady: ((value: Response) => void) | null = null;
const delayedReady = new Promise<Response>((resolve) => {
resolveReady = resolve;
});
const fetchMock = mock(async () => {
// 1st: existing server check -> fail, 2nd: waitForReady -> block until we resolve.
if (fetchMock.mock.calls.length === 1) {
throw new Error('no server yet');
}
return delayedReady;
});
global.fetch = fetchMock as typeof fetch;
spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const firstStart = manager.start(5000);
let secondResolved = false;
const secondStart = manager.start(5000).then((value) => {
secondResolved = true;
return value;
});
await new Promise((resolve) => setTimeout(resolve, 50));
expect(secondResolved).toBe(false);
resolveReady!(new Response(null, { status: 200 }));
expect(await firstStart).toBe(true);
expect(await secondStart).toBe(true);
});
});
+34 -4
View File
@@ -192,8 +192,8 @@ describe('Zombie Agent Prevention', () => {
// hasAnyPendingWork should return true
expect(pendingStore.hasAnyPendingWork()).toBe(true);
// CLAIM-CONFIRM pattern: claimAndDelete marks as 'processing' (not deleted)
const claimed = pendingStore.claimAndDelete(sessionId);
// CLAIM-CONFIRM pattern: claimNextMessage marks as 'processing' (not deleted)
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed?.id).toBe(msgId1);
@@ -206,11 +206,11 @@ describe('Zombie Agent Prevention', () => {
expect(pendingStore.getPendingCount(sessionId)).toBe(2);
// Claim and confirm remaining messages
const msg2 = pendingStore.claimAndDelete(sessionId);
const msg2 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg2!.id);
expect(pendingStore.getPendingCount(sessionId)).toBe(1);
const msg3 = pendingStore.claimAndDelete(sessionId);
const msg3 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg3!.id);
// Should be empty now
@@ -266,6 +266,36 @@ describe('Zombie Agent Prevention', () => {
expect(session.abortController.signal.aborted).toBe(false);
});
// Test: Stuck processing messages are recovered by claimNextMessage self-healing
test('should recover stuck processing messages via claimNextMessage self-healing', async () => {
const sessionId = createDbSession('content-stuck-recovery');
// Enqueue and claim a message (transitions to 'processing')
const msgId = enqueueTestMessage(sessionId, 'content-stuck-recovery');
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// Simulate crash: message stuck in 'processing' with stale timestamp
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago
db.run(
`UPDATE pending_messages SET started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, msgId]
);
// Verify it's stuck
expect(pendingStore.getPendingCount(sessionId)).toBe(1); // processing counts as pending work
// Next claimNextMessage should self-heal: reset stuck message and re-claim it
const recovered = pendingStore.claimNextMessage(sessionId);
expect(recovered).not.toBeNull();
expect(recovered!.id).toBe(msgId);
// Confirm it can be processed successfully
pendingStore.confirmProcessed(msgId);
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
});
// Test: Generator cleanup on session delete
test('should properly cleanup generator promise on session delete', async () => {
const session = createMockSession(1);