perf(chroma): cache backfill watermarks in JSON to skip per-restart Chroma scans
Worker restarts triggered a full Chroma metadata scan for every project on every boot to figure out which sqlite ids were already embedded. With 253 projects and ~92k embeddings, this pegged chroma-mcp at 100-422% CPU on every spawn. Replace the scan with ~/.claude-mem/chroma-sync-state.json — per-project highest synced sqlite_id watermarks for observations/summaries/prompts. Backfill switches from "id NOT IN (huge list)" to "id > watermark"; live syncs bump the watermark on success; one-time bootstrap derives initial watermarks from a single Chroma scan if the state file is missing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+159
-159
File diff suppressed because one or more lines are too long
+157
-63
@@ -13,6 +13,7 @@
|
||||
*/
|
||||
|
||||
import { ChromaMcpManager } from './ChromaMcpManager.js';
|
||||
import { ChromaSyncState, ProjectWatermarks } from './ChromaSyncState.js';
|
||||
import { ParsedObservation, ParsedSummary } from '../../sdk/parser.js';
|
||||
import { SessionStore } from '../sqlite/SessionStore.js';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
@@ -375,6 +376,7 @@ export class ChromaSync {
|
||||
});
|
||||
|
||||
await this.addDocuments(documents);
|
||||
ChromaSyncState.bump(project, 'observations', observationId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -417,6 +419,7 @@ export class ChromaSync {
|
||||
});
|
||||
|
||||
await this.addDocuments(documents);
|
||||
ChromaSyncState.bump(project, 'summaries', summaryId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -470,6 +473,7 @@ export class ChromaSync {
|
||||
});
|
||||
|
||||
await this.addDocuments([document]);
|
||||
ChromaSyncState.bump(project, 'prompts', promptId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -538,12 +542,36 @@ export class ChromaSync {
|
||||
project: targetProject,
|
||||
observations: observationIds.size,
|
||||
summaries: summaryIds.size,
|
||||
prompts: promptIds.size
|
||||
prompts: promptIds.size,
|
||||
total: observationIds.size + summaryIds.size + promptIds.size
|
||||
});
|
||||
|
||||
return { observations: observationIds, summaries: summaryIds, prompts: promptIds };
|
||||
}
|
||||
|
||||
/**
|
||||
* One-time bootstrap: scan Chroma for a project, derive the highest sqlite_id
|
||||
* per doc_type, and persist as watermarks. After this runs once at install,
|
||||
* the watermark file owns the truth and Chroma is never scanned again.
|
||||
*/
|
||||
async bootstrapWatermarksFromChroma(project: string): Promise<void> {
|
||||
const existing = await this.getExistingChromaIds(project);
|
||||
const max = (set: Set<number>): number => {
|
||||
let m = 0;
|
||||
for (const id of set) if (id > m) m = id;
|
||||
return m;
|
||||
};
|
||||
ChromaSyncState.replace(project, {
|
||||
observations: max(existing.observations),
|
||||
summaries: max(existing.summaries),
|
||||
prompts: max(existing.prompts)
|
||||
});
|
||||
logger.info('CHROMA_SYNC', 'Bootstrapped watermarks from Chroma', {
|
||||
project,
|
||||
watermarks: ChromaSyncState.get(project)
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Backfill: Sync all observations missing from Chroma
|
||||
* Reads from SQLite and syncs in batches
|
||||
@@ -558,13 +586,12 @@ export class ChromaSync {
|
||||
|
||||
await this.ensureCollectionExists();
|
||||
|
||||
// Fetch existing IDs from Chroma (fast, metadata only)
|
||||
const existing = await this.getExistingChromaIds(backfillProject);
|
||||
const watermarks = ChromaSyncState.get(backfillProject);
|
||||
|
||||
const db = storeOverride ?? new SessionStore();
|
||||
|
||||
try {
|
||||
await this.runBackfillPipeline(db, backfillProject, existing);
|
||||
await this.runBackfillPipeline(db, backfillProject, watermarks);
|
||||
} catch (error) {
|
||||
logger.error('CHROMA_SYNC', 'Backfill failed', { project: backfillProject }, error instanceof Error ? error : new Error(String(error)));
|
||||
throw new Error(`Backfill failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
@@ -579,11 +606,11 @@ export class ChromaSync {
|
||||
private async runBackfillPipeline(
|
||||
db: SessionStore,
|
||||
backfillProject: string,
|
||||
existing: { observations: Set<number>; summaries: Set<number>; prompts: Set<number> }
|
||||
watermarks: ProjectWatermarks
|
||||
): Promise<void> {
|
||||
const allDocs = await this.backfillObservations(db, backfillProject, existing.observations);
|
||||
const summaryDocs = await this.backfillSummaries(db, backfillProject, existing.summaries);
|
||||
const promptDocs = await this.backfillPrompts(db, backfillProject, existing.prompts);
|
||||
const allDocs = await this.backfillObservations(db, backfillProject, watermarks.observations);
|
||||
const summaryDocs = await this.backfillSummaries(db, backfillProject, watermarks.summaries);
|
||||
const promptDocs = await this.backfillPrompts(db, backfillProject, watermarks.prompts);
|
||||
|
||||
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
|
||||
project: backfillProject,
|
||||
@@ -592,11 +619,7 @@ export class ChromaSync {
|
||||
summaryDocs: summaryDocs.length,
|
||||
promptDocs: promptDocs.length
|
||||
},
|
||||
skipped: {
|
||||
observations: existing.observations.size,
|
||||
summaries: existing.summaries.size,
|
||||
prompts: existing.prompts.size
|
||||
}
|
||||
watermarks: ChromaSyncState.get(backfillProject)
|
||||
});
|
||||
}
|
||||
|
||||
@@ -607,18 +630,17 @@ export class ChromaSync {
|
||||
private async backfillObservations(
|
||||
db: SessionStore,
|
||||
backfillProject: string,
|
||||
existingObservationIds: Set<number>
|
||||
watermark: number
|
||||
): Promise<ChromaDocument[]> {
|
||||
const existingObsIds = Array.from(existingObservationIds).filter(id => Number.isInteger(id) && id > 0);
|
||||
const obsExclusionClause = existingObsIds.length > 0
|
||||
? `AND id NOT IN (${existingObsIds.join(',')})`
|
||||
: '';
|
||||
|
||||
const observations = db.db.prepare(`
|
||||
SELECT * FROM observations
|
||||
WHERE project = ? ${obsExclusionClause}
|
||||
WHERE project = ? AND id > ?
|
||||
ORDER BY id ASC
|
||||
`).all(backfillProject) as StoredObservation[];
|
||||
`).all(backfillProject, watermark) as StoredObservation[];
|
||||
|
||||
if (observations.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const totalObsCount = db.db.prepare(`
|
||||
SELECT COUNT(*) as count FROM observations WHERE project = ?
|
||||
@@ -627,23 +649,50 @@ export class ChromaSync {
|
||||
logger.info('CHROMA_SYNC', 'Backfilling observations', {
|
||||
project: backfillProject,
|
||||
missing: observations.length,
|
||||
existing: existingObservationIds.size,
|
||||
watermark,
|
||||
total: totalObsCount.count
|
||||
});
|
||||
|
||||
const allDocs: ChromaDocument[] = [];
|
||||
const obsByDocCount: Array<{ obs: StoredObservation; docs: ChromaDocument[] }> = [];
|
||||
for (const obs of observations) {
|
||||
allDocs.push(...this.formatObservationDocs(obs));
|
||||
const docs = this.formatObservationDocs(obs);
|
||||
allDocs.push(...docs);
|
||||
obsByDocCount.push({ obs, docs });
|
||||
}
|
||||
|
||||
for (let i = 0; i < allDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = allDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
// Track how many docs we've successfully written so we can bump the
|
||||
// watermark to the highest fully-synced observation id, even if a later
|
||||
// batch fails.
|
||||
let writtenDocs = 0;
|
||||
let lastSyncedObsIdx = -1;
|
||||
try {
|
||||
for (let i = 0; i < allDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = allDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
writtenDocs += batch.length;
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
|
||||
});
|
||||
// Find which observation the last fully-written doc belongs to.
|
||||
let cursor = 0;
|
||||
for (let j = 0; j < obsByDocCount.length; j++) {
|
||||
cursor += obsByDocCount[j].docs.length;
|
||||
if (cursor <= writtenDocs) {
|
||||
lastSyncedObsIdx = j;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${Math.min(i + this.BATCH_SIZE, allDocs.length)}/${allDocs.length}`
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
if (lastSyncedObsIdx >= 0) {
|
||||
const highestId = obsByDocCount[lastSyncedObsIdx].obs.id;
|
||||
ChromaSyncState.bump(backfillProject, 'observations', highestId);
|
||||
}
|
||||
}
|
||||
|
||||
return allDocs;
|
||||
@@ -656,18 +705,17 @@ export class ChromaSync {
|
||||
private async backfillSummaries(
|
||||
db: SessionStore,
|
||||
backfillProject: string,
|
||||
existingSummaryIdSet: Set<number>
|
||||
watermark: number
|
||||
): Promise<ChromaDocument[]> {
|
||||
const existingSummaryIds = Array.from(existingSummaryIdSet).filter(id => Number.isInteger(id) && id > 0);
|
||||
const summaryExclusionClause = existingSummaryIds.length > 0
|
||||
? `AND id NOT IN (${existingSummaryIds.join(',')})`
|
||||
: '';
|
||||
|
||||
const summaries = db.db.prepare(`
|
||||
SELECT * FROM session_summaries
|
||||
WHERE project = ? ${summaryExclusionClause}
|
||||
WHERE project = ? AND id > ?
|
||||
ORDER BY id ASC
|
||||
`).all(backfillProject) as StoredSummary[];
|
||||
`).all(backfillProject, watermark) as StoredSummary[];
|
||||
|
||||
if (summaries.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const totalSummaryCount = db.db.prepare(`
|
||||
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
|
||||
@@ -676,23 +724,42 @@ export class ChromaSync {
|
||||
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
|
||||
project: backfillProject,
|
||||
missing: summaries.length,
|
||||
existing: existingSummaryIdSet.size,
|
||||
watermark,
|
||||
total: totalSummaryCount.count
|
||||
});
|
||||
|
||||
const summaryDocs: ChromaDocument[] = [];
|
||||
const summaryByDocCount: Array<{ summary: StoredSummary; docs: ChromaDocument[] }> = [];
|
||||
for (const summary of summaries) {
|
||||
summaryDocs.push(...this.formatSummaryDocs(summary));
|
||||
const docs = this.formatSummaryDocs(summary);
|
||||
summaryDocs.push(...docs);
|
||||
summaryByDocCount.push({ summary, docs });
|
||||
}
|
||||
|
||||
for (let i = 0; i < summaryDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = summaryDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
let writtenDocs = 0;
|
||||
let lastSyncedIdx = -1;
|
||||
try {
|
||||
for (let i = 0; i < summaryDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = summaryDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
writtenDocs += batch.length;
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
|
||||
});
|
||||
let cursor = 0;
|
||||
for (let j = 0; j < summaryByDocCount.length; j++) {
|
||||
cursor += summaryByDocCount[j].docs.length;
|
||||
if (cursor <= writtenDocs) lastSyncedIdx = j;
|
||||
else break;
|
||||
}
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${Math.min(i + this.BATCH_SIZE, summaryDocs.length)}/${summaryDocs.length}`
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
if (lastSyncedIdx >= 0) {
|
||||
ChromaSyncState.bump(backfillProject, 'summaries', summaryByDocCount[lastSyncedIdx].summary.id);
|
||||
}
|
||||
}
|
||||
|
||||
return summaryDocs;
|
||||
@@ -705,13 +772,8 @@ export class ChromaSync {
|
||||
private async backfillPrompts(
|
||||
db: SessionStore,
|
||||
backfillProject: string,
|
||||
existingPromptIdSet: Set<number>
|
||||
watermark: number
|
||||
): Promise<ChromaDocument[]> {
|
||||
const existingPromptIds = Array.from(existingPromptIdSet).filter(id => Number.isInteger(id) && id > 0);
|
||||
const promptExclusionClause = existingPromptIds.length > 0
|
||||
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
|
||||
: '';
|
||||
|
||||
const prompts = db.db.prepare(`
|
||||
SELECT
|
||||
up.*,
|
||||
@@ -719,9 +781,13 @@ export class ChromaSync {
|
||||
s.memory_session_id
|
||||
FROM user_prompts up
|
||||
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
|
||||
WHERE s.project = ? ${promptExclusionClause}
|
||||
WHERE s.project = ? AND up.id > ?
|
||||
ORDER BY up.id ASC
|
||||
`).all(backfillProject) as StoredUserPrompt[];
|
||||
`).all(backfillProject, watermark) as StoredUserPrompt[];
|
||||
|
||||
if (prompts.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const totalPromptCount = db.db.prepare(`
|
||||
SELECT COUNT(*) as count
|
||||
@@ -733,7 +799,7 @@ export class ChromaSync {
|
||||
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
|
||||
project: backfillProject,
|
||||
missing: prompts.length,
|
||||
existing: existingPromptIdSet.size,
|
||||
watermark,
|
||||
total: totalPromptCount.count
|
||||
});
|
||||
|
||||
@@ -742,14 +808,25 @@ export class ChromaSync {
|
||||
promptDocs.push(this.formatUserPromptDoc(prompt));
|
||||
}
|
||||
|
||||
for (let i = 0; i < promptDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = promptDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
// Prompts are 1 doc each, so the highest fully-synced prompt id moves
|
||||
// forward in lockstep with each batch.
|
||||
let lastSyncedPromptId = 0;
|
||||
try {
|
||||
for (let i = 0; i < promptDocs.length; i += this.BATCH_SIZE) {
|
||||
const batch = promptDocs.slice(i, i + this.BATCH_SIZE);
|
||||
await this.addDocuments(batch);
|
||||
const upTo = Math.min(i + this.BATCH_SIZE, prompts.length);
|
||||
lastSyncedPromptId = prompts[upTo - 1].id;
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
|
||||
});
|
||||
logger.debug('CHROMA_SYNC', 'Backfill progress', {
|
||||
project: backfillProject,
|
||||
progress: `${upTo}/${promptDocs.length}`
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
if (lastSyncedPromptId > 0) {
|
||||
ChromaSyncState.bump(backfillProject, 'prompts', lastSyncedPromptId);
|
||||
}
|
||||
}
|
||||
|
||||
return promptDocs;
|
||||
@@ -875,6 +952,23 @@ export class ChromaSync {
|
||||
|
||||
logger.info('CHROMA_SYNC', `Backfill check for ${projects.length} projects`);
|
||||
|
||||
// Cold-start bootstrap: if no watermark file exists, derive watermarks
|
||||
// from one Chroma scan per project. This is the slow operation we are
|
||||
// permanently replacing — after this runs once, subsequent worker starts
|
||||
// skip the scan entirely.
|
||||
if (!ChromaSyncState.exists()) {
|
||||
logger.info('CHROMA_SYNC', 'Watermark cache missing — bootstrapping from Chroma (one-time)');
|
||||
for (const { project } of projects) {
|
||||
try {
|
||||
await sync.bootstrapWatermarksFromChroma(project);
|
||||
} catch (error) {
|
||||
logger.error('CHROMA_SYNC', `Bootstrap failed for project: ${project}`,
|
||||
{}, error instanceof Error ? error : new Error(String(error)));
|
||||
}
|
||||
}
|
||||
logger.info('CHROMA_SYNC', 'Bootstrap complete — incremental backfills will use watermarks');
|
||||
}
|
||||
|
||||
for (const { project } of projects) {
|
||||
try {
|
||||
await sync.ensureBackfilled(project, db);
|
||||
|
||||
@@ -0,0 +1,114 @@
|
||||
/**
|
||||
* ChromaSyncState — per-project watermark cache for Chroma backfill.
|
||||
*
|
||||
* Replaces full Chroma metadata scans on every worker start with a tiny JSON file
|
||||
* tracking the highest sqlite_id synced to Chroma for each (project, doc_type).
|
||||
*
|
||||
* File: $CLAUDE_MEM_DATA_DIR/chroma-sync-state.json
|
||||
* Schema: { [project]: { observations: number, summaries: number, prompts: number } }
|
||||
*
|
||||
* Reads/writes are synchronous — the file is small and only touched at startup
|
||||
* and after batched adds. An in-memory cache mirrors the file; writes are
|
||||
* atomic via .tmp + rename.
|
||||
*/
|
||||
import { readFileSync, writeFileSync, renameSync, mkdirSync, existsSync } from 'fs';
|
||||
import { join } from 'path';
|
||||
import { SettingsDefaultsManager } from '../../shared/SettingsDefaultsManager.js';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
|
||||
export type DocKind = 'observations' | 'summaries' | 'prompts';
|
||||
|
||||
export interface ProjectWatermarks {
|
||||
observations: number;
|
||||
summaries: number;
|
||||
prompts: number;
|
||||
}
|
||||
|
||||
const ZERO: ProjectWatermarks = { observations: 0, summaries: 0, prompts: 0 };
|
||||
|
||||
function statePath(): string {
|
||||
const dataDir = SettingsDefaultsManager.get('CLAUDE_MEM_DATA_DIR');
|
||||
return join(dataDir, 'chroma-sync-state.json');
|
||||
}
|
||||
|
||||
let cache: Record<string, ProjectWatermarks> | null = null;
|
||||
let dirty = false;
|
||||
|
||||
function load(): Record<string, ProjectWatermarks> {
|
||||
if (cache) return cache;
|
||||
const path = statePath();
|
||||
if (!existsSync(path)) {
|
||||
cache = {};
|
||||
return cache;
|
||||
}
|
||||
const raw = readFileSync(path, 'utf8');
|
||||
const parsed = JSON.parse(raw) as Record<string, Partial<ProjectWatermarks>>;
|
||||
const normalized: Record<string, ProjectWatermarks> = {};
|
||||
for (const [project, marks] of Object.entries(parsed)) {
|
||||
normalized[project] = {
|
||||
observations: Number.isInteger(marks.observations) ? marks.observations as number : 0,
|
||||
summaries: Number.isInteger(marks.summaries) ? marks.summaries as number : 0,
|
||||
prompts: Number.isInteger(marks.prompts) ? marks.prompts as number : 0
|
||||
};
|
||||
}
|
||||
cache = normalized;
|
||||
return cache;
|
||||
}
|
||||
|
||||
function persist(): void {
|
||||
if (!cache) return;
|
||||
const path = statePath();
|
||||
const dataDir = SettingsDefaultsManager.get('CLAUDE_MEM_DATA_DIR');
|
||||
if (!existsSync(dataDir)) mkdirSync(dataDir, { recursive: true });
|
||||
const tmp = `${path}.tmp`;
|
||||
writeFileSync(tmp, JSON.stringify(cache, null, 2), 'utf8');
|
||||
renameSync(tmp, path);
|
||||
dirty = false;
|
||||
}
|
||||
|
||||
export const ChromaSyncState = {
|
||||
/** Whether the state file exists on disk. Used by callers to detect cold-start. */
|
||||
exists(): boolean {
|
||||
return existsSync(statePath());
|
||||
},
|
||||
|
||||
/** Read current watermarks for a project. Returns zeros if unknown. */
|
||||
get(project: string): ProjectWatermarks {
|
||||
const all = load();
|
||||
return { ...(all[project] ?? ZERO) };
|
||||
},
|
||||
|
||||
/** Bump a single watermark to max(current, id). No-op if id is not greater. */
|
||||
bump(project: string, kind: DocKind, id: number): void {
|
||||
if (!Number.isInteger(id) || id <= 0) return;
|
||||
const all = load();
|
||||
const current = all[project] ?? { ...ZERO };
|
||||
if (id <= current[kind]) return;
|
||||
current[kind] = id;
|
||||
all[project] = current;
|
||||
dirty = true;
|
||||
persist();
|
||||
},
|
||||
|
||||
/**
|
||||
* Replace watermarks for a project wholesale. Used by the bootstrap path
|
||||
* after a one-time Chroma scan derives the initial highest IDs.
|
||||
*/
|
||||
replace(project: string, marks: ProjectWatermarks): void {
|
||||
const all = load();
|
||||
all[project] = { ...marks };
|
||||
dirty = true;
|
||||
persist();
|
||||
},
|
||||
|
||||
/** Persist any pending writes. Defensive — bump/replace flush already. */
|
||||
flush(): void {
|
||||
if (dirty) persist();
|
||||
},
|
||||
|
||||
/** Test/diagnostic helper: drop in-memory cache so the next read re-reads disk. */
|
||||
resetCache(): void {
|
||||
cache = null;
|
||||
dirty = false;
|
||||
}
|
||||
};
|
||||
Reference in New Issue
Block a user