Compare commits

..

7 Commits

Author SHA1 Message Date
Alex Newman 91b48a6481 chore: bump version to 10.3.0
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:33:52 -05:00
Alex Newman 40daf8f3fa feat: replace WASM embeddings with persistent chroma-mcp MCP connection (#1176)
* feat: replace WASM embeddings with persistent chroma-mcp MCP connection

Replace ChromaServerManager (npx chroma run + chromadb npm + ONNX/WASM)
with ChromaMcpManager, a singleton stdio MCP client that communicates with
chroma-mcp via uvx. This eliminates native binary issues, segfaults, and
WASM embedding failures that plagued cross-platform installs.

Key changes:
- Add ChromaMcpManager: singleton MCP client with lazy connect, auto-reconnect,
  connection lock, and Zscaler SSL cert support
- Rewrite ChromaSync to use MCP tool calls instead of chromadb npm client
- Handle chroma-mcp's non-JSON responses (plain text success/error messages)
- Treat "collection already exists" as idempotent success
- Wire ChromaMcpManager into GracefulShutdown for clean subprocess teardown
- Delete ChromaServerManager (no longer needed)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address PR review — connection guard leak, timer leak, async reset

- Clear connecting guard in finally block to prevent permanent reconnection block
- Clear timeout after successful connection to prevent timer leak
- Make reset() async to await stop() before nullifying instance
- Delete obsolete chroma-server-manager test (imports deleted class)
- Update graceful-shutdown test to use chromaMcpManager property name

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: prevent chroma-mcp spawn storm — zombie cleanup, stale onclose guard, reconnect backoff

Three bugs caused chroma-mcp processes to accumulate (92+ observed):

1. Zombie on timeout: failed connections left subprocess alive because
   only the timer was cleared, not the transport. Now catch block
   explicitly closes transport+client before rethrowing.

2. Stale onclose race: old transport's onclose handler captured `this`
   and overwrote the current connection reference after reconnect,
   orphaning the new subprocess. Now guarded with reference check.

3. No backoff: every failure triggered immediate reconnect. With
   backfill doing hundreds of MCP calls, this created rapid-fire
   spawning. Added 10s backoff on both connection failure and
   unexpected process death.

Also includes ChromaSync fixes from PR review:
- queryChroma deduplication now preserves index-aligned arrays
- SQL injection guard on backfill ID exclusion lists

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:32:38 -05:00
Alex Newman 7e57b6e02d docs: update CHANGELOG.md for v10.2.6
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:42:24 -05:00
Alex Newman ea683a4e6c chore: bump version to 10.2.6
Publish to npm / publish (push) Has been cancelled
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:41:46 -05:00
Alex Newman 5d79bb7a7a fix: prevent zombie process accumulation by verifying subprocess exit (#1168) (#1175)
Two changes fix the observer process resource leak:

1. Add ensureProcessExit to generator finally blocks in SessionRoutes and
   worker-service, matching the pattern already working in SDKAgent.

2. Add stale session reaper (every 2m) that removes sessions with no active
   generator and no pending work after 15m idle. This unblocks the orphan
   reaper which previously skipped processes for "active" sessions.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 16:33:23 -05:00
Alex Newman 2180d31ee6 chore: update version to 10.2.5 in plugin.json 2026-02-18 15:26:50 -05:00
Alex Newman 75dd8e3174 docs: update CHANGELOG.md for v10.2.5
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:17:38 -05:00
22 changed files with 941 additions and 1229 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
"plugins": [
{
"name": "claude-mem",
"version": "10.2.5",
"version": "10.3.0",
"source": "./plugin",
"description": "Persistent memory system for Claude Code - context compression across sessions"
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "9.0.6",
"version": "10.2.5",
"description": "Persistent memory system for Claude Code - seamlessly preserve context across sessions",
"author": {
"name": "Alex Newman"
+31 -30
View File
@@ -2,6 +2,37 @@
All notable changes to claude-mem.
## [v10.2.6] - 2026-02-18
## Bug Fixes
### Zombie Process Prevention (#1168, #1175)
Observer Claude CLI subprocesses were accumulating as zombies — processes that never exited after their session ended, causing massive resource leaks on long-running systems.
**Root cause:** When observer sessions ended (via idle timeout, abort, or error), the spawned Claude CLI subprocesses were not being reliably killed. The existing `ensureProcessExit()` in `SDKAgent` only covered the happy path; sessions terminated through `SessionRoutes` or `worker-service` bypassed process cleanup entirely.
**Fix — dual-layer approach:**
1. **Immediate cleanup:** Added `ensureProcessExit()` calls to the `finally` blocks in both `SessionRoutes.ts` and `worker-service.ts`, ensuring every session exit path kills its subprocess
2. **Periodic reaping:** Added `reapStaleSessions()` to `SessionManager` — a background interval that scans `~/.claude-mem/observer-sessions/` for stale PID files, verifies the process is still running, and kills any orphans with SIGKILL escalation
This ensures no observer subprocess survives beyond its session lifetime, even in crash scenarios.
## [v10.2.5] - 2026-02-18
### Bug Fixes
- **Self-healing message queue**: Renamed `claimAndDelete``claimNextMessage` with atomic self-healing — automatically resets stale processing messages (>60s) back to pending before claiming, eliminating stuck messages from generator crashes without external timers
- **Removed redundant idle-timeout reset**: The `resetStaleProcessingMessages()` call during idle timeout in worker-service was removed (startup reset kept), since the atomic self-healing in `claimNextMessage` now handles recovery inline
- **TypeScript diagnostic fix**: Added `QUEUE` to logger `Component` type
### Tests
- 5 new tests for self-healing behavior (stuck recovery, active protection, atomicity, empty queue, session isolation)
- 1 new integration test for stuck recovery in zombie-prevention suite
- All existing queue tests updated for renamed method
## [v10.2.4] - 2026-02-18
## Chroma Vector DB Backfill Fix
@@ -1409,33 +1440,3 @@ This release significantly reduces the token footprint of the plugin's MCP tools
**Full Changelog**: https://github.com/thedotmack/claude-mem/compare/v8.2.6...v8.2.7
## [v8.2.6] - 2025-12-29
## What's Changed
### Bug Fixes & Improvements
- Session ID semantic renaming for clarity (content_session_id, memory_session_id)
- Queue system simplification with unified processing logic
- Memory session ID capture for agent resume functionality
- Comprehensive test suite for session ID refactoring
**Full Changelog**: https://github.com/thedotmack/claude-mem/compare/v8.2.5...v8.2.6
## [v8.2.5] - 2025-12-28
## Bug Fixes
- **Logger**: Enhanced Error object handling in debug mode to prevent empty JSON serialization
- **ChromaSync**: Refactored DatabaseManager to initialize ChromaSync lazily, removing background backfill on startup
- **SessionManager**: Simplified message handling and removed linger timeout that was blocking completion
## Technical Details
This patch release addresses several issues discovered after the session continuity fix:
1. Logger now properly serializes Error objects with stack traces in debug mode
2. ChromaSync initialization is now lazy to prevent silent failures during startup
3. Session linger timeout removed to eliminate artificial 5-second delays on session completion
Full changelog: https://github.com/thedotmack/claude-mem/compare/v8.2.4...v8.2.5
+1 -3
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.5",
"version": "10.3.0",
"description": "Memory compression system for Claude Code - persist context across sessions",
"keywords": [
"claude",
@@ -98,9 +98,7 @@
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.1.76",
"@modelcontextprotocol/sdk": "^1.25.1",
"@chroma-core/default-embed": "^0.1.9",
"ansi-to-html": "^0.7.2",
"chromadb": "^3.2.2",
"dompurify": "^3.3.1",
"express": "^4.18.2",
"glob": "^11.0.3",
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem",
"version": "10.2.5",
"version": "10.3.0",
"description": "Persistent memory system for Claude Code - seamlessly preserve context across sessions",
"author": {
"name": "Alex Newman"
+1 -1
View File
@@ -8,7 +8,7 @@
{
"type": "command",
"command": "${CLAUDE_PLUGIN_ROOT}/scripts/setup.sh",
"timeout": 120
"timeout": 300
}
]
}
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "claude-mem-plugin",
"version": "10.2.5",
"version": "10.3.0",
"private": true,
"description": "Runtime dependencies for claude-mem bundled hooks",
"type": "module",
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -30,9 +30,9 @@ export interface CloseableDatabase {
}
/**
* Stoppable service interface for Chroma server
* Stoppable service interface for ChromaMcpManager
*/
export interface StoppableServer {
export interface StoppableService {
stop(): Promise<void>;
}
@@ -44,7 +44,7 @@ export interface GracefulShutdownConfig {
sessionManager: ShutdownableService;
mcpClient?: CloseableClient;
dbManager?: CloseableDatabase;
chromaServer?: StoppableServer;
chromaMcpManager?: StoppableService;
}
/**
@@ -79,11 +79,11 @@ export async function performGracefulShutdown(config: GracefulShutdownConfig): P
logger.info('SYSTEM', 'MCP client closed');
}
// STEP 5: Stop Chroma server (local mode only)
if (config.chromaServer) {
logger.info('SHUTDOWN', 'Stopping Chroma server...');
await config.chromaServer.stop();
logger.info('SHUTDOWN', 'Chroma server stopped');
// STEP 5: Stop Chroma MCP connection
if (config.chromaMcpManager) {
logger.info('SHUTDOWN', 'Stopping Chroma MCP connection...');
await config.chromaMcpManager.stop();
logger.info('SHUTDOWN', 'Chroma MCP connection stopped');
}
// STEP 6: Close database connection (includes ChromaSync cleanup)
+422
View File
@@ -0,0 +1,422 @@
/**
* ChromaMcpManager - Singleton managing a persistent MCP connection to chroma-mcp via uvx
*
* Replaces ChromaServerManager (which spawned `npx chroma run`) with a stdio-based
* MCP client that communicates with chroma-mcp as a subprocess. The chroma-mcp server
* handles its own embedding and persistent storage, eliminating the need for a separate
* HTTP server, chromadb npm package, and ONNX/WASM embedding dependencies.
*
* Lifecycle: lazy-connects on first callTool() use, maintains a single persistent
* connection per worker lifetime, and auto-reconnects if the subprocess dies.
*
* Cross-platform: Linux, macOS, Windows
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { execSync } from 'child_process';
import path from 'path';
import os from 'os';
import fs from 'fs';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
const CHROMA_MCP_CLIENT_NAME = 'claude-mem-chroma';
const CHROMA_MCP_CLIENT_VERSION = '1.0.0';
const MCP_CONNECTION_TIMEOUT_MS = 30_000;
const RECONNECT_BACKOFF_MS = 10_000; // Don't retry connections faster than this after failure
const DEFAULT_CHROMA_DATA_DIR = path.join(os.homedir(), '.claude-mem', 'chroma');
export class ChromaMcpManager {
private static instance: ChromaMcpManager | null = null;
private client: Client | null = null;
private transport: StdioClientTransport | null = null;
private connected: boolean = false;
private lastConnectionFailureTimestamp: number = 0;
private connecting: Promise<void> | null = null;
private constructor() {}
/**
* Get or create the singleton instance
*/
static getInstance(): ChromaMcpManager {
if (!ChromaMcpManager.instance) {
ChromaMcpManager.instance = new ChromaMcpManager();
}
return ChromaMcpManager.instance;
}
/**
* Ensure the MCP client is connected to chroma-mcp.
* Uses a connection lock to prevent concurrent connection attempts.
* If the subprocess has died since the last use, reconnects transparently.
*/
private async ensureConnected(): Promise<void> {
if (this.connected && this.client) {
return;
}
// Backoff: don't retry connections too fast after a failure
const timeSinceLastFailure = Date.now() - this.lastConnectionFailureTimestamp;
if (this.lastConnectionFailureTimestamp > 0 && timeSinceLastFailure < RECONNECT_BACKOFF_MS) {
throw new Error(`chroma-mcp connection in backoff (${Math.ceil((RECONNECT_BACKOFF_MS - timeSinceLastFailure) / 1000)}s remaining)`);
}
// If another caller is already connecting, wait for that attempt
if (this.connecting) {
await this.connecting;
return;
}
this.connecting = this.connectInternal();
try {
await this.connecting;
} catch (error) {
this.lastConnectionFailureTimestamp = Date.now();
throw error;
} finally {
this.connecting = null;
}
}
/**
* Internal connection logic - spawns uvx chroma-mcp and performs MCP handshake.
* Called behind the connection lock to ensure only one connection attempt at a time.
*/
private async connectInternal(): Promise<void> {
// Clean up any stale client/transport from a dead subprocess.
// Close transport first (kills subprocess via SIGTERM) before client
// to avoid hanging on a stuck process.
if (this.transport) {
try { await this.transport.close(); } catch { /* already dead */ }
}
if (this.client) {
try { await this.client.close(); } catch { /* already dead */ }
}
this.client = null;
this.transport = null;
this.connected = false;
const commandArgs = this.buildCommandArgs();
const spawnEnvironment = this.getSpawnEnv();
const isWindows = process.platform === 'win32';
const uvxCommand = isWindows ? 'uvx.cmd' : 'uvx';
logger.info('CHROMA_MCP', 'Connecting to chroma-mcp via MCP stdio', {
command: uvxCommand,
args: commandArgs.join(' ')
});
this.transport = new StdioClientTransport({
command: uvxCommand,
args: commandArgs,
env: spawnEnvironment,
stderr: 'pipe'
});
this.client = new Client(
{ name: CHROMA_MCP_CLIENT_NAME, version: CHROMA_MCP_CLIENT_VERSION },
{ capabilities: {} }
);
const mcpConnectionPromise = this.client.connect(this.transport);
let timeoutId: ReturnType<typeof setTimeout>;
const timeoutPromise = new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error(`MCP connection to chroma-mcp timed out after ${MCP_CONNECTION_TIMEOUT_MS}ms`)),
MCP_CONNECTION_TIMEOUT_MS
);
});
try {
await Promise.race([mcpConnectionPromise, timeoutPromise]);
} catch (connectionError) {
// Connection failed or timed out - kill the subprocess to prevent zombies
clearTimeout(timeoutId!);
logger.warn('CHROMA_MCP', 'Connection failed, killing subprocess to prevent zombie', {
error: connectionError instanceof Error ? connectionError.message : String(connectionError)
});
try { await this.transport.close(); } catch { /* best effort */ }
try { await this.client.close(); } catch { /* best effort */ }
this.client = null;
this.transport = null;
this.connected = false;
throw connectionError;
}
clearTimeout(timeoutId!);
this.connected = true;
logger.info('CHROMA_MCP', 'Connected to chroma-mcp successfully');
// Listen for transport close to mark connection as dead and apply backoff.
// CRITICAL: Guard with reference check to prevent stale onclose handlers from
// previous transports overwriting the current connection (race condition).
const currentTransport = this.transport;
this.transport.onclose = () => {
if (this.transport !== currentTransport) {
logger.debug('CHROMA_MCP', 'Ignoring stale onclose from previous transport');
return;
}
logger.warn('CHROMA_MCP', 'chroma-mcp subprocess closed unexpectedly, applying reconnect backoff');
this.connected = false;
this.client = null;
this.transport = null;
this.lastConnectionFailureTimestamp = Date.now();
};
}
/**
* Build the uvx command arguments based on current settings.
* In local mode: uses persistent client with local data directory.
* In remote mode: uses http client with configured host/port/auth.
*/
private buildCommandArgs(): string[] {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const chromaMode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
if (chromaMode === 'remote') {
const chromaHost = settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1';
const chromaPort = settings.CLAUDE_MEM_CHROMA_PORT || '8000';
const chromaSsl = settings.CLAUDE_MEM_CHROMA_SSL === 'true';
const chromaTenant = settings.CLAUDE_MEM_CHROMA_TENANT || 'default_tenant';
const chromaDatabase = settings.CLAUDE_MEM_CHROMA_DATABASE || 'default_database';
const chromaApiKey = settings.CLAUDE_MEM_CHROMA_API_KEY || '';
const args = [
'chroma-mcp',
'--client-type', 'http',
'--host', chromaHost,
'--port', chromaPort
];
if (chromaSsl) {
args.push('--ssl');
}
if (chromaTenant !== 'default_tenant') {
args.push('--tenant', chromaTenant);
}
if (chromaDatabase !== 'default_database') {
args.push('--database', chromaDatabase);
}
if (chromaApiKey) {
args.push('--api-key', chromaApiKey);
}
return args;
}
// Local mode: persistent client with data directory
return [
'chroma-mcp',
'--client-type', 'persistent',
'--data-dir', DEFAULT_CHROMA_DATA_DIR
];
}
/**
* Call a chroma-mcp tool by name with the given arguments.
* Lazily connects on first call. Reconnects if the subprocess has died.
*
* @param toolName - The chroma-mcp tool name (e.g. 'chroma_query_documents')
* @param toolArguments - The tool arguments as a plain object
* @returns The parsed JSON result from the tool's text output
*/
async callTool(toolName: string, toolArguments: Record<string, unknown>): Promise<unknown> {
await this.ensureConnected();
logger.debug('CHROMA_MCP', `Calling tool: ${toolName}`, {
arguments: JSON.stringify(toolArguments).slice(0, 200)
});
const result = await this.client!.callTool({
name: toolName,
arguments: toolArguments
});
// MCP tools signal errors via isError flag on the CallToolResult
if (result.isError) {
const errorText = (result.content as Array<{ type: string; text?: string }>)
?.find(item => item.type === 'text')?.text || 'Unknown chroma-mcp error';
throw new Error(`chroma-mcp tool "${toolName}" returned error: ${errorText}`);
}
// Extract text from MCP CallToolResult: { content: Array<{ type, text? }> }
const contentArray = result.content as Array<{ type: string; text?: string }>;
if (!contentArray || contentArray.length === 0) {
return null;
}
const firstTextContent = contentArray.find(item => item.type === 'text' && item.text);
if (!firstTextContent || !firstTextContent.text) {
return null;
}
// chroma-mcp returns JSON for query/get results, but plain text for
// mutating operations (e.g. "Successfully created collection ...").
// Try JSON parse first; if it fails, return the raw text for non-error responses.
try {
return JSON.parse(firstTextContent.text);
} catch {
// Plain text response (e.g. "Successfully created collection cm__foo")
// Return null for void-like success messages, callers don't need the text
return null;
}
}
/**
* Check if the MCP connection is alive by calling chroma_list_collections.
* Returns true if the connection is healthy, false otherwise.
*/
async isHealthy(): Promise<boolean> {
try {
await this.callTool('chroma_list_collections', { limit: 1 });
return true;
} catch {
return false;
}
}
/**
* Gracefully stop the MCP connection and kill the chroma-mcp subprocess.
* client.close() sends stdin close -> SIGTERM -> SIGKILL to the subprocess.
*/
async stop(): Promise<void> {
if (!this.client) {
logger.debug('CHROMA_MCP', 'No active MCP connection to stop');
return;
}
logger.info('CHROMA_MCP', 'Stopping chroma-mcp MCP connection');
try {
await this.client.close();
} catch (error) {
logger.debug('CHROMA_MCP', 'Error during client close (subprocess may already be dead)', {}, error as Error);
}
this.client = null;
this.transport = null;
this.connected = false;
this.connecting = null;
logger.info('CHROMA_MCP', 'chroma-mcp MCP connection stopped');
}
/**
* Reset the singleton instance (for testing).
* Awaits stop() to prevent dual subprocesses.
*/
static async reset(): Promise<void> {
if (ChromaMcpManager.instance) {
await ChromaMcpManager.instance.stop();
}
ChromaMcpManager.instance = null;
}
/**
* Get or create a combined SSL certificate bundle for Zscaler/corporate proxy environments.
* On macOS, combines the Python certifi CA bundle with any Zscaler certificates from
* the system keychain. Caches the result for 24 hours at ~/.claude-mem/combined_certs.pem.
*
* Returns the path to the combined cert file, or undefined if not needed/available.
*/
private getCombinedCertPath(): string | undefined {
const combinedCertPath = path.join(os.homedir(), '.claude-mem', 'combined_certs.pem');
if (fs.existsSync(combinedCertPath)) {
const stats = fs.statSync(combinedCertPath);
const ageMs = Date.now() - stats.mtimeMs;
if (ageMs < 24 * 60 * 60 * 1000) {
return combinedCertPath;
}
}
if (process.platform !== 'darwin') {
return undefined;
}
try {
let certifiPath: string | undefined;
try {
certifiPath = execSync(
'uvx --with certifi python -c "import certifi; print(certifi.where())"',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 10000 }
).trim();
} catch {
return undefined;
}
if (!certifiPath || !fs.existsSync(certifiPath)) {
return undefined;
}
let zscalerCert = '';
try {
zscalerCert = execSync(
'security find-certificate -a -c "Zscaler" -p /Library/Keychains/System.keychain',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 5000 }
);
} catch {
return undefined;
}
if (!zscalerCert ||
!zscalerCert.includes('-----BEGIN CERTIFICATE-----') ||
!zscalerCert.includes('-----END CERTIFICATE-----')) {
return undefined;
}
const certifiContent = fs.readFileSync(certifiPath, 'utf8');
const tempPath = combinedCertPath + '.tmp';
fs.writeFileSync(tempPath, certifiContent + '\n' + zscalerCert);
fs.renameSync(tempPath, combinedCertPath);
logger.info('CHROMA_MCP', 'Created combined SSL certificate bundle for Zscaler', {
path: combinedCertPath
});
return combinedCertPath;
} catch (error) {
logger.debug('CHROMA_MCP', 'Could not create combined cert bundle', {}, error as Error);
return undefined;
}
}
/**
* Build subprocess environment with SSL certificate overrides for enterprise proxy compatibility.
* If a combined cert bundle exists (Zscaler), injects SSL_CERT_FILE, REQUESTS_CA_BUNDLE, etc.
* Otherwise returns a plain string-keyed copy of process.env.
*/
private getSpawnEnv(): Record<string, string> {
const baseEnv: Record<string, string> = {};
for (const [key, value] of Object.entries(process.env)) {
if (value !== undefined) {
baseEnv[key] = value;
}
}
const combinedCertPath = this.getCombinedCertPath();
if (!combinedCertPath) {
return baseEnv;
}
logger.info('CHROMA_MCP', 'Using combined SSL certificates for enterprise compatibility', {
certPath: combinedCertPath
});
return {
...baseEnv,
SSL_CERT_FILE: combinedCertPath,
REQUESTS_CA_BUNDLE: combinedCertPath,
CURL_CA_BUNDLE: combinedCertPath,
NODE_EXTRA_CA_CERTS: combinedCertPath
};
}
}
-446
View File
@@ -1,446 +0,0 @@
/**
* ChromaServerManager - Singleton managing local Chroma HTTP server lifecycle
*
* Starts a persistent Chroma server via `npx chroma run` at worker startup
* and manages its lifecycle. In 'remote' mode, skips server start and connects
* to an existing server (future cloud support).
*
* Cross-platform: Linux, macOS, Windows
*/
import { spawn, ChildProcess, execSync } from 'child_process';
import path from 'path';
import os from 'os';
import fs, { existsSync } from 'fs';
import { logger } from '../../utils/logger.js';
export interface ChromaServerConfig {
dataDir: string;
host: string;
port: number;
}
export class ChromaServerManager {
private static instance: ChromaServerManager | null = null;
private serverProcess: ChildProcess | null = null;
private config: ChromaServerConfig;
private starting: boolean = false;
private ready: boolean = false;
private startPromise: Promise<boolean> | null = null;
private constructor(config: ChromaServerConfig) {
this.config = config;
}
/**
* Get or create the singleton instance
*/
static getInstance(config?: ChromaServerConfig): ChromaServerManager {
if (!ChromaServerManager.instance) {
const defaultConfig: ChromaServerConfig = {
dataDir: path.join(os.homedir(), '.claude-mem', 'vector-db'),
host: '127.0.0.1',
port: 8000
};
ChromaServerManager.instance = new ChromaServerManager(config || defaultConfig);
}
return ChromaServerManager.instance;
}
/**
* Start the Chroma HTTP server
* Reuses in-flight startup if already starting
* Spawns `npx chroma run` as a background process
* If a server is already running (from previous worker), reuses it
*/
async start(timeoutMs: number = 60000): Promise<boolean> {
if (this.ready) {
logger.debug('CHROMA_SERVER', 'Server already started or starting', {
ready: this.ready,
starting: this.starting
});
return true;
}
if (this.startPromise) {
logger.debug('CHROMA_SERVER', 'Awaiting existing startup', {
host: this.config.host,
port: this.config.port
});
return this.startPromise;
}
this.starting = true;
this.startPromise = this.startInternal(timeoutMs);
try {
return await this.startPromise;
} finally {
this.startPromise = null;
if (!this.ready) {
this.starting = false;
}
}
}
/**
* Internal startup path used behind a single shared startPromise lock
*/
private async startInternal(timeoutMs: number): Promise<boolean> {
// Check if a server is already running (from previous worker or manual start)
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`,
{ signal: AbortSignal.timeout(3000) }
);
if (response.ok) {
logger.info('CHROMA_SERVER', 'Existing server detected, reusing', {
host: this.config.host,
port: this.config.port
});
this.ready = true;
this.starting = false;
return true;
}
} catch {
// No server running, proceed to start one
}
// Cross-platform: use npx.cmd on Windows
const isWindows = process.platform === 'win32';
// Resolve chroma binary absolutely — npx fails when spawned from cache dirs (#1120)
let command: string;
let args: string[];
try {
// chromadb package installs a 'chroma' bin entry
const chromaBinDir = path.dirname(require.resolve('chromadb/package.json'));
// Check project-level .bin first (most common npm/bun installation layout)
const projectBin = path.join(chromaBinDir, '..', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
// Fallback: nested node_modules .bin (rare — pnpm or workspace hoisting)
const nestedBin = path.join(chromaBinDir, 'node_modules', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
if (existsSync(projectBin)) {
command = projectBin;
} else if (existsSync(nestedBin)) {
command = nestedBin;
} else {
// Last resort: npx with explicit cwd
command = isWindows ? 'npx.cmd' : 'npx';
}
} catch {
command = isWindows ? 'npx.cmd' : 'npx';
}
if (command.includes('npx')) {
args = ['chroma', 'run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
} else {
args = ['run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
}
logger.info('CHROMA_SERVER', 'Starting Chroma server', {
command,
args: args.join(' '),
dataDir: this.config.dataDir
});
const spawnEnv = this.getSpawnEnv();
// Resolve cwd for npx fallback — ensures node_modules is findable (#1120)
let spawnCwd: string | undefined;
try {
spawnCwd = path.dirname(require.resolve('chromadb/package.json'));
} catch {
// If chromadb isn't resolvable, omit cwd and let npx handle it
}
this.serverProcess = spawn(command, args, {
stdio: ['ignore', 'pipe', 'pipe'],
detached: !isWindows, // Don't detach on Windows (no process groups)
windowsHide: true, // Hide console window on Windows
env: spawnEnv,
...(spawnCwd && { cwd: spawnCwd })
});
// Log server output for debugging
this.serverProcess.stdout?.on('data', (data) => {
const msg = data.toString().trim();
if (msg) {
logger.debug('CHROMA_SERVER', msg);
}
});
this.serverProcess.stderr?.on('data', (data) => {
const msg = data.toString().trim();
if (msg) {
// Filter out noisy startup messages
if (!msg.includes('Chroma') || msg.includes('error') || msg.includes('Error')) {
logger.debug('CHROMA_SERVER', msg);
}
}
});
this.serverProcess.on('error', (err) => {
logger.error('CHROMA_SERVER', 'Server process error', {}, err);
this.ready = false;
this.starting = false;
});
this.serverProcess.on('exit', (code, signal) => {
logger.info('CHROMA_SERVER', 'Server process exited', { code, signal });
this.ready = false;
this.starting = false;
this.serverProcess = null;
});
return this.waitForReady(timeoutMs);
}
/**
* Wait for the server to become ready
* Polls the heartbeat endpoint until success or timeout
*/
async waitForReady(timeoutMs: number = 60000): Promise<boolean> {
if (this.ready) {
return true;
}
const startTime = Date.now();
const checkInterval = 500;
logger.info('CHROMA_SERVER', 'Waiting for server to be ready', {
host: this.config.host,
port: this.config.port,
timeoutMs
});
while (Date.now() - startTime < timeoutMs) {
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`
);
if (response.ok) {
this.ready = true;
this.starting = false;
logger.info('CHROMA_SERVER', 'Server ready', {
host: this.config.host,
port: this.config.port,
startupTimeMs: Date.now() - startTime
});
return true;
}
} catch {
// Server not ready yet, continue polling
}
await new Promise(resolve => setTimeout(resolve, checkInterval));
}
this.starting = false;
logger.error('CHROMA_SERVER', 'Server failed to start within timeout', {
timeoutMs,
elapsedMs: Date.now() - startTime
});
return false;
}
/**
* Check if the server is running and ready
* Returns true if we manage the process OR if a server is responding
*/
isRunning(): boolean {
return this.ready;
}
/**
* Async check if server is running by pinging heartbeat
* Use this when you need to verify server is actually reachable
*/
async isServerReachable(): Promise<boolean> {
try {
const response = await fetch(
`http://${this.config.host}:${this.config.port}/api/v2/heartbeat`
);
if (response.ok) {
this.ready = true;
return true;
}
} catch {
// Server not reachable
}
return false;
}
/**
* Get the server URL for client connections
*/
getUrl(): string {
return `http://${this.config.host}:${this.config.port}`;
}
/**
* Get the server configuration
*/
getConfig(): ChromaServerConfig {
return { ...this.config };
}
/**
* Stop the Chroma server
* Gracefully terminates the server process
*/
async stop(): Promise<void> {
if (!this.serverProcess) {
logger.debug('CHROMA_SERVER', 'No server process to stop');
return;
}
logger.info('CHROMA_SERVER', 'Stopping server', { pid: this.serverProcess.pid });
return new Promise((resolve) => {
const proc = this.serverProcess!;
const pid = proc.pid;
const cleanup = () => {
this.serverProcess = null;
this.ready = false;
this.starting = false;
this.startPromise = null;
logger.info('CHROMA_SERVER', 'Server stopped', { pid });
resolve();
};
// Set up exit handler
proc.once('exit', cleanup);
// Cross-platform graceful shutdown
if (process.platform === 'win32') {
// Windows: just send SIGTERM
proc.kill('SIGTERM');
} else {
// Unix: kill the process group to ensure all children are killed
if (pid !== undefined) {
try {
process.kill(-pid, 'SIGTERM');
} catch (err) {
// Process group kill failed, try direct kill
proc.kill('SIGTERM');
}
} else {
proc.kill('SIGTERM');
}
}
// Force kill after timeout if still running
setTimeout(() => {
if (this.serverProcess) {
logger.warn('CHROMA_SERVER', 'Force killing server after timeout', { pid });
try {
proc.kill('SIGKILL');
} catch {
// Already dead
}
cleanup();
}
}, 5000);
});
}
/**
* Get or create combined SSL certificate bundle for Zscaler/corporate proxy environments.
* This ports previous MCP SSL handling so local `npx chroma run` works behind enterprise proxies.
*/
private getCombinedCertPath(): string | undefined {
const combinedCertPath = path.join(os.homedir(), '.claude-mem', 'combined_certs.pem');
if (fs.existsSync(combinedCertPath)) {
const stats = fs.statSync(combinedCertPath);
const ageMs = Date.now() - stats.mtimeMs;
if (ageMs < 24 * 60 * 60 * 1000) {
return combinedCertPath;
}
}
if (process.platform !== 'darwin') {
return undefined;
}
try {
let certifiPath: string | undefined;
try {
certifiPath = execSync(
'uvx --with certifi python -c "import certifi; print(certifi.where())"',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 10000 }
).trim();
} catch {
return undefined;
}
if (!certifiPath || !fs.existsSync(certifiPath)) {
return undefined;
}
let zscalerCert = '';
try {
zscalerCert = execSync(
'security find-certificate -a -c "Zscaler" -p /Library/Keychains/System.keychain',
{ encoding: 'utf8', stdio: ['pipe', 'pipe', 'pipe'], timeout: 5000 }
);
} catch {
return undefined;
}
if (!zscalerCert ||
!zscalerCert.includes('-----BEGIN CERTIFICATE-----') ||
!zscalerCert.includes('-----END CERTIFICATE-----')) {
return undefined;
}
const certifiContent = fs.readFileSync(certifiPath, 'utf8');
const tempPath = combinedCertPath + '.tmp';
fs.writeFileSync(tempPath, certifiContent + '\n' + zscalerCert);
fs.renameSync(tempPath, combinedCertPath);
logger.info('CHROMA_SERVER', 'Created combined SSL certificate bundle for Zscaler', {
path: combinedCertPath
});
return combinedCertPath;
} catch (error) {
logger.debug('CHROMA_SERVER', 'Could not create combined cert bundle', {}, error as Error);
return undefined;
}
}
/**
* Build subprocess env and preserve Zscaler compatibility from previous architecture.
*/
private getSpawnEnv(): NodeJS.ProcessEnv {
const combinedCertPath = this.getCombinedCertPath();
if (!combinedCertPath) {
return process.env;
}
logger.info('CHROMA_SERVER', 'Using combined SSL certificates for enterprise compatibility', {
certPath: combinedCertPath
});
return {
...process.env,
SSL_CERT_FILE: combinedCertPath,
REQUESTS_CA_BUNDLE: combinedCertPath,
CURL_CA_BUNDLE: combinedCertPath,
NODE_EXTRA_CA_CERTS: combinedCertPath
};
}
/**
* Reset the singleton instance (for testing)
*/
static reset(): void {
if (ChromaServerManager.instance) {
// Don't await - just trigger stop
ChromaServerManager.instance.stop().catch(() => {});
}
ChromaServerManager.instance = null;
}
}
+120 -245
View File
@@ -1,26 +1,21 @@
/**
* ChromaSync Service
*
* Automatically syncs observations and session summaries to ChromaDB via HTTP.
* Automatically syncs observations and session summaries to ChromaDB via MCP.
* This service provides real-time semantic search capabilities by maintaining
* a vector database synchronized with SQLite.
*
* Uses the chromadb npm package's built-in ChromaClient for HTTP connections.
* Supports both local server (managed by ChromaServerManager) and remote/cloud
* servers for future claude-mem pro features.
* Uses ChromaMcpManager to communicate with chroma-mcp over stdio MCP protocol.
* The chroma-mcp server handles its own embedding and persistent storage,
* eliminating the need for chromadb npm package and ONNX/WASM dependencies.
*
* Design: Fail-fast with no fallbacks - if Chroma is unavailable, syncing fails.
*/
import { ChromaClient, Collection } from 'chromadb';
import { ChromaMcpManager } from './ChromaMcpManager.js';
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
import { SessionStore } from '../sqlite/SessionStore.js';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import { ChromaServerManager } from './ChromaServerManager.js';
import path from 'path';
import os from 'os';
interface ChromaDocument {
id: string;
@@ -75,13 +70,10 @@ interface StoredUserPrompt {
}
export class ChromaSync {
private chromaClient: ChromaClient | null = null;
private collection: Collection | null = null;
private project: string;
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private collectionCreated = false;
private readonly BATCH_SIZE = 100;
private modelCacheCorruptionRetried = false;
constructor(project: string) {
this.project = project;
@@ -91,146 +83,36 @@ export class ChromaSync {
.replace(/[^a-zA-Z0-9._-]/g, '_')
.replace(/[^a-zA-Z0-9]+$/, ''); // strip trailing non-alphanumeric
this.collectionName = `cm__${sanitized || 'unknown'}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
}
/**
* Ensure HTTP client is connected to Chroma server
* In local mode, verifies ChromaServerManager has started the server
* In remote mode, connects directly to configured host
* Throws error if connection fails
* Ensure collection exists in Chroma via MCP.
* chroma_create_collection is idempotent - safe to call multiple times.
* Uses collectionCreated flag to avoid redundant calls within a session.
*/
private async ensureConnection(): Promise<void> {
if (this.chromaClient) {
private async ensureCollectionExists(): Promise<void> {
if (this.collectionCreated) {
return;
}
logger.info('CHROMA_SYNC', 'Connecting to Chroma HTTP server...', { project: this.project });
const chromaMcp = ChromaMcpManager.getInstance();
try {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const mode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
const host = settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1';
const port = parseInt(settings.CLAUDE_MEM_CHROMA_PORT || '8000', 10);
const ssl = settings.CLAUDE_MEM_CHROMA_SSL === 'true';
// Multi-tenancy settings (used in remote/pro mode)
const tenant = settings.CLAUDE_MEM_CHROMA_TENANT || 'default_tenant';
const database = settings.CLAUDE_MEM_CHROMA_DATABASE || 'default_database';
const apiKey = settings.CLAUDE_MEM_CHROMA_API_KEY || '';
// In local mode, verify server is reachable
if (mode === 'local') {
const serverManager = ChromaServerManager.getInstance();
const reachable = await serverManager.isServerReachable();
if (!reachable) {
throw new Error('Chroma server not reachable. Ensure worker started correctly.');
}
}
// Create HTTP client
const protocol = ssl ? 'https' : 'http';
const chromaPath = `${protocol}://${host}:${port}`;
// Build client options
const clientOptions: { path: string; tenant?: string; database?: string; headers?: Record<string, string> } = {
path: chromaPath
};
// In remote mode, use tenant isolation for pro users
if (mode === 'remote') {
clientOptions.tenant = tenant;
clientOptions.database = database;
// Add API key header if configured
if (apiKey) {
clientOptions.headers = {
'Authorization': `Bearer ${apiKey}`
};
}
logger.info('CHROMA_SYNC', 'Connecting with tenant isolation', {
tenant,
database,
hasApiKey: !!apiKey
});
}
this.chromaClient = new ChromaClient(clientOptions);
// Verify connection with heartbeat
await this.chromaClient.heartbeat();
logger.info('CHROMA_SYNC', 'Connected to Chroma HTTP server', {
project: this.project,
host,
port,
ssl,
mode,
tenant: mode === 'remote' ? tenant : 'default_tenant'
await chromaMcp.callTool('chroma_create_collection', {
collection_name: this.collectionName
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma HTTP server', { project: this.project }, error as Error);
this.chromaClient = null;
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Ensure collection exists, create if needed
* Throws error if collection creation fails
*/
private async ensureCollection(): Promise<void> {
await this.ensureConnection();
if (this.collection) {
return;
}
if (!this.chromaClient) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
try {
// Store model cache outside node_modules so reinstalls don't corrupt it
const { env } = await import('@huggingface/transformers');
env.cacheDir = path.join(os.homedir(), '.claude-mem', 'models');
// Use WASM backend to avoid native ONNX binary issues (#1104, #1105, #1110).
// Same model (all-MiniLM-L6-v2), same embeddings, but runs in WASM —
// no native binary loading, no segfaults, no ENOENT errors.
const { DefaultEmbeddingFunction } = await import('@chroma-core/default-embed');
const embeddingFunction = new DefaultEmbeddingFunction({ wasm: true });
this.collection = await this.chromaClient.getOrCreateCollection({
name: this.collectionName,
embeddingFunction
});
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Self-heal: corrupted model cache → clear and retry once
if (errorMessage.includes('Protobuf parsing failed') && !this.modelCacheCorruptionRetried) {
this.modelCacheCorruptionRetried = true;
logger.warn('CHROMA_SYNC', 'Corrupted model cache detected, clearing and retrying...');
const modelCacheDir = path.join(os.homedir(), '.claude-mem', 'models');
const fs = await import('fs');
if (fs.existsSync(modelCacheDir)) {
fs.rmSync(modelCacheDir, { recursive: true, force: true });
}
return this.ensureCollection(); // retry once
const message = error instanceof Error ? error.message : String(error);
if (!message.includes('already exists')) {
throw error;
}
logger.error('CHROMA_SYNC', 'Failed to get/create collection', { collection: this.collectionName }, error as Error);
throw new Error(`Collection setup failed: ${errorMessage}`);
// Collection already exists - this is the expected path after first creation
}
this.collectionCreated = true;
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
}
/**
@@ -369,7 +251,7 @@ export class ChromaSync {
}
/**
* Add documents to Chroma in batch
* Add documents to Chroma in batch via MCP
* Throws error if batch add fails
*/
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
@@ -377,33 +259,26 @@ export class ChromaSync {
return;
}
await this.ensureCollection();
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
const chromaMcp = ChromaMcpManager.getInstance();
// Add in batches
for (let i = 0; i < documents.length; i += this.BATCH_SIZE) {
const batch = documents.slice(i, i + this.BATCH_SIZE);
await chromaMcp.callTool('chroma_add_documents', {
collection_name: this.collectionName,
ids: batch.map(d => d.id),
documents: batch.map(d => d.document),
metadatas: batch.map(d => d.metadata)
});
}
try {
await this.collection.add({
ids: documents.map(d => d.id),
documents: documents.map(d => d.document),
metadatas: documents.map(d => d.metadata)
});
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to add documents', {
collection: this.collectionName,
count: documents.length
}, error as Error);
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
}
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
}
/**
@@ -545,7 +420,7 @@ export class ChromaSync {
}
/**
* Fetch all existing document IDs from Chroma collection
* Fetch all existing document IDs from Chroma collection via MCP
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(projectOverride?: string): Promise<{
@@ -554,14 +429,9 @@ export class ChromaSync {
prompts: Set<number>;
}> {
const targetProject = projectOverride ?? this.project;
await this.ensureCollection();
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${targetProject}`
);
}
const chromaMcp = ChromaMcpManager.getInstance();
const observationIds = new Set<number>();
const summaryIds = new Set<number>();
@@ -573,45 +443,42 @@ export class ChromaSync {
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: targetProject });
while (true) {
try {
const result = await this.collection.get({
limit,
offset,
where: { project: targetProject },
include: ['metadatas']
});
const result = await chromaMcp.callTool('chroma_get_documents', {
collection_name: this.collectionName,
limit: limit,
offset: offset,
where: { project: targetProject },
include: ['metadatas']
}) as any;
const metadatas = result.metadatas || [];
// chroma_get_documents returns flat arrays: { ids, metadatas, documents }
const metadatas = result?.metadatas || [];
if (metadatas.length === 0) {
break; // No more documents
}
if (metadatas.length === 0) {
break; // No more documents
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: targetProject,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: targetProject }, error as Error);
throw error;
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: targetProject,
offset,
batchSize: metadatas.length
});
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
@@ -635,7 +502,7 @@ export class ChromaSync {
const backfillProject = projectOverride ?? this.project;
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: backfillProject });
await this.ensureCollection();
await this.ensureCollectionExists();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds(backfillProject);
@@ -644,7 +511,8 @@ export class ChromaSync {
try {
// Build exclusion list for observations
const existingObsIds = Array.from(existing.observations);
// Filter to validated positive integers before interpolating into SQL
const existingObsIds = Array.from(existing.observations).filter(id => Number.isInteger(id) && id > 0);
const obsExclusionClause = existingObsIds.length > 0
? `AND id NOT IN (${existingObsIds.join(',')})`
: '';
@@ -685,7 +553,7 @@ export class ChromaSync {
}
// Build exclusion list for summaries
const existingSummaryIds = Array.from(existing.summaries);
const existingSummaryIds = Array.from(existing.summaries).filter(id => Number.isInteger(id) && id > 0);
const summaryExclusionClause = existingSummaryIds.length > 0
? `AND id NOT IN (${existingSummaryIds.join(',')})`
: '';
@@ -726,7 +594,7 @@ export class ChromaSync {
}
// Build exclusion list for prompts
const existingPromptIds = Array.from(existing.prompts);
const existingPromptIds = Array.from(existing.prompts).filter(id => Number.isInteger(id) && id > 0);
const promptExclusionClause = existingPromptIds.length > 0
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
: '';
@@ -797,7 +665,7 @@ export class ChromaSync {
}
/**
* Query Chroma collection for semantic search
* Query Chroma collection for semantic search via MCP
* Used by SearchManager for vector-based search
*/
async queryChroma(
@@ -805,27 +673,34 @@ export class ChromaSync {
limit: number,
whereFilter?: Record<string, any>
): Promise<{ ids: number[]; distances: number[]; metadatas: any[] }> {
await this.ensureCollection();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
}
await this.ensureCollectionExists();
try {
const results = await this.collection.query({
queryTexts: [query],
nResults: limit,
where: whereFilter,
const chromaMcp = ChromaMcpManager.getInstance();
const results = await chromaMcp.callTool('chroma_query_documents', {
collection_name: this.collectionName,
query_texts: [query],
n_results: limit,
...(whereFilter && { where: whereFilter }),
include: ['documents', 'metadatas', 'distances']
});
}) as any;
// Extract unique SQLite IDs from document IDs
// chroma_query_documents returns nested arrays (one per query text)
// We always pass a single query text, so we access [0]
const ids: number[] = [];
const docIds = results.ids?.[0] || [];
for (const docId of docIds) {
const seen = new Set<number>();
const docIds = results?.ids?.[0] || [];
const rawMetadatas = results?.metadatas?.[0] || [];
const rawDistances = results?.distances?.[0] || [];
// Build deduplicated arrays that stay index-aligned:
// Multiple Chroma docs map to the same SQLite ID (one per field).
// Keep the first (best-ranked) distance and metadata per SQLite ID.
const metadatas: any[] = [];
const distances: number[] = [];
for (let i = 0; i < docIds.length; i++) {
const docId = docIds[i];
// Extract sqlite_id from document ID (supports three formats):
// - obs_{id}_narrative, obs_{id}_fact_0, etc (observations)
// - summary_{id}_request, summary_{id}_learned, etc (session summaries)
@@ -843,16 +718,15 @@ export class ChromaSync {
sqliteId = parseInt(promptMatch[1], 10);
}
if (sqliteId !== null && !ids.includes(sqliteId)) {
if (sqliteId !== null && !seen.has(sqliteId)) {
seen.add(sqliteId);
ids.push(sqliteId);
metadatas.push(rawMetadatas[i] ?? null);
distances.push(rawDistances[i] ?? 0);
}
}
return {
ids,
distances: results.distances?.[0] || [],
metadatas: results.metadatas?.[0] || []
};
return { ids, distances, metadatas };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -860,12 +734,13 @@ export class ChromaSync {
const isConnectionError =
errorMessage.includes('ECONNREFUSED') ||
errorMessage.includes('ENOTFOUND') ||
errorMessage.includes('fetch failed');
errorMessage.includes('fetch failed') ||
errorMessage.includes('subprocess closed') ||
errorMessage.includes('timed out');
if (isConnectionError) {
// Reset connection state so next call attempts reconnect
this.chromaClient = null;
this.collection = null;
// Reset collection state so next call attempts reconnect
this.collectionCreated = false;
logger.error('CHROMA_SYNC', 'Connection lost during query',
{ project: this.project, query }, error as Error);
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
@@ -909,13 +784,13 @@ export class ChromaSync {
}
/**
* Close the Chroma client connection
* Server lifecycle is managed by ChromaServerManager, not here
* Close the ChromaSync instance
* ChromaMcpManager is a singleton and manages its own lifecycle
* We don't close it here - it's closed during graceful shutdown
*/
async close(): Promise<void> {
// Just clear references - server lifecycle managed by ChromaServerManager
this.chromaClient = null;
this.collection = null;
logger.info('CHROMA_SYNC', 'Chroma client closed', { project: this.project });
// ChromaMcpManager is a singleton and manages its own lifecycle
// We don't close it here - it's closed during graceful shutdown
logger.info('CHROMA_SYNC', 'ChromaSync closed', { project: this.project });
}
}
+37 -29
View File
@@ -18,7 +18,7 @@ import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
import { getAuthMethodDescription } from '../shared/EnvManager.js';
import { logger } from '../utils/logger.js';
import { ChromaServerManager } from './sync/ChromaServerManager.js';
import { ChromaMcpManager } from './sync/ChromaMcpManager.js';
import { ChromaSync } from './sync/ChromaSync.js';
// Windows: avoid repeated spawn popups when startup fails (issue #921)
@@ -116,7 +116,7 @@ import { LogsRoutes } from './worker/http/routes/LogsRoutes.js';
import { MemoryRoutes } from './worker/http/routes/MemoryRoutes.js';
// Process management for zombie cleanup (Issue #737)
import { startOrphanReaper, reapOrphanedProcesses } from './worker/ProcessRegistry.js';
import { startOrphanReaper, reapOrphanedProcesses, getProcessBySession, ensureProcessExit } from './worker/ProcessRegistry.js';
/**
* Build JSON status output for hook framework communication.
@@ -166,8 +166,8 @@ export class WorkerService {
// Route handlers
private searchRoutes: SearchRoutes | null = null;
// Chroma server (local mode)
private chromaServer: ChromaServerManager | null = null;
// Chroma MCP manager (lazy - connects on first use)
private chromaMcpManager: ChromaMcpManager | null = null;
// Initialization tracking
private initializationComplete: Promise<void>;
@@ -176,6 +176,9 @@ export class WorkerService {
// Orphan reaper cleanup function (Issue #737)
private stopOrphanReaper: (() => void) | null = null;
// Stale session reaper interval (Issue #1168)
private staleSessionReaperInterval: ReturnType<typeof setInterval> | null = null;
// AI interaction tracking for health endpoint
private lastAiInteraction: {
timestamp: number;
@@ -370,31 +373,12 @@ export class WorkerService {
const { ModeManager } = await import('./domain/ModeManager.js');
const { SettingsDefaultsManager } = await import('../shared/SettingsDefaultsManager.js');
const { USER_SETTINGS_PATH } = await import('../shared/paths.js');
const os = await import('os');
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
// Start Chroma server if in local mode
const chromaMode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
if (chromaMode === 'local') {
logger.info('SYSTEM', 'Starting local Chroma server...');
this.chromaServer = ChromaServerManager.getInstance({
dataDir: path.join(os.homedir(), '.claude-mem', 'vector-db'),
host: settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1',
port: parseInt(settings.CLAUDE_MEM_CHROMA_PORT || '8000', 10)
});
const ready = await this.chromaServer.start(60000);
if (ready) {
logger.success('SYSTEM', 'Chroma server ready');
} else {
logger.warn('SYSTEM', 'Chroma server failed to start - vector search disabled');
this.chromaServer = null;
}
} else {
logger.info('SYSTEM', 'Chroma remote mode - skipping local server');
}
// Initialize ChromaMcpManager (lazy - connects on first use via ChromaSync)
this.chromaMcpManager = ChromaMcpManager.getInstance();
logger.info('SYSTEM', 'ChromaMcpManager initialized (lazy - connects on first use)');
const modeId = settings.CLAUDE_MEM_MODE;
ModeManager.getInstance().loadMode(modeId);
@@ -425,7 +409,7 @@ export class WorkerService {
logger.info('WORKER', 'SearchManager initialized and search routes registered');
// Auto-backfill Chroma for all projects if out of sync with SQLite (fire-and-forget)
if (this.chromaServer !== null || chromaMode !== 'local') {
if (this.chromaMcpManager) {
ChromaSync.backfillAllProjects().then(() => {
logger.info('CHROMA_SYNC', 'Backfill check complete for all projects');
}).catch(error => {
@@ -465,6 +449,18 @@ export class WorkerService {
});
logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)');
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
this.staleSessionReaperInterval = setInterval(async () => {
try {
const reaped = await this.sessionManager.reapStaleSessions();
if (reaped > 0) {
logger.info('SYSTEM', `Reaped ${reaped} stale sessions`);
}
} catch (e) {
logger.error('SYSTEM', 'Stale session reaper error', { error: e instanceof Error ? e.message : String(e) });
}
}, 2 * 60 * 1000);
// Auto-recover orphaned queues (fire-and-forget with error logging)
this.processPendingQueues(50).then(result => {
if (result.sessionsStarted > 0) {
@@ -593,7 +589,13 @@ export class WorkerService {
};
throw error;
})
.finally(() => {
.finally(async () => {
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
const trackedProcess = getProcessBySession(session.sessionDbId);
if (trackedProcess && !trackedProcess.process.killed && trackedProcess.process.exitCode === null) {
await ensureProcessExit(trackedProcess, 5000);
}
session.generatorPromise = null;
// Record successful AI interaction if no error occurred
@@ -823,12 +825,18 @@ export class WorkerService {
this.stopOrphanReaper = null;
}
// Stop stale session reaper (Issue #1168)
if (this.staleSessionReaperInterval) {
clearInterval(this.staleSessionReaperInterval);
this.staleSessionReaperInterval = null;
}
await performGracefulShutdown({
server: this.server.getHttpServer(),
sessionManager: this.sessionManager,
mcpClient: this.mcpClient,
dbManager: this.dbManager,
chromaServer: this.chromaServer || undefined
chromaMcpManager: this.chromaMcpManager || undefined
});
}
+1 -1
View File
@@ -37,7 +37,7 @@ export class DatabaseManager {
* Close database connection and cleanup all resources
*/
async close(): Promise<void> {
// Close ChromaSync first (terminates uvx/python processes)
// Close ChromaSync first (MCP connection lifecycle managed by ChromaMcpManager)
if (this.chromaSync) {
await this.chromaSync.close();
this.chromaSync = null;
+33
View File
@@ -341,6 +341,39 @@ export class SessionManager {
}
}
private static readonly MAX_SESSION_IDLE_MS = 15 * 60 * 1000; // 15 minutes
/**
* Reap sessions with no active generator and no pending work that have been idle too long.
* This unblocks the orphan reaper which skips processes for "active" sessions. (Issue #1168)
*/
async reapStaleSessions(): Promise<number> {
const now = Date.now();
const staleSessionIds: number[] = [];
for (const [sessionDbId, session] of this.sessions) {
// Skip sessions with active generators
if (session.generatorPromise) continue;
// Skip sessions with pending work
const pendingCount = this.getPendingStore().getPendingCount(sessionDbId);
if (pendingCount > 0) continue;
// No generator + no pending work + old enough = stale
const sessionAge = now - session.startTime;
if (sessionAge > SessionManager.MAX_SESSION_IDLE_MS) {
staleSessionIds.push(sessionDbId);
}
}
for (const sessionDbId of staleSessionIds) {
logger.warn('SESSION', `Reaping stale session ${sessionDbId} (no activity for >${Math.round(SessionManager.MAX_SESSION_IDLE_MS / 60000)}m)`, { sessionDbId });
await this.deleteSession(sessionDbId);
}
return staleSessionIds.length;
}
/**
* Shutdown all active sessions
*/
@@ -21,6 +21,7 @@ import { SessionCompletionHandler } from '../../session/SessionCompletionHandler
import { PrivacyCheckValidator } from '../../validation/PrivacyCheckValidator.js';
import { SettingsDefaultsManager } from '../../../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../../../shared/paths.js';
import { getProcessBySession, ensureProcessExit } from '../../ProcessRegistry.js';
export class SessionRoutes extends BaseRouteHandler {
private completionHandler: SessionCompletionHandler;
@@ -184,7 +185,13 @@ export class SessionRoutes extends BaseRouteHandler {
}, dbError as Error);
}
})
.finally(() => {
.finally(async () => {
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
const tracked = getProcessBySession(session.sessionDbId);
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
await ensureProcessExit(tracked, 5000);
}
const sessionDbId = session.sessionDbId;
this.spawnInProgress.delete(sessionDbId);
const wasAborted = session.abortController.signal.aborted;
+1 -1
View File
@@ -118,7 +118,7 @@ export class SettingsDefaultsManager {
CLAUDE_MEM_EXCLUDED_PROJECTS: '', // Comma-separated glob patterns for excluded project paths
CLAUDE_MEM_FOLDER_MD_EXCLUDE: '[]', // JSON array of folder paths to exclude from CLAUDE.md generation
// Chroma Vector Database Configuration
CLAUDE_MEM_CHROMA_MODE: 'local', // 'local' starts npx chroma run, 'remote' connects to existing server
CLAUDE_MEM_CHROMA_MODE: 'local', // 'local' uses persistent chroma-mcp via uvx, 'remote' connects to existing server
CLAUDE_MEM_CHROMA_HOST: '127.0.0.1',
CLAUDE_MEM_CHROMA_PORT: '8000',
CLAUDE_MEM_CHROMA_SSL: 'false',
+1 -1
View File
@@ -15,7 +15,7 @@ export enum LogLevel {
SILENT = 4
}
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE';
export type Component = 'HOOK' | 'WORKER' | 'SDK' | 'PARSER' | 'DB' | 'SYSTEM' | 'HTTP' | 'SESSION' | 'CHROMA' | 'CHROMA_MCP' | 'CHROMA_SYNC' | 'FOLDER_INDEX' | 'CLAUDE_MD' | 'QUEUE';
interface LogContext {
sessionId?: number;
@@ -87,9 +87,9 @@ describe('GracefulShutdown', () => {
})
};
const mockChromaServer = {
const mockChromaMcpManager = {
stop: mock(async () => {
callOrder.push('chromaServer.stop');
callOrder.push('chromaMcpManager.stop');
})
};
@@ -102,7 +102,7 @@ describe('GracefulShutdown', () => {
sessionManager: mockSessionManager,
mcpClient: mockMcpClient,
dbManager: mockDbManager,
chromaServer: mockChromaServer
chromaMcpManager: mockChromaMcpManager
};
await performGracefulShutdown(config);
@@ -112,7 +112,7 @@ describe('GracefulShutdown', () => {
expect(callOrder).toContain('serverClose');
expect(callOrder).toContain('sessionManager.shutdownAll');
expect(callOrder).toContain('mcpClient.close');
expect(callOrder).toContain('chromaServer.stop');
expect(callOrder).toContain('chromaMcpManager.stop');
expect(callOrder).toContain('dbManager.close');
// Verify server closes before session manager
@@ -125,7 +125,7 @@ describe('GracefulShutdown', () => {
expect(callOrder.indexOf('mcpClient.close')).toBeLessThan(callOrder.indexOf('dbManager.close'));
// Verify Chroma stops before DB closes
expect(callOrder.indexOf('chromaServer.stop')).toBeLessThan(callOrder.indexOf('dbManager.close'));
expect(callOrder.indexOf('chromaMcpManager.stop')).toBeLessThan(callOrder.indexOf('dbManager.close'));
});
it('should remove PID file during shutdown', async () => {
@@ -216,9 +216,9 @@ describe('GracefulShutdown', () => {
})
};
const mockChromaServer = {
const mockChromaMcpManager = {
stop: mock(async () => {
callOrder.push('chromaServer');
callOrder.push('chromaMcpManager');
})
};
@@ -227,12 +227,12 @@ describe('GracefulShutdown', () => {
sessionManager: mockSessionManager,
mcpClient: mockMcpClient,
dbManager: mockDbManager,
chromaServer: mockChromaServer
chromaMcpManager: mockChromaMcpManager
};
await performGracefulShutdown(config);
expect(callOrder).toEqual(['sessionManager', 'mcpClient', 'chromaServer', 'dbManager']);
expect(callOrder).toEqual(['sessionManager', 'mcpClient', 'chromaMcpManager', 'dbManager']);
});
it('should handle shutdown when PID file does not exist', async () => {
@@ -1,139 +0,0 @@
import { describe, it, expect, beforeEach, afterEach, mock, spyOn } from 'bun:test';
import { EventEmitter } from 'events';
import * as childProcess from 'child_process';
import { ChromaServerManager } from '../../../src/services/sync/ChromaServerManager.js';
function createFakeProcess(pid: number = 4242): childProcess.ChildProcess {
const proc = new EventEmitter() as childProcess.ChildProcess & EventEmitter;
let exited = false;
(proc as any).stdout = new EventEmitter();
(proc as any).stderr = new EventEmitter();
(proc as any).pid = pid;
(proc as any).kill = mock(() => {
if (!exited) {
exited = true;
setTimeout(() => proc.emit('exit', 0, 'SIGTERM'), 0);
}
return true;
});
return proc as childProcess.ChildProcess;
}
describe('ChromaServerManager', () => {
const originalFetch = global.fetch;
const originalPlatform = process.platform;
beforeEach(() => {
mock.restore();
ChromaServerManager.reset();
// Avoid macOS cert bundle shelling in tests; these tests only exercise startup races.
Object.defineProperty(process, 'platform', {
value: 'linux',
writable: true,
configurable: true
});
});
afterEach(() => {
global.fetch = originalFetch;
mock.restore();
ChromaServerManager.reset();
Object.defineProperty(process, 'platform', {
value: originalPlatform,
writable: true,
configurable: true
});
});
it('reuses in-flight startup and only spawns one server process', async () => {
const fetchMock = mock(async () => {
// First call: existing server check fails, second call: waitForReady succeeds.
if (fetchMock.mock.calls.length === 1) {
throw new Error('no server yet');
}
return new Response(null, { status: 200 });
});
global.fetch = fetchMock as typeof fetch;
const spawnSpy = spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const [first, second] = await Promise.all([
manager.start(2000),
manager.start(2000)
]);
expect(first).toBe(true);
expect(second).toBe(true);
expect(spawnSpy).toHaveBeenCalledTimes(1);
});
it('reuses existing reachable server without spawning', async () => {
global.fetch = mock(async () => new Response(null, { status: 200 })) as typeof fetch;
const spawnSpy = spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const ready = await manager.start(2000);
expect(ready).toBe(true);
expect(spawnSpy).not.toHaveBeenCalled();
});
it('waits for ongoing startup instead of returning early', async () => {
let resolveReady: ((value: Response) => void) | null = null;
const delayedReady = new Promise<Response>((resolve) => {
resolveReady = resolve;
});
const fetchMock = mock(async () => {
// 1st: existing server check -> fail, 2nd: waitForReady -> block until we resolve.
if (fetchMock.mock.calls.length === 1) {
throw new Error('no server yet');
}
return delayedReady;
});
global.fetch = fetchMock as typeof fetch;
spyOn(childProcess, 'spawn').mockImplementation(
() => createFakeProcess() as unknown as ReturnType<typeof childProcess.spawn>
);
const manager = ChromaServerManager.getInstance({
dataDir: '/tmp/chroma-test',
host: '127.0.0.1',
port: 8000
});
const firstStart = manager.start(5000);
let secondResolved = false;
const secondStart = manager.start(5000).then((value) => {
secondResolved = true;
return value;
});
await new Promise((resolve) => setTimeout(resolve, 50));
expect(secondResolved).toBe(false);
resolveReady!(new Response(null, { status: 200 }));
expect(await firstStart).toBe(true);
expect(await secondStart).toBe(true);
});
});