7a66cb310f
Addresses six CodeRabbit/Greptile findings on PR #2052: - Schema guard in adoptMergedWorktrees probes for merged_into_project columns before preparing statements; returns early when absent so first boot after upgrade (pre-migration) doesn't silently fail. - Startup adoption now iterates distinct cwds from pending_messages and dedupes via resolveMainRepoPath — the worker daemon runs with cwd=plugin scripts dir, so process.cwd() fallback was a no-op. - ObservationCompiler single-project queries (queryObservations / querySummaries) OR merged_into_project into WHERE so injected context surfaces adopted worktree rows, matching the Multi variants. - SessionStore constructor now calls ensureMergedIntoProjectColumns so bundled artifacts (context-generator.cjs) that embed SessionStore get the merged_into_project column on DBs that only went through the bundled migration chain. - OBSERVER_SESSIONS_PROJECT constant is now derived from basename(OBSERVER_SESSIONS_DIR) and used across PaginationHelper, SessionStore, and timeline queries instead of hardcoded strings. - Corrected misleading Chroma retry docstring in WorktreeAdoption to match actual behavior (no auto-retry once SQL commits).
338 lines
10 KiB
TypeScript
338 lines
10 KiB
TypeScript
/**
|
|
* ObservationCompiler - Query building and data retrieval for context
|
|
*
|
|
* Handles database queries for observations and summaries, plus transcript extraction.
|
|
*/
|
|
|
|
import path from 'path';
|
|
import { existsSync, readFileSync } from 'fs';
|
|
import { SessionStore } from '../sqlite/SessionStore.js';
|
|
import { logger } from '../../utils/logger.js';
|
|
import { SYSTEM_REMINDER_REGEX } from '../../utils/tag-stripping.js';
|
|
import { CLAUDE_CONFIG_DIR } from '../../shared/paths.js';
|
|
import type {
|
|
ContextConfig,
|
|
Observation,
|
|
SessionSummary,
|
|
SummaryTimelineItem,
|
|
TimelineItem,
|
|
PriorMessages,
|
|
} from './types.js';
|
|
import { SUMMARY_LOOKAHEAD } from './types.js';
|
|
|
|
/**
|
|
* Query observations from database with type and concept filtering
|
|
*/
|
|
export function queryObservations(
|
|
db: SessionStore,
|
|
project: string,
|
|
config: ContextConfig,
|
|
platformSource?: string
|
|
): Observation[] {
|
|
const typeArray = Array.from(config.observationTypes);
|
|
const typePlaceholders = typeArray.map(() => '?').join(',');
|
|
const conceptArray = Array.from(config.observationConcepts);
|
|
const conceptPlaceholders = conceptArray.map(() => '?').join(',');
|
|
|
|
return db.db.prepare(`
|
|
SELECT
|
|
o.id,
|
|
o.memory_session_id,
|
|
COALESCE(s.platform_source, 'claude') as platform_source,
|
|
o.type,
|
|
o.title,
|
|
o.subtitle,
|
|
o.narrative,
|
|
o.facts,
|
|
o.concepts,
|
|
o.files_read,
|
|
o.files_modified,
|
|
o.discovery_tokens,
|
|
o.created_at,
|
|
o.created_at_epoch
|
|
FROM observations o
|
|
LEFT JOIN sdk_sessions s ON o.memory_session_id = s.memory_session_id
|
|
WHERE (o.project = ? OR o.merged_into_project = ?)
|
|
AND type IN (${typePlaceholders})
|
|
AND EXISTS (
|
|
SELECT 1 FROM json_each(o.concepts)
|
|
WHERE value IN (${conceptPlaceholders})
|
|
)
|
|
${platformSource ? "AND COALESCE(s.platform_source, 'claude') = ?" : ''}
|
|
ORDER BY o.created_at_epoch DESC
|
|
LIMIT ?
|
|
`).all(
|
|
project,
|
|
project,
|
|
...typeArray,
|
|
...conceptArray,
|
|
...(platformSource ? [platformSource] : []),
|
|
config.totalObservationCount
|
|
) as Observation[];
|
|
}
|
|
|
|
/**
|
|
* Query recent session summaries from database
|
|
*/
|
|
export function querySummaries(
|
|
db: SessionStore,
|
|
project: string,
|
|
config: ContextConfig,
|
|
platformSource?: string
|
|
): SessionSummary[] {
|
|
return db.db.prepare(`
|
|
SELECT
|
|
ss.id,
|
|
ss.memory_session_id,
|
|
COALESCE(s.platform_source, 'claude') as platform_source,
|
|
ss.request,
|
|
ss.investigated,
|
|
ss.learned,
|
|
ss.completed,
|
|
ss.next_steps,
|
|
ss.created_at,
|
|
ss.created_at_epoch
|
|
FROM session_summaries ss
|
|
LEFT JOIN sdk_sessions s ON ss.memory_session_id = s.memory_session_id
|
|
WHERE (ss.project = ? OR ss.merged_into_project = ?)
|
|
${platformSource ? "AND COALESCE(s.platform_source, 'claude') = ?" : ''}
|
|
ORDER BY ss.created_at_epoch DESC
|
|
LIMIT ?
|
|
`).all(
|
|
...[project, project, ...(platformSource ? [platformSource] : []), config.sessionCount + SUMMARY_LOOKAHEAD]
|
|
) as SessionSummary[];
|
|
}
|
|
|
|
/**
|
|
* Query observations from multiple projects (for worktree support)
|
|
*
|
|
* Returns observations from all specified projects, interleaved chronologically.
|
|
* Used when running in a worktree to show both parent repo and worktree observations.
|
|
*/
|
|
export function queryObservationsMulti(
|
|
db: SessionStore,
|
|
projects: string[],
|
|
config: ContextConfig,
|
|
platformSource?: string
|
|
): Observation[] {
|
|
const typeArray = Array.from(config.observationTypes);
|
|
const typePlaceholders = typeArray.map(() => '?').join(',');
|
|
const conceptArray = Array.from(config.observationConcepts);
|
|
const conceptPlaceholders = conceptArray.map(() => '?').join(',');
|
|
|
|
// Build IN clause for projects
|
|
const projectPlaceholders = projects.map(() => '?').join(',');
|
|
|
|
return db.db.prepare(`
|
|
SELECT
|
|
o.id,
|
|
o.memory_session_id,
|
|
COALESCE(s.platform_source, 'claude') as platform_source,
|
|
o.type,
|
|
o.title,
|
|
o.subtitle,
|
|
o.narrative,
|
|
o.facts,
|
|
o.concepts,
|
|
o.files_read,
|
|
o.files_modified,
|
|
o.discovery_tokens,
|
|
o.created_at,
|
|
o.created_at_epoch,
|
|
o.project
|
|
FROM observations o
|
|
LEFT JOIN sdk_sessions s ON o.memory_session_id = s.memory_session_id
|
|
WHERE (o.project IN (${projectPlaceholders})
|
|
OR o.merged_into_project IN (${projectPlaceholders}))
|
|
AND type IN (${typePlaceholders})
|
|
AND EXISTS (
|
|
SELECT 1 FROM json_each(o.concepts)
|
|
WHERE value IN (${conceptPlaceholders})
|
|
)
|
|
${platformSource ? "AND COALESCE(s.platform_source, 'claude') = ?" : ''}
|
|
ORDER BY o.created_at_epoch DESC
|
|
LIMIT ?
|
|
`).all(
|
|
...projects,
|
|
...projects,
|
|
...typeArray,
|
|
...conceptArray,
|
|
...(platformSource ? [platformSource] : []),
|
|
config.totalObservationCount
|
|
) as Observation[];
|
|
}
|
|
|
|
/**
|
|
* Query session summaries from multiple projects (for worktree support)
|
|
*
|
|
* Returns summaries from all specified projects, interleaved chronologically.
|
|
* Used when running in a worktree to show both parent repo and worktree summaries.
|
|
*/
|
|
export function querySummariesMulti(
|
|
db: SessionStore,
|
|
projects: string[],
|
|
config: ContextConfig,
|
|
platformSource?: string
|
|
): SessionSummary[] {
|
|
// Build IN clause for projects
|
|
const projectPlaceholders = projects.map(() => '?').join(',');
|
|
|
|
return db.db.prepare(`
|
|
SELECT
|
|
ss.id,
|
|
ss.memory_session_id,
|
|
COALESCE(s.platform_source, 'claude') as platform_source,
|
|
ss.request,
|
|
ss.investigated,
|
|
ss.learned,
|
|
ss.completed,
|
|
ss.next_steps,
|
|
ss.created_at,
|
|
ss.created_at_epoch,
|
|
ss.project
|
|
FROM session_summaries ss
|
|
LEFT JOIN sdk_sessions s ON ss.memory_session_id = s.memory_session_id
|
|
WHERE (ss.project IN (${projectPlaceholders})
|
|
OR ss.merged_into_project IN (${projectPlaceholders}))
|
|
${platformSource ? "AND COALESCE(s.platform_source, 'claude') = ?" : ''}
|
|
ORDER BY ss.created_at_epoch DESC
|
|
LIMIT ?
|
|
`).all(...projects, ...projects, ...(platformSource ? [platformSource] : []), config.sessionCount + SUMMARY_LOOKAHEAD) as SessionSummary[];
|
|
}
|
|
|
|
/**
|
|
* Convert cwd path to dashed format for transcript lookup
|
|
*/
|
|
function cwdToDashed(cwd: string): string {
|
|
return cwd.replace(/\//g, '-');
|
|
}
|
|
|
|
/**
|
|
* Extract prior messages from transcript file
|
|
*/
|
|
export function extractPriorMessages(transcriptPath: string): PriorMessages {
|
|
try {
|
|
if (!existsSync(transcriptPath)) {
|
|
return { userMessage: '', assistantMessage: '' };
|
|
}
|
|
|
|
const content = readFileSync(transcriptPath, 'utf-8').trim();
|
|
if (!content) {
|
|
return { userMessage: '', assistantMessage: '' };
|
|
}
|
|
|
|
const lines = content.split('\n').filter(line => line.trim());
|
|
let lastAssistantMessage = '';
|
|
|
|
for (let i = lines.length - 1; i >= 0; i--) {
|
|
try {
|
|
const line = lines[i];
|
|
if (!line.includes('"type":"assistant"')) {
|
|
continue;
|
|
}
|
|
|
|
const entry = JSON.parse(line);
|
|
if (entry.type === 'assistant' && entry.message?.content && Array.isArray(entry.message.content)) {
|
|
let text = '';
|
|
for (const block of entry.message.content) {
|
|
if (block.type === 'text') {
|
|
text += block.text;
|
|
}
|
|
}
|
|
text = text.replace(SYSTEM_REMINDER_REGEX, '').trim();
|
|
if (text) {
|
|
lastAssistantMessage = text;
|
|
break;
|
|
}
|
|
}
|
|
} catch (parseError) {
|
|
logger.debug('PARSER', 'Skipping malformed transcript line', { lineIndex: i }, parseError as Error);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
return { userMessage: '', assistantMessage: lastAssistantMessage };
|
|
} catch (error) {
|
|
logger.failure('WORKER', `Failed to extract prior messages from transcript`, { transcriptPath }, error as Error);
|
|
return { userMessage: '', assistantMessage: '' };
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get prior session messages if enabled
|
|
*/
|
|
export function getPriorSessionMessages(
|
|
observations: Observation[],
|
|
config: ContextConfig,
|
|
currentSessionId: string | undefined,
|
|
cwd: string
|
|
): PriorMessages {
|
|
if (!config.showLastMessage || observations.length === 0) {
|
|
return { userMessage: '', assistantMessage: '' };
|
|
}
|
|
|
|
const priorSessionObs = observations.find(obs => obs.memory_session_id !== currentSessionId);
|
|
if (!priorSessionObs) {
|
|
return { userMessage: '', assistantMessage: '' };
|
|
}
|
|
|
|
const priorSessionId = priorSessionObs.memory_session_id;
|
|
const dashedCwd = cwdToDashed(cwd);
|
|
// Use CLAUDE_CONFIG_DIR to support custom Claude config directories
|
|
const transcriptPath = path.join(CLAUDE_CONFIG_DIR, 'projects', dashedCwd, `${priorSessionId}.jsonl`);
|
|
return extractPriorMessages(transcriptPath);
|
|
}
|
|
|
|
/**
|
|
* Prepare summaries for timeline display
|
|
*/
|
|
export function prepareSummariesForTimeline(
|
|
displaySummaries: SessionSummary[],
|
|
allSummaries: SessionSummary[]
|
|
): SummaryTimelineItem[] {
|
|
const mostRecentSummaryId = allSummaries[0]?.id;
|
|
|
|
return displaySummaries.map((summary, i) => {
|
|
const olderSummary = i === 0 ? null : allSummaries[i + 1];
|
|
return {
|
|
...summary,
|
|
displayEpoch: olderSummary ? olderSummary.created_at_epoch : summary.created_at_epoch,
|
|
displayTime: olderSummary ? olderSummary.created_at : summary.created_at,
|
|
shouldShowLink: summary.id !== mostRecentSummaryId
|
|
};
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Build unified timeline from observations and summaries
|
|
*/
|
|
export function buildTimeline(
|
|
observations: Observation[],
|
|
summaries: SummaryTimelineItem[]
|
|
): TimelineItem[] {
|
|
const timeline: TimelineItem[] = [
|
|
...observations.map(obs => ({ type: 'observation' as const, data: obs })),
|
|
...summaries.map(summary => ({ type: 'summary' as const, data: summary }))
|
|
];
|
|
|
|
// Sort chronologically
|
|
timeline.sort((a, b) => {
|
|
const aEpoch = a.type === 'observation' ? a.data.created_at_epoch : a.data.displayEpoch;
|
|
const bEpoch = b.type === 'observation' ? b.data.created_at_epoch : b.data.displayEpoch;
|
|
return aEpoch - bEpoch;
|
|
});
|
|
|
|
return timeline;
|
|
}
|
|
|
|
/**
|
|
* Get set of observation IDs that should show full details
|
|
*/
|
|
export function getFullObservationIds(observations: Observation[], count: number): Set<number> {
|
|
return new Set(
|
|
observations
|
|
.slice(0, count)
|
|
.map(obs => obs.id)
|
|
);
|
|
}
|