feat: replace WASM embeddings with persistent chroma-mcp MCP connection (#1176)

* feat: replace WASM embeddings with persistent chroma-mcp MCP connection

Replace ChromaServerManager (npx chroma run + chromadb npm + ONNX/WASM)
with ChromaMcpManager, a singleton stdio MCP client that communicates with
chroma-mcp via uvx. This eliminates native binary issues, segfaults, and
WASM embedding failures that plagued cross-platform installs.

Key changes:
- Add ChromaMcpManager: singleton MCP client with lazy connect, auto-reconnect,
  connection lock, and Zscaler SSL cert support
- Rewrite ChromaSync to use MCP tool calls instead of chromadb npm client
- Handle chroma-mcp's non-JSON responses (plain text success/error messages)
- Treat "collection already exists" as idempotent success
- Wire ChromaMcpManager into GracefulShutdown for clean subprocess teardown
- Delete ChromaServerManager (no longer needed)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: address PR review — connection guard leak, timer leak, async reset

- Clear connecting guard in finally block to prevent permanent reconnection block
- Clear timeout after successful connection to prevent timer leak
- Make reset() async to await stop() before nullifying instance
- Delete obsolete chroma-server-manager test (imports deleted class)
- Update graceful-shutdown test to use chromaMcpManager property name

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: prevent chroma-mcp spawn storm — zombie cleanup, stale onclose guard, reconnect backoff

Three bugs caused chroma-mcp processes to accumulate (92+ observed):

1. Zombie on timeout: failed connections left subprocess alive because
   only the timer was cleared, not the transport. Now catch block
   explicitly closes transport+client before rethrowing.

2. Stale onclose race: old transport's onclose handler captured `this`
   and overwrote the current connection reference after reconnect,
   orphaning the new subprocess. Now guarded with reference check.

3. No backoff: every failure triggered immediate reconnect. With
   backfill doing hundreds of MCP calls, this created rapid-fire
   spawning. Added 10s backoff on both connection failure and
   unexpected process death.

Also includes ChromaSync fixes from PR review:
- queryChroma deduplication now preserves index-aligned arrays
- SQL injection guard on backfill ID exclusion lists

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-02-18 18:32:38 -05:00
committed by GitHub
parent 7e57b6e02d
commit 40daf8f3fa
13 changed files with 833 additions and 1189 deletions
+120 -245
View File
@@ -1,26 +1,21 @@
/**
* ChromaSync Service
*
* Automatically syncs observations and session summaries to ChromaDB via HTTP.
* Automatically syncs observations and session summaries to ChromaDB via MCP.
* This service provides real-time semantic search capabilities by maintaining
* a vector database synchronized with SQLite.
*
* Uses the chromadb npm package's built-in ChromaClient for HTTP connections.
* Supports both local server (managed by ChromaServerManager) and remote/cloud
* servers for future claude-mem pro features.
* Uses ChromaMcpManager to communicate with chroma-mcp over stdio MCP protocol.
* The chroma-mcp server handles its own embedding and persistent storage,
* eliminating the need for chromadb npm package and ONNX/WASM dependencies.
*
* Design: Fail-fast with no fallbacks - if Chroma is unavailable, syncing fails.
*/
import { ChromaClient, Collection } from 'chromadb';
import { ChromaMcpManager } from './ChromaMcpManager.js';
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
import { SessionStore } from '../sqlite/SessionStore.js';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import { ChromaServerManager } from './ChromaServerManager.js';
import path from 'path';
import os from 'os';
interface ChromaDocument {
id: string;
@@ -75,13 +70,10 @@ interface StoredUserPrompt {
}
export class ChromaSync {
private chromaClient: ChromaClient | null = null;
private collection: Collection | null = null;
private project: string;
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private collectionCreated = false;
private readonly BATCH_SIZE = 100;
private modelCacheCorruptionRetried = false;
constructor(project: string) {
this.project = project;
@@ -91,146 +83,36 @@ export class ChromaSync {
.replace(/[^a-zA-Z0-9._-]/g, '_')
.replace(/[^a-zA-Z0-9]+$/, ''); // strip trailing non-alphanumeric
this.collectionName = `cm__${sanitized || 'unknown'}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
}
/**
* Ensure HTTP client is connected to Chroma server
* In local mode, verifies ChromaServerManager has started the server
* In remote mode, connects directly to configured host
* Throws error if connection fails
* Ensure collection exists in Chroma via MCP.
* chroma_create_collection is idempotent - safe to call multiple times.
* Uses collectionCreated flag to avoid redundant calls within a session.
*/
private async ensureConnection(): Promise<void> {
if (this.chromaClient) {
private async ensureCollectionExists(): Promise<void> {
if (this.collectionCreated) {
return;
}
logger.info('CHROMA_SYNC', 'Connecting to Chroma HTTP server...', { project: this.project });
const chromaMcp = ChromaMcpManager.getInstance();
try {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const mode = settings.CLAUDE_MEM_CHROMA_MODE || 'local';
const host = settings.CLAUDE_MEM_CHROMA_HOST || '127.0.0.1';
const port = parseInt(settings.CLAUDE_MEM_CHROMA_PORT || '8000', 10);
const ssl = settings.CLAUDE_MEM_CHROMA_SSL === 'true';
// Multi-tenancy settings (used in remote/pro mode)
const tenant = settings.CLAUDE_MEM_CHROMA_TENANT || 'default_tenant';
const database = settings.CLAUDE_MEM_CHROMA_DATABASE || 'default_database';
const apiKey = settings.CLAUDE_MEM_CHROMA_API_KEY || '';
// In local mode, verify server is reachable
if (mode === 'local') {
const serverManager = ChromaServerManager.getInstance();
const reachable = await serverManager.isServerReachable();
if (!reachable) {
throw new Error('Chroma server not reachable. Ensure worker started correctly.');
}
}
// Create HTTP client
const protocol = ssl ? 'https' : 'http';
const chromaPath = `${protocol}://${host}:${port}`;
// Build client options
const clientOptions: { path: string; tenant?: string; database?: string; headers?: Record<string, string> } = {
path: chromaPath
};
// In remote mode, use tenant isolation for pro users
if (mode === 'remote') {
clientOptions.tenant = tenant;
clientOptions.database = database;
// Add API key header if configured
if (apiKey) {
clientOptions.headers = {
'Authorization': `Bearer ${apiKey}`
};
}
logger.info('CHROMA_SYNC', 'Connecting with tenant isolation', {
tenant,
database,
hasApiKey: !!apiKey
});
}
this.chromaClient = new ChromaClient(clientOptions);
// Verify connection with heartbeat
await this.chromaClient.heartbeat();
logger.info('CHROMA_SYNC', 'Connected to Chroma HTTP server', {
project: this.project,
host,
port,
ssl,
mode,
tenant: mode === 'remote' ? tenant : 'default_tenant'
await chromaMcp.callTool('chroma_create_collection', {
collection_name: this.collectionName
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma HTTP server', { project: this.project }, error as Error);
this.chromaClient = null;
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Ensure collection exists, create if needed
* Throws error if collection creation fails
*/
private async ensureCollection(): Promise<void> {
await this.ensureConnection();
if (this.collection) {
return;
}
if (!this.chromaClient) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
try {
// Store model cache outside node_modules so reinstalls don't corrupt it
const { env } = await import('@huggingface/transformers');
env.cacheDir = path.join(os.homedir(), '.claude-mem', 'models');
// Use WASM backend to avoid native ONNX binary issues (#1104, #1105, #1110).
// Same model (all-MiniLM-L6-v2), same embeddings, but runs in WASM —
// no native binary loading, no segfaults, no ENOENT errors.
const { DefaultEmbeddingFunction } = await import('@chroma-core/default-embed');
const embeddingFunction = new DefaultEmbeddingFunction({ wasm: true });
this.collection = await this.chromaClient.getOrCreateCollection({
name: this.collectionName,
embeddingFunction
});
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
// Self-heal: corrupted model cache → clear and retry once
if (errorMessage.includes('Protobuf parsing failed') && !this.modelCacheCorruptionRetried) {
this.modelCacheCorruptionRetried = true;
logger.warn('CHROMA_SYNC', 'Corrupted model cache detected, clearing and retrying...');
const modelCacheDir = path.join(os.homedir(), '.claude-mem', 'models');
const fs = await import('fs');
if (fs.existsSync(modelCacheDir)) {
fs.rmSync(modelCacheDir, { recursive: true, force: true });
}
return this.ensureCollection(); // retry once
const message = error instanceof Error ? error.message : String(error);
if (!message.includes('already exists')) {
throw error;
}
logger.error('CHROMA_SYNC', 'Failed to get/create collection', { collection: this.collectionName }, error as Error);
throw new Error(`Collection setup failed: ${errorMessage}`);
// Collection already exists - this is the expected path after first creation
}
this.collectionCreated = true;
logger.debug('CHROMA_SYNC', 'Collection ready', {
collection: this.collectionName
});
}
/**
@@ -369,7 +251,7 @@ export class ChromaSync {
}
/**
* Add documents to Chroma in batch
* Add documents to Chroma in batch via MCP
* Throws error if batch add fails
*/
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
@@ -377,33 +259,26 @@ export class ChromaSync {
return;
}
await this.ensureCollection();
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
const chromaMcp = ChromaMcpManager.getInstance();
// Add in batches
for (let i = 0; i < documents.length; i += this.BATCH_SIZE) {
const batch = documents.slice(i, i + this.BATCH_SIZE);
await chromaMcp.callTool('chroma_add_documents', {
collection_name: this.collectionName,
ids: batch.map(d => d.id),
documents: batch.map(d => d.document),
metadatas: batch.map(d => d.metadata)
});
}
try {
await this.collection.add({
ids: documents.map(d => d.id),
documents: documents.map(d => d.document),
metadatas: documents.map(d => d.metadata)
});
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to add documents', {
collection: this.collectionName,
count: documents.length
}, error as Error);
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
}
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
}
/**
@@ -545,7 +420,7 @@ export class ChromaSync {
}
/**
* Fetch all existing document IDs from Chroma collection
* Fetch all existing document IDs from Chroma collection via MCP
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(projectOverride?: string): Promise<{
@@ -554,14 +429,9 @@ export class ChromaSync {
prompts: Set<number>;
}> {
const targetProject = projectOverride ?? this.project;
await this.ensureCollection();
await this.ensureCollectionExists();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${targetProject}`
);
}
const chromaMcp = ChromaMcpManager.getInstance();
const observationIds = new Set<number>();
const summaryIds = new Set<number>();
@@ -573,45 +443,42 @@ export class ChromaSync {
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: targetProject });
while (true) {
try {
const result = await this.collection.get({
limit,
offset,
where: { project: targetProject },
include: ['metadatas']
});
const result = await chromaMcp.callTool('chroma_get_documents', {
collection_name: this.collectionName,
limit: limit,
offset: offset,
where: { project: targetProject },
include: ['metadatas']
}) as any;
const metadatas = result.metadatas || [];
// chroma_get_documents returns flat arrays: { ids, metadatas, documents }
const metadatas = result?.metadatas || [];
if (metadatas.length === 0) {
break; // No more documents
}
if (metadatas.length === 0) {
break; // No more documents
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta && meta.sqlite_id) {
const sqliteId = meta.sqlite_id as number;
if (meta.doc_type === 'observation') {
observationIds.add(sqliteId);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(sqliteId);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(sqliteId);
}
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: targetProject,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: targetProject }, error as Error);
throw error;
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: targetProject,
offset,
batchSize: metadatas.length
});
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
@@ -635,7 +502,7 @@ export class ChromaSync {
const backfillProject = projectOverride ?? this.project;
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: backfillProject });
await this.ensureCollection();
await this.ensureCollectionExists();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds(backfillProject);
@@ -644,7 +511,8 @@ export class ChromaSync {
try {
// Build exclusion list for observations
const existingObsIds = Array.from(existing.observations);
// Filter to validated positive integers before interpolating into SQL
const existingObsIds = Array.from(existing.observations).filter(id => Number.isInteger(id) && id > 0);
const obsExclusionClause = existingObsIds.length > 0
? `AND id NOT IN (${existingObsIds.join(',')})`
: '';
@@ -685,7 +553,7 @@ export class ChromaSync {
}
// Build exclusion list for summaries
const existingSummaryIds = Array.from(existing.summaries);
const existingSummaryIds = Array.from(existing.summaries).filter(id => Number.isInteger(id) && id > 0);
const summaryExclusionClause = existingSummaryIds.length > 0
? `AND id NOT IN (${existingSummaryIds.join(',')})`
: '';
@@ -726,7 +594,7 @@ export class ChromaSync {
}
// Build exclusion list for prompts
const existingPromptIds = Array.from(existing.prompts);
const existingPromptIds = Array.from(existing.prompts).filter(id => Number.isInteger(id) && id > 0);
const promptExclusionClause = existingPromptIds.length > 0
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
: '';
@@ -797,7 +665,7 @@ export class ChromaSync {
}
/**
* Query Chroma collection for semantic search
* Query Chroma collection for semantic search via MCP
* Used by SearchManager for vector-based search
*/
async queryChroma(
@@ -805,27 +673,34 @@ export class ChromaSync {
limit: number,
whereFilter?: Record<string, any>
): Promise<{ ids: number[]; distances: number[]; metadatas: any[] }> {
await this.ensureCollection();
if (!this.collection) {
throw new Error(
'Chroma collection not initialized. Call ensureCollection() before using collection methods.' +
` Project: ${this.project}`
);
}
await this.ensureCollectionExists();
try {
const results = await this.collection.query({
queryTexts: [query],
nResults: limit,
where: whereFilter,
const chromaMcp = ChromaMcpManager.getInstance();
const results = await chromaMcp.callTool('chroma_query_documents', {
collection_name: this.collectionName,
query_texts: [query],
n_results: limit,
...(whereFilter && { where: whereFilter }),
include: ['documents', 'metadatas', 'distances']
});
}) as any;
// Extract unique SQLite IDs from document IDs
// chroma_query_documents returns nested arrays (one per query text)
// We always pass a single query text, so we access [0]
const ids: number[] = [];
const docIds = results.ids?.[0] || [];
for (const docId of docIds) {
const seen = new Set<number>();
const docIds = results?.ids?.[0] || [];
const rawMetadatas = results?.metadatas?.[0] || [];
const rawDistances = results?.distances?.[0] || [];
// Build deduplicated arrays that stay index-aligned:
// Multiple Chroma docs map to the same SQLite ID (one per field).
// Keep the first (best-ranked) distance and metadata per SQLite ID.
const metadatas: any[] = [];
const distances: number[] = [];
for (let i = 0; i < docIds.length; i++) {
const docId = docIds[i];
// Extract sqlite_id from document ID (supports three formats):
// - obs_{id}_narrative, obs_{id}_fact_0, etc (observations)
// - summary_{id}_request, summary_{id}_learned, etc (session summaries)
@@ -843,16 +718,15 @@ export class ChromaSync {
sqliteId = parseInt(promptMatch[1], 10);
}
if (sqliteId !== null && !ids.includes(sqliteId)) {
if (sqliteId !== null && !seen.has(sqliteId)) {
seen.add(sqliteId);
ids.push(sqliteId);
metadatas.push(rawMetadatas[i] ?? null);
distances.push(rawDistances[i] ?? 0);
}
}
return {
ids,
distances: results.distances?.[0] || [],
metadatas: results.metadatas?.[0] || []
};
return { ids, distances, metadatas };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
@@ -860,12 +734,13 @@ export class ChromaSync {
const isConnectionError =
errorMessage.includes('ECONNREFUSED') ||
errorMessage.includes('ENOTFOUND') ||
errorMessage.includes('fetch failed');
errorMessage.includes('fetch failed') ||
errorMessage.includes('subprocess closed') ||
errorMessage.includes('timed out');
if (isConnectionError) {
// Reset connection state so next call attempts reconnect
this.chromaClient = null;
this.collection = null;
// Reset collection state so next call attempts reconnect
this.collectionCreated = false;
logger.error('CHROMA_SYNC', 'Connection lost during query',
{ project: this.project, query }, error as Error);
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
@@ -909,13 +784,13 @@ export class ChromaSync {
}
/**
* Close the Chroma client connection
* Server lifecycle is managed by ChromaServerManager, not here
* Close the ChromaSync instance
* ChromaMcpManager is a singleton and manages its own lifecycle
* We don't close it here - it's closed during graceful shutdown
*/
async close(): Promise<void> {
// Just clear references - server lifecycle managed by ChromaServerManager
this.chromaClient = null;
this.collection = null;
logger.info('CHROMA_SYNC', 'Chroma client closed', { project: this.project });
// ChromaMcpManager is a singleton and manages its own lifecycle
// We don't close it here - it's closed during graceful shutdown
logger.info('CHROMA_SYNC', 'ChromaSync closed', { project: this.project });
}
}