feat: Fix observation timestamps, refactor session management, and enhance worker reliability (#437)

* Refactor worker version checks and increase timeout settings

- Updated the default hook timeout from 5000ms to 120000ms for improved stability.
- Modified the worker version check to log a warning instead of restarting the worker on version mismatch.
- Removed legacy PM2 cleanup and worker start logic, simplifying the ensureWorkerRunning function.
- Enhanced polling mechanism for worker readiness with increased retries and reduced interval.

* feat: implement worker queue polling to ensure processing completion before proceeding

* refactor: change worker command from start to restart in hooks configuration

* refactor: remove session management complexity

- Simplify createSDKSession to pure INSERT OR IGNORE
- Remove auto-create logic from storeObservation/storeSummary
- Delete 11 unused session management methods
- Derive prompt_number from user_prompts count
- Keep sdk_sessions table schema unchanged for compatibility

* refactor: simplify session management by removing unused methods and auto-creation logic

* Refactor session prompt number retrieval in SessionRoutes

- Updated the method of obtaining the prompt number from the session.
- Replaced `store.getPromptCounter(sessionDbId)` with `store.getPromptNumberFromUserPrompts(claudeSessionId)` for better clarity and accuracy.
- Adjusted the logic for incrementing the prompt number to derive it from the user prompts count instead of directly incrementing a counter.

* refactor: replace getPromptCounter with getPromptNumberFromUserPrompts in SessionManager

Phase 7 of session management simplification. Updates SessionManager to derive
prompt numbers from user_prompts table count instead of using the deprecated
prompt_counter column.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: simplify SessionCompletionHandler to use direct SQL query

Phase 8: Remove call to findActiveSDKSession() and replace with direct
database query in SessionCompletionHandler.completeByClaudeId().

This removes dependency on the deleted findActiveSDKSession() method
and simplifies the code by using a straightforward SELECT query.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: remove markSessionCompleted call from SDKAgent

- Delete call to markSessionCompleted() in SDKAgent.ts
- Session status is no longer tracked or updated
- Part of phase 9: simplifying session management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: remove markSessionComplete method (Phase 10)

- Deleted markSessionComplete() method from DatabaseManager
- Removed markSessionComplete call from SessionCompletionHandler
- Session completion status no longer tracked in database
- Part of session management simplification effort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: replace deleted updateSDKSessionId calls in import script (Phase 11)

- Replace updateSDKSessionId() calls with direct SQL UPDATE statements
- Method was deleted in Phase 3 as part of session management simplification
- Import script now uses direct database access consistently

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* test: add validation for SQL updates in sdk_sessions table

* refactor: enhance worker-cli to support manual and automated runs

* Remove cleanup hook and associated session completion logic

- Deleted the cleanup-hook implementation from the hooks directory.
- Removed the session completion endpoint that was used by the cleanup hook.
- Updated the SessionCompletionHandler to eliminate the completeByClaudeId method and its dependencies.
- Adjusted the SessionRoutes to reflect the removal of the session completion route.

* fix: update worker-cli command to use bun for consistency

* feat: Implement timestamp fix for observations and enhance processing logic

- Added `earliestPendingTimestamp` to `ActiveSession` to track the original timestamp of the earliest pending message.
- Updated `SDKAgent` to capture and utilize the earliest pending timestamp during response processing.
- Modified `SessionManager` to track the earliest timestamp when yielding messages.
- Created scripts for fixing corrupted timestamps, validating fixes, and investigating timestamp issues.
- Verified that all corrupted observations have been repaired and logic for future processing is sound.
- Ensured orphan processing can be safely re-enabled after validation.

* feat: Enhance SessionStore to support custom database paths and add timestamp fields for observations and summaries

* Refactor pending queue processing and add management endpoints

- Disabled automatic recovery of orphaned queues on startup; users must now use the new /api/pending-queue/process endpoint.
- Updated processOrphanedQueues method to processPendingQueues with improved session handling and return detailed results.
- Added new API endpoints for managing pending queues: GET /api/pending-queue and POST /api/pending-queue/process.
- Introduced a new script (check-pending-queue.ts) for checking and processing pending observation queues interactively or automatically.
- Enhanced logging and error handling for better monitoring of session processing.

* updated agent sdk

* feat: Add manual recovery guide and queue management endpoints to documentation

---------

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2025-12-25 15:36:46 -05:00
committed by GitHub
parent 4cf2c1bdb1
commit 266c746d50
47 changed files with 3417 additions and 1026 deletions
+5 -4
View File
@@ -227,19 +227,20 @@ function main() {
claudeSessionToSdkSession.set(sessionMeta.sessionId, existing.sdk_session_id);
} else if (existing && !existing.sdk_session_id) {
// Session exists but sdk_session_id is NULL, update it
const dbId = (db['db'].prepare('SELECT id FROM sdk_sessions WHERE claude_session_id = ?').get(sessionMeta.sessionId) as { id: number }).id;
db.updateSDKSessionId(dbId, syntheticSdkSessionId);
db['db'].prepare('UPDATE sdk_sessions SET sdk_session_id = ? WHERE claude_session_id = ?')
.run(syntheticSdkSessionId, sessionMeta.sessionId);
claudeSessionToSdkSession.set(sessionMeta.sessionId, syntheticSdkSessionId);
} else {
// Create new SDK session
const dbId = db.createSDKSession(
db.createSDKSession(
sessionMeta.sessionId,
sessionMeta.project,
'Imported from transcript XML'
);
// Update with synthetic SDK session ID
db.updateSDKSessionId(dbId, syntheticSdkSessionId);
db['db'].prepare('UPDATE sdk_sessions SET sdk_session_id = ? WHERE claude_session_id = ?')
.run(syntheticSdkSessionId, sessionMeta.sessionId);
claudeSessionToSdkSession.set(sessionMeta.sessionId, syntheticSdkSessionId);
}
+31 -13
View File
@@ -1,52 +1,70 @@
import { ProcessManager } from '../services/process/ProcessManager.js';
import { getWorkerPort } from '../shared/worker-utils.js';
import { stdin } from 'process';
const command = process.argv[2];
const port = getWorkerPort();
const HOOK_STANDARD_RESPONSE = '{"continue": true, "suppressOutput": true}';
const isManualRun = stdin.isTTY;
async function main() {
switch (command) {
case 'start': {
const result = await ProcessManager.start(port);
if (result.success) {
console.log(`Worker started (PID: ${result.pid})`);
const date = new Date().toISOString().slice(0, 10);
console.log(`Logs: ~/.claude-mem/logs/worker-${date}.log`);
if (isManualRun) {
console.log(`Worker started (PID: ${result.pid})`);
const date = new Date().toISOString().slice(0, 10);
console.log(`Logs: ~/.claude-mem/logs/worker-${date}.log`);
} else {
console.log(HOOK_STANDARD_RESPONSE);
}
process.exit(0);
} else {
console.error(`Failed to start: ${result.error}`);
process.exit(1);
}
break;
}
case 'stop': {
await ProcessManager.stop();
console.log('Worker stopped');
if (isManualRun) {
console.log('Worker stopped');
} else {
console.log(HOOK_STANDARD_RESPONSE);
}
process.exit(0);
}
case 'restart': {
const result = await ProcessManager.restart(port);
if (result.success) {
console.log(`Worker restarted (PID: ${result.pid})`);
if (isManualRun) {
console.log(`Worker restarted (PID: ${result.pid})`);
} else {
console.log(HOOK_STANDARD_RESPONSE);
}
process.exit(0);
} else {
console.error(`Failed to restart: ${result.error}`);
process.exit(1);
}
break;
}
case 'status': {
const status = await ProcessManager.status();
if (status.running) {
console.log('Worker is running');
console.log(` PID: ${status.pid}`);
console.log(` Port: ${status.port}`);
console.log(` Uptime: ${status.uptime}`);
if (isManualRun) {
if (status.running) {
console.log('Worker is running');
console.log(` PID: ${status.pid}`);
console.log(` Port: ${status.port}`);
console.log(` Uptime: ${status.uptime}`);
} else {
console.log('Worker is not running');
}
} else {
console.log('Worker is not running');
console.log(HOOK_STANDARD_RESPONSE);
}
process.exit(0);
}
-68
View File
@@ -1,68 +0,0 @@
/**
* Cleanup Hook - SessionEnd
*
* Pure HTTP client - sends data to worker, worker handles all database operations.
* This allows the hook to run under any runtime (Node.js or Bun) since it has no
* native module dependencies.
*/
import { stdin } from 'process';
import { ensureWorkerRunning, getWorkerPort } from '../shared/worker-utils.js';
import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
export interface SessionEndInput {
session_id: string;
reason: 'exit' | 'clear' | 'logout' | 'prompt_input_exit' | 'other';
}
/**
* Cleanup Hook Main Logic - Fire-and-forget HTTP client
*/
async function cleanupHook(input?: SessionEndInput): Promise<void> {
// Ensure worker is running before any other logic
await ensureWorkerRunning();
if (!input) {
throw new Error('cleanup-hook requires input from Claude Code');
}
const { session_id, reason } = input;
const port = getWorkerPort();
// Send to worker - worker handles finding session, marking complete, and stopping spinner
const response = await fetch(`http://127.0.0.1:${port}/api/sessions/complete`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
claudeSessionId: session_id,
reason
}),
signal: AbortSignal.timeout(HOOK_TIMEOUTS.DEFAULT)
});
if (!response.ok) {
throw new Error(`Session cleanup failed: ${response.status}`);
}
console.log('{"continue": true, "suppressOutput": true}');
process.exit(0);
}
// Entry Point
if (stdin.isTTY) {
// Running manually
cleanupHook(undefined);
} else {
let input = '';
stdin.on('data', (chunk) => input += chunk);
stdin.on('end', async () => {
let parsed: SessionEndInput | undefined;
try {
parsed = input ? JSON.parse(input) : undefined;
} catch (error) {
throw new Error(`Failed to parse hook input: ${error instanceof Error ? error.message : String(error)}`);
}
await cleanupHook(parsed);
});
}
+1
View File
@@ -65,6 +65,7 @@ async function summaryHook(input?: StopInput): Promise<void> {
});
if (!response.ok) {
console.log(STANDARD_HOOK_RESPONSE);
throw new Error(`Summary generation failed: ${response.status}`);
}
+42 -243
View File
@@ -20,9 +20,11 @@ import {
export class SessionStore {
public db: Database;
constructor() {
ensureDir(DATA_DIR);
this.db = new Database(DB_PATH);
constructor(dbPath: string = DB_PATH) {
if (dbPath !== ':memory:') {
ensureDir(DATA_DIR);
}
this.db = new Database(dbPath);
// Ensure optimized settings
this.db.run('PRAGMA journal_mode = WAL');
@@ -928,11 +930,13 @@ export class SessionStore {
notes: string | null;
prompt_number: number | null;
created_at: string;
created_at_epoch: number;
} | null {
const stmt = this.db.prepare(`
SELECT
request, investigated, learned, completed, next_steps,
files_read, files_edited, notes, prompt_number, created_at
files_read, files_edited, notes, prompt_number, created_at,
created_at_epoch
FROM session_summaries
WHERE sdk_session_id = ?
ORDER BY created_at_epoch DESC
@@ -1037,80 +1041,20 @@ export class SessionStore {
return stmt.all(...sdkSessionIds) as any[];
}
/**
* Find active SDK session for a Claude session
*/
findActiveSDKSession(claudeSessionId: string): {
id: number;
sdk_session_id: string | null;
project: string;
worker_port: number | null;
} | null {
const stmt = this.db.prepare(`
SELECT id, sdk_session_id, project, worker_port
FROM sdk_sessions
WHERE claude_session_id = ? AND status = 'active'
LIMIT 1
`);
return stmt.get(claudeSessionId) || null;
}
/**
* Find any SDK session for a Claude session (active, failed, or completed)
* Get current prompt number by counting user_prompts for this session
* Replaces the prompt_counter column which is no longer maintained
*/
findAnySDKSession(claudeSessionId: string): { id: number } | null {
const stmt = this.db.prepare(`
SELECT id
FROM sdk_sessions
WHERE claude_session_id = ?
LIMIT 1
`);
return stmt.get(claudeSessionId) || null;
}
/**
* Reactivate an existing session
*/
reactivateSession(id: number, userPrompt: string): void {
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET status = 'active', user_prompt = ?, worker_port = NULL
WHERE id = ?
`);
stmt.run(userPrompt, id);
}
/**
* Increment prompt counter and return new value
*/
incrementPromptCounter(id: number): number {
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET prompt_counter = COALESCE(prompt_counter, 0) + 1
WHERE id = ?
`);
stmt.run(id);
getPromptNumberFromUserPrompts(claudeSessionId: string): number {
const result = this.db.prepare(`
SELECT prompt_counter FROM sdk_sessions WHERE id = ?
`).get(id) as { prompt_counter: number } | undefined;
return result?.prompt_counter || 1;
}
/**
* Get current prompt counter for a session
*/
getPromptCounter(id: number): number {
const result = this.db.prepare(`
SELECT prompt_counter FROM sdk_sessions WHERE id = ?
`).get(id) as { prompt_counter: number | null } | undefined;
return result?.prompt_counter || 0;
SELECT COUNT(*) as count FROM user_prompts WHERE claude_session_id = ?
`).get(claudeSessionId) as { count: number };
return result.count;
}
/**
@@ -1143,94 +1087,21 @@ export class SessionStore {
const now = new Date();
const nowEpoch = now.getTime();
// CRITICAL: INSERT OR IGNORE makes this idempotent
// First call (prompt #1): Creates new row
// Subsequent calls (prompt #2+): Ignored, returns existing ID
const stmt = this.db.prepare(`
// Pure INSERT OR IGNORE - no updates, no complexity
this.db.prepare(`
INSERT OR IGNORE INTO sdk_sessions
(claude_session_id, sdk_session_id, project, user_prompt, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, ?, 'active')
`);
`).run(claudeSessionId, claudeSessionId, project, userPrompt, now.toISOString(), nowEpoch);
const result = stmt.run(claudeSessionId, claudeSessionId, project, userPrompt, now.toISOString(), nowEpoch);
// If lastInsertRowid is 0, insert was ignored (session exists), so fetch existing ID
if (result.lastInsertRowid === 0 || result.changes === 0) {
// Session exists - UPDATE project and user_prompt if we have non-empty values
// This fixes the bug where SAVE hook creates session with empty project,
// then NEW hook can't update it because INSERT OR IGNORE skips the insert
if (project && project.trim() !== '') {
this.db.prepare(`
UPDATE sdk_sessions
SET project = ?, user_prompt = ?
WHERE claude_session_id = ?
`).run(project, userPrompt, claudeSessionId);
}
const selectStmt = this.db.prepare(`
SELECT id FROM sdk_sessions WHERE claude_session_id = ? LIMIT 1
`);
const existing = selectStmt.get(claudeSessionId) as { id: number } | undefined;
return existing!.id;
}
return result.lastInsertRowid as number;
// Return existing or new ID
const row = this.db.prepare('SELECT id FROM sdk_sessions WHERE claude_session_id = ?')
.get(claudeSessionId) as { id: number };
return row.id;
}
/**
* Update SDK session ID (captured from init message)
* Only updates if current sdk_session_id is NULL to avoid breaking foreign keys
* Returns true if update succeeded, false if skipped
*/
updateSDKSessionId(id: number, sdkSessionId: string): boolean {
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET sdk_session_id = ?
WHERE id = ? AND sdk_session_id IS NULL
`);
const result = stmt.run(sdkSessionId, id);
if (result.changes === 0) {
// This is expected behavior - sdk_session_id is already set
// Only log at debug level to avoid noise
logger.debug('DB', 'sdk_session_id already set, skipping update', {
sessionId: id,
sdkSessionId
});
return false;
}
return true;
}
/**
* Set worker port for a session
*/
setWorkerPort(id: number, port: number): void {
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET worker_port = ?
WHERE id = ?
`);
stmt.run(port, id);
}
/**
* Get worker port for a session
*/
getWorkerPort(id: number): number | null {
const stmt = this.db.prepare(`
SELECT worker_port
FROM sdk_sessions
WHERE id = ?
LIMIT 1
`);
const result = stmt.get(id) as { worker_port: number | null } | undefined;
return result?.worker_port || null;
}
/**
* Save a user prompt
@@ -1267,7 +1138,7 @@ export class SessionStore {
/**
* Store an observation (from SDK parsing)
* Auto-creates session record if it doesn't exist in the index
* Assumes session already exists (created by hook)
*/
storeObservation(
sdkSessionId: string,
@@ -1283,33 +1154,12 @@ export class SessionStore {
files_modified: string[];
},
promptNumber?: number,
discoveryTokens: number = 0
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): { id: number; createdAtEpoch: number } {
const now = new Date();
const nowEpoch = now.getTime();
// Ensure session record exists in the index (auto-create if missing)
const checkStmt = this.db.prepare(`
SELECT id FROM sdk_sessions WHERE sdk_session_id = ?
`);
const existingSession = checkStmt.get(sdkSessionId) as { id: number } | undefined;
if (!existingSession) {
// Auto-create session record if it doesn't exist
const insertSession = this.db.prepare(`
INSERT INTO sdk_sessions
(claude_session_id, sdk_session_id, project, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, 'active')
`);
insertSession.run(
sdkSessionId, // claude_session_id and sdk_session_id are the same
sdkSessionId,
project,
now.toISOString(),
nowEpoch
);
console.log(`[SessionStore] Auto-created session record for session_id: ${sdkSessionId}`);
}
// Use override timestamp if provided (for processing backlog messages with original timestamps)
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
const stmt = this.db.prepare(`
INSERT INTO observations
@@ -1331,19 +1181,19 @@ export class SessionStore {
JSON.stringify(observation.files_modified),
promptNumber || null,
discoveryTokens,
now.toISOString(),
nowEpoch
timestampIso,
timestampEpoch
);
return {
id: Number(result.lastInsertRowid),
createdAtEpoch: nowEpoch
createdAtEpoch: timestampEpoch
};
}
/**
* Store a session summary (from SDK parsing)
* Auto-creates session record if it doesn't exist in the index
* Assumes session already exists - will fail with FK error if not
*/
storeSummary(
sdkSessionId: string,
@@ -1357,33 +1207,12 @@ export class SessionStore {
notes: string | null;
},
promptNumber?: number,
discoveryTokens: number = 0
discoveryTokens: number = 0,
overrideTimestampEpoch?: number
): { id: number; createdAtEpoch: number } {
const now = new Date();
const nowEpoch = now.getTime();
// Ensure session record exists in the index (auto-create if missing)
const checkStmt = this.db.prepare(`
SELECT id FROM sdk_sessions WHERE sdk_session_id = ?
`);
const existingSession = checkStmt.get(sdkSessionId) as { id: number } | undefined;
if (!existingSession) {
// Auto-create session record if it doesn't exist
const insertSession = this.db.prepare(`
INSERT INTO sdk_sessions
(claude_session_id, sdk_session_id, project, started_at, started_at_epoch, status)
VALUES (?, ?, ?, ?, ?, 'active')
`);
insertSession.run(
sdkSessionId, // claude_session_id and sdk_session_id are the same
sdkSessionId,
project,
now.toISOString(),
nowEpoch
);
console.log(`[SessionStore] Auto-created session record for session_id: ${sdkSessionId}`);
}
// Use override timestamp if provided (for processing backlog messages with original timestamps)
const timestampEpoch = overrideTimestampEpoch ?? Date.now();
const timestampIso = new Date(timestampEpoch).toISOString();
const stmt = this.db.prepare(`
INSERT INTO session_summaries
@@ -1403,47 +1232,17 @@ export class SessionStore {
summary.notes,
promptNumber || null,
discoveryTokens,
now.toISOString(),
nowEpoch
timestampIso,
timestampEpoch
);
return {
id: Number(result.lastInsertRowid),
createdAtEpoch: nowEpoch
createdAtEpoch: timestampEpoch
};
}
/**
* Mark SDK session as completed
*/
markSessionCompleted(id: number): void {
const now = new Date();
const nowEpoch = now.getTime();
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET status = 'completed', completed_at = ?, completed_at_epoch = ?
WHERE id = ?
`);
stmt.run(now.toISOString(), nowEpoch, id);
}
/**
* Mark SDK session as failed
*/
markSessionFailed(id: number): void {
const now = new Date();
const nowEpoch = now.getTime();
const stmt = this.db.prepare(`
UPDATE sdk_sessions
SET status = 'failed', completed_at = ?, completed_at_epoch = ?
WHERE id = ?
`);
stmt.run(now.toISOString(), nowEpoch, id);
}
// REMOVED: cleanupOrphanedSessions - violates "EVERYTHING SHOULD SAVE ALWAYS"
// There's no such thing as an "orphaned" session. Sessions are created by hooks
+84
View File
@@ -427,6 +427,16 @@ export class WorkerService {
// Initialize database (once, stays open)
await this.dbManager.initialize();
// Recover stuck messages from previous crashes
// Messages stuck in 'processing' state are reset to 'pending' for reprocessing
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const STUCK_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
if (resetCount > 0) {
logger.info('SYSTEM', `Recovered ${resetCount} stuck messages from previous session`, { thresholdMinutes: 5 });
}
// Initialize search services (requires initialized database)
const formattingService = new FormattingService();
const timelineService = new TimelineService();
@@ -464,6 +474,8 @@ export class WorkerService {
this.initializationCompleteFlag = true;
this.resolveInitialization();
logger.info('SYSTEM', 'Background initialization complete');
// Note: Auto-recovery of orphaned queues disabled - use /api/pending-queue/process endpoint instead
} catch (error) {
logger.error('SYSTEM', 'Background initialization failed', {}, error as Error);
// Don't resolve - let the promise remain pending so readiness check continues to fail
@@ -471,6 +483,78 @@ export class WorkerService {
}
}
/**
* Process pending session queues
* Starts SDK agents for sessions that have pending messages but no active processor
* @param sessionLimit Maximum number of sessions to start processing (default: 10)
* @returns Info about what was started
*/
async processPendingQueues(sessionLimit: number = 10): Promise<{
totalPendingSessions: number;
sessionsStarted: number;
sessionsSkipped: number;
startedSessionIds: number[];
}> {
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();
const result = {
totalPendingSessions: orphanedSessionIds.length,
sessionsStarted: 0,
sessionsSkipped: 0,
startedSessionIds: [] as number[]
};
if (orphanedSessionIds.length === 0) {
return result;
}
logger.info('SYSTEM', `Processing up to ${sessionLimit} of ${orphanedSessionIds.length} pending session queues`);
// Process each session sequentially up to the limit
for (const sessionDbId of orphanedSessionIds) {
if (result.sessionsStarted >= sessionLimit) {
break;
}
try {
// Skip if session already has an active generator
const existingSession = this.sessionManager.getSession(sessionDbId);
if (existingSession?.generatorPromise) {
result.sessionsSkipped++;
continue;
}
// Initialize session and start SDK agent
const session = this.sessionManager.initializeSession(sessionDbId);
logger.info('SYSTEM', `Starting processor for session ${sessionDbId}`, {
project: session.project,
pendingCount: pendingStore.getPendingCount(sessionDbId)
});
// Start SDK agent (non-blocking)
session.generatorPromise = this.sdkAgent.startSession(session, this)
.finally(() => {
session.generatorPromise = null;
this.broadcastProcessingStatus();
});
result.sessionsStarted++;
result.startedSessionIds.push(sessionDbId);
// Small delay between sessions to avoid rate limiting
await new Promise(resolve => setTimeout(resolve, 100));
} catch (error) {
logger.warn('SYSTEM', `Failed to process session ${sessionDbId}`, {}, error as Error);
result.sessionsSkipped++;
}
}
return result;
}
/**
* Extract a specific section from instruction content
* Used by /api/instructions endpoint for progressive instruction loading
+1
View File
@@ -22,6 +22,7 @@ export interface ActiveSession {
cumulativeInputTokens: number; // Track input tokens for discovery cost
cumulativeOutputTokens: number; // Track output tokens for discovery cost
pendingProcessingIds: Set<number>; // Track ALL message IDs yielded but not yet processed
earliestPendingTimestamp: number | null; // Original timestamp of earliest pending message (for accurate observation timestamps)
}
export interface PendingMessage {
-6
View File
@@ -110,10 +110,4 @@ export class DatabaseManager {
return session;
}
/**
* Mark session as completed
*/
markSessionComplete(sessionDbId: number): void {
this.getSessionStore().markSessionCompleted(sessionDbId);
}
}
+16 -9
View File
@@ -115,6 +115,9 @@ export class SDKAgent {
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
// Process response (empty or not) and mark messages as processed
// Capture earliest timestamp BEFORE processing (will be cleared after)
const originalTimestamp = session.earliestPendingTimestamp;
if (responseSize > 0) {
const truncatedResponse = responseSize > 100
? textContent.substring(0, 100) + '...'
@@ -124,8 +127,8 @@ export class SDKAgent {
promptNumber: session.lastPromptNumber
}, truncatedResponse);
// Parse and process response with discovery token delta
await this.processSDKResponse(session, textContent, worker, discoveryTokens);
// Parse and process response with discovery token delta and original timestamp
await this.processSDKResponse(session, textContent, worker, discoveryTokens, originalTimestamp);
} else {
// Empty response - still need to mark pending messages as processed
await this.markMessagesProcessed(session, worker);
@@ -145,8 +148,6 @@ export class SDKAgent {
duration: `${(sessionDuration / 1000).toFixed(1)}s`
});
this.dbManager.getSessionStore().markSessionCompleted(session.sessionDbId);
} catch (error: any) {
if (error.name === 'AbortError') {
logger.warn('SDK', 'Agent aborted', { sessionId: session.sessionDbId });
@@ -254,19 +255,21 @@ export class SDKAgent {
/**
* Process SDK response text (parse XML, save to database, sync to Chroma)
* @param discoveryTokens - Token cost for discovering this response (delta, not cumulative)
* @param originalTimestamp - Original epoch when message was queued (for backlog processing accuracy)
*/
private async processSDKResponse(session: ActiveSession, text: string, worker: any | undefined, discoveryTokens: number): Promise<void> {
private async processSDKResponse(session: ActiveSession, text: string, worker: any | undefined, discoveryTokens: number, originalTimestamp: number | null): Promise<void> {
// Parse observations
const observations = parseObservations(text, session.claudeSessionId);
// Store observations
// Store observations with original timestamp (if processing backlog) or current time
for (const obs of observations) {
const { id: obsId, createdAtEpoch } = this.dbManager.getSessionStore().storeObservation(
session.claudeSessionId,
session.project,
obs,
session.lastPromptNumber,
discoveryTokens
discoveryTokens,
originalTimestamp ?? undefined
);
// Log observation details
@@ -336,14 +339,15 @@ export class SDKAgent {
// Parse summary
const summary = parseSummary(text, session.sessionDbId);
// Store summary
// Store summary with original timestamp (if processing backlog) or current time
if (summary) {
const { id: summaryId, createdAtEpoch } = this.dbManager.getSessionStore().storeSummary(
session.claudeSessionId,
session.project,
summary,
session.lastPromptNumber,
discoveryTokens
discoveryTokens,
originalTimestamp ?? undefined
);
// Log summary details
@@ -422,6 +426,9 @@ export class SDKAgent {
});
session.pendingProcessingIds.clear();
// Clear timestamp for next batch (will be set fresh from next message)
session.earliestPendingTimestamp = null;
// Clean up old processed messages (keep last 100 for UI display)
const deletedCount = pendingMessageStore.cleanupProcessed(100);
if (deletedCount > 0) {
+11 -2
View File
@@ -113,11 +113,12 @@ export class SessionManager {
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptCounter(sessionDbId),
lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptNumberFromUserPrompts(dbSession.claude_session_id),
startTime: Date.now(),
cumulativeInputTokens: 0,
cumulativeOutputTokens: 0,
pendingProcessingIds: new Set()
pendingProcessingIds: new Set(),
earliestPendingTimestamp: null
};
this.sessions.set(sessionDbId, session);
@@ -445,6 +446,14 @@ export class SessionManager {
// Track this message ID for completion marking
session.pendingProcessingIds.add(persistentMessage.id);
// Track earliest timestamp for accurate observation timestamps
// This ensures backlog messages get their original timestamps, not current time
if (session.earliestPendingTimestamp === null) {
session.earliestPendingTimestamp = persistentMessage.created_at_epoch;
} else {
session.earliestPendingTimestamp = Math.min(session.earliestPendingTimestamp, persistentMessage.created_at_epoch);
}
// Convert to PendingMessageWithId and yield
// Include original timestamp for accurate observation timestamps (survives stuck processing)
const message: PendingMessageWithId = {
@@ -51,6 +51,10 @@ export class DataRoutes extends BaseRouteHandler {
app.get('/api/processing-status', this.handleGetProcessingStatus.bind(this));
app.post('/api/processing', this.handleSetProcessing.bind(this));
// Pending queue management endpoints
app.get('/api/pending-queue', this.handleGetPendingQueue.bind(this));
app.post('/api/pending-queue/process', this.handleProcessPendingQueue.bind(this));
// Import endpoint
app.post('/api/import', this.handleImport.bind(this));
}
@@ -364,4 +368,58 @@ export class DataRoutes extends BaseRouteHandler {
stats
});
});
/**
* Get pending queue contents
* GET /api/pending-queue
* Returns all pending, processing, and failed messages with optional recently processed
*/
private handleGetPendingQueue = this.wrapHandler((req: Request, res: Response): void => {
const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
// Get queue contents (pending, processing, failed)
const queueMessages = pendingStore.getQueueMessages();
// Get recently processed (last 30 min, up to 20)
const recentlyProcessed = pendingStore.getRecentlyProcessed(20, 30);
// Get stuck message count (processing > 5 min)
const stuckCount = pendingStore.getStuckCount(5 * 60 * 1000);
// Get sessions with pending work
const sessionsWithPending = pendingStore.getSessionsWithPendingMessages();
res.json({
queue: {
messages: queueMessages,
totalPending: queueMessages.filter((m: { status: string }) => m.status === 'pending').length,
totalProcessing: queueMessages.filter((m: { status: string }) => m.status === 'processing').length,
totalFailed: queueMessages.filter((m: { status: string }) => m.status === 'failed').length,
stuckCount
},
recentlyProcessed,
sessionsWithPendingWork: sessionsWithPending
});
});
/**
* Process pending queue
* POST /api/pending-queue/process
* Body: { sessionLimit?: number } - defaults to 10
* Starts SDK agents for sessions with pending messages
*/
private handleProcessPendingQueue = this.wrapHandler(async (req: Request, res: Response): Promise<void> => {
const sessionLimit = Math.min(
Math.max(parseInt(req.body.sessionLimit, 10) || 10, 1),
100 // Max 100 sessions at once
);
const result = await this.workerService.processPendingQueues(sessionLimit);
res.json({
success: true,
...result
});
});
}
@@ -33,7 +33,6 @@ export class SessionRoutes extends BaseRouteHandler {
super();
this.completionHandler = new SessionCompletionHandler(
sessionManager,
dbManager,
eventBroadcaster
);
}
@@ -72,7 +71,6 @@ export class SessionRoutes extends BaseRouteHandler {
app.post('/api/sessions/init', this.handleSessionInitByClaudeId.bind(this));
app.post('/api/sessions/observations', this.handleObservationsByClaudeId.bind(this));
app.post('/api/sessions/summarize', this.handleSummarizeByClaudeId.bind(this));
app.post('/api/sessions/complete', this.handleSessionCompleteByClaudeId.bind(this));
}
/**
@@ -286,7 +284,7 @@ export class SessionRoutes extends BaseRouteHandler {
// Get or create session
const sessionDbId = store.createSDKSession(claudeSessionId, '', '');
const promptNumber = store.getPromptCounter(sessionDbId);
const promptNumber = store.getPromptNumberFromUserPrompts(claudeSessionId);
// Privacy check: skip if user prompt was entirely private
const userPrompt = PrivacyCheckValidator.checkUserPromptPrivacy(
@@ -353,7 +351,7 @@ export class SessionRoutes extends BaseRouteHandler {
// Get or create session
const sessionDbId = store.createSDKSession(claudeSessionId, '', '');
const promptNumber = store.getPromptCounter(sessionDbId);
const promptNumber = store.getPromptNumberFromUserPrompts(claudeSessionId);
// Privacy check: skip if user prompt was entirely private
const userPrompt = PrivacyCheckValidator.checkUserPromptPrivacy(
@@ -390,31 +388,6 @@ export class SessionRoutes extends BaseRouteHandler {
res.json({ status: 'queued' });
});
/**
* Complete session by claudeSessionId (cleanup-hook uses this)
* POST /api/sessions/complete
* Body: { claudeSessionId }
*
* Marks session complete, stops SDK agent, broadcasts status
*/
private handleSessionCompleteByClaudeId = this.wrapHandler(async (req: Request, res: Response): Promise<void> => {
const { claudeSessionId } = req.body;
if (!claudeSessionId) {
return this.badRequest(res, 'Missing claudeSessionId');
}
const found = await this.completionHandler.completeByClaudeId(claudeSessionId);
if (!found) {
// No active session - nothing to clean up (may have already been completed)
res.json({ success: true, message: 'No active session found' });
return;
}
res.json({ success: true });
});
/**
* Initialize session by claudeSessionId (new-hook uses this)
* POST /api/sessions/init
@@ -440,8 +413,9 @@ export class SessionRoutes extends BaseRouteHandler {
// Step 1: Create/get SDK session (idempotent INSERT OR IGNORE)
const sessionDbId = store.createSDKSession(claudeSessionId, project, prompt);
// Step 2: Increment prompt counter
const promptNumber = store.incrementPromptCounter(sessionDbId);
// Step 2: Get next prompt number from user_prompts count
const currentCount = store.getPromptNumberFromUserPrompts(claudeSessionId);
const promptNumber = currentCount + 1;
// Step 3: Strip privacy tags from prompt
const cleanedPrompt = stripMemoryTagsFromPrompt(prompt);
@@ -1,23 +1,20 @@
/**
* Session Completion Handler
*
* Consolidates session completion logic to eliminate duplication across
* three different completion endpoints (DELETE, POST by DB ID, POST by Claude ID).
* Consolidates session completion logic for manual session deletion/completion.
* Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete endpoints.
*
* All completion flows follow the same pattern:
* 1. Delete session from SessionManager (aborts SDK agent)
* 2. Mark session complete in database
* 3. Broadcast session completed event
* Completion flow:
* 1. Delete session from SessionManager (aborts SDK agent, cleans up in-memory state)
* 2. Broadcast session completed event (updates UI spinner)
*/
import { SessionManager } from '../SessionManager.js';
import { DatabaseManager } from '../DatabaseManager.js';
import { SessionEventBroadcaster } from '../events/SessionEventBroadcaster.js';
export class SessionCompletionHandler {
constructor(
private sessionManager: SessionManager,
private dbManager: DatabaseManager,
private eventBroadcaster: SessionEventBroadcaster
) {}
@@ -29,34 +26,7 @@ export class SessionCompletionHandler {
// Delete from session manager (aborts SDK agent)
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast session completed event
this.eventBroadcaster.broadcastSessionCompleted(sessionDbId);
}
/**
* Complete session by Claude session ID
* Used by POST /api/sessions/complete (cleanup-hook endpoint)
*
* @returns true if session was found and completed, false if no active session found
*/
async completeByClaudeId(claudeSessionId: string): Promise<boolean> {
const store = this.dbManager.getSessionStore();
// Find session by claudeSessionId
const session = store.findActiveSDKSession(claudeSessionId);
if (!session) {
// No active session - nothing to clean up (may have already been completed)
return false;
}
const sessionDbId = session.id;
// Complete using standard flow
await this.completeByDbId(sessionDbId);
return true;
}
}
+1 -1
View File
@@ -1,5 +1,5 @@
export const HOOK_TIMEOUTS = {
DEFAULT: 5000, // Standard HTTP timeout (up from 2000ms)
DEFAULT: 120000, // Standard HTTP timeout (up from 2000ms)
HEALTH_CHECK: 1000, // Worker health check (up from 500ms)
WORKER_STARTUP_WAIT: 1000,
WORKER_STARTUP_RETRIES: 15,
+18 -99
View File
@@ -1,10 +1,8 @@
import path from "path";
import { homedir } from "os";
import { spawnSync } from "child_process";
import { existsSync, writeFileSync, readFileSync, mkdirSync } from "fs";
import { readFileSync } from "fs";
import { logger } from "../utils/logger.js";
import { HOOK_TIMEOUTS, getTimeout } from "./hook-constants.js";
import { ProcessManager } from "../services/process/ProcessManager.js";
import { SettingsDefaultsManager } from "./SettingsDefaultsManager.js";
import { getWorkerRestartInstructions } from "../utils/error-messages.js";
@@ -96,123 +94,44 @@ async function getWorkerVersion(): Promise<string> {
/**
* Check if worker version matches plugin version
* If mismatch detected, restart the worker automatically
* Logs a warning if mismatch is detected
*/
async function ensureWorkerVersionMatches(): Promise<void> {
async function checkWorkerVersion(): Promise<void> {
const pluginVersion = getPluginVersion();
const workerVersion = await getWorkerVersion();
if (pluginVersion !== workerVersion) {
logger.info('SYSTEM', 'Worker version mismatch detected - restarting worker', {
logger.warn('SYSTEM', 'Worker version mismatch', {
pluginVersion,
workerVersion
workerVersion,
hint: 'Restart worker with: claude-mem worker restart'
});
// Give files time to sync before restart
await new Promise(resolve => setTimeout(resolve, getTimeout(HOOK_TIMEOUTS.PRE_RESTART_SETTLE_DELAY)));
// Restart the worker
await ProcessManager.restart(getWorkerPort());
// Give it a moment to start
await new Promise(resolve => setTimeout(resolve, 1000));
// Verify it's healthy
if (!await isWorkerHealthy()) {
throw new Error(`Worker failed to restart after version mismatch. Expected ${pluginVersion}, was running ${workerVersion}`);
}
}
}
/**
* Start the worker service using ProcessManager
* Handles both Unix (Bun) and Windows (compiled exe) platforms
*/
async function startWorker(): Promise<boolean> {
// Clean up legacy PM2 (one-time migration)
const dataDir = SettingsDefaultsManager.get('CLAUDE_MEM_DATA_DIR');
const pm2MigratedMarker = path.join(dataDir, '.pm2-migrated');
// Ensure data directory exists (may not exist on fresh install)
mkdirSync(dataDir, { recursive: true });
if (!existsSync(pm2MigratedMarker)) {
spawnSync('pm2', ['delete', 'claude-mem-worker'], { stdio: 'ignore' });
// Mark migration as complete
writeFileSync(pm2MigratedMarker, new Date().toISOString(), 'utf-8');
logger.debug('SYSTEM', 'PM2 cleanup completed and marked');
}
const port = getWorkerPort();
const result = await ProcessManager.start(port);
if (!result.success) {
logger.error('SYSTEM', 'Failed to start worker', {
platform: process.platform,
port,
error: result.error,
marketplaceRoot: MARKETPLACE_ROOT
});
}
return result.success;
}
/**
* Ensure worker service is running
* Checks health and auto-starts if not running
* Also ensures worker version matches plugin version
* Polls until worker is ready (assumes worker-cli.js start was called by hooks.json)
*/
export async function ensureWorkerRunning(): Promise<void> {
// Check if already healthy (will throw on fetch errors)
let healthy = false;
try {
healthy = await isWorkerHealthy();
} catch (error) {
// Worker not running or unreachable - continue to start it
healthy = false;
}
const maxRetries = 25; // 5 seconds total
const pollInterval = 200;
if (healthy) {
// Worker is healthy, but check if version matches
await ensureWorkerVersionMatches();
return;
}
// Try to start the worker
const started = await startWorker();
if (!started) {
const port = getWorkerPort();
throw new Error(
getWorkerRestartInstructions({
port,
customPrefix: `Worker service failed to start on port ${port}.`
})
);
}
// Wait for worker to become responsive after starting
// Try up to 5 times with 500ms delays (2.5 seconds total)
for (let i = 0; i < 5; i++) {
await new Promise(resolve => setTimeout(resolve, 500));
for (let i = 0; i < maxRetries; i++) {
try {
if (await isWorkerHealthy()) {
await ensureWorkerVersionMatches();
await checkWorkerVersion(); // logs warning on mismatch, doesn't restart
return;
}
} catch (error) {
// Continue trying
} catch {
// Continue polling
}
await new Promise(r => setTimeout(r, pollInterval));
}
// Worker started but isn't responding
const port = getWorkerPort();
logger.error('SYSTEM', 'Worker started but not responding to health checks');
throw new Error(
getWorkerRestartInstructions({
port,
customPrefix: `Worker service started but is not responding on port ${port}.`
})
);
throw new Error(getWorkerRestartInstructions({
port: getWorkerPort(),
customPrefix: 'Worker did not become ready within 5 seconds.'
}));
}