refactor: decompose monolithic services into modular architecture (#534)

* docs: add monolith refactor report with system breakdown

Comprehensive analysis of codebase identifying:
- 14 files over 500 lines requiring refactoring
- 3 critical monoliths (SessionStore, SearchManager, worker-service)
- 80% code duplication across agent files
- 5-phase refactoring roadmap with domain-based architecture

* fix: prevent memory_session_id from equaling content_session_id

The bug: memory_session_id was initialized to contentSessionId as a
"placeholder for FK purposes". This caused the SDK resume logic to
inject memory agent messages into the USER's Claude Code transcript,
corrupting their conversation history.

Root cause:
- SessionStore.createSDKSession initialized memory_session_id = contentSessionId
- SDKAgent checked memorySessionId !== contentSessionId but this check
  only worked if the session was fetched fresh from DB

The fix:
- SessionStore: Initialize memory_session_id as NULL, not contentSessionId
- SDKAgent: Simple truthy check !!session.memorySessionId (NULL = fresh start)
- Database migration: Ran UPDATE to set memory_session_id = NULL for 1807
  existing sessions that had the bug

Also adds [ALIGNMENT] logging across the session lifecycle to help debug
session continuity issues:
- Hook entry: contentSessionId + promptNumber
- DB lookup: contentSessionId → memorySessionId mapping proof
- Resume decision: shows which memorySessionId will be used for resume
- Capture: logs when memorySessionId is captured from first SDK response

UI: Added "Alignment" quick filter button in LogsModal to show only
alignment logs for debugging session continuity.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor: improve error handling in worker-service.ts

- Fix GENERIC_CATCH anti-patterns by logging full error objects instead of just messages
- Add [ANTI-PATTERN IGNORED] markers for legitimate cases (cleanup, hot paths)
- Simplify error handling comments to be more concise
- Improve httpShutdown() error discrimination for ECONNREFUSED
- Reduce LARGE_TRY_BLOCK issues in initialization code

Part of anti-pattern cleanup plan (132 total issues)

* refactor: improve error logging in SearchManager.ts

- Pass full error objects to logger instead of just error.message
- Fixes PARTIAL_ERROR_LOGGING anti-patterns (10 instances)
- Better debugging visibility when Chroma queries fail

Part of anti-pattern cleanup (133 remaining)

* refactor: improve error logging across SessionStore and mcp-server

- SessionStore.ts: Fix error logging in column rename utility
- mcp-server.ts: Log full error objects instead of just error.message
- Improve error handling in Worker API calls and tool execution

Part of anti-pattern cleanup (133 remaining)

* Refactor hooks to streamline error handling and loading states

- Simplified error handling in useContextPreview by removing try-catch and directly checking response status.
- Refactored usePagination to eliminate try-catch, improving readability and maintaining error handling through response checks.
- Cleaned up useSSE by removing unnecessary try-catch around JSON parsing, ensuring clarity in message handling.
- Enhanced useSettings by streamlining the saving process, removing try-catch, and directly checking the result for success.

* refactor: add error handling back to SearchManager Chroma calls

- Wrap queryChroma calls in try-catch to prevent generator crashes
- Log Chroma errors as warnings and fall back gracefully
- Fixes generator failures when Chroma has issues
- Part of anti-pattern cleanup recovery

* feat: Add generator failure investigation report and observation duplication regression report

- Created a comprehensive investigation report detailing the root cause of generator failures during anti-pattern cleanup, including the impact, investigation process, and implemented fixes.
- Documented the critical regression causing observation duplication due to race conditions in the SDK agent, outlining symptoms, root cause analysis, and proposed fixes.

* fix: address PR #528 review comments - atomic cleanup and detector improvements

This commit addresses critical review feedback from PR #528:

## 1. Atomic Message Cleanup (Fix Race Condition)

**Problem**: SessionRoutes.ts generator error handler had race condition
- Queried messages then marked failed in loop
- If crash during loop → partial marking → inconsistent state

**Solution**:
- Added `markSessionMessagesFailed()` to PendingMessageStore.ts
- Single atomic UPDATE statement replaces loop
- Follows existing pattern from `resetProcessingToPending()`

**Files**:
- src/services/sqlite/PendingMessageStore.ts (new method)
- src/services/worker/http/routes/SessionRoutes.ts (use new method)

## 2. Anti-Pattern Detector Improvements

**Problem**: Detector didn't recognize logger.failure() method
- Lines 212 & 335 already included "failure"
- Lines 112-113 (PARTIAL_ERROR_LOGGING detection) did not

**Solution**: Updated regex patterns to include "failure" for consistency

**Files**:
- scripts/anti-pattern-test/detect-error-handling-antipatterns.ts

## 3. Documentation

**PR Comment**: Added clarification on memory_session_id fix location
- Points to SessionStore.ts:1155
- Explains why NULL initialization prevents message injection bug

## Review Response

Addresses "Must Address Before Merge" items from review:
 Clarified memory_session_id bug fix location (via PR comment)
 Made generator error handler message cleanup atomic
 Deferred comprehensive test suite to follow-up PR (keeps PR focused)

## Testing

- Build passes with no errors
- Anti-pattern detector runs successfully
- Atomic cleanup follows proven pattern from existing methods

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* fix: FOREIGN KEY constraint and missing failed_at_epoch column

Two critical bugs fixed:

1. Missing failed_at_epoch column in pending_messages table
   - Added migration 20 to create the column
   - Fixes error when trying to mark messages as failed

2. FOREIGN KEY constraint failed when storing observations
   - All three agents (SDK, Gemini, OpenRouter) were passing
     session.contentSessionId instead of session.memorySessionId
   - storeObservationsAndMarkComplete expects memorySessionId
   - Added null check and clear error message

However, observations still not saving - see investigation report.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* Refactor hook input parsing to improve error handling

- Added a nested try-catch block in new-hook.ts, save-hook.ts, and summary-hook.ts to handle JSON parsing errors more gracefully.
- Replaced direct error throwing with logging of the error details using logger.error.
- Ensured that the process exits cleanly after handling input in all three hooks.

* docs: update monolith report post session-logging merge

- SessionStore grew to 2,011 lines (49 methods) - highest priority
- SearchManager reduced to 1,778 lines (improved)
- Agent files reduced by ~45 lines combined
- Added trend indicators and post-merge observations
- Core refactoring proposal remains valid

* refactor(sqlite): decompose SessionStore into modular architecture

Extract the 2011-line SessionStore.ts monolith into focused, single-responsibility
modules following grep-optimized progressive disclosure pattern:

New module structure:
- sessions/ - Session creation and retrieval (create.ts, get.ts, types.ts)
- observations/ - Observation storage and queries (store.ts, get.ts, recent.ts, files.ts, types.ts)
- summaries/ - Summary storage and queries (store.ts, get.ts, recent.ts, types.ts)
- prompts/ - User prompt management (store.ts, get.ts, types.ts)
- timeline/ - Cross-entity timeline queries (queries.ts)
- import/ - Bulk import operations (bulk.ts)
- migrations/ - Database migrations (runner.ts)

New coordinator files:
- Database.ts - ClaudeMemDatabase class with re-exports
- transactions.ts - Atomic cross-entity transactions
- Named re-export facades (Sessions.ts, Observations.ts, etc.)

Key design decisions:
- All functions take `db: Database` as first parameter (functional style)
- Named re-exports instead of index.ts for grep-friendliness
- SessionStore retained as backward-compatible wrapper
- Target file size: 50-150 lines (60% compliance)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(agents): extract shared logic into modular architecture

Consolidate duplicate code across SDKAgent, GeminiAgent, and OpenRouterAgent
into focused utility modules. Total reduction: 500 lines (29%).

New modules in src/services/worker/agents/:
- ResponseProcessor.ts: Atomic DB transactions, Chroma sync, SSE broadcast
- ObservationBroadcaster.ts: SSE event formatting and dispatch
- SessionCleanupHelper.ts: Session state cleanup and stuck message reset
- FallbackErrorHandler.ts: Provider error detection for fallback logic
- types.ts: Shared interfaces (WorkerRef, SSE payloads, StorageResult)

Bug fix: SDKAgent was incorrectly using obs.files instead of obs.files_read
and hardcoding files_modified to empty array.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(search): extract search strategies into modular architecture

Decompose SearchManager into focused strategy pattern with:
- SearchOrchestrator: Coordinates strategy selection and fallback
- ChromaSearchStrategy: Vector semantic search via ChromaDB
- SQLiteSearchStrategy: Filter-only queries for date/project/type
- HybridSearchStrategy: Metadata filtering + semantic ranking
- ResultFormatter: Markdown table formatting for results
- TimelineBuilder: Chronological timeline construction
- Filter modules: DateFilter, ProjectFilter, TypeFilter

SearchManager now delegates to new infrastructure while maintaining
full backward compatibility with existing public API.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(context): decompose context-generator into modular architecture

Extract 660-line monolith into focused components:
- ContextBuilder: Main orchestrator (~160 lines)
- ContextConfigLoader: Configuration loading
- TokenCalculator: Token budget calculations
- ObservationCompiler: Data retrieval and query building
- MarkdownFormatter/ColorFormatter: Output formatting
- Section renderers: Header, Timeline, Summary, Footer

Maintains full backward compatibility - context-generator.ts now
delegates to new ContextBuilder while preserving public API.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(worker): decompose worker-service into modular infrastructure

Split 2000+ line monolith into focused modules:

Infrastructure:
- ProcessManager: PID files, signal handlers, child process cleanup
- HealthMonitor: Port checks, health polling, version matching
- GracefulShutdown: Coordinated cleanup on exit

Server:
- Server: Express app setup, core routes, route registration
- Middleware: Re-exports from existing middleware
- ErrorHandler: Centralized error handling with AppError class

Integrations:
- CursorHooksInstaller: Full Cursor IDE integration (registry, hooks, MCP)

WorkerService now acts as thin coordinator wiring all components together.
Maintains full backward compatibility with existing public API.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* Refactor session queue processing and database interactions

- Implement claim-and-delete pattern in SessionQueueProcessor to simplify message handling and eliminate duplicate processing.
- Update PendingMessageStore to support atomic claim-and-delete operations, removing the need for intermediate processing states.
- Introduce storeObservations method in SessionStore for simplified observation and summary storage without message tracking.
- Remove deprecated methods and clean up session state management in worker agents.
- Adjust response processing to accommodate new storage patterns, ensuring atomic transactions for observations and summaries.
- Remove unnecessary reset logic for stuck messages due to the new queue handling approach.

* Add duplicate observation cleanup script

Script to clean up duplicate observations created by the batching bug
where observations were stored once per message ID instead of once per
observation. Includes safety checks to always keep at least one copy.

Usage:
  bun scripts/cleanup-duplicates.ts           # Dry run
  bun scripts/cleanup-duplicates.ts --execute # Delete duplicates
  bun scripts/cleanup-duplicates.ts --aggressive # Ignore time window

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-01-03 21:22:27 -05:00
committed by GitHub
parent 7748e62387
commit 2fc4153bef
87 changed files with 9933 additions and 3492 deletions
+60 -1
View File
@@ -1,6 +1,7 @@
import { Database } from 'bun:sqlite';
import { DATA_DIR, DB_PATH, ensureDir } from '../../shared/paths.js';
import { logger } from '../../utils/logger.js';
import { MigrationRunner } from './migrations/runner.js';
// SQLite configuration constants
const SQLITE_MMAP_SIZE_BYTES = 256 * 1024 * 1024; // 256MB
@@ -14,8 +15,53 @@ export interface Migration {
let dbInstance: Database | null = null;
/**
* ClaudeMemDatabase - New entry point for the sqlite module
*
* Replaces SessionStore as the database coordinator.
* Sets up bun:sqlite with optimized settings and runs all migrations.
*
* Usage:
* const db = new ClaudeMemDatabase(); // uses default DB_PATH
* const db = new ClaudeMemDatabase('/path/to/db.sqlite');
* const db = new ClaudeMemDatabase(':memory:'); // for tests
*/
export class ClaudeMemDatabase {
public db: Database;
constructor(dbPath: string = DB_PATH) {
// Ensure data directory exists (skip for in-memory databases)
if (dbPath !== ':memory:') {
ensureDir(DATA_DIR);
}
// Create database connection
this.db = new Database(dbPath, { create: true, readwrite: true });
// Apply optimized SQLite settings
this.db.run('PRAGMA journal_mode = WAL');
this.db.run('PRAGMA synchronous = NORMAL');
this.db.run('PRAGMA foreign_keys = ON');
this.db.run('PRAGMA temp_store = memory');
this.db.run(`PRAGMA mmap_size = ${SQLITE_MMAP_SIZE_BYTES}`);
this.db.run(`PRAGMA cache_size = ${SQLITE_CACHE_SIZE_PAGES}`);
// Run all migrations
const migrationRunner = new MigrationRunner(this.db);
migrationRunner.runAllMigrations();
}
/**
* Close the database connection
*/
close(): void {
this.db.close();
}
}
/**
* SQLite Database singleton with migration support and optimized settings
* @deprecated Use ClaudeMemDatabase instead for new code
*/
export class DatabaseManager {
private static instance: DatabaseManager;
@@ -173,4 +219,17 @@ export async function initializeDatabase(): Promise<Database> {
return await manager.initialize();
}
export { Database };
// Re-export bun:sqlite Database type
export { Database };
// Re-export MigrationRunner for external use
export { MigrationRunner } from './migrations/runner.js';
// Re-export all module functions for convenient imports
export * from './Sessions.js';
export * from './Observations.js';
export * from './Summaries.js';
export * from './Prompts.js';
export * from './Timeline.js';
export * from './Import.js';
export * from './transactions.js';
+5
View File
@@ -0,0 +1,5 @@
/**
* Import functions for bulk data import with duplicate checking
*/
export * from './import/bulk.js';
+10
View File
@@ -0,0 +1,10 @@
/**
* Observations module - named re-exports
* Provides all observation-related database operations
*/
export * from './observations/types.js';
export * from './observations/store.js';
export * from './observations/get.js';
export * from './observations/recent.js';
export * from './observations/files.js';
+16 -83
View File
@@ -26,17 +26,14 @@ export interface PersistentPendingMessage {
/**
* PendingMessageStore - Persistent work queue for SDK messages
*
* Messages are persisted before processing and marked complete after success.
* This enables recovery from SDK hangs and worker crashes.
* Messages are persisted before processing using a claim-and-delete pattern.
* This simplifies the lifecycle and eliminates duplicate processing bugs.
*
* Lifecycle:
* 1. enqueue() - Message persisted with status 'pending'
* 2. markProcessing() - Status changes to 'processing' when yielded to SDK
* 3. markProcessed() - Status changes to 'processed' after successful SDK response
* 4. markFailed() - Status changes to 'failed' if max retries exceeded
* 2. claimAndDelete() - Atomically claims and deletes message (process in memory)
*
* Recovery:
* - resetStuckMessages() - Moves 'processing' messages back to 'pending' if stuck
* - getSessionsWithPendingMessages() - Find sessions that need recovery on startup
*/
export class PendingMessageStore {
@@ -80,14 +77,13 @@ export class PendingMessageStore {
}
/**
* Atomically claim the next pending message for processing
* Finds oldest pending -> marks processing -> returns it
* Uses a transaction to prevent race conditions
* Atomically claim and DELETE the next pending message.
* Finds oldest pending -> returns it -> deletes from queue.
* The queue is a pure buffer: claim it, delete it, process in memory.
* Uses a transaction to prevent race conditions.
*/
claimNextMessage(sessionDbId: number): PersistentPendingMessage | null {
const now = Date.now();
const claimTx = this.db.transaction((sessionId: number, timestamp: number) => {
claimAndDelete(sessionDbId: number): PersistentPendingMessage | null {
const claimTx = this.db.transaction((sessionId: number) => {
const peekStmt = this.db.prepare(`
SELECT * FROM pending_messages
WHERE session_db_id = ? AND status = 'pending'
@@ -95,26 +91,16 @@ export class PendingMessageStore {
LIMIT 1
`);
const msg = peekStmt.get(sessionId) as PersistentPendingMessage | null;
if (msg) {
const updateStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'processing', started_processing_at_epoch = ?
WHERE id = ?
`);
updateStmt.run(timestamp, msg.id);
// Return updated object
return {
...msg,
status: 'processing',
started_processing_at_epoch: timestamp
} as PersistentPendingMessage;
// Delete immediately - no "processing" state needed
const deleteStmt = this.db.prepare('DELETE FROM pending_messages WHERE id = ?');
deleteStmt.run(msg.id);
}
return null;
return msg;
});
return claimTx(sessionDbId, now) as PersistentPendingMessage | null;
return claimTx(sessionDbId) as PersistentPendingMessage | null;
}
/**
@@ -254,38 +240,7 @@ export class PendingMessageStore {
}
/**
* Mark message as being processed (status: pending -> processing)
*/
markProcessing(messageId: number): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'processing', started_processing_at_epoch = ?
WHERE id = ? AND status = 'pending'
`);
stmt.run(now, messageId);
}
/**
* Mark message as successfully processed (status: processing -> processed)
* Clears tool_input and tool_response to save space (observations are already saved)
*/
markProcessed(messageId: number): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE pending_messages
SET
status = 'processed',
completed_at_epoch = ?,
tool_input = NULL,
tool_response = NULL
WHERE id = ? AND status = 'processing'
`);
stmt.run(now, messageId);
}
/**
* Mark message as failed (status: processing -> failed or back to pending for retry)
* Mark message as failed (status: pending -> failed or back to pending for retry)
* If retry_count < maxRetries, moves back to 'pending' for retry
* Otherwise marks as 'failed' permanently
*/
@@ -381,28 +336,6 @@ export class PendingMessageStore {
return result ? { sessionDbId: result.session_db_id, contentSessionId: result.content_session_id } : null;
}
/**
* Cleanup old processed messages (retention policy)
* Keeps the most recent N processed messages, deletes the rest
* @param retentionCount Number of processed messages to keep (default: 100)
* @returns Number of messages deleted
*/
cleanupProcessed(retentionCount: number = 100): number {
const stmt = this.db.prepare(`
DELETE FROM pending_messages
WHERE status = 'processed'
AND id NOT IN (
SELECT id FROM pending_messages
WHERE status = 'processed'
ORDER BY completed_at_epoch DESC
LIMIT ?
)
`);
const result = stmt.run(retentionCount);
return result.changes;
}
/**
* Clear all failed messages from the queue
* @returns Number of messages deleted
+10
View File
@@ -0,0 +1,10 @@
/**
* User prompts module - named re-exports
*
* Provides all user prompt database operations as standalone functions.
* Each function takes `db: Database` as first parameter.
*/
export * from './prompts/types.js';
export * from './prompts/store.js';
export * from './prompts/get.js';
+113
View File
@@ -1324,6 +1324,119 @@ export class SessionStore {
}
/**
* ATOMIC: Store observations + summary (no message tracking)
*
* Simplified version for use with claim-and-delete queue pattern.
* Messages are deleted from queue immediately on claim, so there's no
* message completion to track. This just stores observations and summary.
*
* @param memorySessionId - SDK memory session ID
* @param project - Project name
* @param observations - Array of observations to store (can be empty)
* @param summary - Optional summary to store
* @param promptNumber - Optional prompt number
* @param discoveryTokens - Discovery tokens count
* @param overrideTimestampEpoch - Optional override timestamp
* @returns Object with observation IDs, optional summary ID, and timestamp
*/
storeObservations(
memorySessionId: string,
project: string,
observations: Array<{
type: string;
title: string | null;
subtitle: string | null;
facts: string[];
narrative: string | null;
concepts: string[];
files_read: string[];
files_modified: string[];
}>,
summary: {
request: string;
investigated: string;
learned: string;
completed: string;
next_steps: string;
notes: string | null;
} | null,
promptNumber?: number,
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): { observationIds: number[]; summaryId: number | null; createdAtEpoch: number } {
// Use override timestamp if provided
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
// Create transaction that wraps all operations
const storeTx = this.db.transaction(() => {
const observationIds: number[] = [];
// 1. Store all observations
const obsStmt = this.db.prepare(`
INSERT INTO observations
(memory_session_id, project, type, title, subtitle, facts, narrative, concepts,
files_read, files_modified, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const observation of observations) {
const result = obsStmt.run(
memorySessionId,
project,
observation.type,
observation.title,
observation.subtitle,
JSON.stringify(observation.facts),
observation.narrative,
JSON.stringify(observation.concepts),
JSON.stringify(observation.files_read),
JSON.stringify(observation.files_modified),
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
observationIds.push(Number(result.lastInsertRowid));
}
// 2. Store summary if provided
let summaryId: number | null = null;
if (summary) {
const summaryStmt = this.db.prepare(`
INSERT INTO session_summaries
(memory_session_id, project, request, investigated, learned, completed,
next_steps, notes, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = summaryStmt.run(
memorySessionId,
project,
summary.request,
summary.investigated,
summary.learned,
summary.completed,
summary.next_steps,
summary.notes,
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
summaryId = Number(result.lastInsertRowid);
}
return { observationIds, summaryId, createdAtEpoch: timestampEpoch };
});
// Execute the transaction and return results
return storeTx();
}
/**
* @deprecated Use storeObservations instead. This method is kept for backwards compatibility.
*
* ATOMIC: Store observations + summary + mark pending message as processed
*
* This method wraps observation storage, summary storage, and message completion
+11
View File
@@ -0,0 +1,11 @@
/**
* Sessions module - re-exports all session-related functions
*
* Usage:
* import { createSDKSession, getSessionById } from './Sessions.js';
* const sessionId = createSDKSession(db, contentId, project, prompt);
*/
export * from './sessions/types.js';
export * from './sessions/create.js';
export * from './sessions/get.js';
+7
View File
@@ -0,0 +1,7 @@
/**
* Summaries module - Named re-exports for summary-related database operations
*/
export * from './summaries/types.js';
export * from './summaries/store.js';
export * from './summaries/get.js';
export * from './summaries/recent.js';
+8
View File
@@ -0,0 +1,8 @@
/**
* Timeline module re-exports
* Provides time-based context queries for observations, sessions, and prompts
*
* grep-friendly: Timeline, getTimelineAroundTimestamp, getTimelineAroundObservation, getAllProjects
*/
export * from './timeline/queries.js';
+236
View File
@@ -0,0 +1,236 @@
/**
* Bulk import functions for importing data with duplicate checking
*/
import { Database } from 'bun:sqlite';
export interface ImportResult {
imported: boolean;
id: number;
}
/**
* Import SDK session with duplicate checking
* Duplicates are identified by content_session_id
*/
export function importSdkSession(
db: Database,
session: {
content_session_id: string;
memory_session_id: string;
project: string;
user_prompt: string;
started_at: string;
started_at_epoch: number;
completed_at: string | null;
completed_at_epoch: number | null;
status: string;
}
): ImportResult {
// Check if session already exists
const existing = db
.prepare('SELECT id FROM sdk_sessions WHERE content_session_id = ?')
.get(session.content_session_id) as { id: number } | undefined;
if (existing) {
return { imported: false, id: existing.id };
}
const stmt = db.prepare(`
INSERT INTO sdk_sessions (
content_session_id, memory_session_id, project, user_prompt,
started_at, started_at_epoch, completed_at, completed_at_epoch, status
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = stmt.run(
session.content_session_id,
session.memory_session_id,
session.project,
session.user_prompt,
session.started_at,
session.started_at_epoch,
session.completed_at,
session.completed_at_epoch,
session.status
);
return { imported: true, id: result.lastInsertRowid as number };
}
/**
* Import session summary with duplicate checking
* Duplicates are identified by memory_session_id
*/
export function importSessionSummary(
db: Database,
summary: {
memory_session_id: string;
project: string;
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
files_read: string | null;
files_edited: string | null;
notes: string | null;
prompt_number: number | null;
discovery_tokens: number;
created_at: string;
created_at_epoch: number;
}
): ImportResult {
// Check if summary already exists for this session
const existing = db
.prepare('SELECT id FROM session_summaries WHERE memory_session_id = ?')
.get(summary.memory_session_id) as { id: number } | undefined;
if (existing) {
return { imported: false, id: existing.id };
}
const stmt = db.prepare(`
INSERT INTO session_summaries (
memory_session_id, project, request, investigated, learned,
completed, next_steps, files_read, files_edited, notes,
prompt_number, discovery_tokens, created_at, created_at_epoch
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = stmt.run(
summary.memory_session_id,
summary.project,
summary.request,
summary.investigated,
summary.learned,
summary.completed,
summary.next_steps,
summary.files_read,
summary.files_edited,
summary.notes,
summary.prompt_number,
summary.discovery_tokens || 0,
summary.created_at,
summary.created_at_epoch
);
return { imported: true, id: result.lastInsertRowid as number };
}
/**
* Import observation with duplicate checking
* Duplicates are identified by memory_session_id + title + created_at_epoch
*/
export function importObservation(
db: Database,
obs: {
memory_session_id: string;
project: string;
text: string | null;
type: string;
title: string | null;
subtitle: string | null;
facts: string | null;
narrative: string | null;
concepts: string | null;
files_read: string | null;
files_modified: string | null;
prompt_number: number | null;
discovery_tokens: number;
created_at: string;
created_at_epoch: number;
}
): ImportResult {
// Check if observation already exists
const existing = db
.prepare(
`
SELECT id FROM observations
WHERE memory_session_id = ? AND title = ? AND created_at_epoch = ?
`
)
.get(obs.memory_session_id, obs.title, obs.created_at_epoch) as
| { id: number }
| undefined;
if (existing) {
return { imported: false, id: existing.id };
}
const stmt = db.prepare(`
INSERT INTO observations (
memory_session_id, project, text, type, title, subtitle,
facts, narrative, concepts, files_read, files_modified,
prompt_number, discovery_tokens, created_at, created_at_epoch
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = stmt.run(
obs.memory_session_id,
obs.project,
obs.text,
obs.type,
obs.title,
obs.subtitle,
obs.facts,
obs.narrative,
obs.concepts,
obs.files_read,
obs.files_modified,
obs.prompt_number,
obs.discovery_tokens || 0,
obs.created_at,
obs.created_at_epoch
);
return { imported: true, id: result.lastInsertRowid as number };
}
/**
* Import user prompt with duplicate checking
* Duplicates are identified by content_session_id + prompt_number
*/
export function importUserPrompt(
db: Database,
prompt: {
content_session_id: string;
prompt_number: number;
prompt_text: string;
created_at: string;
created_at_epoch: number;
}
): ImportResult {
// Check if prompt already exists
const existing = db
.prepare(
`
SELECT id FROM user_prompts
WHERE content_session_id = ? AND prompt_number = ?
`
)
.get(prompt.content_session_id, prompt.prompt_number) as
| { id: number }
| undefined;
if (existing) {
return { imported: false, id: existing.id };
}
const stmt = db.prepare(`
INSERT INTO user_prompts (
content_session_id, prompt_number, prompt_text,
created_at, created_at_epoch
) VALUES (?, ?, ?, ?, ?)
`);
const result = stmt.run(
prompt.content_session_id,
prompt.prompt_number,
prompt.prompt_text,
prompt.created_at,
prompt.created_at_epoch
);
return { imported: true, id: result.lastInsertRowid as number };
}
+19 -1
View File
@@ -1,7 +1,14 @@
// Export main components
export { DatabaseManager, getDatabase, initializeDatabase } from './Database.js';
export {
ClaudeMemDatabase,
DatabaseManager,
getDatabase,
initializeDatabase,
MigrationRunner
} from './Database.js';
// Export session store (CRUD operations for sessions, observations, summaries)
// @deprecated Use modular functions from Database.ts instead
export { SessionStore } from './SessionStore.js';
// Export session search (FTS5 and structured search)
@@ -12,3 +19,14 @@ export * from './types.js';
// Export migrations
export { migrations } from './migrations.js';
// Export transactions
export { storeObservations, storeObservationsAndMarkComplete } from './transactions.js';
// Re-export all modular functions for convenient access
export * from './Sessions.js';
export * from './Observations.js';
export * from './Summaries.js';
export * from './Prompts.js';
export * from './Timeline.js';
export * from './Import.js';
+3
View File
@@ -1,6 +1,9 @@
import { Database } from 'bun:sqlite';
import { Migration } from './Database.js';
// Re-export MigrationRunner for SessionStore migration extraction
export { MigrationRunner } from './migrations/runner.js';
/**
* Initial schema migration - creates all core tables
*/
+631
View File
@@ -0,0 +1,631 @@
import { Database } from 'bun:sqlite';
import { logger } from '../../../utils/logger.js';
import {
TableColumnInfo,
IndexInfo,
TableNameRow,
SchemaVersion
} from '../../../types/database.js';
/**
* MigrationRunner handles all database schema migrations
* Extracted from SessionStore to separate concerns
*/
export class MigrationRunner {
constructor(private db: Database) {}
/**
* Run all migrations in order
* This is the only public method - all migrations are internal
*/
runAllMigrations(): void {
this.initializeSchema();
this.ensureWorkerPortColumn();
this.ensurePromptTrackingColumns();
this.removeSessionSummariesUniqueConstraint();
this.addObservationHierarchicalFields();
this.makeObservationsTextNullable();
this.createUserPromptsTable();
this.ensureDiscoveryTokensColumn();
this.createPendingMessagesTable();
this.renameSessionIdColumns();
this.repairSessionIdColumnRename();
this.addFailedAtEpochColumn();
}
/**
* Initialize database schema using migrations (migration004)
* This runs the core SDK tables migration if no tables exist
*/
private initializeSchema(): void {
// Create schema_versions table if it doesn't exist
this.db.run(`
CREATE TABLE IF NOT EXISTS schema_versions (
id INTEGER PRIMARY KEY,
version INTEGER UNIQUE NOT NULL,
applied_at TEXT NOT NULL
)
`);
// Get applied migrations
const appliedVersions = this.db.prepare('SELECT version FROM schema_versions ORDER BY version').all() as SchemaVersion[];
const maxApplied = appliedVersions.length > 0 ? Math.max(...appliedVersions.map(v => v.version)) : 0;
// Only run migration004 if no migrations have been applied
// This creates the sdk_sessions, observations, and session_summaries tables
if (maxApplied === 0) {
logger.info('DB', 'Initializing fresh database with migration004');
// Migration004: SDK agent architecture tables
this.db.run(`
CREATE TABLE IF NOT EXISTS sdk_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_session_id TEXT UNIQUE NOT NULL,
memory_session_id TEXT UNIQUE,
project TEXT NOT NULL,
user_prompt TEXT,
started_at TEXT NOT NULL,
started_at_epoch INTEGER NOT NULL,
completed_at TEXT,
completed_at_epoch INTEGER,
status TEXT CHECK(status IN ('active', 'completed', 'failed')) NOT NULL DEFAULT 'active'
);
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_claude_id ON sdk_sessions(content_session_id);
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_sdk_id ON sdk_sessions(memory_session_id);
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_project ON sdk_sessions(project);
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_status ON sdk_sessions(status);
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_started ON sdk_sessions(started_at_epoch DESC);
CREATE TABLE IF NOT EXISTS observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT NOT NULL,
project TEXT NOT NULL,
text TEXT NOT NULL,
type TEXT NOT NULL CHECK(type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery')),
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_observations_sdk_session ON observations(memory_session_id);
CREATE INDEX IF NOT EXISTS idx_observations_project ON observations(project);
CREATE INDEX IF NOT EXISTS idx_observations_type ON observations(type);
CREATE INDEX IF NOT EXISTS idx_observations_created ON observations(created_at_epoch DESC);
CREATE TABLE IF NOT EXISTS session_summaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT UNIQUE NOT NULL,
project TEXT NOT NULL,
request TEXT,
investigated TEXT,
learned TEXT,
completed TEXT,
next_steps TEXT,
files_read TEXT,
files_edited TEXT,
notes TEXT,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_session_summaries_sdk_session ON session_summaries(memory_session_id);
CREATE INDEX IF NOT EXISTS idx_session_summaries_project ON session_summaries(project);
CREATE INDEX IF NOT EXISTS idx_session_summaries_created ON session_summaries(created_at_epoch DESC);
`);
// Record migration004 as applied
this.db.prepare('INSERT INTO schema_versions (version, applied_at) VALUES (?, ?)').run(4, new Date().toISOString());
logger.info('DB', 'Migration004 applied successfully');
}
}
/**
* Ensure worker_port column exists (migration 5)
*/
private ensureWorkerPortColumn(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(5) as SchemaVersion | undefined;
if (applied) return;
// Check if column exists
const tableInfo = this.db.query('PRAGMA table_info(sdk_sessions)').all() as TableColumnInfo[];
const hasWorkerPort = tableInfo.some(col => col.name === 'worker_port');
if (!hasWorkerPort) {
this.db.run('ALTER TABLE sdk_sessions ADD COLUMN worker_port INTEGER');
logger.info('DB', 'Added worker_port column to sdk_sessions table');
}
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(5, new Date().toISOString());
}
/**
* Ensure prompt tracking columns exist (migration 6)
*/
private ensurePromptTrackingColumns(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(6) as SchemaVersion | undefined;
if (applied) return;
// Check sdk_sessions for prompt_counter
const sessionsInfo = this.db.query('PRAGMA table_info(sdk_sessions)').all() as TableColumnInfo[];
const hasPromptCounter = sessionsInfo.some(col => col.name === 'prompt_counter');
if (!hasPromptCounter) {
this.db.run('ALTER TABLE sdk_sessions ADD COLUMN prompt_counter INTEGER DEFAULT 0');
logger.info('DB', 'Added prompt_counter column to sdk_sessions table');
}
// Check observations for prompt_number
const observationsInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const obsHasPromptNumber = observationsInfo.some(col => col.name === 'prompt_number');
if (!obsHasPromptNumber) {
this.db.run('ALTER TABLE observations ADD COLUMN prompt_number INTEGER');
logger.info('DB', 'Added prompt_number column to observations table');
}
// Check session_summaries for prompt_number
const summariesInfo = this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[];
const sumHasPromptNumber = summariesInfo.some(col => col.name === 'prompt_number');
if (!sumHasPromptNumber) {
this.db.run('ALTER TABLE session_summaries ADD COLUMN prompt_number INTEGER');
logger.info('DB', 'Added prompt_number column to session_summaries table');
}
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(6, new Date().toISOString());
}
/**
* Remove UNIQUE constraint from session_summaries.memory_session_id (migration 7)
*/
private removeSessionSummariesUniqueConstraint(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(7) as SchemaVersion | undefined;
if (applied) return;
// Check if UNIQUE constraint exists
const summariesIndexes = this.db.query('PRAGMA index_list(session_summaries)').all() as IndexInfo[];
const hasUniqueConstraint = summariesIndexes.some(idx => idx.unique === 1);
if (!hasUniqueConstraint) {
// Already migrated (no constraint exists)
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(7, new Date().toISOString());
return;
}
logger.info('DB', 'Removing UNIQUE constraint from session_summaries.memory_session_id');
// Begin transaction
this.db.run('BEGIN TRANSACTION');
// Create new table without UNIQUE constraint
this.db.run(`
CREATE TABLE session_summaries_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT NOT NULL,
project TEXT NOT NULL,
request TEXT,
investigated TEXT,
learned TEXT,
completed TEXT,
next_steps TEXT,
files_read TEXT,
files_edited TEXT,
notes TEXT,
prompt_number INTEGER,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
)
`);
// Copy data from old table
this.db.run(`
INSERT INTO session_summaries_new
SELECT id, memory_session_id, project, request, investigated, learned,
completed, next_steps, files_read, files_edited, notes,
prompt_number, created_at, created_at_epoch
FROM session_summaries
`);
// Drop old table
this.db.run('DROP TABLE session_summaries');
// Rename new table
this.db.run('ALTER TABLE session_summaries_new RENAME TO session_summaries');
// Recreate indexes
this.db.run(`
CREATE INDEX idx_session_summaries_sdk_session ON session_summaries(memory_session_id);
CREATE INDEX idx_session_summaries_project ON session_summaries(project);
CREATE INDEX idx_session_summaries_created ON session_summaries(created_at_epoch DESC);
`);
// Commit transaction
this.db.run('COMMIT');
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(7, new Date().toISOString());
logger.info('DB', 'Successfully removed UNIQUE constraint from session_summaries.memory_session_id');
}
/**
* Add hierarchical fields to observations table (migration 8)
*/
private addObservationHierarchicalFields(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(8) as SchemaVersion | undefined;
if (applied) return;
// Check if new fields already exist
const tableInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const hasTitle = tableInfo.some(col => col.name === 'title');
if (hasTitle) {
// Already migrated
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(8, new Date().toISOString());
return;
}
logger.info('DB', 'Adding hierarchical fields to observations table');
// Add new columns
this.db.run(`
ALTER TABLE observations ADD COLUMN title TEXT;
ALTER TABLE observations ADD COLUMN subtitle TEXT;
ALTER TABLE observations ADD COLUMN facts TEXT;
ALTER TABLE observations ADD COLUMN narrative TEXT;
ALTER TABLE observations ADD COLUMN concepts TEXT;
ALTER TABLE observations ADD COLUMN files_read TEXT;
ALTER TABLE observations ADD COLUMN files_modified TEXT;
`);
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(8, new Date().toISOString());
logger.info('DB', 'Successfully added hierarchical fields to observations table');
}
/**
* Make observations.text nullable (migration 9)
* The text field is deprecated in favor of structured fields (title, subtitle, narrative, etc.)
*/
private makeObservationsTextNullable(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(9) as SchemaVersion | undefined;
if (applied) return;
// Check if text column is already nullable
const tableInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const textColumn = tableInfo.find(col => col.name === 'text');
if (!textColumn || textColumn.notnull === 0) {
// Already migrated or text column doesn't exist
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(9, new Date().toISOString());
return;
}
logger.info('DB', 'Making observations.text nullable');
// Begin transaction
this.db.run('BEGIN TRANSACTION');
// Create new table with text as nullable
this.db.run(`
CREATE TABLE observations_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT NOT NULL,
project TEXT NOT NULL,
text TEXT,
type TEXT NOT NULL CHECK(type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery', 'change')),
title TEXT,
subtitle TEXT,
facts TEXT,
narrative TEXT,
concepts TEXT,
files_read TEXT,
files_modified TEXT,
prompt_number INTEGER,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
)
`);
// Copy data from old table (all existing columns)
this.db.run(`
INSERT INTO observations_new
SELECT id, memory_session_id, project, text, type, title, subtitle, facts,
narrative, concepts, files_read, files_modified, prompt_number,
created_at, created_at_epoch
FROM observations
`);
// Drop old table
this.db.run('DROP TABLE observations');
// Rename new table
this.db.run('ALTER TABLE observations_new RENAME TO observations');
// Recreate indexes
this.db.run(`
CREATE INDEX idx_observations_sdk_session ON observations(memory_session_id);
CREATE INDEX idx_observations_project ON observations(project);
CREATE INDEX idx_observations_type ON observations(type);
CREATE INDEX idx_observations_created ON observations(created_at_epoch DESC);
`);
// Commit transaction
this.db.run('COMMIT');
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(9, new Date().toISOString());
logger.info('DB', 'Successfully made observations.text nullable');
}
/**
* Create user_prompts table with FTS5 support (migration 10)
*/
private createUserPromptsTable(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(10) as SchemaVersion | undefined;
if (applied) return;
// Check if table already exists
const tableInfo = this.db.query('PRAGMA table_info(user_prompts)').all() as TableColumnInfo[];
if (tableInfo.length > 0) {
// Already migrated
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(10, new Date().toISOString());
return;
}
logger.info('DB', 'Creating user_prompts table with FTS5 support');
// Begin transaction
this.db.run('BEGIN TRANSACTION');
// Create main table (using content_session_id since memory_session_id is set asynchronously by worker)
this.db.run(`
CREATE TABLE user_prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_session_id TEXT NOT NULL,
prompt_number INTEGER NOT NULL,
prompt_text TEXT NOT NULL,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(content_session_id) REFERENCES sdk_sessions(content_session_id) ON DELETE CASCADE
);
CREATE INDEX idx_user_prompts_claude_session ON user_prompts(content_session_id);
CREATE INDEX idx_user_prompts_created ON user_prompts(created_at_epoch DESC);
CREATE INDEX idx_user_prompts_prompt_number ON user_prompts(prompt_number);
CREATE INDEX idx_user_prompts_lookup ON user_prompts(content_session_id, prompt_number);
`);
// Create FTS5 virtual table
this.db.run(`
CREATE VIRTUAL TABLE user_prompts_fts USING fts5(
prompt_text,
content='user_prompts',
content_rowid='id'
);
`);
// Create triggers to sync FTS5
this.db.run(`
CREATE TRIGGER user_prompts_ai AFTER INSERT ON user_prompts BEGIN
INSERT INTO user_prompts_fts(rowid, prompt_text)
VALUES (new.id, new.prompt_text);
END;
CREATE TRIGGER user_prompts_ad AFTER DELETE ON user_prompts BEGIN
INSERT INTO user_prompts_fts(user_prompts_fts, rowid, prompt_text)
VALUES('delete', old.id, old.prompt_text);
END;
CREATE TRIGGER user_prompts_au AFTER UPDATE ON user_prompts BEGIN
INSERT INTO user_prompts_fts(user_prompts_fts, rowid, prompt_text)
VALUES('delete', old.id, old.prompt_text);
INSERT INTO user_prompts_fts(rowid, prompt_text)
VALUES (new.id, new.prompt_text);
END;
`);
// Commit transaction
this.db.run('COMMIT');
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(10, new Date().toISOString());
logger.info('DB', 'Successfully created user_prompts table with FTS5 support');
}
/**
* Ensure discovery_tokens column exists (migration 11)
* CRITICAL: This migration was incorrectly using version 7 (which was already taken by removeSessionSummariesUniqueConstraint)
* The duplicate version number may have caused migration tracking issues in some databases
*/
private ensureDiscoveryTokensColumn(): void {
// Check if migration already applied to avoid unnecessary re-runs
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(11) as SchemaVersion | undefined;
if (applied) return;
// Check if discovery_tokens column exists in observations table
const observationsInfo = this.db.query('PRAGMA table_info(observations)').all() as TableColumnInfo[];
const obsHasDiscoveryTokens = observationsInfo.some(col => col.name === 'discovery_tokens');
if (!obsHasDiscoveryTokens) {
this.db.run('ALTER TABLE observations ADD COLUMN discovery_tokens INTEGER DEFAULT 0');
logger.info('DB', 'Added discovery_tokens column to observations table');
}
// Check if discovery_tokens column exists in session_summaries table
const summariesInfo = this.db.query('PRAGMA table_info(session_summaries)').all() as TableColumnInfo[];
const sumHasDiscoveryTokens = summariesInfo.some(col => col.name === 'discovery_tokens');
if (!sumHasDiscoveryTokens) {
this.db.run('ALTER TABLE session_summaries ADD COLUMN discovery_tokens INTEGER DEFAULT 0');
logger.info('DB', 'Added discovery_tokens column to session_summaries table');
}
// Record migration only after successful column verification/addition
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(11, new Date().toISOString());
}
/**
* Create pending_messages table for persistent work queue (migration 16)
* Messages are persisted before processing and deleted after success.
* Enables recovery from SDK hangs and worker crashes.
*/
private createPendingMessagesTable(): void {
// Check if migration already applied
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(16) as SchemaVersion | undefined;
if (applied) return;
// Check if table already exists
const tables = this.db.query("SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'").all() as TableNameRow[];
if (tables.length > 0) {
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(16, new Date().toISOString());
return;
}
logger.info('DB', 'Creating pending_messages table');
this.db.run(`
CREATE TABLE pending_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_db_id INTEGER NOT NULL,
content_session_id TEXT NOT NULL,
message_type TEXT NOT NULL CHECK(message_type IN ('observation', 'summarize')),
tool_name TEXT,
tool_input TEXT,
tool_response TEXT,
cwd TEXT,
last_user_message TEXT,
last_assistant_message TEXT,
prompt_number INTEGER,
status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending', 'processing', 'processed', 'failed')),
retry_count INTEGER NOT NULL DEFAULT 0,
created_at_epoch INTEGER NOT NULL,
started_processing_at_epoch INTEGER,
completed_at_epoch INTEGER,
FOREIGN KEY (session_db_id) REFERENCES sdk_sessions(id) ON DELETE CASCADE
)
`);
this.db.run('CREATE INDEX IF NOT EXISTS idx_pending_messages_session ON pending_messages(session_db_id)');
this.db.run('CREATE INDEX IF NOT EXISTS idx_pending_messages_status ON pending_messages(status)');
this.db.run('CREATE INDEX IF NOT EXISTS idx_pending_messages_claude_session ON pending_messages(content_session_id)');
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(16, new Date().toISOString());
logger.info('DB', 'pending_messages table created successfully');
}
/**
* Rename session ID columns for semantic clarity (migration 17)
* - claude_session_id -> content_session_id (user's observed session)
* - sdk_session_id -> memory_session_id (memory agent's session for resume)
*
* IDEMPOTENT: Checks each table individually before renaming.
* This handles databases in any intermediate state (partial migration, fresh install, etc.)
*/
private renameSessionIdColumns(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(17) as SchemaVersion | undefined;
if (applied) return;
logger.info('DB', 'Checking session ID columns for semantic clarity rename');
let renamesPerformed = 0;
// Helper to safely rename a column if it exists
const safeRenameColumn = (table: string, oldCol: string, newCol: string): boolean => {
const tableInfo = this.db.query(`PRAGMA table_info(${table})`).all() as TableColumnInfo[];
const hasOldCol = tableInfo.some(col => col.name === oldCol);
const hasNewCol = tableInfo.some(col => col.name === newCol);
if (hasNewCol) {
// Already renamed, nothing to do
return false;
}
if (hasOldCol) {
// SQLite 3.25+ supports ALTER TABLE RENAME COLUMN
this.db.run(`ALTER TABLE ${table} RENAME COLUMN ${oldCol} TO ${newCol}`);
logger.info('DB', `Renamed ${table}.${oldCol} to ${newCol}`);
return true;
}
// Neither column exists - table might not exist or has different schema
logger.warn('DB', `Column ${oldCol} not found in ${table}, skipping rename`);
return false;
};
// Rename in sdk_sessions table
if (safeRenameColumn('sdk_sessions', 'claude_session_id', 'content_session_id')) renamesPerformed++;
if (safeRenameColumn('sdk_sessions', 'sdk_session_id', 'memory_session_id')) renamesPerformed++;
// Rename in pending_messages table
if (safeRenameColumn('pending_messages', 'claude_session_id', 'content_session_id')) renamesPerformed++;
// Rename in observations table
if (safeRenameColumn('observations', 'sdk_session_id', 'memory_session_id')) renamesPerformed++;
// Rename in session_summaries table
if (safeRenameColumn('session_summaries', 'sdk_session_id', 'memory_session_id')) renamesPerformed++;
// Rename in user_prompts table
if (safeRenameColumn('user_prompts', 'claude_session_id', 'content_session_id')) renamesPerformed++;
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(17, new Date().toISOString());
if (renamesPerformed > 0) {
logger.info('DB', `Successfully renamed ${renamesPerformed} session ID columns`);
} else {
logger.info('DB', 'No session ID column renames needed (already up to date)');
}
}
/**
* Repair session ID column renames (migration 19)
* DEPRECATED: Migration 17 is now fully idempotent and handles all cases.
* This migration is kept for backwards compatibility but does nothing.
*/
private repairSessionIdColumnRename(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(19) as SchemaVersion | undefined;
if (applied) return;
// Migration 17 now handles all column rename cases idempotently.
// Just record this migration as applied.
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(19, new Date().toISOString());
}
/**
* Add failed_at_epoch column to pending_messages (migration 20)
* Used by markSessionMessagesFailed() for error recovery tracking
*/
private addFailedAtEpochColumn(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(20) as SchemaVersion | undefined;
if (applied) return;
const tableInfo = this.db.query('PRAGMA table_info(pending_messages)').all() as TableColumnInfo[];
const hasColumn = tableInfo.some(col => col.name === 'failed_at_epoch');
if (!hasColumn) {
this.db.run('ALTER TABLE pending_messages ADD COLUMN failed_at_epoch INTEGER');
logger.info('DB', 'Added failed_at_epoch column to pending_messages table');
}
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(20, new Date().toISOString());
}
}
+52
View File
@@ -0,0 +1,52 @@
/**
* Session file retrieval functions
* Extracted from SessionStore.ts for modular organization
*/
import { Database } from 'bun:sqlite';
import type { SessionFilesResult } from './types.js';
/**
* Get aggregated files from all observations for a session
*/
export function getFilesForSession(
db: Database,
memorySessionId: string
): SessionFilesResult {
const stmt = db.prepare(`
SELECT files_read, files_modified
FROM observations
WHERE memory_session_id = ?
`);
const rows = stmt.all(memorySessionId) as Array<{
files_read: string | null;
files_modified: string | null;
}>;
const filesReadSet = new Set<string>();
const filesModifiedSet = new Set<string>();
for (const row of rows) {
// Parse files_read
if (row.files_read) {
const files = JSON.parse(row.files_read);
if (Array.isArray(files)) {
files.forEach(f => filesReadSet.add(f));
}
}
// Parse files_modified
if (row.files_modified) {
const files = JSON.parse(row.files_modified);
if (Array.isArray(files)) {
files.forEach(f => filesModifiedSet.add(f));
}
}
}
return {
filesRead: Array.from(filesReadSet),
filesModified: Array.from(filesModifiedSet)
};
}
+112
View File
@@ -0,0 +1,112 @@
/**
* Observation retrieval functions
* Extracted from SessionStore.ts for modular organization
*/
import { Database } from 'bun:sqlite';
import type { ObservationRecord } from '../../../types/database.js';
import type { GetObservationsByIdsOptions, ObservationSessionRow } from './types.js';
/**
* Get a single observation by ID
*/
export function getObservationById(db: Database, id: number): ObservationRecord | null {
const stmt = db.prepare(`
SELECT *
FROM observations
WHERE id = ?
`);
return stmt.get(id) as ObservationRecord | undefined || null;
}
/**
* Get observations by array of IDs with ordering and limit
*/
export function getObservationsByIds(
db: Database,
ids: number[],
options: GetObservationsByIdsOptions = {}
): ObservationRecord[] {
if (ids.length === 0) return [];
const { orderBy = 'date_desc', limit, project, type, concepts, files } = options;
const orderClause = orderBy === 'date_asc' ? 'ASC' : 'DESC';
const limitClause = limit ? `LIMIT ${limit}` : '';
// Build placeholders for IN clause
const placeholders = ids.map(() => '?').join(',');
const params: any[] = [...ids];
const additionalConditions: string[] = [];
// Apply project filter
if (project) {
additionalConditions.push('project = ?');
params.push(project);
}
// Apply type filter
if (type) {
if (Array.isArray(type)) {
const typePlaceholders = type.map(() => '?').join(',');
additionalConditions.push(`type IN (${typePlaceholders})`);
params.push(...type);
} else {
additionalConditions.push('type = ?');
params.push(type);
}
}
// Apply concepts filter
if (concepts) {
const conceptsList = Array.isArray(concepts) ? concepts : [concepts];
const conceptConditions = conceptsList.map(() =>
'EXISTS (SELECT 1 FROM json_each(concepts) WHERE value = ?)'
);
params.push(...conceptsList);
additionalConditions.push(`(${conceptConditions.join(' OR ')})`);
}
// Apply files filter
if (files) {
const filesList = Array.isArray(files) ? files : [files];
const fileConditions = filesList.map(() => {
return '(EXISTS (SELECT 1 FROM json_each(files_read) WHERE value LIKE ?) OR EXISTS (SELECT 1 FROM json_each(files_modified) WHERE value LIKE ?))';
});
filesList.forEach(file => {
params.push(`%${file}%`, `%${file}%`);
});
additionalConditions.push(`(${fileConditions.join(' OR ')})`);
}
const whereClause = additionalConditions.length > 0
? `WHERE id IN (${placeholders}) AND ${additionalConditions.join(' AND ')}`
: `WHERE id IN (${placeholders})`;
const stmt = db.prepare(`
SELECT *
FROM observations
${whereClause}
ORDER BY created_at_epoch ${orderClause}
${limitClause}
`);
return stmt.all(...params) as ObservationRecord[];
}
/**
* Get observations for a specific session
*/
export function getObservationsForSession(
db: Database,
memorySessionId: string
): ObservationSessionRow[] {
const stmt = db.prepare(`
SELECT title, subtitle, type, prompt_number
FROM observations
WHERE memory_session_id = ?
ORDER BY created_at_epoch ASC
`);
return stmt.all(memorySessionId) as ObservationSessionRow[];
}
@@ -0,0 +1,43 @@
/**
* Recent observation retrieval functions
* Extracted from SessionStore.ts for modular organization
*/
import { Database } from 'bun:sqlite';
import type { RecentObservationRow, AllRecentObservationRow } from './types.js';
/**
* Get recent observations for a project
*/
export function getRecentObservations(
db: Database,
project: string,
limit: number = 20
): RecentObservationRow[] {
const stmt = db.prepare(`
SELECT type, text, prompt_number, created_at
FROM observations
WHERE project = ?
ORDER BY created_at_epoch DESC
LIMIT ?
`);
return stmt.all(project, limit) as RecentObservationRow[];
}
/**
* Get recent observations across all projects (for web UI)
*/
export function getAllRecentObservations(
db: Database,
limit: number = 100
): AllRecentObservationRow[] {
const stmt = db.prepare(`
SELECT id, type, title, subtitle, text, project, prompt_number, created_at, created_at_epoch
FROM observations
ORDER BY created_at_epoch DESC
LIMIT ?
`);
return stmt.all(limit) as AllRecentObservationRow[];
}
+54
View File
@@ -0,0 +1,54 @@
/**
* Store observation function
* Extracted from SessionStore.ts for modular organization
*/
import { Database } from 'bun:sqlite';
import type { ObservationInput, StoreObservationResult } from './types.js';
/**
* Store an observation (from SDK parsing)
* Assumes session already exists (created by hook)
*/
export function storeObservation(
db: Database,
memorySessionId: string,
project: string,
observation: ObservationInput,
promptNumber?: number,
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): StoreObservationResult {
// Use override timestamp if provided (for processing backlog messages with original timestamps)
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
const stmt = db.prepare(`
INSERT INTO observations
(memory_session_id, project, type, title, subtitle, facts, narrative, concepts,
files_read, files_modified, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = stmt.run(
memorySessionId,
project,
observation.type,
observation.title,
observation.subtitle,
JSON.stringify(observation.facts),
observation.narrative,
JSON.stringify(observation.concepts),
JSON.stringify(observation.files_read),
JSON.stringify(observation.files_modified),
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
return {
id: Number(result.lastInsertRowid),
createdAtEpoch: timestampEpoch
};
}
+81
View File
@@ -0,0 +1,81 @@
/**
* Type definitions for observation operations
* Extracted from SessionStore.ts for modular organization
*/
/**
* Input type for storeObservation function
*/
export interface ObservationInput {
type: string;
title: string | null;
subtitle: string | null;
facts: string[];
narrative: string | null;
concepts: string[];
files_read: string[];
files_modified: string[];
}
/**
* Result from storing an observation
*/
export interface StoreObservationResult {
id: number;
createdAtEpoch: number;
}
/**
* Options for getObservationsByIds
*/
export interface GetObservationsByIdsOptions {
orderBy?: 'date_desc' | 'date_asc';
limit?: number;
project?: string;
type?: string | string[];
concepts?: string | string[];
files?: string | string[];
}
/**
* Result type for getFilesForSession
*/
export interface SessionFilesResult {
filesRead: string[];
filesModified: string[];
}
/**
* Simple observation row for getObservationsForSession
*/
export interface ObservationSessionRow {
title: string;
subtitle: string;
type: string;
prompt_number: number | null;
}
/**
* Recent observation row type
*/
export interface RecentObservationRow {
type: string;
text: string;
prompt_number: number | null;
created_at: string;
}
/**
* Full recent observation row (for web UI)
*/
export interface AllRecentObservationRow {
id: number;
type: string;
title: string | null;
subtitle: string | null;
text: string;
project: string;
prompt_number: number | null;
created_at: string;
created_at_epoch: number;
}
+168
View File
@@ -0,0 +1,168 @@
/**
* User prompt retrieval operations
*/
import type { Database } from 'bun:sqlite';
import type { UserPromptRecord, LatestPromptResult } from '../../../types/database.js';
import type { RecentUserPromptResult, PromptWithProject, GetPromptsByIdsOptions } from './types.js';
/**
* Get user prompt by session ID and prompt number
* @returns The prompt text, or null if not found
*/
export function getUserPrompt(
db: Database,
contentSessionId: string,
promptNumber: number
): string | null {
const stmt = db.prepare(`
SELECT prompt_text
FROM user_prompts
WHERE content_session_id = ? AND prompt_number = ?
LIMIT 1
`);
const result = stmt.get(contentSessionId, promptNumber) as { prompt_text: string } | undefined;
return result?.prompt_text ?? null;
}
/**
* Get current prompt number by counting user_prompts for this session
* Replaces the prompt_counter column which is no longer maintained
*/
export function getPromptNumberFromUserPrompts(db: Database, contentSessionId: string): number {
const result = db.prepare(`
SELECT COUNT(*) as count FROM user_prompts WHERE content_session_id = ?
`).get(contentSessionId) as { count: number };
return result.count;
}
/**
* Get latest user prompt with session info for a Claude session
* Used for syncing prompts to Chroma during session initialization
*/
export function getLatestUserPrompt(
db: Database,
contentSessionId: string
): LatestPromptResult | undefined {
const stmt = db.prepare(`
SELECT
up.*,
s.memory_session_id,
s.project
FROM user_prompts up
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE up.content_session_id = ?
ORDER BY up.created_at_epoch DESC
LIMIT 1
`);
return stmt.get(contentSessionId) as LatestPromptResult | undefined;
}
/**
* Get recent user prompts across all sessions (for web UI)
*/
export function getAllRecentUserPrompts(
db: Database,
limit: number = 100
): RecentUserPromptResult[] {
const stmt = db.prepare(`
SELECT
up.id,
up.content_session_id,
s.project,
up.prompt_number,
up.prompt_text,
up.created_at,
up.created_at_epoch
FROM user_prompts up
LEFT JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
ORDER BY up.created_at_epoch DESC
LIMIT ?
`);
return stmt.all(limit) as RecentUserPromptResult[];
}
/**
* Get a single user prompt by ID
*/
export function getPromptById(db: Database, id: number): PromptWithProject | null {
const stmt = db.prepare(`
SELECT
p.id,
p.content_session_id,
p.prompt_number,
p.prompt_text,
s.project,
p.created_at,
p.created_at_epoch
FROM user_prompts p
LEFT JOIN sdk_sessions s ON p.content_session_id = s.content_session_id
WHERE p.id = ?
LIMIT 1
`);
return (stmt.get(id) as PromptWithProject | undefined) || null;
}
/**
* Get multiple user prompts by IDs
*/
export function getPromptsByIds(db: Database, ids: number[]): PromptWithProject[] {
if (ids.length === 0) return [];
const placeholders = ids.map(() => '?').join(',');
const stmt = db.prepare(`
SELECT
p.id,
p.content_session_id,
p.prompt_number,
p.prompt_text,
s.project,
p.created_at,
p.created_at_epoch
FROM user_prompts p
LEFT JOIN sdk_sessions s ON p.content_session_id = s.content_session_id
WHERE p.id IN (${placeholders})
ORDER BY p.created_at_epoch DESC
`);
return stmt.all(...ids) as PromptWithProject[];
}
/**
* Get user prompts by IDs (for hybrid Chroma search)
* Returns prompts in specified temporal order with optional project filter
*/
export function getUserPromptsByIds(
db: Database,
ids: number[],
options: GetPromptsByIdsOptions = {}
): UserPromptRecord[] {
if (ids.length === 0) return [];
const { orderBy = 'date_desc', limit, project } = options;
const orderClause = orderBy === 'date_asc' ? 'ASC' : 'DESC';
const limitClause = limit ? `LIMIT ${limit}` : '';
const placeholders = ids.map(() => '?').join(',');
const params: (number | string)[] = [...ids];
const projectFilter = project ? 'AND s.project = ?' : '';
if (project) params.push(project);
const stmt = db.prepare(`
SELECT
up.*,
s.project,
s.memory_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE up.id IN (${placeholders}) ${projectFilter}
ORDER BY up.created_at_epoch ${orderClause}
${limitClause}
`);
return stmt.all(...params) as UserPromptRecord[];
}
+28
View File
@@ -0,0 +1,28 @@
/**
* User prompt storage operations
*/
import type { Database } from 'bun:sqlite';
/**
* Save a user prompt to the database
* @returns The inserted row ID
*/
export function saveUserPrompt(
db: Database,
contentSessionId: string,
promptNumber: number,
promptText: string
): number {
const now = new Date();
const nowEpoch = now.getTime();
const stmt = db.prepare(`
INSERT INTO user_prompts
(content_session_id, prompt_number, prompt_text, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?)
`);
const result = stmt.run(contentSessionId, promptNumber, promptText, now.toISOString(), nowEpoch);
return result.lastInsertRowid as number;
}
+40
View File
@@ -0,0 +1,40 @@
/**
* Type definitions for user prompts module
*/
import type { Database } from 'bun:sqlite';
/**
* Result type for getAllRecentUserPrompts
*/
export interface RecentUserPromptResult {
id: number;
content_session_id: string;
project: string;
prompt_number: number;
prompt_text: string;
created_at: string;
created_at_epoch: number;
}
/**
* Result type for getPromptById and getPromptsByIds
*/
export interface PromptWithProject {
id: number;
content_session_id: string;
prompt_number: number;
prompt_text: string;
project: string;
created_at: string;
created_at_epoch: number;
}
/**
* Options for getUserPromptsByIds
*/
export interface GetPromptsByIdsOptions {
orderBy?: 'date_desc' | 'date_asc';
limit?: number;
project?: string;
}
+62
View File
@@ -0,0 +1,62 @@
/**
* Session creation and update functions
* Database-first parameter pattern for functional composition
*/
import type { Database } from 'bun:sqlite';
/**
* Create a new SDK session (idempotent - returns existing session ID if already exists)
*
* IDEMPOTENCY via INSERT OR IGNORE pattern:
* - Prompt #1: session_id not in database -> INSERT creates new row
* - Prompt #2+: session_id exists -> INSERT ignored, fetch existing ID
* - Result: Same database ID returned for all prompts in conversation
*
* WHY THIS MATTERS:
* - NO "does session exist?" checks needed anywhere
* - NO risk of creating duplicate sessions
* - ALL hooks automatically connected via session_id
* - SAVE hook observations go to correct session (same session_id)
* - SDKAgent continuation prompt has correct context (same session_id)
*/
export function createSDKSession(
db: Database,
contentSessionId: string,
project: string,
userPrompt: string
): number {
const now = new Date();
const nowEpoch = now.getTime();
// Pure INSERT OR IGNORE - no updates, no complexity
// NOTE: memory_session_id starts as NULL. It is captured by SDKAgent from the first SDK
// response and stored via updateMemorySessionId(). CRITICAL: memory_session_id must NEVER
// equal contentSessionId - that would inject memory messages into the user's transcript!
db.prepare(`
INSERT OR IGNORE INTO sdk_sessions
(content_session_id, memory_session_id, project, user_prompt, started_at, started_at_epoch, status)
VALUES (?, NULL, ?, ?, ?, ?, 'active')
`).run(contentSessionId, project, userPrompt, now.toISOString(), nowEpoch);
// Return existing or new ID
const row = db.prepare('SELECT id FROM sdk_sessions WHERE content_session_id = ?')
.get(contentSessionId) as { id: number };
return row.id;
}
/**
* Update the memory session ID for a session
* Called by SDKAgent when it captures the session ID from the first SDK message
*/
export function updateMemorySessionId(
db: Database,
sessionDbId: number,
memorySessionId: string
): void {
db.prepare(`
UPDATE sdk_sessions
SET memory_session_id = ?
WHERE id = ?
`).run(memorySessionId, sessionDbId);
}
+106
View File
@@ -0,0 +1,106 @@
/**
* Session retrieval functions
* Database-first parameter pattern for functional composition
*/
import type { Database } from 'bun:sqlite';
import type {
SessionBasic,
SessionFull,
SessionWithStatus,
SessionSummaryDetail,
} from './types.js';
/**
* Get session by ID (basic fields only)
*/
export function getSessionById(db: Database, id: number): SessionBasic | null {
const stmt = db.prepare(`
SELECT id, content_session_id, memory_session_id, project, user_prompt
FROM sdk_sessions
WHERE id = ?
LIMIT 1
`);
return (stmt.get(id) as SessionBasic | undefined) || null;
}
/**
* Get SDK sessions by memory session IDs
* Used for exporting session metadata
*/
export function getSdkSessionsBySessionIds(
db: Database,
memorySessionIds: string[]
): SessionFull[] {
if (memorySessionIds.length === 0) return [];
const placeholders = memorySessionIds.map(() => '?').join(',');
const stmt = db.prepare(`
SELECT id, content_session_id, memory_session_id, project, user_prompt,
started_at, started_at_epoch, completed_at, completed_at_epoch, status
FROM sdk_sessions
WHERE memory_session_id IN (${placeholders})
ORDER BY started_at_epoch DESC
`);
return stmt.all(...memorySessionIds) as SessionFull[];
}
/**
* Get recent sessions with their status and summary info
* Returns sessions ordered oldest-first for display
*/
export function getRecentSessionsWithStatus(
db: Database,
project: string,
limit: number = 3
): SessionWithStatus[] {
const stmt = db.prepare(`
SELECT * FROM (
SELECT
s.memory_session_id,
s.status,
s.started_at,
s.started_at_epoch,
s.user_prompt,
CASE WHEN sum.memory_session_id IS NOT NULL THEN 1 ELSE 0 END as has_summary
FROM sdk_sessions s
LEFT JOIN session_summaries sum ON s.memory_session_id = sum.memory_session_id
WHERE s.project = ? AND s.memory_session_id IS NOT NULL
GROUP BY s.memory_session_id
ORDER BY s.started_at_epoch DESC
LIMIT ?
)
ORDER BY started_at_epoch ASC
`);
return stmt.all(project, limit) as SessionWithStatus[];
}
/**
* Get full session summary by ID (includes request_summary and learned_summary)
*/
export function getSessionSummaryById(
db: Database,
id: number
): SessionSummaryDetail | null {
const stmt = db.prepare(`
SELECT
id,
memory_session_id,
content_session_id,
project,
user_prompt,
request_summary,
learned_summary,
status,
created_at,
created_at_epoch
FROM sdk_sessions
WHERE id = ?
LIMIT 1
`);
return (stmt.get(id) as SessionSummaryDetail | undefined) || null;
}
+58
View File
@@ -0,0 +1,58 @@
/**
* Session-related type definitions
* Standalone types for session query results
*/
/**
* Basic session info (minimal fields)
*/
export interface SessionBasic {
id: number;
content_session_id: string;
memory_session_id: string | null;
project: string;
user_prompt: string;
}
/**
* Full session record with timestamps
*/
export interface SessionFull {
id: number;
content_session_id: string;
memory_session_id: string;
project: string;
user_prompt: string;
started_at: string;
started_at_epoch: number;
completed_at: string | null;
completed_at_epoch: number | null;
status: string;
}
/**
* Session with summary info for status display
*/
export interface SessionWithStatus {
memory_session_id: string | null;
status: string;
started_at: string;
user_prompt: string | null;
has_summary: boolean;
}
/**
* Session summary with all detail fields
*/
export interface SessionSummaryDetail {
id: number;
memory_session_id: string | null;
content_session_id: string;
project: string;
user_prompt: string;
request_summary: string | null;
learned_summary: string | null;
status: string;
created_at: string;
created_at_epoch: number;
}
+86
View File
@@ -0,0 +1,86 @@
/**
* Get session summaries from the database
*/
import type { Database } from 'bun:sqlite';
import type { SessionSummaryRecord } from '../../../types/database.js';
import type { SessionSummary, GetByIdsOptions } from './types.js';
/**
* Get summary for a specific session
*
* @param db - Database instance
* @param memorySessionId - SDK memory session ID
* @returns Most recent summary for the session, or null if none exists
*/
export function getSummaryForSession(
db: Database,
memorySessionId: string
): SessionSummary | null {
const stmt = db.prepare(`
SELECT
request, investigated, learned, completed, next_steps,
files_read, files_edited, notes, prompt_number, created_at,
created_at_epoch
FROM session_summaries
WHERE memory_session_id = ?
ORDER BY created_at_epoch DESC
LIMIT 1
`);
return (stmt.get(memorySessionId) as SessionSummary | undefined) || null;
}
/**
* Get a single session summary by ID
*
* @param db - Database instance
* @param id - Summary ID
* @returns Full summary record or null if not found
*/
export function getSummaryById(
db: Database,
id: number
): SessionSummaryRecord | null {
const stmt = db.prepare(`
SELECT * FROM session_summaries WHERE id = ?
`);
return (stmt.get(id) as SessionSummaryRecord | undefined) || null;
}
/**
* Get session summaries by IDs (for hybrid Chroma search)
* Returns summaries in specified temporal order
*
* @param db - Database instance
* @param ids - Array of summary IDs
* @param options - Query options (orderBy, limit, project)
*/
export function getSummariesByIds(
db: Database,
ids: number[],
options: GetByIdsOptions = {}
): SessionSummaryRecord[] {
if (ids.length === 0) return [];
const { orderBy = 'date_desc', limit, project } = options;
const orderClause = orderBy === 'date_asc' ? 'ASC' : 'DESC';
const limitClause = limit ? `LIMIT ${limit}` : '';
const placeholders = ids.map(() => '?').join(',');
const params: (number | string)[] = [...ids];
// Apply project filter
const whereClause = project
? `WHERE id IN (${placeholders}) AND project = ?`
: `WHERE id IN (${placeholders})`;
if (project) params.push(project);
const stmt = db.prepare(`
SELECT * FROM session_summaries
${whereClause}
ORDER BY created_at_epoch ${orderClause}
${limitClause}
`);
return stmt.all(...params) as SessionSummaryRecord[];
}
+77
View File
@@ -0,0 +1,77 @@
/**
* Get recent session summaries from the database
*/
import type { Database } from 'bun:sqlite';
import type { RecentSummary, SummaryWithSessionInfo, FullSummary } from './types.js';
/**
* Get recent session summaries for a project
*
* @param db - Database instance
* @param project - Project name to filter by
* @param limit - Maximum number of summaries to return (default 10)
*/
export function getRecentSummaries(
db: Database,
project: string,
limit: number = 10
): RecentSummary[] {
const stmt = db.prepare(`
SELECT
request, investigated, learned, completed, next_steps,
files_read, files_edited, notes, prompt_number, created_at
FROM session_summaries
WHERE project = ?
ORDER BY created_at_epoch DESC
LIMIT ?
`);
return stmt.all(project, limit) as RecentSummary[];
}
/**
* Get recent summaries with session info for context display
*
* @param db - Database instance
* @param project - Project name to filter by
* @param limit - Maximum number of summaries to return (default 3)
*/
export function getRecentSummariesWithSessionInfo(
db: Database,
project: string,
limit: number = 3
): SummaryWithSessionInfo[] {
const stmt = db.prepare(`
SELECT
memory_session_id, request, learned, completed, next_steps,
prompt_number, created_at
FROM session_summaries
WHERE project = ?
ORDER BY created_at_epoch DESC
LIMIT ?
`);
return stmt.all(project, limit) as SummaryWithSessionInfo[];
}
/**
* Get recent summaries across all projects (for web UI)
*
* @param db - Database instance
* @param limit - Maximum number of summaries to return (default 50)
*/
export function getAllRecentSummaries(
db: Database,
limit: number = 50
): FullSummary[] {
const stmt = db.prepare(`
SELECT id, request, investigated, learned, completed, next_steps,
files_read, files_edited, notes, project, prompt_number,
created_at, created_at_epoch
FROM session_summaries
ORDER BY created_at_epoch DESC
LIMIT ?
`);
return stmt.all(limit) as FullSummary[];
}
+58
View File
@@ -0,0 +1,58 @@
/**
* Store session summaries in the database
*/
import type { Database } from 'bun:sqlite';
import type { SummaryInput, StoreSummaryResult } from './types.js';
/**
* Store a session summary (from SDK parsing)
* Assumes session already exists - will fail with FK error if not
*
* @param db - Database instance
* @param memorySessionId - SDK memory session ID
* @param project - Project name
* @param summary - Summary content from SDK parsing
* @param promptNumber - Optional prompt number
* @param discoveryTokens - Token count for discovery (default 0)
* @param overrideTimestampEpoch - Optional timestamp override for backlog processing
*/
export function storeSummary(
db: Database,
memorySessionId: string,
project: string,
summary: SummaryInput,
promptNumber?: number,
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): StoreSummaryResult {
// Use override timestamp if provided (for processing backlog messages with original timestamps)
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
const stmt = db.prepare(`
INSERT INTO session_summaries
(memory_session_id, project, request, investigated, learned, completed,
next_steps, notes, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = stmt.run(
memorySessionId,
project,
summary.request,
summary.investigated,
summary.learned,
summary.completed,
summary.next_steps,
summary.notes,
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
return {
id: Number(result.lastInsertRowid),
createdAtEpoch: timestampEpoch
};
}
+97
View File
@@ -0,0 +1,97 @@
/**
* Type definitions for summary-related database operations
*/
/**
* Summary input for storage (from SDK parsing)
*/
export interface SummaryInput {
request: string;
investigated: string;
learned: string;
completed: string;
next_steps: string;
notes: string | null;
}
/**
* Result from storing a summary
*/
export interface StoreSummaryResult {
id: number;
createdAtEpoch: number;
}
/**
* Summary for a specific session (minimal fields)
*/
export interface SessionSummary {
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
files_read: string | null;
files_edited: string | null;
notes: string | null;
prompt_number: number | null;
created_at: string;
created_at_epoch: number;
}
/**
* Summary with session info for context display
*/
export interface SummaryWithSessionInfo {
memory_session_id: string;
request: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
prompt_number: number | null;
created_at: string;
}
/**
* Recent summary (for project-scoped queries)
*/
export interface RecentSummary {
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
files_read: string | null;
files_edited: string | null;
notes: string | null;
prompt_number: number | null;
created_at: string;
}
/**
* Full summary with all fields (for web UI)
*/
export interface FullSummary {
id: number;
request: string | null;
investigated: string | null;
learned: string | null;
completed: string | null;
next_steps: string | null;
files_read: string | null;
files_edited: string | null;
notes: string | null;
project: string;
prompt_number: number | null;
created_at: string;
created_at_epoch: number;
}
/**
* Options for getByIds query
*/
export interface GetByIdsOptions {
orderBy?: 'date_desc' | 'date_asc';
limit?: number;
project?: string;
}
+218
View File
@@ -0,0 +1,218 @@
/**
* Timeline query functions
* Provides time-based context queries for observations, sessions, and prompts
*
* grep-friendly: getTimelineAroundTimestamp, getTimelineAroundObservation, getAllProjects
*/
import type { Database } from 'bun:sqlite';
import type { ObservationRecord, SessionSummaryRecord, UserPromptRecord } from '../../../types/database.js';
import { logger } from '../../../utils/logger.js';
/**
* Timeline result containing observations, sessions, and prompts within a time window
*/
export interface TimelineResult {
observations: ObservationRecord[];
sessions: Array<{
id: number;
memory_session_id: string;
project: string;
request: string | null;
completed: string | null;
next_steps: string | null;
created_at: string;
created_at_epoch: number;
}>;
prompts: Array<{
id: number;
content_session_id: string;
prompt_number: number;
prompt_text: string;
project: string | undefined;
created_at: string;
created_at_epoch: number;
}>;
}
/**
* Get timeline around a specific timestamp
* Convenience wrapper that delegates to getTimelineAroundObservation with null anchor
*
* @param db Database connection
* @param anchorEpoch Epoch timestamp to anchor the query around
* @param depthBefore Number of records to retrieve before anchor (any type)
* @param depthAfter Number of records to retrieve after anchor (any type)
* @param project Optional project filter
* @returns Object containing observations, sessions, and prompts for the specified window
*/
export function getTimelineAroundTimestamp(
db: Database,
anchorEpoch: number,
depthBefore: number = 10,
depthAfter: number = 10,
project?: string
): TimelineResult {
return getTimelineAroundObservation(db, null, anchorEpoch, depthBefore, depthAfter, project);
}
/**
* Get timeline around a specific observation ID
* Uses observation ID offsets to determine time boundaries, then fetches all record types in that window
*
* @param db Database connection
* @param anchorObservationId Observation ID to anchor around (null for timestamp-based)
* @param anchorEpoch Epoch timestamp fallback or anchor for timestamp-based queries
* @param depthBefore Number of records to retrieve before anchor
* @param depthAfter Number of records to retrieve after anchor
* @param project Optional project filter
* @returns Object containing observations, sessions, and prompts for the specified window
*/
export function getTimelineAroundObservation(
db: Database,
anchorObservationId: number | null,
anchorEpoch: number,
depthBefore: number = 10,
depthAfter: number = 10,
project?: string
): TimelineResult {
const projectFilter = project ? 'AND project = ?' : '';
const projectParams = project ? [project] : [];
let startEpoch: number;
let endEpoch: number;
if (anchorObservationId !== null) {
// Get boundary observations by ID offset
const beforeQuery = `
SELECT id, created_at_epoch
FROM observations
WHERE id <= ? ${projectFilter}
ORDER BY id DESC
LIMIT ?
`;
const afterQuery = `
SELECT id, created_at_epoch
FROM observations
WHERE id >= ? ${projectFilter}
ORDER BY id ASC
LIMIT ?
`;
try {
const beforeRecords = db.prepare(beforeQuery).all(anchorObservationId, ...projectParams, depthBefore + 1) as Array<{id: number; created_at_epoch: number}>;
const afterRecords = db.prepare(afterQuery).all(anchorObservationId, ...projectParams, depthAfter + 1) as Array<{id: number; created_at_epoch: number}>;
// Get the earliest and latest timestamps from boundary observations
if (beforeRecords.length === 0 && afterRecords.length === 0) {
return { observations: [], sessions: [], prompts: [] };
}
startEpoch = beforeRecords.length > 0 ? beforeRecords[beforeRecords.length - 1].created_at_epoch : anchorEpoch;
endEpoch = afterRecords.length > 0 ? afterRecords[afterRecords.length - 1].created_at_epoch : anchorEpoch;
} catch (err: any) {
logger.error('DB', 'Error getting boundary observations', undefined, { error: err, project });
return { observations: [], sessions: [], prompts: [] };
}
} else {
// For timestamp-based anchors, use time-based boundaries
// Get observations to find the time window
const beforeQuery = `
SELECT created_at_epoch
FROM observations
WHERE created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch DESC
LIMIT ?
`;
const afterQuery = `
SELECT created_at_epoch
FROM observations
WHERE created_at_epoch >= ? ${projectFilter}
ORDER BY created_at_epoch ASC
LIMIT ?
`;
try {
const beforeRecords = db.prepare(beforeQuery).all(anchorEpoch, ...projectParams, depthBefore) as Array<{created_at_epoch: number}>;
const afterRecords = db.prepare(afterQuery).all(anchorEpoch, ...projectParams, depthAfter + 1) as Array<{created_at_epoch: number}>;
if (beforeRecords.length === 0 && afterRecords.length === 0) {
return { observations: [], sessions: [], prompts: [] };
}
startEpoch = beforeRecords.length > 0 ? beforeRecords[beforeRecords.length - 1].created_at_epoch : anchorEpoch;
endEpoch = afterRecords.length > 0 ? afterRecords[afterRecords.length - 1].created_at_epoch : anchorEpoch;
} catch (err: any) {
logger.error('DB', 'Error getting boundary timestamps', undefined, { error: err, project });
return { observations: [], sessions: [], prompts: [] };
}
}
// Now query ALL record types within the time window
const obsQuery = `
SELECT *
FROM observations
WHERE created_at_epoch >= ? AND created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch ASC
`;
const sessQuery = `
SELECT *
FROM session_summaries
WHERE created_at_epoch >= ? AND created_at_epoch <= ? ${projectFilter}
ORDER BY created_at_epoch ASC
`;
const promptQuery = `
SELECT up.*, s.project, s.memory_session_id
FROM user_prompts up
JOIN sdk_sessions s ON up.content_session_id = s.content_session_id
WHERE up.created_at_epoch >= ? AND up.created_at_epoch <= ? ${projectFilter.replace('project', 's.project')}
ORDER BY up.created_at_epoch ASC
`;
const observations = db.prepare(obsQuery).all(startEpoch, endEpoch, ...projectParams) as ObservationRecord[];
const sessions = db.prepare(sessQuery).all(startEpoch, endEpoch, ...projectParams) as SessionSummaryRecord[];
const prompts = db.prepare(promptQuery).all(startEpoch, endEpoch, ...projectParams) as UserPromptRecord[];
return {
observations,
sessions: sessions.map(s => ({
id: s.id,
memory_session_id: s.memory_session_id,
project: s.project,
request: s.request,
completed: s.completed,
next_steps: s.next_steps,
created_at: s.created_at,
created_at_epoch: s.created_at_epoch
})),
prompts: prompts.map(p => ({
id: p.id,
content_session_id: p.content_session_id,
prompt_number: p.prompt_number,
prompt_text: p.prompt_text,
project: p.project,
created_at: p.created_at,
created_at_epoch: p.created_at_epoch
}))
};
}
/**
* Get all unique projects from the database (for web UI project filter)
*
* @param db Database connection
* @returns Array of unique project names
*/
export function getAllProjects(db: Database): string[] {
const stmt = db.prepare(`
SELECT DISTINCT project
FROM sdk_sessions
WHERE project IS NOT NULL AND project != ''
ORDER BY project ASC
`);
const rows = stmt.all() as Array<{ project: string }>;
return rows.map(row => row.project);
}
+236
View File
@@ -0,0 +1,236 @@
/**
* Cross-boundary database transactions
*
* This module contains atomic transactions that span multiple domains
* (observations, summaries, pending messages). These functions ensure
* data consistency across domain boundaries.
*/
import { Database } from 'bun:sqlite';
import type { ObservationInput } from './observations/types.js';
import type { SummaryInput } from './summaries/types.js';
/**
* Result from storeObservations / storeObservationsAndMarkComplete transaction
*/
export interface StoreObservationsResult {
observationIds: number[];
summaryId: number | null;
createdAtEpoch: number;
}
// Legacy alias for backwards compatibility
export type StoreAndMarkCompleteResult = StoreObservationsResult;
/**
* ATOMIC: Store observations + summary + mark pending message as processed
*
* This function wraps observation storage, summary storage, and message completion
* in a single database transaction to prevent race conditions. If the worker crashes
* during processing, either all operations succeed together or all fail together.
*
* This fixes the observation duplication bug where observations were stored but
* the message wasn't marked complete, causing reprocessing on crash recovery.
*
* @param db - Database instance
* @param memorySessionId - SDK memory session ID
* @param project - Project name
* @param observations - Array of observations to store (can be empty)
* @param summary - Optional summary to store
* @param messageId - Pending message ID to mark as processed
* @param promptNumber - Optional prompt number
* @param discoveryTokens - Discovery tokens count
* @param overrideTimestampEpoch - Optional override timestamp
* @returns Object with observation IDs, optional summary ID, and timestamp
*/
export function storeObservationsAndMarkComplete(
db: Database,
memorySessionId: string,
project: string,
observations: ObservationInput[],
summary: SummaryInput | null,
messageId: number,
promptNumber?: number,
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): StoreAndMarkCompleteResult {
// Use override timestamp if provided
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
// Create transaction that wraps all operations
const storeAndMarkTx = db.transaction(() => {
const observationIds: number[] = [];
// 1. Store all observations
const obsStmt = db.prepare(`
INSERT INTO observations
(memory_session_id, project, type, title, subtitle, facts, narrative, concepts,
files_read, files_modified, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const observation of observations) {
const result = obsStmt.run(
memorySessionId,
project,
observation.type,
observation.title,
observation.subtitle,
JSON.stringify(observation.facts),
observation.narrative,
JSON.stringify(observation.concepts),
JSON.stringify(observation.files_read),
JSON.stringify(observation.files_modified),
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
observationIds.push(Number(result.lastInsertRowid));
}
// 2. Store summary if provided
let summaryId: number | null = null;
if (summary) {
const summaryStmt = db.prepare(`
INSERT INTO session_summaries
(memory_session_id, project, request, investigated, learned, completed,
next_steps, notes, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = summaryStmt.run(
memorySessionId,
project,
summary.request,
summary.investigated,
summary.learned,
summary.completed,
summary.next_steps,
summary.notes,
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
summaryId = Number(result.lastInsertRowid);
}
// 3. Mark pending message as processed
// This UPDATE is part of the same transaction, so if it fails,
// observations and summary will be rolled back
const updateStmt = db.prepare(`
UPDATE pending_messages
SET
status = 'processed',
completed_at_epoch = ?,
tool_input = NULL,
tool_response = NULL
WHERE id = ? AND status = 'processing'
`);
updateStmt.run(timestampEpoch, messageId);
return { observationIds, summaryId, createdAtEpoch: timestampEpoch };
});
// Execute the transaction and return results
return storeAndMarkTx();
}
/**
* ATOMIC: Store observations + summary (no message tracking)
*
* Simplified version for use with claim-and-delete queue pattern.
* Messages are deleted from queue immediately on claim, so there's no
* message completion to track. This just stores observations and summary.
*
* @param db - Database instance
* @param memorySessionId - SDK memory session ID
* @param project - Project name
* @param observations - Array of observations to store (can be empty)
* @param summary - Optional summary to store
* @param promptNumber - Optional prompt number
* @param discoveryTokens - Discovery tokens count
* @param overrideTimestampEpoch - Optional override timestamp
* @returns Object with observation IDs, optional summary ID, and timestamp
*/
export function storeObservations(
db: Database,
memorySessionId: string,
project: string,
observations: ObservationInput[],
summary: SummaryInput | null,
promptNumber?: number,
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): StoreObservationsResult {
// Use override timestamp if provided
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
// Create transaction that wraps all operations
const storeTx = db.transaction(() => {
const observationIds: number[] = [];
// 1. Store all observations
const obsStmt = db.prepare(`
INSERT INTO observations
(memory_session_id, project, type, title, subtitle, facts, narrative, concepts,
files_read, files_modified, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
for (const observation of observations) {
const result = obsStmt.run(
memorySessionId,
project,
observation.type,
observation.title,
observation.subtitle,
JSON.stringify(observation.facts),
observation.narrative,
JSON.stringify(observation.concepts),
JSON.stringify(observation.files_read),
JSON.stringify(observation.files_modified),
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
observationIds.push(Number(result.lastInsertRowid));
}
// 2. Store summary if provided
let summaryId: number | null = null;
if (summary) {
const summaryStmt = db.prepare(`
INSERT INTO session_summaries
(memory_session_id, project, request, investigated, learned, completed,
next_steps, notes, prompt_number, discovery_tokens, created_at, created_at_epoch)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
const result = summaryStmt.run(
memorySessionId,
project,
summary.request,
summary.investigated,
summary.learned,
summary.completed,
summary.next_steps,
summary.notes,
promptNumber || null,
discoveryTokens,
timestampIso,
timestampEpoch
);
summaryId = Number(result.lastInsertRowid);
}
return { observationIds, summaryId, createdAtEpoch: timestampEpoch };
});
// Execute the transaction and return results
return storeTx();
}