feat: Implement user prompt syncing to Chroma and enhance timeline querying

- Added `getObservationById` method to retrieve observations by ID in SessionStore.
- Introduced `getSessionSummariesByIds` and `getUserPromptsByIds` methods for fetching session summaries and user prompts by IDs.
- Developed `getTimelineAroundTimestamp` and `getTimelineAroundObservation` methods to provide a unified timeline of observations, sessions, and prompts around a specified anchor point.
- Enhanced ChromaSync to format and sync user prompts, including a new `syncUserPrompt` method.
- Updated WorkerService to sync the latest user prompt to Chroma after updating the worker port.
- Created tests for timeline querying and MCP handler logic to ensure functionality.
- Documented the implementation plan for user prompts and timeline context tool in the Chroma search completion plan.
This commit is contained in:
Alex Newman
2025-11-03 16:55:33 -05:00
parent c6bf72ca72
commit 633f89a5fb
18 changed files with 2152 additions and 229 deletions
+231
View File
@@ -616,6 +616,19 @@ export class SessionStore {
return stmt.all(sdkSessionId) as any[];
}
/**
* Get a single observation by ID
*/
getObservationById(id: number): any | null {
const stmt = this.db.prepare(`
SELECT *
FROM observations
WHERE id = ?
`);
return stmt.get(id) as any || null;
}
/**
* Get observations by array of IDs with ordering and limit
*/
@@ -1115,6 +1128,224 @@ export class SessionStore {
return result.changes;
}
/**
* Get session summaries by IDs (for hybrid Chroma search)
* Returns summaries in specified temporal order
*/
getSessionSummariesByIds(
ids: number[],
options: { orderBy?: 'date_desc' | 'date_asc'; limit?: number } = {}
): any[] {
if (ids.length === 0) return [];
const { orderBy = 'date_desc', limit } = options;
const orderClause = orderBy === 'date_asc' ? 'ASC' : 'DESC';
const limitClause = limit ? `LIMIT ${limit}` : '';
const placeholders = ids.map(() => '?').join(',');
const stmt = this.db.prepare(`
SELECT * FROM session_summaries
WHERE id IN (${placeholders})
ORDER BY created_at_epoch ${orderClause}
${limitClause}
`);
return stmt.all(...ids) as any[];
}
/**
* Get user prompts by IDs (for hybrid Chroma search)
* Returns prompts in specified temporal order
*/
getUserPromptsByIds(
ids: number[],
options: { orderBy?: 'date_desc' | 'date_asc'; limit?: number } = {}
): any[] {
if (ids.length === 0) return [];
const { orderBy = 'date_desc', limit } = options;
const orderClause = orderBy === 'date_asc' ? 'ASC' : 'DESC';
const limitClause = limit ? `LIMIT ${limit}` : '';
const placeholders = ids.map(() => '?').join(',');
const stmt = this.db.prepare(`
SELECT
up.*,
s.project,
s.sdk_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE up.id IN (${placeholders})
ORDER BY up.created_at_epoch ${orderClause}
${limitClause}
`);
return stmt.all(...ids) as any[];
}
/**
* Get a unified timeline of all records (observations, sessions, prompts) around an anchor point
* @param anchorEpoch The anchor timestamp (epoch milliseconds)
* @param depthBefore Number of records to retrieve before anchor (any type)
* @param depthAfter Number of records to retrieve after anchor (any type)
* @param project Optional project filter
* @returns Object containing observations, sessions, and prompts for the specified window
*/
getTimelineAroundTimestamp(
anchorEpoch: number,
depthBefore: number = 10,
depthAfter: number = 10,
project?: string
): {
observations: any[];
sessions: any[];
prompts: any[];
} {
return this.getTimelineAroundObservation(null, anchorEpoch, depthBefore, depthAfter, project);
}
/**
* Get timeline around a specific observation ID
* Uses observation ID offsets to determine time boundaries, then fetches all record types in that window
*/
getTimelineAroundObservation(
anchorObservationId: number | null,
anchorEpoch: number,
depthBefore: number = 10,
depthAfter: number = 10,
project?: string
): {
observations: any[];
sessions: any[];
prompts: any[];
} {
const projectFilter = project ? 'AND project = ?' : '';
const projectParams = project ? [project] : [];
let startEpoch: number;
let endEpoch: number;
if (anchorObservationId !== null) {
// Get boundary observations by ID offset
const beforeQuery = `
SELECT id, created_at_epoch
FROM observations
WHERE id <= ? ${projectFilter}
ORDER BY id DESC
LIMIT ?
`;
const afterQuery = `
SELECT id, created_at_epoch
FROM observations
WHERE id >= ? ${projectFilter}
ORDER BY id ASC
LIMIT ?
`;
try {
const beforeRecords = this.db.prepare(beforeQuery).all(anchorObservationId, ...projectParams, depthBefore + 1) as any[];
const afterRecords = this.db.prepare(afterQuery).all(anchorObservationId, ...projectParams, depthAfter + 1) as any[];
// Get the earliest and latest timestamps from boundary observations
if (beforeRecords.length === 0 && afterRecords.length === 0) {
return { observations: [], sessions: [], prompts: [] };
}
startEpoch = beforeRecords.length > 0 ? beforeRecords[beforeRecords.length - 1].created_at_epoch : anchorEpoch;
endEpoch = afterRecords.length > 0 ? afterRecords[afterRecords.length - 1].created_at_epoch : anchorEpoch;
} catch (err: any) {
console.error('[SessionStore] Error getting boundary observations:', err.message);
return { observations: [], sessions: [], prompts: [] };
}
} else {
// For timestamp-based anchors, use time-based boundaries
// Get observations to find the time window
const beforeQuery = `
SELECT created_at_epoch
FROM observations
WHERE created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch DESC
LIMIT ?
`;
const afterQuery = `
SELECT created_at_epoch
FROM observations
WHERE created_at_epoch >= ? ${projectFilter}
ORDER BY created_at_epoch ASC
LIMIT ?
`;
try {
const beforeRecords = this.db.prepare(beforeQuery).all(anchorEpoch, ...projectParams, depthBefore) as any[];
const afterRecords = this.db.prepare(afterQuery).all(anchorEpoch, ...projectParams, depthAfter + 1) as any[];
if (beforeRecords.length === 0 && afterRecords.length === 0) {
return { observations: [], sessions: [], prompts: [] };
}
startEpoch = beforeRecords.length > 0 ? beforeRecords[beforeRecords.length - 1].created_at_epoch : anchorEpoch;
endEpoch = afterRecords.length > 0 ? afterRecords[afterRecords.length - 1].created_at_epoch : anchorEpoch;
} catch (err: any) {
console.error('[SessionStore] Error getting boundary timestamps:', err.message);
return { observations: [], sessions: [], prompts: [] };
}
}
// Now query ALL record types within the time window
const obsQuery = `
SELECT *
FROM observations
WHERE created_at_epoch >= ? AND created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch ASC
`;
const sessQuery = `
SELECT *
FROM session_summaries
WHERE created_at_epoch >= ? AND created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch ASC
`;
const promptQuery = `
SELECT up.*, s.project, s.sdk_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE up.created_at_epoch >= ? AND up.created_at_epoch <= ? ${projectFilter.replace('project', 's.project')}
ORDER BY up.created_at_epoch ASC
`;
try {
const observations = this.db.prepare(obsQuery).all(startEpoch, endEpoch, ...projectParams) as any[];
const sessions = this.db.prepare(sessQuery).all(startEpoch, endEpoch, ...projectParams) as any[];
const prompts = this.db.prepare(promptQuery).all(startEpoch, endEpoch, ...projectParams) as any[];
return {
observations,
sessions: sessions.map(s => ({
id: s.id,
sdk_session_id: s.sdk_session_id,
project: s.project,
request: s.request,
completed: s.completed,
next_steps: s.next_steps,
created_at: s.created_at,
created_at_epoch: s.created_at_epoch
})),
prompts: prompts.map(p => ({
id: p.id,
claude_session_id: p.claude_session_id,
project: p.project,
prompt: p.prompt_text,
created_at: p.created_at,
created_at_epoch: p.created_at_epoch
}))
};
} catch (err: any) {
console.error('[SessionStore] Error querying timeline records:', err.message);
return { observations: [], sessions: [], prompts: [] };
}
}
/**
* Close the database connection
*/
+247 -10
View File
@@ -55,6 +55,17 @@ interface StoredSummary {
created_at_epoch: number;
}
interface StoredUserPrompt {
id: number;
claude_session_id: string;
prompt_number: number;
prompt_text: string;
created_at: string;
created_at_epoch: number;
sdk_session_id: string;
project: string;
}
export class ChromaSync {
private client: Client | null = null;
private connected: boolean = false;
@@ -404,27 +415,182 @@ export class ChromaSync {
await this.addDocuments(documents);
}
/**
* Format user prompt into Chroma document
* Each prompt becomes a single document (unlike observations/summaries which split by field)
*/
private formatUserPromptDoc(prompt: StoredUserPrompt): ChromaDocument {
return {
id: `prompt_${prompt.id}`,
document: prompt.prompt_text,
metadata: {
sqlite_id: prompt.id,
doc_type: 'user_prompt',
sdk_session_id: prompt.sdk_session_id,
project: prompt.project,
created_at_epoch: prompt.created_at_epoch,
prompt_number: prompt.prompt_number
}
};
}
/**
* Sync a single user prompt to Chroma
* Blocks until sync completes, throws on error
*/
async syncUserPrompt(
promptId: number,
sdkSessionId: string,
project: string,
promptText: string,
promptNumber: number,
createdAtEpoch: number
): Promise<void> {
// Create StoredUserPrompt format
const stored: StoredUserPrompt = {
id: promptId,
claude_session_id: '', // Not needed for Chroma sync
prompt_number: promptNumber,
prompt_text: promptText,
created_at: new Date(createdAtEpoch * 1000).toISOString(),
created_at_epoch: createdAtEpoch,
sdk_session_id: sdkSessionId,
project: project
};
const document = this.formatUserPromptDoc(stored);
logger.info('CHROMA_SYNC', 'Syncing user prompt', {
promptId,
project
});
await this.addDocuments([document]);
}
/**
* Fetch all existing document IDs from Chroma collection
* Returns Sets of SQLite IDs for observations, summaries, and prompts
*/
private async getExistingChromaIds(): Promise<{
observations: Set<number>;
summaries: Set<number>;
prompts: Set<number>;
}> {
await this.ensureConnection();
if (!this.client) {
throw new Error('Chroma client not initialized');
}
const observationIds = new Set<number>();
const summaryIds = new Set<number>();
const promptIds = new Set<number>();
let offset = 0;
const limit = 1000; // Large batches, metadata only = fast
logger.info('CHROMA_SYNC', 'Fetching existing Chroma document IDs...', { project: this.project });
while (true) {
try {
const result = await this.client.callTool({
name: 'chroma_get_documents',
arguments: {
collection_name: this.collectionName,
limit,
offset,
where: { project: this.project }, // Filter by project
include: ['metadatas']
}
});
const data = result.content[0];
if (data.type !== 'text') {
throw new Error('Unexpected response type from chroma_get_documents');
}
const parsed = JSON.parse(data.text);
const metadatas = parsed.metadatas || [];
if (metadatas.length === 0) {
break; // No more documents
}
// Extract SQLite IDs from metadata
for (const meta of metadatas) {
if (meta.sqlite_id) {
if (meta.doc_type === 'observation') {
observationIds.add(meta.sqlite_id);
} else if (meta.doc_type === 'summary') {
summaryIds.add(meta.sqlite_id);
} else if (meta.doc_type === 'prompt') {
promptIds.add(meta.sqlite_id);
}
}
}
offset += limit;
logger.debug('CHROMA_SYNC', 'Fetched batch of existing IDs', {
project: this.project,
offset,
batchSize: metadatas.length
});
} catch (error) {
logger.error('CHROMA_SYNC', 'Failed to fetch existing IDs', { project: this.project }, error as Error);
throw error;
}
}
logger.info('CHROMA_SYNC', 'Existing IDs fetched', {
project: this.project,
observations: observationIds.size,
summaries: summaryIds.size,
prompts: promptIds.size
});
return { observations: observationIds, summaries: summaryIds, prompts: promptIds };
}
/**
* Backfill: Sync all observations missing from Chroma
* Reads from SQLite and syncs in batches
* Throws error if backfill fails
*/
async ensureBackfilled(): Promise<void> {
logger.info('CHROMA_SYNC', 'Starting backfill', { project: this.project });
logger.info('CHROMA_SYNC', 'Starting smart backfill', { project: this.project });
await this.ensureCollection();
// Fetch existing IDs from Chroma (fast, metadata only)
const existing = await this.getExistingChromaIds();
const db = new SessionStore();
try {
// Get all observations for this project
// Build exclusion list for observations
const existingObsIds = Array.from(existing.observations);
const obsExclusionClause = existingObsIds.length > 0
? `AND id NOT IN (${existingObsIds.join(',')})`
: '';
// Get only observations missing from Chroma
const observations = db.db.prepare(`
SELECT * FROM observations WHERE project = ? ORDER BY id ASC
SELECT * FROM observations
WHERE project = ? ${obsExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredObservation[];
const totalObsCount = db.db.prepare(`
SELECT COUNT(*) as count FROM observations WHERE project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling observations', {
project: this.project,
count: observations.length
missing: observations.length,
existing: existing.observations.size,
total: totalObsCount.count
});
// Format all observation documents
@@ -444,14 +610,28 @@ export class ChromaSync {
});
}
// Get all summaries for this project
// Build exclusion list for summaries
const existingSummaryIds = Array.from(existing.summaries);
const summaryExclusionClause = existingSummaryIds.length > 0
? `AND id NOT IN (${existingSummaryIds.join(',')})`
: '';
// Get only summaries missing from Chroma
const summaries = db.db.prepare(`
SELECT * FROM session_summaries WHERE project = ? ORDER BY id ASC
SELECT * FROM session_summaries
WHERE project = ? ${summaryExclusionClause}
ORDER BY id ASC
`).all(this.project) as StoredSummary[];
const totalSummaryCount = db.db.prepare(`
SELECT COUNT(*) as count FROM session_summaries WHERE project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling summaries', {
project: this.project,
count: summaries.length
missing: summaries.length,
existing: existing.summaries.size,
total: totalSummaryCount.count
});
// Format all summary documents
@@ -471,10 +651,67 @@ export class ChromaSync {
});
}
logger.info('CHROMA_SYNC', 'Backfill complete', {
// Build exclusion list for prompts
const existingPromptIds = Array.from(existing.prompts);
const promptExclusionClause = existingPromptIds.length > 0
? `AND up.id NOT IN (${existingPromptIds.join(',')})`
: '';
// Get only user prompts missing from Chroma
const prompts = db.db.prepare(`
SELECT
up.*,
s.project,
s.sdk_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE s.project = ? ${promptExclusionClause}
ORDER BY up.id ASC
`).all(this.project) as StoredUserPrompt[];
const totalPromptCount = db.db.prepare(`
SELECT COUNT(*) as count
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE s.project = ?
`).get(this.project) as { count: number };
logger.info('CHROMA_SYNC', 'Backfilling user prompts', {
project: this.project,
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length
missing: prompts.length,
existing: existing.prompts.size,
total: totalPromptCount.count
});
// Format all prompt documents
const promptDocs: ChromaDocument[] = [];
for (const prompt of prompts) {
promptDocs.push(this.formatUserPromptDoc(prompt));
}
// Sync in batches
for (let i = 0; i < promptDocs.length; i += this.BATCH_SIZE) {
const batch = promptDocs.slice(i, i + this.BATCH_SIZE);
await this.addDocuments(batch);
logger.info('CHROMA_SYNC', 'Backfill progress', {
project: this.project,
progress: `${Math.min(i + this.BATCH_SIZE, promptDocs.length)}/${promptDocs.length}`
});
}
logger.info('CHROMA_SYNC', 'Smart backfill complete', {
project: this.project,
synced: {
observationDocs: allDocs.length,
summaryDocs: summaryDocs.length,
promptDocs: promptDocs.length
},
skipped: {
observations: existing.observations.size,
summaries: existing.summaries.size,
prompts: existing.prompts.size
}
});
} catch (error) {
+29
View File
@@ -195,8 +195,37 @@ class WorkerService {
// Update port in database
db.setWorkerPort(sessionDbId, this.port!);
// Get the latest user_prompt for this session to sync to Chroma
const latestPrompt = db.db.prepare(`
SELECT
up.*,
s.sdk_session_id,
s.project
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE up.claude_session_id = ?
ORDER BY up.created_at_epoch DESC
LIMIT 1
`).get(claudeSessionId) as any;
db.close();
// Sync user prompt to Chroma (fire-and-forget, but crash on failure)
if (latestPrompt) {
this.chromaSync.syncUserPrompt(
latestPrompt.id,
latestPrompt.sdk_session_id,
latestPrompt.project,
latestPrompt.prompt_text,
latestPrompt.prompt_number,
latestPrompt.created_at_epoch
).catch(err => {
logger.failure('WORKER', 'Failed to sync user_prompt to Chroma', { promptId: latestPrompt.id }, err);
process.exit(1); // Fail fast - Chroma sync is critical
});
}
// Start SDK agent in background
session.generatorPromise = this.runSDKAgent(session).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);