MAESTRO: fix(db): prevent FK constraint failures on worker restart

Cherry-picked source changes from PR #889 by @Et9797. Fixes #846.

Key changes:
- Add ensureMemorySessionIdRegistered() guard in SessionStore.ts
- Add ON UPDATE CASCADE migration (schema v21) for observations and session_summaries FK constraints
- Change message queue from claim-and-delete to claim-confirm pattern (PendingMessageStore.ts)
- Add spawn deduplication and unrecoverable error detection in SessionRoutes.ts and worker-service.ts
- Add forceInit flag to SDKAgent for stale session recovery

Build artifacts skipped (pre-existing dompurify dep issue). Path fixes (HealthMonitor.ts, worker-utils.ts)
already merged via PR #634.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-02-06 03:16:17 -05:00
parent 7ed1e576b2
commit da1d2cd36a
20 changed files with 1136 additions and 150 deletions
+45 -6
View File
@@ -77,12 +77,13 @@ export class PendingMessageStore {
}
/**
* Atomically claim and DELETE the next pending message.
* Finds oldest pending -> returns it -> deletes from queue.
* The queue is a pure buffer: claim it, delete it, process in memory.
* Atomically claim the next pending message by marking it as 'processing'.
* CRITICAL FIX: Does NOT delete - message stays in DB until confirmProcessed() is called.
* This prevents message loss if the generator crashes mid-processing.
* Uses a transaction to prevent race conditions.
*/
claimAndDelete(sessionDbId: number): PersistentPendingMessage | null {
const now = Date.now();
const claimTx = this.db.transaction((sessionId: number) => {
const peekStmt = this.db.prepare(`
SELECT * FROM pending_messages
@@ -93,9 +94,14 @@ export class PendingMessageStore {
const msg = peekStmt.get(sessionId) as PersistentPendingMessage | null;
if (msg) {
// Delete immediately - no "processing" state needed
const deleteStmt = this.db.prepare('DELETE FROM pending_messages WHERE id = ?');
deleteStmt.run(msg.id);
// CRITICAL FIX: Mark as 'processing' instead of deleting
// Message will be deleted by confirmProcessed() after successful store
const updateStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'processing', started_processing_at_epoch = ?
WHERE id = ?
`);
updateStmt.run(now, msg.id);
// Log claim with minimal info (avoid logging full payload)
logger.info('QUEUE', `CLAIMED | sessionDbId=${sessionId} | messageId=${msg.id} | type=${msg.message_type}`, {
@@ -108,6 +114,39 @@ export class PendingMessageStore {
return claimTx(sessionDbId) as PersistentPendingMessage | null;
}
/**
* Confirm a message was successfully processed - DELETE it from the queue.
* CRITICAL: Only call this AFTER the observation/summary has been stored to DB.
* This prevents message loss on generator crash.
*/
confirmProcessed(messageId: number): void {
const stmt = this.db.prepare('DELETE FROM pending_messages WHERE id = ?');
const result = stmt.run(messageId);
if (result.changes > 0) {
logger.debug('QUEUE', `CONFIRMED | messageId=${messageId} | deleted from queue`);
}
}
/**
* Reset stale 'processing' messages back to 'pending' for retry.
* Called on worker startup and periodically to recover from crashes.
* @param thresholdMs Messages processing longer than this are considered stale (default: 5 minutes)
* @returns Number of messages reset
*/
resetStaleProcessingMessages(thresholdMs: number = 5 * 60 * 1000): number {
const cutoff = Date.now() - thresholdMs;
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
const result = stmt.run(cutoff);
if (result.changes > 0) {
logger.info('QUEUE', `RESET_STALE | count=${result.changes} | thresholdMs=${thresholdMs}`);
}
return result.changes;
}
/**
* Get all pending messages for session (ordered by creation time)
*/
+234 -25
View File
@@ -47,6 +47,7 @@ export class SessionStore {
this.renameSessionIdColumns();
this.repairSessionIdColumnRename();
this.addFailedAtEpochColumn();
this.addOnUpdateCascadeToForeignKeys();
}
/**
@@ -101,7 +102,7 @@ export class SessionStore {
type TEXT NOT NULL CHECK(type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery')),
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_observations_sdk_session ON observations(memory_session_id);
@@ -123,7 +124,7 @@ export class SessionStore {
notes TEXT,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_session_summaries_sdk_session ON session_summaries(memory_session_id);
@@ -645,11 +646,187 @@ export class SessionStore {
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(20, new Date().toISOString());
}
/**
* Add ON UPDATE CASCADE to FK constraints on observations and session_summaries (migration 21)
*
* Both tables have FK(memory_session_id) -> sdk_sessions(memory_session_id) with ON DELETE CASCADE
* but missing ON UPDATE CASCADE. This causes FK constraint violations when code updates
* sdk_sessions.memory_session_id while child rows still reference the old value.
*
* SQLite doesn't support ALTER TABLE for FK changes, so we recreate both tables.
*/
private addOnUpdateCascadeToForeignKeys(): void {
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(21) as SchemaVersion | undefined;
if (applied) return;
logger.debug('DB', 'Adding ON UPDATE CASCADE to FK constraints on observations and session_summaries');
this.db.run('BEGIN TRANSACTION');
try {
// ==========================================
// 1. Recreate observations table
// ==========================================
// Drop FTS triggers first (they reference the observations table)
this.db.run('DROP TRIGGER IF EXISTS observations_ai');
this.db.run('DROP TRIGGER IF EXISTS observations_ad');
this.db.run('DROP TRIGGER IF EXISTS observations_au');
this.db.run(`
CREATE TABLE observations_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT NOT NULL,
project TEXT NOT NULL,
text TEXT,
type TEXT NOT NULL CHECK(type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery', 'change')),
title TEXT,
subtitle TEXT,
facts TEXT,
narrative TEXT,
concepts TEXT,
files_read TEXT,
files_modified TEXT,
prompt_number INTEGER,
discovery_tokens INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE ON UPDATE CASCADE
)
`);
this.db.run(`
INSERT INTO observations_new
SELECT id, memory_session_id, project, text, type, title, subtitle, facts,
narrative, concepts, files_read, files_modified, prompt_number,
discovery_tokens, created_at, created_at_epoch
FROM observations
`);
this.db.run('DROP TABLE observations');
this.db.run('ALTER TABLE observations_new RENAME TO observations');
// Recreate indexes
this.db.run(`
CREATE INDEX idx_observations_sdk_session ON observations(memory_session_id);
CREATE INDEX idx_observations_project ON observations(project);
CREATE INDEX idx_observations_type ON observations(type);
CREATE INDEX idx_observations_created ON observations(created_at_epoch DESC);
`);
// Recreate FTS triggers only if observations_fts exists
// (SessionSearch.ensureFTSTables creates it on first use with IF NOT EXISTS)
const hasFTS = (this.db.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='observations_fts'").all() as { name: string }[]).length > 0;
if (hasFTS) {
this.db.run(`
CREATE TRIGGER IF NOT EXISTS observations_ai AFTER INSERT ON observations BEGIN
INSERT INTO observations_fts(rowid, title, subtitle, narrative, text, facts, concepts)
VALUES (new.id, new.title, new.subtitle, new.narrative, new.text, new.facts, new.concepts);
END;
CREATE TRIGGER IF NOT EXISTS observations_ad AFTER DELETE ON observations BEGIN
INSERT INTO observations_fts(observations_fts, rowid, title, subtitle, narrative, text, facts, concepts)
VALUES('delete', old.id, old.title, old.subtitle, old.narrative, old.text, old.facts, old.concepts);
END;
CREATE TRIGGER IF NOT EXISTS observations_au AFTER UPDATE ON observations BEGIN
INSERT INTO observations_fts(observations_fts, rowid, title, subtitle, narrative, text, facts, concepts)
VALUES('delete', old.id, old.title, old.subtitle, old.narrative, old.text, old.facts, old.concepts);
INSERT INTO observations_fts(rowid, title, subtitle, narrative, text, facts, concepts)
VALUES (new.id, new.title, new.subtitle, new.narrative, new.text, new.facts, new.concepts);
END;
`);
}
// ==========================================
// 2. Recreate session_summaries table
// ==========================================
this.db.run(`
CREATE TABLE session_summaries_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_session_id TEXT NOT NULL,
project TEXT NOT NULL,
request TEXT,
investigated TEXT,
learned TEXT,
completed TEXT,
next_steps TEXT,
files_read TEXT,
files_edited TEXT,
notes TEXT,
prompt_number INTEGER,
discovery_tokens INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
FOREIGN KEY(memory_session_id) REFERENCES sdk_sessions(memory_session_id) ON DELETE CASCADE ON UPDATE CASCADE
)
`);
this.db.run(`
INSERT INTO session_summaries_new
SELECT id, memory_session_id, project, request, investigated, learned,
completed, next_steps, files_read, files_edited, notes,
prompt_number, discovery_tokens, created_at, created_at_epoch
FROM session_summaries
`);
// Drop session_summaries FTS triggers before dropping the table
this.db.run('DROP TRIGGER IF EXISTS session_summaries_ai');
this.db.run('DROP TRIGGER IF EXISTS session_summaries_ad');
this.db.run('DROP TRIGGER IF EXISTS session_summaries_au');
this.db.run('DROP TABLE session_summaries');
this.db.run('ALTER TABLE session_summaries_new RENAME TO session_summaries');
// Recreate indexes
this.db.run(`
CREATE INDEX idx_session_summaries_sdk_session ON session_summaries(memory_session_id);
CREATE INDEX idx_session_summaries_project ON session_summaries(project);
CREATE INDEX idx_session_summaries_created ON session_summaries(created_at_epoch DESC);
`);
// Recreate session_summaries FTS triggers if FTS table exists
const hasSummariesFTS = (this.db.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='session_summaries_fts'").all() as { name: string }[]).length > 0;
if (hasSummariesFTS) {
this.db.run(`
CREATE TRIGGER IF NOT EXISTS session_summaries_ai AFTER INSERT ON session_summaries BEGIN
INSERT INTO session_summaries_fts(rowid, request, investigated, learned, completed, next_steps, notes)
VALUES (new.id, new.request, new.investigated, new.learned, new.completed, new.next_steps, new.notes);
END;
CREATE TRIGGER IF NOT EXISTS session_summaries_ad AFTER DELETE ON session_summaries BEGIN
INSERT INTO session_summaries_fts(session_summaries_fts, rowid, request, investigated, learned, completed, next_steps, notes)
VALUES('delete', old.id, old.request, old.investigated, old.learned, old.completed, old.next_steps, old.notes);
END;
CREATE TRIGGER IF NOT EXISTS session_summaries_au AFTER UPDATE ON session_summaries BEGIN
INSERT INTO session_summaries_fts(session_summaries_fts, rowid, request, investigated, learned, completed, next_steps, notes)
VALUES('delete', old.id, old.request, old.investigated, old.learned, old.completed, old.next_steps, old.notes);
INSERT INTO session_summaries_fts(rowid, request, investigated, learned, completed, next_steps, notes)
VALUES (new.id, new.request, new.investigated, new.learned, new.completed, new.next_steps, new.notes);
END;
`);
}
// Record migration
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(21, new Date().toISOString());
this.db.run('COMMIT');
logger.debug('DB', 'Successfully added ON UPDATE CASCADE to FK constraints');
} catch (error) {
this.db.run('ROLLBACK');
throw error;
}
}
/**
* Update the memory session ID for a session
* Called by SDKAgent when it captures the session ID from the first SDK message
* Also used to RESET to null on stale resume failures (worker-service.ts)
*/
updateMemorySessionId(sessionDbId: number, memorySessionId: string): void {
updateMemorySessionId(sessionDbId: number, memorySessionId: string | null): void {
this.db.prepare(`
UPDATE sdk_sessions
SET memory_session_id = ?
@@ -657,6 +834,37 @@ export class SessionStore {
`).run(memorySessionId, sessionDbId);
}
/**
* Ensures memory_session_id is registered in sdk_sessions before FK-constrained INSERT.
* This fixes Issue #846 where observations fail after worker restart because the
* SDK generates a new memory_session_id but it's not registered in the parent table
* before child records try to reference it.
*
* @param sessionDbId - The database ID of the session
* @param memorySessionId - The memory session ID to ensure is registered
*/
ensureMemorySessionIdRegistered(sessionDbId: number, memorySessionId: string): void {
const session = this.db.prepare(`
SELECT id, memory_session_id FROM sdk_sessions WHERE id = ?
`).get(sessionDbId) as { id: number; memory_session_id: string | null } | undefined;
if (!session) {
throw new Error(`Session ${sessionDbId} not found in sdk_sessions`);
}
if (session.memory_session_id !== memorySessionId) {
this.db.prepare(`
UPDATE sdk_sessions SET memory_session_id = ? WHERE id = ?
`).run(memorySessionId, sessionDbId);
logger.info('DB', 'Registered memory_session_id before storage (FK fix)', {
sessionDbId,
oldId: session.memory_session_id,
newId: memorySessionId
});
}
}
/**
* Get recent session summaries for a project
*/
@@ -1151,39 +1359,40 @@ export class SessionStore {
* - Prompt #2+: session_id exists → INSERT ignored, fetch existing ID
* - Result: Same database ID returned for all prompts in conversation
*
* WHY THIS MATTERS:
* - NO "does session exist?" checks needed anywhere
* - NO risk of creating duplicate sessions
* - ALL hooks automatically connected via session_id
* - SAVE hook observations go to correct session (same session_id)
* - SDKAgent continuation prompt has correct context (same session_id)
*
* This is KISS in action: Trust the database UNIQUE constraint and
* INSERT OR IGNORE to handle both creation and lookup elegantly.
* Pure get-or-create: never modifies memory_session_id.
* Multi-terminal isolation is handled by ON UPDATE CASCADE at the schema level.
*/
createSDKSession(contentSessionId: string, project: string, userPrompt: string): number {
const now = new Date();
const nowEpoch = now.getTime();
// INSERT OR IGNORE to create session, then backfill project if it was created empty
// Session reuse: Return existing session ID if already created for this contentSessionId.
const existing = this.db.prepare(`
SELECT id FROM sdk_sessions WHERE content_session_id = ?
`).get(contentSessionId) as { id: number } | undefined;
if (existing) {
// Backfill project if session was created by another hook with empty project
if (project) {
this.db.prepare(`
UPDATE sdk_sessions SET project = ?
WHERE content_session_id = ? AND (project IS NULL OR project = '')
`).run(project, contentSessionId);
}
return existing.id;
}
// New session - insert fresh row
// NOTE: memory_session_id starts as NULL. It is captured by SDKAgent from the first SDK
// response and stored via updateMemorySessionId(). CRITICAL: memory_session_id must NEVER
// equal contentSessionId - that would inject memory messages into the user's transcript!
// response and stored via ensureMemorySessionIdRegistered(). CRITICAL: memory_session_id
// must NEVER equal contentSessionId - that would inject memory messages into the user's transcript!
this.db.prepare(`
INSERT OR IGNORE INTO sdk_sessions
INSERT INTO sdk_sessions
(content_session_id, memory_session_id, project, user_prompt, started_at, started_at_epoch, status)
VALUES (?, NULL, ?, ?, ?, ?, 'active')
`).run(contentSessionId, project, userPrompt, now.toISOString(), nowEpoch);
// Backfill project if session was created by another hook with empty project
if (project) {
this.db.prepare(`
UPDATE sdk_sessions SET project = ?
WHERE content_session_id = ? AND (project IS NULL OR project = '')
`).run(project, contentSessionId);
}
// Return existing or new ID
// Return new ID
const row = this.db.prepare('SELECT id FROM sdk_sessions WHERE content_session_id = ?')
.get(contentSessionId) as { id: number };
return row.id;
+25 -20
View File
@@ -14,12 +14,8 @@ import { logger } from '../../../utils/logger.js';
* - Prompt #2+: session_id exists -> INSERT ignored, fetch existing ID
* - Result: Same database ID returned for all prompts in conversation
*
* WHY THIS MATTERS:
* - NO "does session exist?" checks needed anywhere
* - NO risk of creating duplicate sessions
* - ALL hooks automatically connected via session_id
* - SAVE hook observations go to correct session (same session_id)
* - SDKAgent continuation prompt has correct context (same session_id)
* Pure get-or-create: never modifies memory_session_id.
* Multi-terminal isolation is handled by ON UPDATE CASCADE at the schema level.
*/
export function createSDKSession(
db: Database,
@@ -30,25 +26,33 @@ export function createSDKSession(
const now = new Date();
const nowEpoch = now.getTime();
// INSERT OR IGNORE to create session, then backfill project if it was created empty
// Check for existing session
const existing = db.prepare(`
SELECT id FROM sdk_sessions WHERE content_session_id = ?
`).get(contentSessionId) as { id: number } | undefined;
if (existing) {
// Backfill project if session was created by another hook with empty project
if (project) {
db.prepare(`
UPDATE sdk_sessions SET project = ?
WHERE content_session_id = ? AND (project IS NULL OR project = '')
`).run(project, contentSessionId);
}
return existing.id;
}
// New session - insert fresh row
// NOTE: memory_session_id starts as NULL. It is captured by SDKAgent from the first SDK
// response and stored via updateMemorySessionId(). CRITICAL: memory_session_id must NEVER
// equal contentSessionId - that would inject memory messages into the user's transcript!
// response and stored via ensureMemorySessionIdRegistered(). CRITICAL: memory_session_id
// must NEVER equal contentSessionId - that would inject memory messages into the user's transcript!
db.prepare(`
INSERT OR IGNORE INTO sdk_sessions
INSERT INTO sdk_sessions
(content_session_id, memory_session_id, project, user_prompt, started_at, started_at_epoch, status)
VALUES (?, NULL, ?, ?, ?, ?, 'active')
`).run(contentSessionId, project, userPrompt, now.toISOString(), nowEpoch);
// Backfill project if session was created by another hook with empty project
if (project) {
db.prepare(`
UPDATE sdk_sessions SET project = ?
WHERE content_session_id = ? AND (project IS NULL OR project = '')
`).run(project, contentSessionId);
}
// Return existing or new ID
// Return new ID
const row = db.prepare('SELECT id FROM sdk_sessions WHERE content_session_id = ?')
.get(contentSessionId) as { id: number };
return row.id;
@@ -57,11 +61,12 @@ export function createSDKSession(
/**
* Update the memory session ID for a session
* Called by SDKAgent when it captures the session ID from the first SDK message
* Also used to RESET to null on stale resume failures (worker-service.ts)
*/
export function updateMemorySessionId(
db: Database,
sessionDbId: number,
memorySessionId: string
memorySessionId: string | null
): void {
db.prepare(`
UPDATE sdk_sessions
+77 -4
View File
@@ -190,6 +190,7 @@ export class WorkerService {
this.broadcastProcessingStatus();
});
// Initialize MCP client
// Empty capabilities object: this client only calls tools, doesn't expose any
this.mcpClient = new Client({
@@ -319,13 +320,12 @@ export class WorkerService {
await this.dbManager.initialize();
// Recover stuck messages from previous crashes
// Reset any messages that were processing when worker died
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const STUCK_THRESHOLD_MS = 5 * 60 * 1000;
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
const resetCount = pendingStore.resetStaleProcessingMessages(0); // 0 = reset ALL processing
if (resetCount > 0) {
logger.info('SYSTEM', `Recovered ${resetCount} stuck messages from previous session`, { thresholdMinutes: 5 });
logger.info('SYSTEM', `Reset ${resetCount} stale processing messages to pending`);
}
// Initialize search services
@@ -421,10 +421,43 @@ export class WorkerService {
const agent = this.getActiveAgent();
const providerName = agent.constructor.name;
// Before starting generator, check if AbortController is already aborted
// This can happen after a previous generator was aborted but the session still has pending work
if (session.abortController.signal.aborted) {
logger.debug('SYSTEM', 'Replacing aborted AbortController before starting generator', {
sessionId: session.sessionDbId
});
session.abortController = new AbortController();
}
// Track whether generator failed with an unrecoverable error to prevent infinite restart loops
let hadUnrecoverableError = false;
logger.info('SYSTEM', `Starting generator (${source}) using ${providerName}`, { sessionId: sid });
session.generatorPromise = agent.startSession(session, this)
.catch(async (error: unknown) => {
const errorMessage = (error as Error)?.message || '';
// Detect unrecoverable errors that should NOT trigger restart
// These errors will fail immediately on retry, causing infinite loops
const unrecoverablePatterns = [
'Claude executable not found',
'CLAUDE_CODE_PATH',
'ENOENT',
'spawn',
];
if (unrecoverablePatterns.some(pattern => errorMessage.includes(pattern))) {
hadUnrecoverableError = true;
logger.error('SDK', 'Unrecoverable generator error - will NOT restart', {
sessionId: session.sessionDbId,
project: session.project,
errorMessage
});
return;
}
// Fallback for terminated SDK sessions (provider abstraction)
if (this.isSessionTerminatedError(error)) {
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
sessionId: session.sessionDbId,
@@ -433,6 +466,20 @@ export class WorkerService {
});
return this.runFallbackForTerminatedSession(session, error);
}
// Detect stale resume failures - SDK session context was lost
if ((errorMessage.includes('aborted by user') || errorMessage.includes('No conversation found'))
&& session.memorySessionId) {
logger.warn('SDK', 'Detected stale resume failure, clearing memorySessionId for fresh start', {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId,
errorMessage
});
// Clear stale memorySessionId and force fresh init on next attempt
this.dbManager.getSessionStore().updateMemorySessionId(session.sessionDbId, null);
session.memorySessionId = null;
session.forceInit = true;
}
logger.error('SDK', 'Session generator failed', {
sessionId: session.sessionDbId,
project: session.project,
@@ -442,6 +489,32 @@ export class WorkerService {
})
.finally(() => {
session.generatorPromise = null;
// Do NOT restart after unrecoverable errors - prevents infinite loops
if (hadUnrecoverableError) {
logger.warn('SYSTEM', 'Skipping restart due to unrecoverable error', {
sessionId: session.sessionDbId
});
this.broadcastProcessingStatus();
return;
}
// Check if there's pending work that needs processing with a fresh AbortController
const { PendingMessageStore } = require('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const pendingCount = pendingStore.getPendingCount(session.sessionDbId);
if (pendingCount > 0) {
logger.info('SYSTEM', 'Pending work remains after generator exit, restarting with fresh AbortController', {
sessionId: session.sessionDbId,
pendingCount
});
// Reset AbortController for restart
session.abortController = new AbortController();
// Restart processor
this.startSessionProcessor(session, 'pending-work-restart');
}
this.broadcastProcessingStatus();
});
}
+4
View File
@@ -34,6 +34,10 @@ export interface ActiveSession {
conversationHistory: ConversationMessage[]; // Shared conversation history for provider switching
currentProvider: 'claude' | 'gemini' | 'openrouter' | null; // Track which provider is currently running
consecutiveRestarts: number; // Track consecutive restart attempts to prevent infinite loops
forceInit?: boolean; // Force fresh SDK session (skip resume)
// CLAIM-CONFIRM FIX: Track IDs of messages currently being processed
// These IDs will be confirmed (deleted) after successful storage
processingMessageIds: number[];
}
export interface PendingMessage {
+4
View File
@@ -186,6 +186,10 @@ export class GeminiAgent {
let lastCwd: string | undefined;
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
// CLAIM-CONFIRM: Track message ID for confirmProcessed() after successful storage
// The message is now in 'processing' status in DB until ResponseProcessor calls confirmProcessed()
session.processingMessageIds.push(message._persistentId);
// Capture cwd from each message for worktree support
if (message.cwd) {
lastCwd = message.cwd;
+4
View File
@@ -145,6 +145,10 @@ export class OpenRouterAgent {
// Process pending messages
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
// CLAIM-CONFIRM: Track message ID for confirmProcessed() after successful storage
// The message is now in 'processing' status in DB until ResponseProcessor calls confirmProcessed()
session.processingMessageIds.push(message._persistentId);
// Capture cwd from messages for proper worktree support
if (message.cwd) {
lastCwd = message.cwd;
+50 -15
View File
@@ -72,10 +72,21 @@ export class SDKAgent {
// CRITICAL: Only resume if:
// 1. memorySessionId exists (was captured from a previous SDK response)
// 2. lastPromptNumber > 1 (this is a continuation within the same SDK session)
// 3. forceInit is NOT set (stale session recovery clears this)
// On worker restart or crash recovery, memorySessionId may exist from a previous
// SDK session but we must NOT resume because the SDK context was lost.
// NEVER use contentSessionId for resume - that would inject messages into the user's transcript!
const hasRealMemorySessionId = !!session.memorySessionId;
const shouldResume = hasRealMemorySessionId && session.lastPromptNumber > 1 && !session.forceInit;
// Clear forceInit after using it
if (session.forceInit) {
logger.info('SDK', 'forceInit flag set, starting fresh SDK session', {
sessionDbId: session.sessionDbId,
previousMemorySessionId: session.memorySessionId
});
session.forceInit = false;
}
// Build isolated environment from ~/.claude-mem/.env
// This prevents Issue #733: random ANTHROPIC_API_KEY from project .env files
@@ -88,15 +99,15 @@ export class SDKAgent {
contentSessionId: session.contentSessionId,
memorySessionId: session.memorySessionId,
hasRealMemorySessionId,
resume_parameter: hasRealMemorySessionId ? session.memorySessionId : '(none - fresh start)',
shouldResume,
resume_parameter: shouldResume ? session.memorySessionId : '(none - fresh start)',
lastPromptNumber: session.lastPromptNumber,
authMethod
});
// Debug-level alignment logs for detailed tracing
if (session.lastPromptNumber > 1) {
const willResume = hasRealMemorySessionId;
logger.debug('SDK', `[ALIGNMENT] Resume Decision | contentSessionId=${session.contentSessionId} | memorySessionId=${session.memorySessionId} | prompt#=${session.lastPromptNumber} | hasRealMemorySessionId=${hasRealMemorySessionId} | willResume=${willResume} | resumeWith=${willResume ? session.memorySessionId : 'NONE'}`);
logger.debug('SDK', `[ALIGNMENT] Resume Decision | contentSessionId=${session.contentSessionId} | memorySessionId=${session.memorySessionId} | prompt#=${session.lastPromptNumber} | hasRealMemorySessionId=${hasRealMemorySessionId} | shouldResume=${shouldResume} | resumeWith=${shouldResume ? session.memorySessionId : 'NONE'}`);
} else {
// INIT prompt - never resume even if memorySessionId exists (stale from previous session)
const hasStaleMemoryId = hasRealMemorySessionId;
@@ -119,10 +130,8 @@ export class SDKAgent {
// Isolate observer sessions - they'll appear under project "observer-sessions"
// instead of polluting user's actual project resume lists
cwd: OBSERVER_SESSIONS_DIR,
// Only resume if BOTH: (1) we have a memorySessionId AND (2) this isn't the first prompt
// On worker restart, memorySessionId may exist from a previous SDK session but we
// need to start fresh since the SDK context was lost
...(hasRealMemorySessionId && session.lastPromptNumber > 1 && { resume: session.memorySessionId }),
// Only resume if shouldResume is true (memorySessionId exists, not first prompt, not forceInit)
...(shouldResume && { resume: session.memorySessionId }),
disallowedTools,
abortController: session.abortController,
pathToClaudeCodeExecutable: claudePath,
@@ -134,21 +143,35 @@ export class SDKAgent {
// Process SDK messages
for await (const message of queryResult) {
// Capture memory session ID from first SDK message (any type has session_id)
// This enables resume for subsequent generator starts within the same user session
if (!session.memorySessionId && message.session_id) {
// Capture or update memory session ID from SDK message
// IMPORTANT: The SDK may return a DIFFERENT session_id on resume than what we sent!
// We must always sync the DB to match what the SDK actually uses.
//
// MULTI-TERMINAL COLLISION FIX (FK constraint bug):
// Use ensureMemorySessionIdRegistered() instead of updateMemorySessionId() because:
// 1. It's idempotent - safe to call multiple times
// 2. It verifies the update happened (SELECT before UPDATE)
// 3. Consistent with ResponseProcessor's usage pattern
// This ensures FK constraint compliance BEFORE any observations are stored.
if (message.session_id && message.session_id !== session.memorySessionId) {
const previousId = session.memorySessionId;
session.memorySessionId = message.session_id;
// Persist to database for cross-restart recovery
this.dbManager.getSessionStore().updateMemorySessionId(
// Persist to database IMMEDIATELY for FK constraint compliance
// This must happen BEFORE any observations referencing this ID are stored
this.dbManager.getSessionStore().ensureMemorySessionIdRegistered(
session.sessionDbId,
message.session_id
);
// Verify the update by reading back from DB
const verification = this.dbManager.getSessionStore().getSessionById(session.sessionDbId);
const dbVerified = verification?.memory_session_id === message.session_id;
logger.info('SESSION', `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`, {
const logMessage = previousId
? `MEMORY_ID_CHANGED | sessionDbId=${session.sessionDbId} | from=${previousId} | to=${message.session_id} | dbVerified=${dbVerified}`
: `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`;
logger.info('SESSION', logMessage, {
sessionId: session.sessionDbId,
memorySessionId: message.session_id
memorySessionId: message.session_id,
previousId
});
if (!dbVerified) {
logger.error('SESSION', `MEMORY_ID_MISMATCH | sessionDbId=${session.sessionDbId} | expected=${message.session_id} | got=${verification?.memory_session_id}`, {
@@ -156,7 +179,7 @@ export class SDKAgent {
});
}
// Debug-level alignment log for detailed tracing
logger.debug('SDK', `[ALIGNMENT] Captured | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
logger.debug('SDK', `[ALIGNMENT] ${previousId ? 'Updated' : 'Captured'} | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
}
// Handle assistant messages
@@ -166,6 +189,14 @@ export class SDKAgent {
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
: typeof content === 'string' ? content : '';
// Check for context overflow - prevents infinite retry loops
if (textContent.includes('prompt is too long') ||
textContent.includes('context window')) {
logger.error('SDK', 'Context overflow detected - terminating session');
session.abortController.abort();
return;
}
const responseSize = textContent.length;
// Capture token state BEFORE updating (for delta calculation)
@@ -317,6 +348,10 @@ export class SDKAgent {
// Consume pending messages from SessionManager (event-driven, no polling)
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
// CLAIM-CONFIRM: Track message ID for confirmProcessed() after successful storage
// The message is now in 'processing' status in DB until ResponseProcessor calls confirmProcessed()
session.processingMessageIds.push(message._persistentId);
// Capture cwd from each message for worktree support
if (message.cwd) {
cwdTracker.lastCwd = message.cwd;
+2 -1
View File
@@ -154,7 +154,8 @@ export class SessionManager {
earliestPendingTimestamp: null,
conversationHistory: [], // Initialize empty - will be populated by agents
currentProvider: null, // Will be set when generator starts
consecutiveRestarts: 0 // Track consecutive restart attempts to prevent infinite loops
consecutiveRestarts: 0, // Track consecutive restart attempts to prevent infinite loops
processingMessageIds: [] // CLAIM-CONFIRM: Track message IDs for confirmProcessed()
};
logger.debug('SESSION', 'Creating new session object (memorySessionId cleared to prevent stale resume)', {
@@ -76,6 +76,14 @@ export async function processAgentResponse(
throw new Error('Cannot store observations: memorySessionId not yet captured');
}
// SAFETY NET (Issue #846 / Multi-terminal FK fix):
// The PRIMARY fix is in SDKAgent.ts where ensureMemorySessionIdRegistered() is called
// immediately when the SDK returns a memory_session_id. This call is a defensive safety net
// in case the DB was somehow not updated (race condition, crash, etc.).
// In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL
// for each new generator, ensuring clean isolation.
sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId);
// Log pre-storage with session ID chain for verification
logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {
sessionId: session.sessionDbId,
@@ -100,6 +108,18 @@ export async function processAgentResponse(
memorySessionId: session.memorySessionId
});
// CLAIM-CONFIRM: Now that storage succeeded, confirm all processing messages (delete from queue)
// This is the critical step that prevents message loss on generator crash
const pendingStore = sessionManager.getPendingMessageStore();
for (const messageId of session.processingMessageIds) {
pendingStore.confirmProcessed(messageId);
}
if (session.processingMessageIds.length > 0) {
logger.debug('QUEUE', `CONFIRMED_BATCH | sessionDbId=${session.sessionDbId} | count=${session.processingMessageIds.length} | ids=[${session.processingMessageIds.join(',')}]`);
}
// Clear the tracking array after confirmation
session.processingMessageIds = [];
// AFTER transaction commits - async operations (can fail safely without data loss)
await syncAndBroadcastObservations(
observations,
@@ -24,6 +24,8 @@ import { USER_SETTINGS_PATH } from '../../../../shared/paths.js';
export class SessionRoutes extends BaseRouteHandler {
private completionHandler: SessionCompletionHandler;
private spawnInProgress = new Map<number, boolean>();
private crashRecoveryScheduled = new Set<number>();
constructor(
private sessionManager: SessionManager,
@@ -91,10 +93,17 @@ export class SessionRoutes extends BaseRouteHandler {
const session = this.sessionManager.getSession(sessionDbId);
if (!session) return;
// GUARD: Prevent duplicate spawns
if (this.spawnInProgress.get(sessionDbId)) {
logger.debug('SESSION', 'Spawn already in progress, skipping', { sessionDbId, source });
return;
}
const selectedProvider = this.getSelectedProvider();
// Start generator if not running
if (!session.generatorPromise) {
this.spawnInProgress.set(sessionDbId, true);
this.startGeneratorWithProvider(session, selectedProvider, source);
return;
}
@@ -135,9 +144,13 @@ export class SessionRoutes extends BaseRouteHandler {
const agent = provider === 'openrouter' ? this.openRouterAgent : (provider === 'gemini' ? this.geminiAgent : this.sdkAgent);
const agentName = provider === 'openrouter' ? 'OpenRouter' : (provider === 'gemini' ? 'Gemini' : 'Claude SDK');
// Use database count for accurate telemetry (in-memory array is always empty due to FK constraint fix)
const pendingStore = this.sessionManager.getPendingMessageStore();
const actualQueueDepth = pendingStore.getPendingCount(session.sessionDbId);
logger.info('SESSION', `Generator auto-starting (${source}) using ${agentName}`, {
sessionId: session.sessionDbId,
queueDepth: session.pendingMessages.length,
queueDepth: actualQueueDepth,
historyLength: session.conversationHistory.length
});
@@ -173,6 +186,7 @@ export class SessionRoutes extends BaseRouteHandler {
})
.finally(() => {
const sessionDbId = session.sessionDbId;
this.spawnInProgress.delete(sessionDbId);
const wasAborted = session.abortController.signal.aborted;
if (wasAborted) {
@@ -196,6 +210,12 @@ export class SessionRoutes extends BaseRouteHandler {
const MAX_CONSECUTIVE_RESTARTS = 3;
if (pendingCount > 0) {
// GUARD: Prevent duplicate crash recovery spawns
if (this.crashRecoveryScheduled.has(sessionDbId)) {
logger.debug('SESSION', 'Crash recovery already scheduled', { sessionDbId });
return;
}
session.consecutiveRestarts = (session.consecutiveRestarts || 0) + 1;
if (session.consecutiveRestarts > MAX_CONSECUTIVE_RESTARTS) {
@@ -223,11 +243,14 @@ export class SessionRoutes extends BaseRouteHandler {
session.abortController = new AbortController();
oldController.abort();
this.crashRecoveryScheduled.add(sessionDbId);
// Exponential backoff: 1s, 2s, 4s for subsequent restarts
const backoffMs = Math.min(1000 * Math.pow(2, session.consecutiveRestarts - 1), 8000);
// Delay before restart with exponential backoff
setTimeout(() => {
this.crashRecoveryScheduled.delete(sessionDbId);
const stillExists = this.sessionManager.getSession(sessionDbId);
if (stillExists && !stillExists.generatorPromise) {
this.startGeneratorWithProvider(stillExists, this.getSelectedProvider(), 'crash-recovery');
@@ -398,11 +421,15 @@ export class SessionRoutes extends BaseRouteHandler {
return;
}
// Use database count for accurate queue length (in-memory array is always empty due to FK constraint fix)
const pendingStore = this.sessionManager.getPendingMessageStore();
const queueLength = pendingStore.getPendingCount(sessionDbId);
res.json({
status: 'active',
sessionDbId,
project: session.project,
queueLength: session.pendingMessages.length,
queueLength,
uptime: Date.now() - session.startTime
});
});