Files
claude-mem/src/services/sqlite/PendingMessageStore.ts
T
Alex Newman c6f932988a Fix 30+ root-cause bugs across 10 triage phases (#1214)
* MAESTRO: fix ChromaDB core issues — Python pinning, Windows paths, disable toggle, metadata sanitization, transport errors

- Add --python version pinning to uvx args in both local and remote mode (fixes #1196, #1206, #1208)
- Convert backslash paths to forward slashes for --data-dir on Windows (fixes #1199)
- Add CLAUDE_MEM_CHROMA_ENABLED setting for SQLite-only fallback mode (fixes #707)
- Sanitize metadata in addDocuments() to filter null/undefined/empty values (fixes #1183, #1188)
- Wrap callTool() in try/catch for transport errors with auto-reconnect (fixes #1162)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix data integrity — content-hash deduplication, project name collision, empty project guard, stuck isProcessing

- Add SHA-256 content-hash deduplication to observations INSERT (store.ts, transactions.ts, SessionStore.ts)
- Add content_hash column via migration 22 with backfill and index
- Fix project name collision: getCurrentProjectName() now returns parent/basename
- Guard against empty project string with cwd-derived fallback
- Fix stuck isProcessing: hasAnyPendingWork() resets processing messages older than 5 minutes
- Add 12 new tests covering all four fixes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix hook lifecycle — stderr suppression, output isolation, conversation pollution prevention

- Suppress process.stderr.write in hookCommand() to prevent Claude Code showing diagnostic
  output as error UI (#1181). Restores stderr in finally block for worker-continues case.
- Convert console.error() to logger.warn()/error() in hook-command.ts and handlers/index.ts
  so all diagnostics route to log file instead of stderr.
- Verified all 7 handlers return suppressOutput: true (prevents conversation pollution #598, #784).
- Verified session-complete is a recognized event type (fixes #984).
- Verified unknown event types return no-op handler with exit 0 (graceful degradation).
- Added 10 new tests in tests/hook-lifecycle.test.ts covering event dispatch, adapter defaults,
  stderr suppression, and standard response constants.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix worker lifecycle — restart loop coordination, stale transport retry, ENOENT shutdown race

- Add PID file mtime guard to prevent concurrent restart storms (#1145):
  isPidFileRecent() + touchPidFile() coordinate across sessions
- Add transparent retry in ChromaMcpManager.callTool() on transport
  error — reconnects and retries once instead of failing (#1131)
- Wrap getInstalledPluginVersion() with ENOENT/EBUSY handling (#1042)
- Verified ChromaMcpManager.stop() already called on all shutdown paths

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix Windows platform support — uvx.cmd spawn, PowerShell $_ elimination, windowsHide, FTS5 fallback

- Route uvx spawn through cmd.exe /c on Windows since MCP SDK lacks shell:true (#1190, #1192, #1199)
- Replace all PowerShell Where-Object {$_} pipelines with WQL -Filter server-side filtering (#1024, #1062)
- Add windowsHide: true to all exec/spawn calls missing it to prevent console popups (#1048)
- Add FTS5 runtime probe with graceful fallback when unavailable on Windows (#791)
- Guard FTS5 table creation in migrations, SessionSearch, and SessionStore with try/catch

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix skills/ distribution — build-time verification and regression tests (#1187)

Add post-build verification in build-hooks.js that fails if critical
distribution files (skills, hooks, plugin manifest) are missing. Add
10 regression tests covering skill file presence, YAML frontmatter,
hooks.json integrity, and package.json files field.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix MigrationRunner schema initialization (#979) — version conflict between parallel migration systems

Root cause: old DatabaseManager migrations 1-7 shared schema_versions table with
MigrationRunner's 4-22, causing version number collisions (5=drop tables vs add column,
6=FTS5 vs prompt tracking, 7=discovery_tokens vs remove UNIQUE).  initializeSchema()
was gated behind maxApplied===0, so core tables were never created when old versions
were present.

Fixes:
- initializeSchema() always creates core tables via CREATE TABLE IF NOT EXISTS
- Migrations 5-7 check actual DB state (columns/constraints) not just version tracking
- Crash-safe temp table rebuilds (DROP IF EXISTS _new before CREATE)
- Added missing migration 21 (ON UPDATE CASCADE) to MigrationRunner
- Added ON UPDATE CASCADE to FK definitions in initializeSchema()
- All changes applied to both runner.ts and SessionStore.ts

Tests: 13 new tests in migration-runner.test.ts covering fresh DB, idempotency,
version conflicts, crash recovery, FK constraints, and data integrity.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix 21 test failures — stale mocks, outdated assertions, missing OpenClaw guards

Server tests (12): Added missing workerPath and getAiStatus to ServerOptions
mocks after interface expansion. ChromaSync tests (3): Updated to verify
transport cleanup in ChromaMcpManager after architecture refactor. OpenClaw (2):
Added memory_ tool skipping and response truncation to prevent recursive loops
and oversized payloads. MarkdownFormatter (2): Updated assertions to match
current output. SettingsDefaultsManager (1): Used correct default key for
getBool test. Logger standards (1): Excluded CLI transcript command from
background service check.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix Codex CLI compatibility (#744) — session_id fallbacks, unknown platform tolerance, undefined guard

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix Cursor IDE integration (#838, #1049) — adapter field fallbacks, tolerant session-init validation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix /api/logs OOM (#1203) — tail-read replaces full-file readFileSync

Replace readFileSync (loads entire file into memory) with readLastLines()
that reads only from the end of the file in expanding chunks (64KB → 10MB cap).
Prevents OOM on large log files while preserving the same API response shape.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix Settings CORS error (#1029) — explicit methods and allowedHeaders in CORS config

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: add session custom_title for agent attribution (#1213) — migration 23, endpoint + store support

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: prevent CLAUDE.md/AGENTS.md writes inside .git/ directories (#1165)

Add .git path guard to all 4 write sites to prevent ref corruption when
paths resolve inside .git internals.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix plugin disabled state not respected (#781) — early exit check in all hook entry points

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix UserPromptSubmit context re-injection on every turn (#1079) — contextInjected session flag

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* MAESTRO: fix stale AbortController queue stall (#1099) — lastGeneratorActivity tracking + 30s timeout

Three-layer fix:
1. Added lastGeneratorActivity timestamp to ActiveSession, updated by
   processAgentResponse (all agents), getMessageIterator (queue yields),
   and startGeneratorWithProvider (generator launch)
2. Added stale generator detection in ensureGeneratorRunning — if no
   activity for >30s, aborts stale controller, resets state, restarts
3. Added AbortSignal.timeout(30000) in deleteSession to prevent
   indefinite hang when awaiting a stuck generator promise

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 19:34:35 -05:00

490 lines
17 KiB
TypeScript

import { Database } from './sqlite-compat.js';
import type { PendingMessage } from '../worker-types.js';
import { logger } from '../../utils/logger.js';
/** Messages processing longer than this are considered stale and reset to pending by self-healing */
const STALE_PROCESSING_THRESHOLD_MS = 60_000;
/**
* Persistent pending message record from database
*/
export interface PersistentPendingMessage {
id: number;
session_db_id: number;
content_session_id: string;
message_type: 'observation' | 'summarize';
tool_name: string | null;
tool_input: string | null;
tool_response: string | null;
cwd: string | null;
last_assistant_message: string | null;
prompt_number: number | null;
status: 'pending' | 'processing' | 'processed' | 'failed';
retry_count: number;
created_at_epoch: number;
started_processing_at_epoch: number | null;
completed_at_epoch: number | null;
}
/**
* PendingMessageStore - Persistent work queue for SDK messages
*
* Messages are persisted before processing using a claim-confirm pattern.
* This simplifies the lifecycle and eliminates duplicate processing bugs.
*
* Lifecycle:
* 1. enqueue() - Message persisted with status 'pending'
* 2. claimNextMessage() - Atomically claims next pending message (marks as 'processing')
* 3. confirmProcessed() - Deletes message after successful processing
*
* Self-healing:
* - claimNextMessage() resets stale 'processing' messages (>60s) back to 'pending' before claiming
* - This eliminates stuck messages from generator crashes without external timers
*
* Recovery:
* - getSessionsWithPendingMessages() - Find sessions that need recovery on startup
*/
export class PendingMessageStore {
private db: Database;
private maxRetries: number;
constructor(db: Database, maxRetries: number = 3) {
this.db = db;
this.maxRetries = maxRetries;
}
/**
* Enqueue a new message (persist before processing)
* @returns The database ID of the persisted message
*/
enqueue(sessionDbId: number, contentSessionId: string, message: PendingMessage): number {
const now = Date.now();
const stmt = this.db.prepare(`
INSERT INTO pending_messages (
session_db_id, content_session_id, message_type,
tool_name, tool_input, tool_response, cwd,
last_assistant_message,
prompt_number, status, retry_count, created_at_epoch
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?)
`);
const result = stmt.run(
sessionDbId,
contentSessionId,
message.type,
message.tool_name || null,
message.tool_input ? JSON.stringify(message.tool_input) : null,
message.tool_response ? JSON.stringify(message.tool_response) : null,
message.cwd || null,
message.last_assistant_message || null,
message.prompt_number || null,
now
);
return result.lastInsertRowid as number;
}
/**
* Atomically claim the next pending message by marking it as 'processing'.
* Self-healing: resets any stale 'processing' messages (>60s) back to 'pending' first.
* Message stays in DB until confirmProcessed() is called.
* Uses a transaction to prevent race conditions.
*/
claimNextMessage(sessionDbId: number): PersistentPendingMessage | null {
const claimTx = this.db.transaction((sessionId: number) => {
// Capture time inside transaction so it's fresh if WAL contention causes retry
const now = Date.now();
// Self-healing: reset stale 'processing' messages back to 'pending'
// This recovers from generator crashes without external timers
// Note: strict < means messages must be OLDER than threshold to be reset
const staleCutoff = now - STALE_PROCESSING_THRESHOLD_MS;
const resetStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE session_db_id = ? AND status = 'processing'
AND started_processing_at_epoch < ?
`);
const resetResult = resetStmt.run(sessionId, staleCutoff);
if (resetResult.changes > 0) {
logger.info('QUEUE', `SELF_HEAL | sessionDbId=${sessionId} | recovered ${resetResult.changes} stale processing message(s)`);
}
const peekStmt = this.db.prepare(`
SELECT * FROM pending_messages
WHERE session_db_id = ? AND status = 'pending'
ORDER BY id ASC
LIMIT 1
`);
const msg = peekStmt.get(sessionId) as PersistentPendingMessage | null;
if (msg) {
// CRITICAL FIX: Mark as 'processing' instead of deleting
// Message will be deleted by confirmProcessed() after successful store
const updateStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'processing', started_processing_at_epoch = ?
WHERE id = ?
`);
updateStmt.run(now, msg.id);
// Log claim with minimal info (avoid logging full payload)
logger.info('QUEUE', `CLAIMED | sessionDbId=${sessionId} | messageId=${msg.id} | type=${msg.message_type}`, {
sessionId: sessionId
});
}
return msg;
});
return claimTx(sessionDbId) as PersistentPendingMessage | null;
}
/**
* Confirm a message was successfully processed - DELETE it from the queue.
* CRITICAL: Only call this AFTER the observation/summary has been stored to DB.
* This prevents message loss on generator crash.
*/
confirmProcessed(messageId: number): void {
const stmt = this.db.prepare('DELETE FROM pending_messages WHERE id = ?');
const result = stmt.run(messageId);
if (result.changes > 0) {
logger.debug('QUEUE', `CONFIRMED | messageId=${messageId} | deleted from queue`);
}
}
/**
* Reset stale 'processing' messages back to 'pending' for retry.
* Called on worker startup and periodically to recover from crashes.
* @param thresholdMs Messages processing longer than this are considered stale (default: 5 minutes)
* @returns Number of messages reset
*/
resetStaleProcessingMessages(thresholdMs: number = 5 * 60 * 1000, sessionDbId?: number): number {
const cutoff = Date.now() - thresholdMs;
let stmt;
let result;
if (sessionDbId !== undefined) {
stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ? AND session_db_id = ?
`);
result = stmt.run(cutoff, sessionDbId);
} else {
stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
result = stmt.run(cutoff);
}
if (result.changes > 0) {
logger.info('QUEUE', `RESET_STALE | count=${result.changes} | thresholdMs=${thresholdMs}${sessionDbId !== undefined ? ` | sessionDbId=${sessionDbId}` : ''}`);
}
return result.changes;
}
/**
* Get all pending messages for session (ordered by creation time)
*/
getAllPending(sessionDbId: number): PersistentPendingMessage[] {
const stmt = this.db.prepare(`
SELECT * FROM pending_messages
WHERE session_db_id = ? AND status = 'pending'
ORDER BY id ASC
`);
return stmt.all(sessionDbId) as PersistentPendingMessage[];
}
/**
* Get all queue messages (for UI display)
* Returns pending, processing, and failed messages (not processed - they're deleted)
* Joins with sdk_sessions to get project name
*/
getQueueMessages(): (PersistentPendingMessage & { project: string | null })[] {
const stmt = this.db.prepare(`
SELECT pm.*, ss.project
FROM pending_messages pm
LEFT JOIN sdk_sessions ss ON pm.content_session_id = ss.content_session_id
WHERE pm.status IN ('pending', 'processing', 'failed')
ORDER BY
CASE pm.status
WHEN 'failed' THEN 0
WHEN 'processing' THEN 1
WHEN 'pending' THEN 2
END,
pm.created_at_epoch ASC
`);
return stmt.all() as (PersistentPendingMessage & { project: string | null })[];
}
/**
* Get count of stuck messages (processing longer than threshold)
*/
getStuckCount(thresholdMs: number): number {
const cutoff = Date.now() - thresholdMs;
const stmt = this.db.prepare(`
SELECT COUNT(*) as count FROM pending_messages
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
const result = stmt.get(cutoff) as { count: number };
return result.count;
}
/**
* Retry a specific message (reset to pending)
* Works for pending (re-queue), processing (reset stuck), and failed messages
*/
retryMessage(messageId: number): boolean {
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE id = ? AND status IN ('pending', 'processing', 'failed')
`);
const result = stmt.run(messageId);
return result.changes > 0;
}
/**
* Reset all processing messages for a session to pending
* Used when force-restarting a stuck session
*/
resetProcessingToPending(sessionDbId: number): number {
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE session_db_id = ? AND status = 'processing'
`);
const result = stmt.run(sessionDbId);
return result.changes;
}
/**
* Mark all processing messages for a session as failed
* Used in error recovery when session generator crashes
* @returns Number of messages marked failed
*/
markSessionMessagesFailed(sessionDbId: number): number {
const now = Date.now();
// Atomic update - all processing messages for session → failed
// Note: This bypasses retry logic since generator failures are session-level,
// not message-level. Individual message failures use markFailed() instead.
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?
WHERE session_db_id = ? AND status = 'processing'
`);
const result = stmt.run(now, sessionDbId);
return result.changes;
}
/**
* Mark all pending and processing messages for a session as failed (abandoned).
* Used when SDK session is terminated and no fallback agent is available:
* prevents the session from appearing in getSessionsWithPendingMessages forever.
* @returns Number of messages marked failed
*/
markAllSessionMessagesAbandoned(sessionDbId: number): number {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?
WHERE session_db_id = ? AND status IN ('pending', 'processing')
`);
const result = stmt.run(now, sessionDbId);
return result.changes;
}
/**
* Abort a specific message (delete from queue)
*/
abortMessage(messageId: number): boolean {
const stmt = this.db.prepare('DELETE FROM pending_messages WHERE id = ?');
const result = stmt.run(messageId);
return result.changes > 0;
}
/**
* Retry all stuck messages at once
*/
retryAllStuck(thresholdMs: number): number {
const cutoff = Date.now() - thresholdMs;
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
const result = stmt.run(cutoff);
return result.changes;
}
/**
* Get recently processed messages (for UI feedback)
* Shows messages completed in the last N minutes so users can see their stuck items were processed
*/
getRecentlyProcessed(limit: number = 10, withinMinutes: number = 30): (PersistentPendingMessage & { project: string | null })[] {
const cutoff = Date.now() - (withinMinutes * 60 * 1000);
const stmt = this.db.prepare(`
SELECT pm.*, ss.project
FROM pending_messages pm
LEFT JOIN sdk_sessions ss ON pm.content_session_id = ss.content_session_id
WHERE pm.status = 'processed' AND pm.completed_at_epoch > ?
ORDER BY pm.completed_at_epoch DESC
LIMIT ?
`);
return stmt.all(cutoff, limit) as (PersistentPendingMessage & { project: string | null })[];
}
/**
* Mark message as failed (status: pending -> failed or back to pending for retry)
* If retry_count < maxRetries, moves back to 'pending' for retry
* Otherwise marks as 'failed' permanently
*/
markFailed(messageId: number): void {
const now = Date.now();
// Get current retry count
const msg = this.db.prepare('SELECT retry_count FROM pending_messages WHERE id = ?').get(messageId) as { retry_count: number } | undefined;
if (!msg) return;
if (msg.retry_count < this.maxRetries) {
// Move back to pending for retry
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', retry_count = retry_count + 1, started_processing_at_epoch = NULL
WHERE id = ?
`);
stmt.run(messageId);
} else {
// Max retries exceeded, mark as permanently failed
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', completed_at_epoch = ?
WHERE id = ?
`);
stmt.run(now, messageId);
}
}
/**
* Reset stuck messages (processing -> pending if stuck longer than threshold)
* @param thresholdMs Messages processing longer than this are considered stuck (0 = reset all)
* @returns Number of messages reset
*/
resetStuckMessages(thresholdMs: number): number {
const cutoff = thresholdMs === 0 ? Date.now() : Date.now() - thresholdMs;
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
const result = stmt.run(cutoff);
return result.changes;
}
/**
* Get count of pending messages for a session
*/
getPendingCount(sessionDbId: number): number {
const stmt = this.db.prepare(`
SELECT COUNT(*) as count FROM pending_messages
WHERE session_db_id = ? AND status IN ('pending', 'processing')
`);
const result = stmt.get(sessionDbId) as { count: number };
return result.count;
}
/**
* Check if any session has pending work.
* Excludes 'processing' messages stuck for >5 minutes (resets them to 'pending' as a side effect).
*/
hasAnyPendingWork(): boolean {
// Reset stuck 'processing' messages older than 5 minutes before checking
const stuckCutoff = Date.now() - (5 * 60 * 1000);
const resetStmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'pending', started_processing_at_epoch = NULL
WHERE status = 'processing' AND started_processing_at_epoch < ?
`);
const resetResult = resetStmt.run(stuckCutoff);
if (resetResult.changes > 0) {
logger.info('QUEUE', `STUCK_RESET | hasAnyPendingWork reset ${resetResult.changes} stuck processing message(s) older than 5 minutes`);
}
const stmt = this.db.prepare(`
SELECT COUNT(*) as count FROM pending_messages
WHERE status IN ('pending', 'processing')
`);
const result = stmt.get() as { count: number };
return result.count > 0;
}
/**
* Get all session IDs that have pending messages (for recovery on startup)
*/
getSessionsWithPendingMessages(): number[] {
const stmt = this.db.prepare(`
SELECT DISTINCT session_db_id FROM pending_messages
WHERE status IN ('pending', 'processing')
`);
const results = stmt.all() as { session_db_id: number }[];
return results.map(r => r.session_db_id);
}
/**
* Get session info for a pending message (for recovery)
*/
getSessionInfoForMessage(messageId: number): { sessionDbId: number; contentSessionId: string } | null {
const stmt = this.db.prepare(`
SELECT session_db_id, content_session_id FROM pending_messages WHERE id = ?
`);
const result = stmt.get(messageId) as { session_db_id: number; content_session_id: string } | undefined;
return result ? { sessionDbId: result.session_db_id, contentSessionId: result.content_session_id } : null;
}
/**
* Clear all failed messages from the queue
* @returns Number of messages deleted
*/
clearFailed(): number {
const stmt = this.db.prepare(`
DELETE FROM pending_messages
WHERE status = 'failed'
`);
const result = stmt.run();
return result.changes;
}
/**
* Clear all pending, processing, and failed messages from the queue
* Keeps only processed messages (for history)
* @returns Number of messages deleted
*/
clearAll(): number {
const stmt = this.db.prepare(`
DELETE FROM pending_messages
WHERE status IN ('pending', 'processing', 'failed')
`);
const result = stmt.run();
return result.changes;
}
/**
* Convert a PersistentPendingMessage back to PendingMessage format
*/
toPendingMessage(persistent: PersistentPendingMessage): PendingMessage {
return {
type: persistent.message_type,
tool_name: persistent.tool_name || undefined,
tool_input: persistent.tool_input ? JSON.parse(persistent.tool_input) : undefined,
tool_response: persistent.tool_response ? JSON.parse(persistent.tool_response) : undefined,
prompt_number: persistent.prompt_number || undefined,
cwd: persistent.cwd || undefined,
last_assistant_message: persistent.last_assistant_message || undefined
};
}
}