fix(worktree): address PR review — schema guard, startup adoption, query parity

Addresses six CodeRabbit/Greptile findings on PR #2052:

- Schema guard in adoptMergedWorktrees probes for merged_into_project
  columns before preparing statements; returns early when absent so first
  boot after upgrade (pre-migration) doesn't silently fail.

- Startup adoption now iterates distinct cwds from pending_messages and
  dedupes via resolveMainRepoPath — the worker daemon runs with
  cwd=plugin scripts dir, so process.cwd() fallback was a no-op.

- ObservationCompiler single-project queries (queryObservations /
  querySummaries) OR merged_into_project into WHERE so injected context
  surfaces adopted worktree rows, matching the Multi variants.

- SessionStore constructor now calls ensureMergedIntoProjectColumns so
  bundled artifacts (context-generator.cjs) that embed SessionStore get
  the merged_into_project column on DBs that only went through the
  bundled migration chain.

- OBSERVER_SESSIONS_PROJECT constant is now derived from
  basename(OBSERVER_SESSIONS_DIR) and used across PaginationHelper,
  SessionStore, and timeline queries instead of hardcoded strings.

- Corrected misleading Chroma retry docstring in WorktreeAdoption to
  match actual behavior (no auto-retry once SQL commits).
This commit is contained in:
Alex Newman
2026-04-16 21:31:30 -07:00
parent d1601123fd
commit 7a66cb310f
10 changed files with 408 additions and 253 deletions
+4 -3
View File
@@ -52,7 +52,7 @@ export function queryObservations(
o.created_at_epoch
FROM observations o
LEFT JOIN sdk_sessions s ON o.memory_session_id = s.memory_session_id
WHERE o.project = ?
WHERE (o.project = ? OR o.merged_into_project = ?)
AND type IN (${typePlaceholders})
AND EXISTS (
SELECT 1 FROM json_each(o.concepts)
@@ -62,6 +62,7 @@ export function queryObservations(
ORDER BY o.created_at_epoch DESC
LIMIT ?
`).all(
project,
project,
...typeArray,
...conceptArray,
@@ -93,12 +94,12 @@ export function querySummaries(
ss.created_at_epoch
FROM session_summaries ss
LEFT JOIN sdk_sessions s ON ss.memory_session_id = s.memory_session_id
WHERE ss.project = ?
WHERE (ss.project = ? OR ss.merged_into_project = ?)
${platformSource ? "AND COALESCE(s.platform_source, 'claude') = ?" : ''}
ORDER BY ss.created_at_epoch DESC
LIMIT ?
`).all(
...[project, ...(platformSource ? [platformSource] : []), config.sessionCount + SUMMARY_LOOKAHEAD]
...[project, project, ...(platformSource ? [platformSource] : []), config.sessionCount + SUMMARY_LOOKAHEAD]
) as SessionSummary[];
}
+109 -2
View File
@@ -130,8 +130,12 @@ function listMergedBranches(mainRepo: string): Set<string> {
* Idempotent: a row is only touched when its `merged_into_project IS NULL`.
*
* Chroma is patched AFTER SQL commits. Chroma failure does NOT roll back SQL —
* SQL is source of truth; a subsequent run will retry the Chroma patch because
* the filter in `updateMergedIntoProject` keys on `sqlite_id`.
* SQL is source of truth. A transient Chroma failure does NOT auto-retry:
* once SQL commits, `merged_into_project IS NULL` no longer matches those rows,
* so the same adoption pass won't rediscover them. If Chroma patching fails,
* `result.chromaFailed` reflects the count — callers should surface this to
* the operator, and re-running adoption after clearing `merged_into_project`
* (or reseeding Chroma) is the recovery path.
*/
export async function adoptMergedWorktrees(opts: {
repoPath?: string;
@@ -201,6 +205,29 @@ export async function adoptMergedWorktrees(opts: {
const { Database } = require('bun:sqlite') as typeof import('bun:sqlite');
db = new Database(dbPath);
// Schema guard: adoption may be invoked on worker startup before
// DatabaseManager runs migrations. If the `merged_into_project` column
// isn't present yet, prepared statements below will fail with
// "no such column", silently skipping adoption until the next restart.
// Return early so the next boot (post-migration) picks this up.
interface ColumnInfo { name: string }
const obsColumns = db
.prepare('PRAGMA table_info(observations)')
.all() as ColumnInfo[];
const sumColumns = db
.prepare('PRAGMA table_info(session_summaries)')
.all() as ColumnInfo[];
const obsHasColumn = obsColumns.some(c => c.name === 'merged_into_project');
const sumHasColumn = sumColumns.some(c => c.name === 'merged_into_project');
if (!obsHasColumn || !sumHasColumn) {
logger.debug(
'SYSTEM',
'Worktree adoption skipped (merged_into_project column missing; will run after migration)',
{ obsHasColumn, sumHasColumn }
);
return result;
}
const selectObs = db.prepare(
'SELECT id FROM observations WHERE project = ? AND merged_into_project IS NULL'
);
@@ -290,3 +317,83 @@ export async function adoptMergedWorktrees(opts: {
return result;
}
/**
* Run adoption once per distinct parent repo referenced by recorded cwds.
*
* Worker startup adoption cannot use `process.cwd()` as a seed — the daemon is
* spawned with cwd=marketplace-plugin-dir, which isn't a git repo. Instead, we
* derive candidate parent repos from `pending_messages.cwd` (the user's actual
* working directories), dedupe via `resolveMainRepoPath`, and run adoption
* against each. Failures on individual repos are logged but don't short-circuit
* the others.
*
* Safe to call before `dbManager.initialize()`: opens its own short-lived DB
* handle (readonly) to enumerate cwds, then delegates to `adoptMergedWorktrees`
* which opens its own writable handle.
*/
export async function adoptMergedWorktreesForAllKnownRepos(opts: {
dataDirectory?: string;
dryRun?: boolean;
} = {}): Promise<AdoptionResult[]> {
const dataDirectory = opts.dataDirectory ?? DEFAULT_DATA_DIR;
const dbPath = path.join(dataDirectory, 'claude-mem.db');
const results: AdoptionResult[] = [];
if (!existsSync(dbPath)) {
logger.debug('SYSTEM', 'Worktree adoption skipped (no DB yet)', { dbPath });
return results;
}
const uniqueParents = new Set<string>();
let db: import('bun:sqlite').Database | null = null;
try {
const { Database } = require('bun:sqlite') as typeof import('bun:sqlite');
db = new Database(dbPath, { readonly: true });
const hasPending = db.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'"
).get() as { name: string } | undefined;
if (!hasPending) {
logger.debug('SYSTEM', 'Worktree adoption skipped (pending_messages table missing)');
return results;
}
const cwdRows = db.prepare(`
SELECT cwd FROM pending_messages
WHERE cwd IS NOT NULL AND cwd != ''
GROUP BY cwd
`).all() as Array<{ cwd: string }>;
for (const { cwd } of cwdRows) {
const mainRepo = resolveMainRepoPath(cwd);
if (mainRepo) uniqueParents.add(mainRepo);
}
} finally {
db?.close();
}
if (uniqueParents.size === 0) {
logger.debug('SYSTEM', 'Worktree adoption found no known parent repos');
return results;
}
for (const repoPath of uniqueParents) {
try {
const result = await adoptMergedWorktrees({
repoPath,
dataDirectory,
dryRun: opts.dryRun
});
results.push(result);
} catch (err) {
logger.warn(
'SYSTEM',
'Worktree adoption failed for parent repo (continuing)',
{ repoPath, error: err instanceof Error ? err.message : String(err) }
);
}
}
return results;
}
+36 -5
View File
@@ -1,5 +1,5 @@
import { Database } from 'bun:sqlite';
import { DATA_DIR, DB_PATH, ensureDir } from '../../shared/paths.js';
import { DATA_DIR, DB_PATH, ensureDir, OBSERVER_SESSIONS_PROJECT } from '../../shared/paths.js';
import { logger } from '../../utils/logger.js';
import {
TableColumnInfo,
@@ -65,6 +65,7 @@ export class SessionStore {
this.addSessionCustomTitleColumn();
this.addSessionPlatformSourceColumn();
this.addObservationModelColumns();
this.ensureMergedIntoProjectColumns();
}
/**
@@ -944,6 +945,36 @@ export class SessionStore {
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(26, new Date().toISOString());
}
/**
* Ensure merged_into_project columns + indices exist on observations and session_summaries.
*
* Self-idempotent via PRAGMA table_info guard — does NOT bump schema_versions.
* Mirrors MigrationRunner.ensureMergedIntoProjectColumns so bundled artifacts
* that embed SessionStore (e.g. context-generator.cjs) stay schema-consistent
* with the standalone migration path.
*/
private ensureMergedIntoProjectColumns(): void {
const obsCols = this.db
.query('PRAGMA table_info(observations)')
.all() as TableColumnInfo[];
if (!obsCols.some(c => c.name === 'merged_into_project')) {
this.db.run('ALTER TABLE observations ADD COLUMN merged_into_project TEXT');
}
this.db.run(
'CREATE INDEX IF NOT EXISTS idx_observations_merged_into ON observations(merged_into_project)'
);
const sumCols = this.db
.query('PRAGMA table_info(session_summaries)')
.all() as TableColumnInfo[];
if (!sumCols.some(c => c.name === 'merged_into_project')) {
this.db.run('ALTER TABLE session_summaries ADD COLUMN merged_into_project TEXT');
}
this.db.run(
'CREATE INDEX IF NOT EXISTS idx_summaries_merged_into ON session_summaries(merged_into_project)'
);
}
/**
* Update the memory session ID for a session
* Called by SDKAgent when it captures the session ID from the first SDK message
@@ -1192,9 +1223,9 @@ export class SessionStore {
SELECT DISTINCT project
FROM sdk_sessions
WHERE project IS NOT NULL AND project != ''
AND project != 'observer-sessions'
AND project != ?
`;
const params: unknown[] = [];
const params: unknown[] = [OBSERVER_SESSIONS_PROJECT];
if (normalizedPlatformSource) {
query += ' AND COALESCE(platform_source, ?) = ?';
@@ -1219,10 +1250,10 @@ export class SessionStore {
MAX(started_at_epoch) as latest_epoch
FROM sdk_sessions
WHERE project IS NOT NULL AND project != ''
AND project != 'observer-sessions'
AND project != ?
GROUP BY COALESCE(platform_source, '${DEFAULT_PLATFORM_SOURCE}'), project
ORDER BY latest_epoch DESC
`).all() as Array<{ platform_source: string; project: string; latest_epoch: number }>;
`).all(OBSERVER_SESSIONS_PROJECT) as Array<{ platform_source: string; project: string; latest_epoch: number }>;
const projects: string[] = [];
const seenProjects = new Set<string>();
+3 -2
View File
@@ -8,6 +8,7 @@
import type { Database } from 'bun:sqlite';
import type { ObservationRecord, SessionSummaryRecord, UserPromptRecord } from '../../../types/database.js';
import { logger } from '../../../utils/logger.js';
import { OBSERVER_SESSIONS_PROJECT } from '../../../shared/paths.js';
/**
* Timeline result containing observations, sessions, and prompts within a time window
@@ -210,10 +211,10 @@ export function getAllProjects(db: Database): string[] {
SELECT DISTINCT project
FROM sdk_sessions
WHERE project IS NOT NULL AND project != ''
AND project != 'observer-sessions'
AND project != ?
ORDER BY project ASC
`);
const rows = stmt.all() as Array<{ project: string }>;
const rows = stmt.all(OBSERVER_SESSIONS_PROJECT) as Array<{ project: string }>;
return rows.map(row => row.project);
}
+16 -7
View File
@@ -59,7 +59,7 @@ import {
httpShutdown
} from './infrastructure/HealthMonitor.js';
import { performGracefulShutdown } from './infrastructure/GracefulShutdown.js';
import { adoptMergedWorktrees } from './infrastructure/WorktreeAdoption.js';
import { adoptMergedWorktrees, adoptMergedWorktreesForAllKnownRepos } from './infrastructure/WorktreeAdoption.js';
// Server imports
import { Server } from './server/Server.js';
@@ -368,13 +368,22 @@ export class WorkerService {
// Stamp merged worktrees so their observations surface under the parent
// project. Runs every startup (not marker-gated) because git state evolves
// and the engine is fully idempotent. Must also precede dbManager.initialize().
//
// The worker daemon is spawned with cwd=marketplace-plugin-dir (not a git
// repo), so we can't seed adoption with process.cwd(). Instead, discover
// parent repos from recorded pending_messages.cwd values.
try {
const adoption = await adoptMergedWorktrees({});
if (adoption.adoptedObservations > 0 || adoption.adoptedSummaries > 0 || adoption.chromaUpdates > 0) {
logger.info('SYSTEM', 'Merged worktrees adopted on startup', adoption);
}
if (adoption.errors.length > 0) {
logger.warn('SYSTEM', 'Worktree adoption had per-branch errors', { errors: adoption.errors });
const adoptions = await adoptMergedWorktreesForAllKnownRepos({});
for (const adoption of adoptions) {
if (adoption.adoptedObservations > 0 || adoption.adoptedSummaries > 0 || adoption.chromaUpdates > 0) {
logger.info('SYSTEM', 'Merged worktrees adopted on startup', adoption);
}
if (adoption.errors.length > 0) {
logger.warn('SYSTEM', 'Worktree adoption had per-branch errors', {
repoPath: adoption.repoPath,
errors: adoption.errors
});
}
}
} catch (err) {
logger.error('SYSTEM', 'Worktree adoption failed (non-fatal)', {}, err as Error);
+3 -1
View File
@@ -9,6 +9,7 @@
import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import { OBSERVER_SESSIONS_PROJECT } from '../../shared/paths.js';
import type { PaginatedResult, Observation, Summary, UserPrompt } from '../worker-types.js';
export class PaginationHelper {
@@ -107,7 +108,8 @@ export class PaginationHelper {
params.push(project, project);
} else {
// Hide internal observer-session rows from the unfiltered UI list.
conditions.push("o.project != 'observer-sessions'");
conditions.push('o.project != ?');
params.push(OBSERVER_SESSIONS_PROJECT);
}
if (platformSource) {
conditions.push(`COALESCE(s.platform_source, 'claude') = ?`);
+1 -1
View File
@@ -77,7 +77,7 @@ export const OBSERVER_SESSIONS_DIR = join(DATA_DIR, 'observer-sessions');
// Project name assigned to observer sessions (basename of OBSERVER_SESSIONS_DIR).
// UI queries filter this out so internal worker sessions don't pollute project lists.
export const OBSERVER_SESSIONS_PROJECT = 'observer-sessions';
export const OBSERVER_SESSIONS_PROJECT = basename(OBSERVER_SESSIONS_DIR);
// Claude integration paths
export const CLAUDE_SETTINGS_PATH = join(CLAUDE_CONFIG_DIR, 'settings.json');