ec41cfac67
Breaking Changes: - Python dependency for optimal performance (semantic search) - Search behavior prioritizes semantic relevance with Chroma - Worker service now initializes ChromaSync on startup Major Features: - Hybrid Search Architecture combining ChromaDB semantic search with SQLite temporal filtering - ChromaSync Service for automatic vector database synchronization (738 lines) - get_timeline_by_query tool with auto/interactive modes - Enhanced MCP tools with hybrid semantic + keyword search capabilities Technical Changes: - New: src/services/sync/ChromaSync.ts (vector database sync) - Modified: src/servers/search-server.ts (+995 lines for hybrid search) - Modified: src/services/worker-service.ts (+136 lines for ChromaSync integration) - Modified: src/services/sqlite/SessionStore.ts (+276 lines for timeline queries) - Validation: 1,390 observations → 8,279 vector documents - Performance: Semantic search with 90-day window <200ms Documentation: - Updated CLAUDE.md with hybrid search architecture - Updated CHANGELOG.md with comprehensive v5.0.0 entry - Removed usage tracking documentation - Version bumped across all manifest files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
739 lines
22 KiB
TypeScript
739 lines
22 KiB
TypeScript
/**
|
|
* ChromaSync Service
|
|
*
|
|
* Automatically syncs observations and session summaries to ChromaDB via MCP.
|
|
* This service provides real-time semantic search capabilities by maintaining
|
|
* a vector database synchronized with SQLite.
|
|
*
|
|
* Design: Fail-fast throws - worker handles failures with graceful degradation.
|
|
* If Chroma/Python unavailable, sync operations throw but worker continues without semantic search.
|
|
*/
|
|
|
|
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
|
|
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
|
|
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
|
|
import { SessionStore } from '../sqlite/SessionStore.js';
|
|
import { logger } from '../../utils/logger.js';
|
|
import path from 'path';
|
|
import os from 'os';
|
|
|
|
interface ChromaDocument {
|
|
id: string;
|
|
document: string;
|
|
metadata: Record<string, string | number>;
|
|
}
|
|
|
|
interface StoredObservation {
|
|
id: number;
|
|
sdk_session_id: string;
|
|
project: string;
|
|
text: string | null;
|
|
type: string;
|
|
title: string | null;
|
|
subtitle: string | null;
|
|
facts: string | null; // JSON
|
|
narrative: string | null;
|
|
concepts: string | null; // JSON
|
|
files_read: string | null; // JSON
|
|
files_modified: string | null; // JSON
|
|
prompt_number: number;
|
|
created_at: string;
|
|
created_at_epoch: number;
|
|
}
|
|
|
|
interface StoredSummary {
|
|
id: number;
|
|
sdk_session_id: string;
|
|
project: string;
|
|
request: string | null;
|
|
investigated: string | null;
|
|
learned: string | null;
|
|
completed: string | null;
|
|
next_steps: string | null;
|
|
notes: string | null;
|
|
prompt_number: number;
|
|
created_at: string;
|
|
created_at_epoch: number;
|
|
}
|
|
|
|
interface StoredUserPrompt {
|
|
id: number;
|
|
claude_session_id: string;
|
|
prompt_number: number;
|
|
prompt_text: string;
|
|
created_at: string;
|
|
created_at_epoch: number;
|
|
sdk_session_id: string;
|
|
project: string;
|
|
}
|
|
|
|
export class ChromaSync {
|
|
private client: Client | null = null;
|
|
private connected: boolean = false;
|
|
private project: string;
|
|
private collectionName: string;
|
|
private readonly VECTOR_DB_DIR: string;
|
|
private readonly BATCH_SIZE = 100;
|
|
|
|
constructor(project: string) {
|
|
this.project = project;
|
|
this.collectionName = `cm__${project}`;
|
|
this.VECTOR_DB_DIR = path.join(os.homedir(), '.claude-mem', 'vector-db');
|
|
}
|
|
|
|
/**
|
|
* Ensure MCP client is connected to Chroma server
|
|
* Throws error if connection fails
|
|
*/
|
|
private async ensureConnection(): Promise<void> {
|
|
if (this.connected && this.client) {
|
|
return;
|
|
}
|
|
|
|
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });
|
|
|
|
try {
|
|
const transport = new StdioClientTransport({
|
|
command: 'uvx',
|
|
args: [
|
|
'chroma-mcp',
|
|
'--client-type', 'persistent',
|
|
'--data-dir', this.VECTOR_DB_DIR
|
|
],
|
|
stderr: 'ignore'
|
|
});
|
|
|
|
this.client = new Client({
|
|
name: 'claude-mem-chroma-sync',
|
|
version: '1.0.0'
|
|
}, {
|
|
capabilities: {}
|
|
});
|
|
|
|
await this.client.connect(transport);
|
|
this.connected = true;
|
|
|
|
logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
|
|
} catch (error) {
|
|
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
|
|
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ensure collection exists, create if needed
|
|
* Throws error if collection creation fails
|
|
*/
|
|
private async ensureCollection(): Promise<void> {
|
|
await this.ensureConnection();
|
|
|
|
if (!this.client) {
|
|
throw new Error('Chroma client not initialized');
|
|
}
|
|
|
|
try {
|
|
// Try to get collection info (will fail if doesn't exist)
|
|
await this.client.callTool({
|
|
name: 'chroma_get_collection_info',
|
|
arguments: {
|
|
collection_name: this.collectionName
|
|
}
|
|
});
|
|
|
|
logger.debug('CHROMA_SYNC', 'Collection exists', { collection: this.collectionName });
|
|
} catch (error) {
|
|
// Collection doesn't exist, create it
|
|
logger.info('CHROMA_SYNC', 'Creating collection', { collection: this.collectionName });
|
|
|
|
try {
|
|
await this.client.callTool({
|
|
name: 'chroma_create_collection',
|
|
arguments: {
|
|
collection_name: this.collectionName,
|
|
embedding_function_name: 'default'
|
|
}
|
|
});
|
|
|
|
logger.info('CHROMA_SYNC', 'Collection created', { collection: this.collectionName });
|
|
} catch (createError) {
|
|
logger.error('CHROMA_SYNC', 'Failed to create collection', { collection: this.collectionName }, createError as Error);
|
|
throw new Error(`Collection creation failed: ${createError instanceof Error ? createError.message : String(createError)}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Format observation into Chroma documents (granular approach)
|
|
* Each semantic field becomes a separate vector document
|
|
*/
|
|
private formatObservationDocs(obs: StoredObservation): ChromaDocument[] {
|
|
const documents: ChromaDocument[] = [];
|
|
|
|
// Parse JSON fields
|
|
const facts = obs.facts ? JSON.parse(obs.facts) : [];
|
|
const concepts = obs.concepts ? JSON.parse(obs.concepts) : [];
|
|
const files_read = obs.files_read ? JSON.parse(obs.files_read) : [];
|
|
const files_modified = obs.files_modified ? JSON.parse(obs.files_modified) : [];
|
|
|
|
const baseMetadata: Record<string, string | number> = {
|
|
sqlite_id: obs.id,
|
|
doc_type: 'observation',
|
|
sdk_session_id: obs.sdk_session_id,
|
|
project: obs.project,
|
|
created_at_epoch: obs.created_at_epoch,
|
|
type: obs.type || 'discovery',
|
|
title: obs.title || 'Untitled'
|
|
};
|
|
|
|
// Add optional metadata fields
|
|
if (obs.subtitle) {
|
|
baseMetadata.subtitle = obs.subtitle;
|
|
}
|
|
if (concepts.length > 0) {
|
|
baseMetadata.concepts = concepts.join(',');
|
|
}
|
|
if (files_read.length > 0) {
|
|
baseMetadata.files_read = files_read.join(',');
|
|
}
|
|
if (files_modified.length > 0) {
|
|
baseMetadata.files_modified = files_modified.join(',');
|
|
}
|
|
|
|
// Narrative as separate document
|
|
if (obs.narrative) {
|
|
documents.push({
|
|
id: `obs_${obs.id}_narrative`,
|
|
document: obs.narrative,
|
|
metadata: { ...baseMetadata, field_type: 'narrative' }
|
|
});
|
|
}
|
|
|
|
// Text as separate document (legacy field)
|
|
if (obs.text) {
|
|
documents.push({
|
|
id: `obs_${obs.id}_text`,
|
|
document: obs.text,
|
|
metadata: { ...baseMetadata, field_type: 'text' }
|
|
});
|
|
}
|
|
|
|
// Each fact as separate document
|
|
facts.forEach((fact: string, index: number) => {
|
|
documents.push({
|
|
id: `obs_${obs.id}_fact_${index}`,
|
|
document: fact,
|
|
metadata: { ...baseMetadata, field_type: 'fact', fact_index: index }
|
|
});
|
|
});
|
|
|
|
return documents;
|
|
}
|
|
|
|
/**
|
|
* Format summary into Chroma documents (granular approach)
|
|
* Each summary field becomes a separate vector document
|
|
*/
|
|
private formatSummaryDocs(summary: StoredSummary): ChromaDocument[] {
|
|
const documents: ChromaDocument[] = [];
|
|
|
|
const baseMetadata: Record<string, string | number> = {
|
|
sqlite_id: summary.id,
|
|
doc_type: 'session_summary',
|
|
sdk_session_id: summary.sdk_session_id,
|
|
project: summary.project,
|
|
created_at_epoch: summary.created_at_epoch,
|
|
prompt_number: summary.prompt_number || 0
|
|
};
|
|
|
|
// Each field becomes a separate document
|
|
if (summary.request) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_request`,
|
|
document: summary.request,
|
|
metadata: { ...baseMetadata, field_type: 'request' }
|
|
});
|
|
}
|
|
|
|
if (summary.investigated) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_investigated`,
|
|
document: summary.investigated,
|
|
metadata: { ...baseMetadata, field_type: 'investigated' }
|
|
});
|
|
}
|
|
|
|
if (summary.learned) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_learned`,
|
|
document: summary.learned,
|
|
metadata: { ...baseMetadata, field_type: 'learned' }
|
|
});
|
|
}
|
|
|
|
if (summary.completed) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_completed`,
|
|
document: summary.completed,
|
|
metadata: { ...baseMetadata, field_type: 'completed' }
|
|
});
|
|
}
|
|
|
|
if (summary.next_steps) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_next_steps`,
|
|
document: summary.next_steps,
|
|
metadata: { ...baseMetadata, field_type: 'next_steps' }
|
|
});
|
|
}
|
|
|
|
if (summary.notes) {
|
|
documents.push({
|
|
id: `summary_${summary.id}_notes`,
|
|
document: summary.notes,
|
|
metadata: { ...baseMetadata, field_type: 'notes' }
|
|
});
|
|
}
|
|
|
|
return documents;
|
|
}
|
|
|
|
/**
|
|
* Add documents to Chroma in batch
|
|
* Throws error if batch add fails
|
|
*/
|
|
private async addDocuments(documents: ChromaDocument[]): Promise<void> {
|
|
if (documents.length === 0) {
|
|
return;
|
|
}
|
|
|
|
await this.ensureCollection();
|
|
|
|
if (!this.client) {
|
|
throw new Error('Chroma client not initialized');
|
|
}
|
|
|
|
try {
|
|
await this.client.callTool({
|
|
name: 'chroma_add_documents',
|
|
arguments: {
|
|
collection_name: this.collectionName,
|
|
documents: documents.map(d => d.document),
|
|
ids: documents.map(d => d.id),
|
|
metadatas: documents.map(d => d.metadata)
|
|
}
|
|
});
|
|
|
|
logger.debug('CHROMA_SYNC', 'Documents added', {
|
|
collection: this.collectionName,
|
|
count: documents.length
|
|
});
|
|
} catch (error) {
|
|
logger.error('CHROMA_SYNC', 'Failed to add documents', {
|
|
collection: this.collectionName,
|
|
count: documents.length
|
|
}, error as Error);
|
|
throw new Error(`Document add failed: ${error instanceof Error ? error.message : String(error)}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sync a single observation to Chroma
|
|
* Blocks until sync completes, throws on error
|
|
*/
|
|
async syncObservation(
|
|
observationId: number,
|
|
sdkSessionId: string,
|
|
project: string,
|
|
obs: ParsedObservation,
|
|
promptNumber: number,
|
|
createdAtEpoch: number
|
|
): Promise<void> {
|
|
// Convert ParsedObservation to StoredObservation format
|
|
const stored: StoredObservation = {
|
|
id: observationId,
|
|
sdk_session_id: sdkSessionId,
|
|
project: project,
|
|
text: null, // Legacy field, not used
|
|
type: obs.type,
|
|
title: obs.title,
|
|
subtitle: obs.subtitle,
|
|
facts: JSON.stringify(obs.facts),
|
|
narrative: obs.narrative,
|
|
concepts: JSON.stringify(obs.concepts),
|
|
files_read: JSON.stringify(obs.files_read),
|
|
files_modified: JSON.stringify(obs.files_modified),
|
|
prompt_number: promptNumber,
|
|
created_at: new Date(createdAtEpoch * 1000).toISOString(),
|
|
created_at_epoch: createdAtEpoch
|
|
};
|
|
|
|
const documents = this.formatObservationDocs(stored);
|
|
|
|
logger.info('CHROMA_SYNC', 'Syncing observation', {
|
|
observationId,
|
|
documentCount: documents.length,
|
|
project
|
|
});
|
|
|
|
await this.addDocuments(documents);
|
|
}
|
|
|
|
/**
|
|
* Sync a single summary to Chroma
|
|
* Blocks until sync completes, throws on error
|
|
*/
|
|
async syncSummary(
|
|
summaryId: number,
|
|
sdkSessionId: string,
|
|
project: string,
|
|
summary: ParsedSummary,
|
|
promptNumber: number,
|
|
createdAtEpoch: number
|
|
): Promise<void> {
|
|
// Convert ParsedSummary to StoredSummary format
|
|
const stored: StoredSummary = {
|
|
id: summaryId,
|
|
sdk_session_id: sdkSessionId,
|
|
project: project,
|
|
request: summary.request,
|
|
investigated: summary.investigated,
|
|
learned: summary.learned,
|
|
completed: summary.completed,
|
|
next_steps: summary.next_steps,
|
|
notes: summary.notes,
|
|
prompt_number: promptNumber,
|
|
created_at: new Date(createdAtEpoch * 1000).toISOString(),
|
|
created_at_epoch: createdAtEpoch
|
|
};
|
|
|
|
const documents = this.formatSummaryDocs(stored);
|
|
|
|
logger.info('CHROMA_SYNC', 'Syncing summary', {
|
|
summaryId,
|
|
documentCount: documents.length,
|
|
project
|
|
});
|
|
|
|
await this.addDocuments(documents);
|
|
}
|
|
|
|
/**
|
|
* Format user prompt into Chroma document
|
|
* Each prompt becomes a single document (unlike observations/summaries which split by field)
|
|
*/
|
|
private formatUserPromptDoc(prompt: StoredUserPrompt): ChromaDocument {
|
|
return {
|
|
id: `prompt_${prompt.id}`,
|
|
document: prompt.prompt_text,
|
|
metadata: {
|
|
sqlite_id: prompt.id,
|
|
doc_type: 'user_prompt',
|
|
sdk_session_id: prompt.sdk_session_id,
|
|
project: prompt.project,
|
|
created_at_epoch: prompt.created_at_epoch,
|
|
prompt_number: prompt.prompt_number
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Sync a single user prompt to Chroma
|
|
* Blocks until sync completes, throws on error
|
|
*/
|
|
async syncUserPrompt(
|
|
promptId: number,
|
|
sdkSessionId: string,
|
|
project: string,
|
|
promptText: string,
|
|
promptNumber: number,
|
|
createdAtEpoch: number
|
|
): Promise<void> {
|
|
// Create StoredUserPrompt format
|
|
const stored: StoredUserPrompt = {
|
|
id: promptId,
|
|
claude_session_id: '', // Not needed for Chroma sync
|
|
prompt_number: promptNumber,
|
|
prompt_text: promptText,
|
|
created_at: new Date(createdAtEpoch * 1000).toISOString(),
|
|
created_at_epoch: createdAtEpoch,
|
|
sdk_session_id: sdkSessionId,
|
|
project: project
|
|
};
|
|
|
|
const document = this.formatUserPromptDoc(stored);
|
|
|
|
logger.info('CHROMA_SYNC', 'Syncing user prompt', {
|
|
promptId,
|
|
project
|
|
});
|
|
|
|
await this.addDocuments([document]);
|
|
}
|
|
|
|
/**
|
|
* Fetch all existing document IDs from Chroma collection
|
|
* Returns Sets of SQLite IDs for observations, summaries, and prompts
|
|
*/
|
|
private async getExistingChromaIds(): Promise<{
|
|
observations: Set<number>;
|
|
summaries: Set<number>;
|
|
prompts: Set<number>;
|
|
}> {
|
|
await this.ensureConnection();
|
|
|
|
if (!this.client) {
|
|
throw new Error('Chroma client not initialized');
|
|
}
|
|
|
|
const observationIds = new Set<number>();
|
|
const summaryIds = new Set<number>();
|
|
const promptIds = new Set<number>();
|
|
|
|
let offset = 0;
|
|
const limit = 1000; // Large batches, metadata only = fast
|
|
|
|
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: this.project });
|
|
|
|
while (true) {
|
|
try {
|
|
const result = await this.client.callTool({
|
|
name: 'chroma_get_documents',
|
|
arguments: {
|
|
collection_name: this.collectionName,
|
|
limit,
|
|
offset,
|
|
where: { project: this.project }, // Filter by project
|
|
include: ['metadatas']
|
|
}
|
|
});
|
|
|
|
const data = result.content[0];
|
|
if (data.type !== 'text') {
|
|
throw new Error('Unexpected response type from chroma_get_documents');
|
|
}
|
|
|
|
const parsed = JSON.parse(data.text);
|
|
const metadatas = parsed.metadatas || [];
|
|
|
|
if (metadatas.length === 0) {
|
|
break; // No more documents
|
|
}
|
|
|
|
// Extract SQLite IDs from metadata
|
|
for (const meta of metadatas) {
|
|
if (meta.sqlite_id) {
|
|
if (meta.doc_type === 'observation') {
|
|
observationIds.add(meta.sqlite_id);
|
|
} else if (meta.doc_type === 'session_summary') {
|
|
summaryIds.add(meta.sqlite_id);
|
|
} else if (meta.doc_type === 'user_prompt') {
|
|
promptIds.add(meta.sqlite_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
offset += limit;
|
|
|
|
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
|
|
project: this.project,
|
|
offset,
|
|
batchSize: metadatas.length
|
|
});
|
|
} catch (error) {
|
|
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: this.project }, error as Error);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
|
|
project: this.project,
|
|
observations: observationIds.size,
|
|
summaries: summaryIds.size,
|
|
prompts: promptIds.size
|
|
});
|
|
|
|
return { observations: observationIds, summaries: summaryIds, prompts: promptIds };
|
|
}
|
|
|
|
/**
|
|
* Backfill: Sync all observations missing from Chroma
|
|
* Reads from SQLite and syncs in batches
|
|
* Throws error if backfill fails
|
|
*/
|
|
async ensureBackfilled(): Promise<void> {
|
|
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: this.project });
|
|
|
|
await this.ensureCollection();
|
|
|
|
// Fetch existing IDs from Chroma (fast, metadata only)
|
|
const existing = await this.getExistingChromaIds();
|
|
|
|
const db = new SessionStore();
|
|
|
|
try {
|
|
// Build exclusion list for observations
|
|
const existingObsIds = Array.from(existing.observations);
|
|
const obsExclusionClause = existingObsIds.length > 0
|
|
? `AND id NOT IN (${existingObsIds.join(',')})`
|
|
: '';
|
|
|
|
// Get only observations missing from Chroma
|
|
const observations = db.db.prepare(`
|
|
SELECT * FROM observations
|
|
WHERE project = ? ${obsExclusionClause}
|
|
ORDER BY id ASC
|
|
`).all(this.project) as StoredObservation[];
|
|
|
|
const totalObsCount = db.db.prepare(`
|
|
SELECT COUNT(*) as count FROM observations WHERE project = ?
|
|
`).get(this.project) as { count: number };
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfilling observations', {
|
|
project: this.project,
|
|
missing: observations.length,
|
|
existing: existing.observations.size,
|
|
total: totalObsCount.count
|
|
});
|
|
|
|
// Format all observation documents
|
|
const allDocs: ChromaDocument[] = [];
|
|
for (const obs of observations) {
|
|
allDocs.push(...this.formatObservationDocs(obs));
|
|
}
|
|
|
|
// Sync in batches
|
|
for (let i = 0; i < allDocs.length; i += this.BATCH_SIZE) {
|
|
const batch = allDocs.slice(i, i + this.BATCH_SIZE);
|
|
await this.addDocuments(batch);
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfill progress', {
|
|
project: this.project,
|
|
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
|
|
});
|
|
}
|
|
|
|
// Build exclusion list for summaries
|
|
const existingSummaryIds = Array.from(existing.summaries);
|
|
const summaryExclusionClause = existingSummaryIds.length > 0
|
|
? `AND id NOT IN (${existingSummaryIds.join(',')})`
|
|
: '';
|
|
|
|
// Get only summaries missing from Chroma
|
|
const summaries = db.db.prepare(`
|
|
SELECT * FROM session_summaries
|
|
WHERE project = ? ${summaryExclusionClause}
|
|
ORDER BY id ASC
|
|
`).all(this.project) as StoredSummary[];
|
|
|
|
const totalSummaryCount = db.db.prepare(`
|
|
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
|
|
`).get(this.project) as { count: number };
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
|
|
project: this.project,
|
|
missing: summaries.length,
|
|
existing: existing.summaries.size,
|
|
total: totalSummaryCount.count
|
|
});
|
|
|
|
// Format all summary documents
|
|
const summaryDocs: ChromaDocument[] = [];
|
|
for (const summary of summaries) {
|
|
summaryDocs.push(...this.formatSummaryDocs(summary));
|
|
}
|
|
|
|
// Sync in batches
|
|
for (let i = 0; i < summaryDocs.length; i += this.BATCH_SIZE) {
|
|
const batch = summaryDocs.slice(i, i + this.BATCH_SIZE);
|
|
await this.addDocuments(batch);
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfill progress', {
|
|
project: this.project,
|
|
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
|
|
});
|
|
}
|
|
|
|
// Build exclusion list for prompts
|
|
const existingPromptIds = Array.from(existing.prompts);
|
|
const promptExclusionClause = existingPromptIds.length > 0
|
|
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
|
|
: '';
|
|
|
|
// Get only user prompts missing from Chroma
|
|
const prompts = db.db.prepare(`
|
|
SELECT
|
|
up.*,
|
|
s.project,
|
|
s.sdk_session_id
|
|
FROM user_prompts up
|
|
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
|
|
WHERE s.project = ? ${promptExclusionClause}
|
|
ORDER BY up.id ASC
|
|
`).all(this.project) as StoredUserPrompt[];
|
|
|
|
const totalPromptCount = db.db.prepare(`
|
|
SELECT COUNT(*) as count
|
|
FROM user_prompts up
|
|
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
|
|
WHERE s.project = ?
|
|
`).get(this.project) as { count: number };
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
|
|
project: this.project,
|
|
missing: prompts.length,
|
|
existing: existing.prompts.size,
|
|
total: totalPromptCount.count
|
|
});
|
|
|
|
// Format all prompt documents
|
|
const promptDocs: ChromaDocument[] = [];
|
|
for (const prompt of prompts) {
|
|
promptDocs.push(this.formatUserPromptDoc(prompt));
|
|
}
|
|
|
|
// Sync in batches
|
|
for (let i = 0; i < promptDocs.length; i += this.BATCH_SIZE) {
|
|
const batch = promptDocs.slice(i, i + this.BATCH_SIZE);
|
|
await this.addDocuments(batch);
|
|
|
|
logger.info('CHROMA_SYNC', 'Backfill progress', {
|
|
project: this.project,
|
|
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
|
|
});
|
|
}
|
|
|
|
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
|
|
project: this.project,
|
|
synced: {
|
|
observationDocs: allDocs.length,
|
|
summaryDocs: summaryDocs.length,
|
|
promptDocs: promptDocs.length
|
|
},
|
|
skipped: {
|
|
observations: existing.observations.size,
|
|
summaries: existing.summaries.size,
|
|
prompts: existing.prompts.size
|
|
}
|
|
});
|
|
|
|
} catch (error) {
|
|
logger.error('CHROMA_SYNC', 'Backfill failed', { project: this.project }, error as Error);
|
|
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
|
|
} finally {
|
|
db.close();
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close the Chroma client connection
|
|
*/
|
|
async close(): Promise<void> {
|
|
if (this.client && this.connected) {
|
|
await this.client.close();
|
|
this.connected = false;
|
|
this.client = null;
|
|
logger.info('CHROMA_SYNC', 'Chroma client closed', { project: this.project });
|
|
}
|
|
}
|
|
}
|