Files
claude-mem/src/services/sync/ChromaSync.ts
T
Alex Newman bff10d49c9 fix(windows): Windows platform stabilization improvements (#378)
* chore: bump version to 7.3.6 in package.json

* Enhance worker readiness checks and MCP connection handling

- Updated health check endpoint to /api/readiness for better initialization tracking.
- Increased timeout for health checks and worker startup retries, especially for Windows.
- Added initialization flags to track MCP readiness and overall worker initialization status.
- Implemented a timeout guard for MCP connection to prevent hanging.
- Adjusted logging to reflect readiness state and errors more accurately.

* fix(windows): use Bun PATH detection in worker wrapper

Phase 2/8: Fix Bun PATH Detection in Worker Wrapper

- Import getBunPath() in worker-wrapper.ts for Bun detection
- Add Bun path resolution before spawning inner worker process
- Update spawn call to use detected Bun path instead of process.execPath
- Add logging to bun-path.ts when PATH detection succeeds
- Add logging when fallback paths are used
- Add Windows-specific validation for .exe extension
- Log warning with searched paths when Bun not found
- Fail fast with clear error message if Bun cannot be detected

This ensures worker-wrapper uses the correct Bun executable on Windows
even when Bun is not in PATH, fixing issue #371 where users reported
"Bun not in PATH" errors despite Bun being installed.

Addresses: #371

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix(windows): standardize child process spawning with windowsHide

Phase 3/8: Standardize Child Process Spawning (Windows)

Changes:
- Added windowsHide flag to ChromaSync MCP subprocess spawn
- Added Windows-specific process tracking (childPid) in ChromaSync
- Force-kill subprocess on Windows before closing transport to prevent zombie processes
- Updated cleanupOrphanedProcesses() to support Windows using PowerShell Get-CimInstance
- Use taskkill /T /F for proper process tree cleanup on Windows
- Audited BranchManager - confirmed windowsHide already present on all spawn calls

This prevents PowerShell windows from appearing during ChromaSync operations
and ensures proper cleanup of subprocess trees on Windows.

Addresses: #363, #361, #367, #371, #373, #374

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix(windows): enhance socket cleanup with recursive process tree management

Phase 4/8: Enhanced Socket Cleanup & Process Tree Management

Changes:
- Added recursive process tree enumeration in worker-wrapper.ts for Windows
- Enhanced killInner() to enumerate all descendants before killing
- Added fallback individual process kill if taskkill /T fails
- Added 10s timeout to ChromaSync.close() in DatabaseManager to prevent hangs
- Force nullify ChromaSync even on close failure to prevent resource leaks
- Improved logging to show full process tree during cleanup

This ensures complete cleanup of all child processes (ChromaSync MCP subprocess,
Python processes, etc.) preventing socket leaks and CLOSE_WAIT states.

Addresses: #363, #361

* fix(windows): consolidate project name extraction with drive root handling

Phase 5/8: Project Name Extraction Consolidation

- Created shared getProjectName() utility in src/utils/project-name.ts
- Handles edge case: drive roots (C:\, J:\) now return "drive-X" format
- Handles edge case: null/undefined/empty cwd now returns "unknown-project"
- Fixed missing null check bug in new-hook.ts
- Replaced duplicated path.basename(cwd) logic in:
  - src/hooks/context-hook.ts
  - src/hooks/new-hook.ts
  - src/services/context-generator.ts

Addresses: #374

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix(windows): increase timeouts and improve error messages

Phase 6/8: Increase Timeouts & Improve Error Messages

- Enhanced logger.ts with platform prefix (WIN32/DARWIN) and PID in all logs
- Added comprehensive Windows troubleshooting to ProcessManager error messages
- Enhanced Bun detection error message with Windows-specific troubleshooting
- All error messages now include GitHub issue numbers and docs links
- Windows timeout already increased to 2.0x multiplier in previous phases

Changes:
- src/utils/logger.ts: Added platform prefix and PID to all log output
- src/services/process/ProcessManager.ts: Enhanced error messages with troubleshooting steps
- src/utils/bun-path.ts: Added Windows-specific Bun detection error guidance

Addresses: #363, #361, #367, #371, #373, #374

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix(windows): add comprehensive Windows CI testing

Phase 7/8: Add Windows CI Testing

- Create automated Windows testing workflow
- Test worker startup/shutdown cycles
- Verify Bun PATH detection on Windows
- Test rapid restart scenarios
- Validate port cleanup after shutdown
- Check for zombie processes
- Run on all pushes and PRs to main/fix/feature branches

Addresses: #363, #361, #367, #371, #373, #374

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* ci(windows): remove build steps from Windows CI workflow

Build files are already included in the plugin folder, so npm install
and npm run build are unnecessary steps in the CI workflow.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* revert: remove Windows CI workflow

The CI workflow cannot be properly implemented in the current architecture
due to limitations in testing the worker service in CI environments.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* security: add PID validation and improve ChromaSync timeout handling

Address critical security and reliability issues identified in PR review:

**Security Fixes:**
- Add PID validation before all PowerShell/taskkill command execution
- Validate PIDs are positive integers to prevent command injection
- Apply validation in worker-wrapper.ts, worker-service.ts, and ChromaSync.ts

**Reliability Improvements:**
- Add timeout handling to ChromaSync client.close() (10s timeout)
- Add timeout handling to ChromaSync transport.close() (5s timeout)
- Implement force-kill fallback when ChromaSync close operations timeout
- Prevents hanging on shutdown and ensures subprocess cleanup

**Implementation Details:**
- PID validation checks: Number.isInteger(pid) && pid > 0
- Applied before all execSync taskkill calls on Windows
- Applied in process enumeration (Get-CimInstance) PowerShell commands
- ChromaSync.close() uses Promise.race for timeout enforcement
- Graceful degradation with force-kill fallback on timeout

Addresses PR #378 review feedback

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* Refactor ChromaSync client and transport closure logic

- Removed timeout handling for closing the Chroma client and transport.
- Simplified error logging for client and transport closure.
- Ensured subprocess cleanup logic is more straightforward.

* fix(worker): streamline Windows process management and cleanup

* revert: remove speculative LLM-generated complexity

Reverts defensive code that was added speculatively without user-reported issues:

- ChromaSync: Remove PID extraction and explicit taskkill (wrapper handles this)
- worker-wrapper: Restore simple taskkill /T /F (validated in v7.3.5)
- DatabaseManager: Remove Promise.race timeout wrapper
- hook-constants: Restore original timeout values
- logger: Remove platform/PID additions to every log line
- bun-path: Remove speculative logging

Keeps only changes that map to actual GitHub issues:
- #374: Drive root project name fix (getProjectName utility)
- #363: Readiness endpoint and Windows orphan cleanup
- #367: windowsHide on ChromaSync transport

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-17 18:44:04 -05:00

878 lines
27 KiB
TypeScript

/**
* ChromaSync Service
*
* Automatically syncs observations and session summaries to ChromaDB via MCP.
* This service provides real-time semantic search capabilities by maintaining
* a vector database synchronized with SQLite.
*
* Design: Fail-fast with no fallbacks - if Chroma is unavailable, syncing fails.
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
import { SessionStore } from '../sqlite/SessionStore.js';
import { logger } from '../../utils/logger.js';
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import path from 'path';
import os from 'os';
interface ChromaDocument {
id: string;
document: string;
metadata: Record<string, string | number>;
}
interface StoredObservation {
id: number;
sdk_session_id: string;
project: string;
text: string | null;
type: string;
title: string | null;
subtitle: string | null;
facts: string | null; // JSON
narrative: string | null;
concepts: string | null; // JSON
files_read: string | null; // JSON
files_modified: string | null; // JSON
prompt_number: number;
discovery_tokens: number; // ROI metrics
created_at: string;
created_at_epoch: number;
}
interface StoredSummary {
id: number;
sdk_session_id: string;
project: string;
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
notes: string | null;
prompt_number: number;
discovery_tokens: number; // ROI metrics
created_at: string;
created_at_epoch: number;
}
interface StoredUserPrompt {
id: number;
claude_session_id: string;
prompt_number: number;
prompt_text: string;
created_at: string;
created_at_epoch: number;
sdk_session_id: string;
project: string;
}
export class ChromaSync {
private client: Client | null = null;
private transport: StdioClientTransport | null = null;
private connected: boolean = false;
private project: string;
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private readonly BATCH_SIZE = 100;
constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
}
/**
* Ensure MCP client is connected to Chroma server
* Throws error if connection fails
*/
private async ensureConnection(): Promise<void> {
if (this.connected && this.client) {
return;
}
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });
try {
// Use Python 3.13 by default to avoid onnxruntime compatibility issues with Python 3.14+
// See: https://github.com/thedotmack/claude-mem/issues/170 (Python 3.14 incompatibility)
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const pythonVersion = settings.CLAUDE_MEM_PYTHON_VERSION;
const isWindows = process.platform === 'win32';
const transportOptions: any = {
command: 'uvx',
args: [
'--python', pythonVersion,
'chroma-mcp',
'--client-type', 'persistent',
'--data-dir', this.VECTOR_DB_DIR
],
stderr: 'ignore'
};
// CRITICAL: On Windows, try to hide console window to prevent PowerShell popups
// Note: windowsHide may not be supported by MCP SDK's StdioClientTransport
if (isWindows) {
transportOptions.windowsHide = true;
logger.debug('CHROMA_SYNC', 'Windows detected, attempting to hide console window', { project: this.project });
}
this.transport = new StdioClientTransport(transportOptions);
this.client = new Client({
name: 'claude-mem-chroma-sync',
version: '1.0.0'
}, {
capabilities: {}
});
await this.client.connect(this.transport);
this.connected = true;
logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Ensure collection exists, create if needed
* Throws error if collection creation fails
*/
private async ensureCollection(): Promise<void> {
await this.ensureConnection();
if (!this.client) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
try {
// Try to get collection info (will fail if doesn't exist)
await this.client.callTool({
name: 'chroma_get_collection_info',
arguments: {
collection_name: this.collectionName
}
});
logger.debug('CHROMA_SYNC', 'Collection exists', { collection: this.collectionName });
} catch (error) {
// Collection doesn't exist, create it
logger.info('CHROMA_SYNC', 'Creating collection', { collection: this.collectionName });
try {
await this.client.callTool({
name: 'chroma_create_collection',
arguments: {
collection_name: this.collectionName,
embedding_function_name: 'default'
}
});
logger.info('CHROMA_SYNC', 'Collection created', { collection: this.collectionName });
} catch (createError) {
logger.error('CHROMA_SYNC', 'Failed to create collection', { collection: this.collectionName }, createError as Error);
throw new Error(`Collection creation failed: ${createError instanceof Error ? createError.message : String(createError)}`);
}
}
}
/**
* Format observation into Chroma documents (granular approach)
* Each semantic field becomes a separate vector document
*/
private formatObservationDocs(obs: StoredObservation): ChromaDocument[] {
const documents: ChromaDocument[] = [];
// Parse JSON fields
const facts = obs.facts ? JSON.parse(obs.facts) : [];
const concepts = obs.concepts ? JSON.parse(obs.concepts) : [];
const files_read = obs.files_read ? JSON.parse(obs.files_read) : [];
const files_modified = obs.files_modified ? JSON.parse(obs.files_modified) : [];
const baseMetadata: Record<string, string | number> = {
sqlite_id: obs.id,
doc_type: 'observation',
sdk_session_id: obs.sdk_session_id,
project: obs.project,
created_at_epoch: obs.created_at_epoch,
type: obs.type || 'discovery',
title: obs.title || 'Untitled'
};
// Add optional metadata fields
if (obs.subtitle) {
baseMetadata.subtitle = obs.subtitle;
}
if (concepts.length > 0) {
baseMetadata.concepts = concepts.join(',');
}
if (files_read.length > 0) {
baseMetadata.files_read = files_read.join(',');
}
if (files_modified.length > 0) {
baseMetadata.files_modified = files_modified.join(',');
}
// Narrative as separate document
if (obs.narrative) {
documents.push({
id: `obs_${obs.id}_narrative`,
document: obs.narrative,
metadata: { ...baseMetadata, field_type: 'narrative' }
});
}
// Text as separate document (legacy field)
if (obs.text) {
documents.push({
id: `obs_${obs.id}_text`,
document: obs.text,
metadata: { ...baseMetadata, field_type: 'text' }
});
}
// Each fact as separate document
facts.forEach((fact: string, index: number) => {
documents.push({
id: `obs_${obs.id}_fact_${index}`,
document: fact,
metadata: { ...baseMetadata, field_type: 'fact', fact_index: index }
});
});
return documents;
}
/**
* Format summary into Chroma documents (granular approach)
* Each summary field becomes a separate vector document
*/
private formatSummaryDocs(summary: StoredSummary): ChromaDocument[] {
const documents: ChromaDocument[] = [];
const baseMetadata: Record<string, string | number> = {
sqlite_id: summary.id,
doc_type: 'session_summary',
sdk_session_id: summary.sdk_session_id,
project: summary.project,
created_at_epoch: summary.created_at_epoch,
prompt_number: summary.prompt_number || 0
};
// Each field becomes a separate document
if (summary.request) {
documents.push({
id: `summary_${summary.id}_request`,
document: summary.request,
metadata: { ...baseMetadata, field_type: 'request' }
});
}
if (summary.investigated) {
documents.push({
id: `summary_${summary.id}_investigated`,
document: summary.investigated,
metadata: { ...baseMetadata, field_type: 'investigated' }
});
}
if (summary.learned) {
documents.push({
id: `summary_${summary.id}_learned`,
document: summary.learned,
metadata: { ...baseMetadata, field_type: 'learned' }
});
}
if (summary.completed) {
documents.push({
id: `summary_${summary.id}_completed`,
document: summary.completed,
metadata: { ...baseMetadata, field_type: 'completed' }
});
}
if (summary.next_steps) {
documents.push({
id: `summary_${summary.id}_next_steps`,
document: summary.next_steps,
metadata: { ...baseMetadata, field_type: 'next_steps' }
});
}
if (summary.notes) {
documents.push({
id: `summary_${summary.id}_notes`,
document: summary.notes,
metadata: { ...baseMetadata, field_type: 'notes' }
});
}
return documents;
}
/**
* Add documents to Chroma in batch
* Throws error if batch add fails
*/
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
if (documents.length === 0) {
return;
}
await this.ensureCollection();
if (!this.client) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
try {
await this.client.callTool({
name: 'chroma_add_documents',
arguments: {
collection_name: this.collectionName,
documents: documents.map(d => d.document),
ids: documents.map(d => d.id),
metadatas: documents.map(d => d.metadata)
}
});
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to add documents', {
collection: this.collectionName,
count: documents.length
}, error as Error);
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Sync a single observation to Chroma
* Blocks until sync completes, throws on error
*/
async syncObservation(
observationId: number,
sdkSessionId: string,
project: string,
obs: ParsedObservation,
promptNumber: number,
createdAtEpoch: number,
discoveryTokens: number = 0
): Promise<void> {
// Convert ParsedObservation to StoredObservation format
const stored: StoredObservation = {
id: observationId,
sdk_session_id: sdkSessionId,
project: project,
text: null, // Legacy field, not used
type: obs.type,
title: obs.title,
subtitle: obs.subtitle,
facts: JSON.stringify(obs.facts),
narrative: obs.narrative,
concepts: JSON.stringify(obs.concepts),
files_read: JSON.stringify(obs.files_read),
files_modified: JSON.stringify(obs.files_modified),
prompt_number: promptNumber,
discovery_tokens: discoveryTokens,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch
};
const documents = this.formatObservationDocs(stored);
logger.info('CHROMA_SYNC', 'Syncing observation', {
observationId,
documentCount: documents.length,
project
});
await this.addDocuments(documents);
}
/**
* Sync a single summary to Chroma
* Blocks until sync completes, throws on error
*/
async syncSummary(
summaryId: number,
sdkSessionId: string,
project: string,
summary: ParsedSummary,
promptNumber: number,
createdAtEpoch: number,
discoveryTokens: number = 0
): Promise<void> {
// Convert ParsedSummary to StoredSummary format
const stored: StoredSummary = {
id: summaryId,
sdk_session_id: sdkSessionId,
project: project,
request: summary.request,
investigated: summary.investigated,
learned: summary.learned,
completed: summary.completed,
next_steps: summary.next_steps,
notes: summary.notes,
prompt_number: promptNumber,
discovery_tokens: discoveryTokens,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch
};
const documents = this.formatSummaryDocs(stored);
logger.info('CHROMA_SYNC', 'Syncing summary', {
summaryId,
documentCount: documents.length,
project
});
await this.addDocuments(documents);
}
/**
* Format user prompt into Chroma document
* Each prompt becomes a single document (unlike observations/summaries which split by field)
*/
private formatUserPromptDoc(prompt: StoredUserPrompt): ChromaDocument {
return {
id: `prompt_${prompt.id}`,
document: prompt.prompt_text,
metadata: {
sqlite_id: prompt.id,
doc_type: 'user_prompt',
sdk_session_id: prompt.sdk_session_id,
project: prompt.project,
created_at_epoch: prompt.created_at_epoch,
prompt_number: prompt.prompt_number
}
};
}
/**
* Sync a single user prompt to Chroma
* Blocks until sync completes, throws on error
*/
async syncUserPrompt(
promptId: number,
sdkSessionId: string,
project: string,
promptText: string,
promptNumber: number,
createdAtEpoch: number
): Promise<void> {
// Create StoredUserPrompt format
const stored: StoredUserPrompt = {
id: promptId,
claude_session_id: '', // Not needed for Chroma sync
prompt_number: promptNumber,
prompt_text: promptText,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch,
sdk_session_id: sdkSessionId,
project: project
};
const document = this.formatUserPromptDoc(stored);
logger.info('CHROMA_SYNC', 'Syncing user prompt', {
promptId,
project
});
await this.addDocuments([document]);
}
/**
* Fetch all existing document IDs from Chroma collection
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(): Promise<{
observations: Set<number>;
summaries: Set<number>;
prompts: Set<number>;
}> {
await this.ensureConnection();
if (!this.client) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
const observationIds = new Set<number>();
const summaryIds = new Set<number>();
const promptIds = new Set<number>();
let offset = 0;
const limit = 1000; // Large batches, metadata only = fast
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: this.project });
while (true) {
try {
const result = await this.client.callTool({
name: 'chroma_get_documents',
arguments: {
collection_name: this.collectionName,
limit,
offset,
where: { project: this.project }, // Filter by project
include: ['metadatas']
}
});
const data = result.content[0];
if (data.type !== 'text') {
throw new Error('Unexpected response type from chroma_get_documents');
}
const parsed = JSON.parse(data.text);
const metadatas = parsed.metadatas || [];
if (metadatas.length === 0) {
break; // No more documents
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta.sqlite_id) {
if (meta.doc_type === 'observation') {
observationIds.add(meta.sqlite_id);
} else if (meta.doc_type === 'session_summary') {
summaryIds.add(meta.sqlite_id);
} else if (meta.doc_type === 'user_prompt') {
promptIds.add(meta.sqlite_id);
}
}
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: this.project,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: this.project }, error as Error);
throw error;
}
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
project: this.project,
observations: observationIds.size,
summaries: summaryIds.size,
prompts: promptIds.size
});
return { observations: observationIds, summaries: summaryIds, prompts: promptIds };
}
/**
* Backfill: Sync all observations missing from Chroma
* Reads from SQLite and syncs in batches
* Throws error if backfill fails
*/
async ensureBackfilled(): Promise<void> {
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: this.project });
await this.ensureCollection();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds();
const db = new SessionStore();
try {
// Build exclusion list for observations
const existingObsIds = Array.from(existing.observations);
const obsExclusionClause = existingObsIds.length > 0
? `AND id NOT IN (${existingObsIds.join(',')})`
: '';
// Get only observations missing from Chroma
const observations = db.db.prepare(`
SELECT * FROM observations
WHERE project = ? ${obsExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredObservation[];
const totalObsCount = db.db.prepare(`
SELECT COUNT(*) as count FROM observations WHERE project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling observations', {
project: this.project,
missing: observations.length,
existing: existing.observations.size,
total: totalObsCount.count
});
// Format all observation documents
const allDocs: ChromaDocument[] = [];
for (const obs of observations) {
allDocs.push(...this.formatObservationDocs(obs));
}
// Sync in batches
for (let i = 0; i < allDocs.length; i += this.BATCH_SIZE) {
const batch = allDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
});
}
// Build exclusion list for summaries
const existingSummaryIds = Array.from(existing.summaries);
const summaryExclusionClause = existingSummaryIds.length > 0
? `AND id NOT IN (${existingSummaryIds.join(',')})`
: '';
// Get only summaries missing from Chroma
const summaries = db.db.prepare(`
SELECT * FROM session_summaries
WHERE project = ? ${summaryExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredSummary[];
const totalSummaryCount = db.db.prepare(`
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
project: this.project,
missing: summaries.length,
existing: existing.summaries.size,
total: totalSummaryCount.count
});
// Format all summary documents
const summaryDocs: ChromaDocument[] = [];
for (const summary of summaries) {
summaryDocs.push(...this.formatSummaryDocs(summary));
}
// Sync in batches
for (let i = 0; i < summaryDocs.length; i += this.BATCH_SIZE) {
const batch = summaryDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
});
}
// Build exclusion list for prompts
const existingPromptIds = Array.from(existing.prompts);
const promptExclusionClause = existingPromptIds.length > 0
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
: '';
// Get only user prompts missing from Chroma
const prompts = db.db.prepare(`
SELECT
up.*,
s.project,
s.sdk_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE s.project = ? ${promptExclusionClause}
ORDER BY up.id ASC
`).all(this.project) as StoredUserPrompt[];
const totalPromptCount = db.db.prepare(`
SELECT COUNT(*) as count
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE s.project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
project: this.project,
missing: prompts.length,
existing: existing.prompts.size,
total: totalPromptCount.count
});
// Format all prompt documents
const promptDocs: ChromaDocument[] = [];
for (const prompt of prompts) {
promptDocs.push(this.formatUserPromptDoc(prompt));
}
// Sync in batches
for (let i = 0; i < promptDocs.length; i += this.BATCH_SIZE) {
const batch = promptDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
});
}
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
project: this.project,
synced: {
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length,
promptDocs: promptDocs.length
},
skipped: {
observations: existing.observations.size,
summaries: existing.summaries.size,
prompts: existing.prompts.size
}
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Backfill failed', { project: this.project }, error as Error);
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
} finally {
db.close();
}
}
/**
* Query Chroma collection for semantic search
* Used by SearchManager for vector-based search
*/
async queryChroma(
query: string,
limit: number,
whereFilter?: Record<string, any>
): Promise<{ ids: number[]; distances: number[]; metadatas: any[] }> {
await this.ensureConnection();
if (!this.client) {
throw new Error(
'Chroma client not initialized. Call ensureConnection() before using client methods.' +
` Project: ${this.project}`
);
}
const whereStringified = whereFilter ? JSON.stringify(whereFilter) : undefined;
const arguments_obj = {
collection_name: this.collectionName,
query_texts: [query],
n_results: limit,
include: ['documents', 'metadatas', 'distances'],
where: whereStringified
};
const result = await this.client.callTool({
name: 'chroma_query_documents',
arguments: arguments_obj
});
const resultText = logger.happyPathError(
'CHROMA',
'Missing text in MCP chroma_query_documents result',
{ project: this.project },
{ query_text: query },
result.content[0]?.text || ''
);
// Parse JSON response
let parsed: any;
try {
parsed = JSON.parse(resultText);
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to parse Chroma response', { project: this.project }, error as Error);
return { ids: [], distances: [], metadatas: [] };
}
// Extract unique IDs from document IDs
const ids: number[] = [];
const docIds = parsed.ids?.[0] || [];
for (const docId of docIds) {
// Extract sqlite_id from document ID (supports three formats):
// - obs_{id}_narrative, obs_{id}_fact_0, etc (observations)
// - summary_{id}_request, summary_{id}_learned, etc (session summaries)
// - prompt_{id} (user prompts)
const obsMatch = docId.match(/obs_(\d+)_/);
const summaryMatch = docId.match(/summary_(\d+)_/);
const promptMatch = docId.match(/prompt_(\d+)/);
let sqliteId: number | null = null;
if (obsMatch) {
sqliteId = parseInt(obsMatch[1], 10);
} else if (summaryMatch) {
sqliteId = parseInt(summaryMatch[1], 10);
} else if (promptMatch) {
sqliteId = parseInt(promptMatch[1], 10);
}
if (sqliteId !== null && !ids.includes(sqliteId)) {
ids.push(sqliteId);
}
}
const distances = parsed.distances?.[0] || [];
const metadatas = parsed.metadatas?.[0] || [];
return { ids, distances, metadatas };
}
/**
* Close the Chroma client connection and cleanup subprocess
*/
async close(): Promise<void> {
if (!this.connected && !this.client && !this.transport) {
return;
}
try {
// Close client first
if (this.client) {
try {
await this.client.close();
} catch (error) {
logger.warn('CHROMA_SYNC', 'Error closing Chroma client', { project: this.project }, error as Error);
}
}
// Explicitly close transport to kill subprocess
if (this.transport) {
try {
await this.transport.close();
} catch (error) {
logger.warn('CHROMA_SYNC', 'Error closing transport', { project: this.project }, error as Error);
}
}
logger.info('CHROMA_SYNC', 'Chroma client and subprocess closed', { project: this.project });
} finally {
// Always reset state, even if errors occurred
this.connected = false;
this.client = null;
this.transport = null;
}
}
}