close to win

This commit is contained in:
Alex Newman
2025-11-01 21:33:08 -04:00
parent 65c89ea2f0
commit 02130c49d1
12 changed files with 761 additions and 142 deletions
+499
View File
@@ -0,0 +1,499 @@
/**
* ChromaSync Service
*
* Automatically syncs observations and session summaries to ChromaDB via MCP.
* This service provides real-time semantic search capabilities by maintaining
* a vector database synchronized with SQLite.
*
* Design: Fail-fast with no fallbacks - if Chroma is unavailable, syncing fails.
*/
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
import { SessionStore } from '../sqlite/SessionStore.js';
import { logger } from '../../utils/logger.js';
import path from 'path';
import os from 'os';
interface ChromaDocument {
id: string;
document: string;
metadata: Record<string, string | number>;
}
interface StoredObservation {
id: number;
sdk_session_id: string;
project: string;
text: string | null;
type: string;
title: string | null;
subtitle: string | null;
facts: string | null; // JSON
narrative: string | null;
concepts: string | null; // JSON
files_read: string | null; // JSON
files_modified: string | null; // JSON
prompt_number: number;
created_at: string;
created_at_epoch: number;
}
interface StoredSummary {
id: number;
sdk_session_id: string;
project: string;
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
notes: string | null;
prompt_number: number;
created_at: string;
created_at_epoch: number;
}
export class ChromaSync {
private client: Client | null = null;
private connected: boolean = false;
private project: string;
private collectionName: string;
private readonly VECTOR_DB_DIR: string;
private readonly BATCH_SIZE = 100;
constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
}
/**
* Ensure MCP client is connected to Chroma server
* Throws error if connection fails
*/
private async ensureConnection(): Promise<void> {
if (this.connected && this.client) {
return;
}
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });
try {
const transport = new StdioClientTransport({
command: 'uvx',
args: [
'chroma-mcp',
'--client-type', 'persistent',
'--data-dir', this.VECTOR_DB_DIR
]
});
this.client = new Client({
name: 'claude-mem-chroma-sync',
version: '1.0.0'
}, {
capabilities: {}
});
await this.client.connect(transport);
this.connected = true;
logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Ensure collection exists, create if needed
* Throws error if collection creation fails
*/
private async ensureCollection(): Promise<void> {
await this.ensureConnection();
if (!this.client) {
throw new Error('Chroma client not initialized');
}
try {
// Try to get collection info (will fail if doesn't exist)
await this.client.callTool({
name: 'chroma_get_collection_info',
arguments: {
collection_name: this.collectionName
}
});
logger.debug('CHROMA_SYNC', 'Collection exists', { collection: this.collectionName });
} catch (error) {
// Collection doesn't exist, create it
logger.info('CHROMA_SYNC', 'Creating collection', { collection: this.collectionName });
try {
await this.client.callTool({
name: 'chroma_create_collection',
arguments: {
collection_name: this.collectionName,
embedding_function_name: 'default'
}
});
logger.info('CHROMA_SYNC', 'Collection created', { collection: this.collectionName });
} catch (createError) {
logger.error('CHROMA_SYNC', 'Failed to create collection', { collection: this.collectionName }, createError as Error);
throw new Error(`Collection creation failed: ${createError instanceof Error ? createError.message : String(createError)}`);
}
}
}
/**
* Format observation into Chroma documents (granular approach)
* Each semantic field becomes a separate vector document
*/
private formatObservationDocs(obs: StoredObservation): ChromaDocument[] {
const documents: ChromaDocument[] = [];
// Parse JSON fields
const facts = obs.facts ? JSON.parse(obs.facts) : [];
const concepts = obs.concepts ? JSON.parse(obs.concepts) : [];
const files_read = obs.files_read ? JSON.parse(obs.files_read) : [];
const files_modified = obs.files_modified ? JSON.parse(obs.files_modified) : [];
const baseMetadata: Record<string, string | number> = {
sqlite_id: obs.id,
doc_type: 'observation',
sdk_session_id: obs.sdk_session_id,
project: obs.project,
created_at_epoch: obs.created_at_epoch,
type: obs.type || 'discovery',
title: obs.title || 'Untitled'
};
// Add optional metadata fields
if (obs.subtitle) {
baseMetadata.subtitle = obs.subtitle;
}
if (concepts.length > 0) {
baseMetadata.concepts = concepts.join(',');
}
if (files_read.length > 0) {
baseMetadata.files_read = files_read.join(',');
}
if (files_modified.length > 0) {
baseMetadata.files_modified = files_modified.join(',');
}
// Narrative as separate document
if (obs.narrative) {
documents.push({
id: `obs_${obs.id}_narrative`,
document: obs.narrative,
metadata: { ...baseMetadata, field_type: 'narrative' }
});
}
// Text as separate document (legacy field)
if (obs.text) {
documents.push({
id: `obs_${obs.id}_text`,
document: obs.text,
metadata: { ...baseMetadata, field_type: 'text' }
});
}
// Each fact as separate document
facts.forEach((fact: string, index: number) => {
documents.push({
id: `obs_${obs.id}_fact_${index}`,
document: fact,
metadata: { ...baseMetadata, field_type: 'fact', fact_index: index }
});
});
return documents;
}
/**
* Format summary into Chroma documents (granular approach)
* Each summary field becomes a separate vector document
*/
private formatSummaryDocs(summary: StoredSummary): ChromaDocument[] {
const documents: ChromaDocument[] = [];
const baseMetadata: Record<string, string | number> = {
sqlite_id: summary.id,
doc_type: 'session_summary',
sdk_session_id: summary.sdk_session_id,
project: summary.project,
created_at_epoch: summary.created_at_epoch,
prompt_number: summary.prompt_number || 0
};
// Each field becomes a separate document
if (summary.request) {
documents.push({
id: `summary_${summary.id}_request`,
document: summary.request,
metadata: { ...baseMetadata, field_type: 'request' }
});
}
if (summary.investigated) {
documents.push({
id: `summary_${summary.id}_investigated`,
document: summary.investigated,
metadata: { ...baseMetadata, field_type: 'investigated' }
});
}
if (summary.learned) {
documents.push({
id: `summary_${summary.id}_learned`,
document: summary.learned,
metadata: { ...baseMetadata, field_type: 'learned' }
});
}
if (summary.completed) {
documents.push({
id: `summary_${summary.id}_completed`,
document: summary.completed,
metadata: { ...baseMetadata, field_type: 'completed' }
});
}
if (summary.next_steps) {
documents.push({
id: `summary_${summary.id}_next_steps`,
document: summary.next_steps,
metadata: { ...baseMetadata, field_type: 'next_steps' }
});
}
if (summary.notes) {
documents.push({
id: `summary_${summary.id}_notes`,
document: summary.notes,
metadata: { ...baseMetadata, field_type: 'notes' }
});
}
return documents;
}
/**
* Add documents to Chroma in batch
* Throws error if batch add fails
*/
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
if (documents.length === 0) {
return;
}
await this.ensureCollection();
if (!this.client) {
throw new Error('Chroma client not initialized');
}
try {
await this.client.callTool({
name: 'chroma_add_documents',
arguments: {
collection_name: this.collectionName,
documents: documents.map(d => d.document),
ids: documents.map(d => d.id),
metadatas: documents.map(d => d.metadata)
}
});
logger.debug('CHROMA_SYNC', 'Documents added', {
collection: this.collectionName,
count: documents.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to add documents', {
collection: this.collectionName,
count: documents.length
}, error as Error);
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
/**
* Sync a single observation to Chroma
* Blocks until sync completes, throws on error
*/
async syncObservation(
observationId: number,
sdkSessionId: string,
project: string,
obs: ParsedObservation,
promptNumber: number,
createdAtEpoch: number
): Promise<void> {
// Convert ParsedObservation to StoredObservation format
const stored: StoredObservation = {
id: observationId,
sdk_session_id: sdkSessionId,
project: project,
text: null, // Legacy field, not used
type: obs.type,
title: obs.title,
subtitle: obs.subtitle,
facts: JSON.stringify(obs.facts),
narrative: obs.narrative,
concepts: JSON.stringify(obs.concepts),
files_read: JSON.stringify(obs.files_read),
files_modified: JSON.stringify(obs.files_modified),
prompt_number: promptNumber,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch
};
const documents = this.formatObservationDocs(stored);
logger.info('CHROMA_SYNC', 'Syncing observation', {
observationId,
documentCount: documents.length,
project
});
await this.addDocuments(documents);
}
/**
* Sync a single summary to Chroma
* Blocks until sync completes, throws on error
*/
async syncSummary(
summaryId: number,
sdkSessionId: string,
project: string,
summary: ParsedSummary,
promptNumber: number,
createdAtEpoch: number
): Promise<void> {
// Convert ParsedSummary to StoredSummary format
const stored: StoredSummary = {
id: summaryId,
sdk_session_id: sdkSessionId,
project: project,
request: summary.request,
investigated: summary.investigated,
learned: summary.learned,
completed: summary.completed,
next_steps: summary.next_steps,
notes: summary.notes,
prompt_number: promptNumber,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch
};
const documents = this.formatSummaryDocs(stored);
logger.info('CHROMA_SYNC', 'Syncing summary', {
summaryId,
documentCount: documents.length,
project
});
await this.addDocuments(documents);
}
/**
* Backfill: Sync all observations missing from Chroma
* Reads from SQLite and syncs in batches
* Throws error if backfill fails
*/
async ensureBackfilled(): Promise<void> {
logger.info('CHROMA_SYNC', 'Starting backfill', { project: this.project });
await this.ensureCollection();
const db = new SessionStore();
try {
// Get all observations for this project
const observations = db.db.prepare(`
SELECT * FROM observations WHERE project = ? ORDER BY id ASC
`).all(this.project) as StoredObservation[];
logger.info('CHROMA_SYNC', 'Backfilling observations', {
project: this.project,
count: observations.length
});
// Format all observation documents
const allDocs: ChromaDocument[] = [];
for (const obs of observations) {
allDocs.push(...this.formatObservationDocs(obs));
}
// Sync in batches
for (let i = 0; i < allDocs.length; i += this.BATCH_SIZE) {
const batch = allDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
});
}
// Get all summaries for this project
const summaries = db.db.prepare(`
SELECT * FROM session_summaries WHERE project = ? ORDER BY id ASC
`).all(this.project) as StoredSummary[];
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
project: this.project,
count: summaries.length
});
// Format all summary documents
const summaryDocs: ChromaDocument[] = [];
for (const summary of summaries) {
summaryDocs.push(...this.formatSummaryDocs(summary));
}
// Sync in batches
for (let i = 0; i < summaryDocs.length; i += this.BATCH_SIZE) {
const batch = summaryDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
});
}
logger.info('CHROMA_SYNC', 'Backfill complete', {
project: this.project,
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Backfill failed', { project: this.project }, error as Error);
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
} finally {
db.close();
}
}
/**
* Close the Chroma client connection
*/
async close(): Promise<void> {
if (this.client && this.connected) {
await this.client.close();
this.connected = false;
this.client = null;
logger.info('CHROMA_SYNC', 'Chroma client closed', { project: this.project });
}
}
}