Files
claude-mem/src/services/worker/SessionManager.ts
T
Alex Newman 80a8c90a1a feat: add embedded Process Supervisor for unified process lifecycle (#1370)
* feat: add embedded Process Supervisor for unified process lifecycle management

Consolidates scattered process management (ProcessManager, GracefulShutdown,
HealthMonitor, ProcessRegistry) into a unified src/supervisor/ module.

New: ProcessRegistry with JSON persistence, env sanitizer (strips CLAUDECODE_*
vars), graceful shutdown cascade (SIGTERM → 5s wait → SIGKILL with tree-kill
on Windows), PID file liveness validation, and singleton Supervisor API.

Fixes #1352 (worker inherits CLAUDECODE env causing nested sessions)
Fixes #1356 (zombie TCP socket after Windows reboot)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add session-scoped process reaping to supervisor

Adds reapSession(sessionId) to ProcessRegistry for killing session-tagged
processes on session end. SessionManager.deleteSession() now triggers reaping.
Tightens orphan reaper interval from 60s to 30s.

Fixes #1351 (MCP server processes leak on session end)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add Unix domain socket support for worker communication

Introduces socket-manager.ts for UDS-based worker communication, eliminating
port 37777 collisions between concurrent sessions. Worker listens on
~/.claude-mem/sockets/worker.sock by default with TCP fallback.

All hook handlers, MCP server, health checks, and admin commands updated to
use socket-aware workerHttpRequest(). Backwards compatible — settings can
force TCP mode via CLAUDE_MEM_WORKER_TRANSPORT=tcp.

Fixes #1346 (port 37777 collision across concurrent sessions)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: remove in-process worker fallback from hook command

Removes the fallback path where hook scripts started WorkerService in-process,
making the worker a grandchild of Claude Code (killed by sandbox). Hooks now
always delegate to ensureWorkerStarted() which spawns a fully detached daemon.

Fixes #1249 (grandchild process killed by sandbox)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add health checker and /api/admin/doctor endpoint

Adds 30-second periodic health sweep that prunes dead processes from the
supervisor registry and cleans stale socket files. Adds /api/admin/doctor
endpoint exposing supervisor state, process liveness, and environment health.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: add comprehensive supervisor test suite

64 tests covering all supervisor modules: process registry (18 tests),
env sanitizer (8), shutdown cascade (10), socket manager (15), health
checker (5), and supervisor API (6). Includes persistence, isolation,
edge cases, and cross-module integration scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: revert Unix domain socket transport, restore TCP on port 37777

The socket-manager introduced UDS as default transport, but this broke
the HTTP server's TCP accessibility (viewer UI, curl, external monitoring).
Since there's only ever one worker process handling all sessions, the
port collision rationale for UDS doesn't apply. Reverts to TCP-only,
removing ~900 lines of unnecessary complexity.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: remove dead code found in pre-landing review

Remove unused `acceptingSpawns` field from Supervisor class (written but
never read — assertCanSpawn uses stopPromise instead) and unused
`buildWorkerUrl` import from context handler.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* updated gitignore

* fix: address PR review feedback - downgrade HTTP logging, clean up gitignore, harden supervisor

- Downgrade request/response HTTP logging from info to debug to reduce noise
- Remove unused getWorkerPort imports, use buildWorkerUrl helper
- Export ENV_PREFIXES/ENV_EXACT_MATCHES from env-sanitizer, reuse in Server.ts
- Fix isPidAlive(0) returning true (should be false)
- Add shutdownInitiated flag to prevent signal handler race condition
- Make validateWorkerPidFile testable with pidFilePath option
- Remove unused dataDir from ShutdownCascadeOptions
- Upgrade reapSession log from debug to warn
- Rename zombiePidFiles to deadProcessPids (returns actual PIDs)
- Clean up gitignore: remove duplicate datasets/, stale ~*/ and http*/ patterns
- Fix tests to use temp directories instead of relying on real PID file

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 14:49:23 -07:00

504 lines
18 KiB
TypeScript

/**
* SessionManager: Event-driven session lifecycle
*
* Responsibility:
* - Manage active session lifecycle
* - Handle event-driven message queues
* - Coordinate between HTTP requests and SDK agent
* - Zero-latency event notification (no polling)
*/
import { EventEmitter } from 'events';
import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationData } from '../worker-types.js';
import { PendingMessageStore } from '../sqlite/PendingMessageStore.js';
import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js';
import { getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
import { getSupervisor } from '../../supervisor/index.js';
export class SessionManager {
private dbManager: DatabaseManager;
private sessions: Map<number, ActiveSession> = new Map();
private sessionQueues: Map<number, EventEmitter> = new Map();
private onSessionDeletedCallback?: () => void;
private pendingStore: PendingMessageStore | null = null;
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Get or create PendingMessageStore (lazy initialization to avoid circular dependency)
*/
private getPendingStore(): PendingMessageStore {
if (!this.pendingStore) {
const sessionStore = this.dbManager.getSessionStore();
this.pendingStore = new PendingMessageStore(sessionStore.db, 3);
}
return this.pendingStore;
}
/**
* Set callback to be called when a session is deleted (for broadcasting status)
*/
setOnSessionDeleted(callback: () => void): void {
this.onSessionDeletedCallback = callback;
}
/**
* Initialize a new session or return existing one
*/
initializeSession(sessionDbId: number, currentUserPrompt?: string, promptNumber?: number): ActiveSession {
logger.debug('SESSION', 'initializeSession called', {
sessionDbId,
promptNumber,
has_currentUserPrompt: !!currentUserPrompt
});
// Check if already active
let session = this.sessions.get(sessionDbId);
if (session) {
logger.debug('SESSION', 'Returning cached session', {
sessionDbId,
contentSessionId: session.contentSessionId,
lastPromptNumber: session.lastPromptNumber
});
// Refresh project from database in case it was updated by new-hook
// This fixes the bug where sessions created with empty project get updated
// in the database but the in-memory session still has the stale empty value
const dbSession = this.dbManager.getSessionById(sessionDbId);
if (dbSession.project && dbSession.project !== session.project) {
logger.debug('SESSION', 'Updating project from database', {
sessionDbId,
oldProject: session.project,
newProject: dbSession.project
});
session.project = dbSession.project;
}
// Update userPrompt for continuation prompts
if (currentUserPrompt) {
logger.debug('SESSION', 'Updating userPrompt for continuation', {
sessionDbId,
promptNumber,
oldPrompt: session.userPrompt.substring(0, 80),
newPrompt: currentUserPrompt.substring(0, 80)
});
session.userPrompt = currentUserPrompt;
session.lastPromptNumber = promptNumber || session.lastPromptNumber;
} else {
logger.debug('SESSION', 'No currentUserPrompt provided for existing session', {
sessionDbId,
promptNumber,
usingCachedPrompt: session.userPrompt.substring(0, 80)
});
}
return session;
}
// Fetch from database
const dbSession = this.dbManager.getSessionById(sessionDbId);
logger.debug('SESSION', 'Fetched session from database', {
sessionDbId,
content_session_id: dbSession.content_session_id,
memory_session_id: dbSession.memory_session_id
});
// Log warning if we're discarding a stale memory_session_id (Issue #817)
if (dbSession.memory_session_id) {
logger.warn('SESSION', `Discarding stale memory_session_id from previous worker instance (Issue #817)`, {
sessionDbId,
staleMemorySessionId: dbSession.memory_session_id,
reason: 'SDK context lost on worker restart - will capture new ID'
});
}
// Use currentUserPrompt if provided, otherwise fall back to database (first prompt)
const userPrompt = currentUserPrompt || dbSession.user_prompt;
if (!currentUserPrompt) {
logger.debug('SESSION', 'No currentUserPrompt provided for new session, using database', {
sessionDbId,
promptNumber,
dbPrompt: dbSession.user_prompt.substring(0, 80)
});
} else {
logger.debug('SESSION', 'Initializing session with fresh userPrompt', {
sessionDbId,
promptNumber,
userPrompt: currentUserPrompt.substring(0, 80)
});
}
// Create active session
// CRITICAL: Do NOT load memorySessionId from database here (Issue #817)
// When creating a new in-memory session, any database memory_session_id is STALE
// because the SDK context was lost when the worker restarted. The SDK agent will
// capture a new memorySessionId on the first response and persist it.
// Loading stale memory_session_id causes "No conversation found" crashes on resume.
session = {
sessionDbId,
contentSessionId: dbSession.content_session_id,
memorySessionId: null, // Always start fresh - SDK will capture new ID
project: dbSession.project,
userPrompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptNumberFromUserPrompts(dbSession.content_session_id),
startTime: Date.now(),
cumulativeInputTokens: 0,
cumulativeOutputTokens: 0,
earliestPendingTimestamp: null,
conversationHistory: [], // Initialize empty - will be populated by agents
currentProvider: null, // Will be set when generator starts
consecutiveRestarts: 0, // Track consecutive restart attempts to prevent infinite loops
processingMessageIds: [], // CLAIM-CONFIRM: Track message IDs for confirmProcessed()
lastGeneratorActivity: Date.now() // Initialize for stale detection (Issue #1099)
};
logger.debug('SESSION', 'Creating new session object (memorySessionId cleared to prevent stale resume)', {
sessionDbId,
contentSessionId: dbSession.content_session_id,
dbMemorySessionId: dbSession.memory_session_id || '(none in DB)',
memorySessionId: '(cleared - will capture fresh from SDK)',
lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptNumberFromUserPrompts(dbSession.content_session_id)
});
this.sessions.set(sessionDbId, session);
// Create event emitter for queue notifications
const emitter = new EventEmitter();
this.sessionQueues.set(sessionDbId, emitter);
logger.info('SESSION', 'Session initialized', {
sessionId: sessionDbId,
project: session.project,
contentSessionId: session.contentSessionId,
queueDepth: 0,
hasGenerator: false
});
return session;
}
/**
* Get active session by ID
*/
getSession(sessionDbId: number): ActiveSession | undefined {
return this.sessions.get(sessionDbId);
}
/**
* Queue an observation for processing (zero-latency notification)
* Auto-initializes session if not in memory but exists in database
*
* CRITICAL: Persists to database FIRST before adding to in-memory queue.
* This ensures observations survive worker crashes.
*/
queueObservation(sessionDbId: number, data: ObservationData): void {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
// CRITICAL: Persist to database FIRST
const message: PendingMessage = {
type: 'observation',
tool_name: data.tool_name,
tool_input: data.tool_input,
tool_response: data.tool_response,
prompt_number: data.prompt_number,
cwd: data.cwd
};
try {
const messageId = this.getPendingStore().enqueue(sessionDbId, session.contentSessionId, message);
const queueDepth = this.getPendingStore().getPendingCount(sessionDbId);
const toolSummary = logger.formatTool(data.tool_name, data.tool_input);
logger.info('QUEUE', `ENQUEUED | sessionDbId=${sessionDbId} | messageId=${messageId} | type=observation | tool=${toolSummary} | depth=${queueDepth}`, {
sessionId: sessionDbId
});
} catch (error) {
logger.error('SESSION', 'Failed to persist observation to DB', {
sessionId: sessionDbId,
tool: data.tool_name
}, error);
throw error; // Don't continue if we can't persist
}
// Notify generator immediately (zero latency)
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
}
/**
* Queue a summarize request (zero-latency notification)
* Auto-initializes session if not in memory but exists in database
*
* CRITICAL: Persists to database FIRST before adding to in-memory queue.
* This ensures summarize requests survive worker crashes.
*/
queueSummarize(sessionDbId: number, lastAssistantMessage?: string): void {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
// CRITICAL: Persist to database FIRST
const message: PendingMessage = {
type: 'summarize',
last_assistant_message: lastAssistantMessage
};
try {
const messageId = this.getPendingStore().enqueue(sessionDbId, session.contentSessionId, message);
const queueDepth = this.getPendingStore().getPendingCount(sessionDbId);
logger.info('QUEUE', `ENQUEUED | sessionDbId=${sessionDbId} | messageId=${messageId} | type=summarize | depth=${queueDepth}`, {
sessionId: sessionDbId
});
} catch (error) {
logger.error('SESSION', 'Failed to persist summarize to DB', {
sessionId: sessionDbId
}, error);
throw error; // Don't continue if we can't persist
}
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
}
/**
* Delete a session (abort SDK agent and cleanup)
* Verifies subprocess exit to prevent zombie process accumulation (Issue #737)
*/
async deleteSession(sessionDbId: number): Promise<void> {
const session = this.sessions.get(sessionDbId);
if (!session) {
return; // Already deleted
}
const sessionDuration = Date.now() - session.startTime;
// 1. Abort the SDK agent
session.abortController.abort();
// 2. Wait for generator to finish (with 30s timeout to prevent stale stall, Issue #1099)
if (session.generatorPromise) {
const generatorDone = session.generatorPromise.catch(() => {
logger.debug('SYSTEM', 'Generator already failed, cleaning up', { sessionId: session.sessionDbId });
});
const timeoutDone = new Promise<void>(resolve => {
AbortSignal.timeout(30_000).addEventListener('abort', () => resolve(), { once: true });
});
await Promise.race([generatorDone, timeoutDone]).then(() => {}, () => {
logger.warn('SESSION', 'Generator did not exit within 30s after abort, forcing cleanup (#1099)', { sessionDbId });
});
}
// 3. Verify subprocess exit with 5s timeout (Issue #737 fix)
const tracked = getProcessBySession(sessionDbId);
if (tracked && tracked.process.exitCode === null) {
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, {
sessionId: sessionDbId,
pid: tracked.pid
});
await ensureProcessExit(tracked, 5000);
}
// 3b. Reap all supervisor-tracked processes for this session (#1351)
// This catches MCP servers and other child processes not tracked by the
// in-memory ProcessRegistry (e.g. processes registered only in supervisor.json).
try {
await getSupervisor().getRegistry().reapSession(sessionDbId);
} catch (error) {
logger.warn('SESSION', 'Supervisor reapSession failed (non-blocking)', {
sessionId: sessionDbId
}, error as Error);
}
// 4. Cleanup
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);
logger.info('SESSION', 'Session deleted', {
sessionId: sessionDbId,
duration: `${(sessionDuration / 1000).toFixed(1)}s`,
project: session.project
});
// Trigger callback to broadcast status update (spinner may need to stop)
if (this.onSessionDeletedCallback) {
this.onSessionDeletedCallback();
}
}
/**
* Remove session from in-memory maps and notify without awaiting generator.
* Used when SDK resume fails and we give up (no fallback): avoids deadlock
* from deleteSession() awaiting the same generator promise we're inside.
*/
removeSessionImmediate(sessionDbId: number): void {
const session = this.sessions.get(sessionDbId);
if (!session) return;
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);
logger.info('SESSION', 'Session removed (orphaned after SDK termination)', {
sessionId: sessionDbId,
project: session.project
});
if (this.onSessionDeletedCallback) {
this.onSessionDeletedCallback();
}
}
private static readonly MAX_SESSION_IDLE_MS = 15 * 60 * 1000; // 15 minutes
/**
* Reap sessions with no active generator and no pending work that have been idle too long.
* This unblocks the orphan reaper which skips processes for "active" sessions. (Issue #1168)
*/
async reapStaleSessions(): Promise<number> {
const now = Date.now();
const staleSessionIds: number[] = [];
for (const [sessionDbId, session] of this.sessions) {
// Skip sessions with active generators
if (session.generatorPromise) continue;
// Skip sessions with pending work
const pendingCount = this.getPendingStore().getPendingCount(sessionDbId);
if (pendingCount > 0) continue;
// No generator + no pending work + old enough = stale
const sessionAge = now - session.startTime;
if (sessionAge > SessionManager.MAX_SESSION_IDLE_MS) {
staleSessionIds.push(sessionDbId);
}
}
for (const sessionDbId of staleSessionIds) {
logger.warn('SESSION', `Reaping stale session ${sessionDbId} (no activity for >${Math.round(SessionManager.MAX_SESSION_IDLE_MS / 60000)}m)`, { sessionDbId });
await this.deleteSession(sessionDbId);
}
return staleSessionIds.length;
}
/**
* Shutdown all active sessions
*/
async shutdownAll(): Promise<void> {
const sessionIds = Array.from(this.sessions.keys());
await Promise.all(sessionIds.map(id => this.deleteSession(id)));
}
/**
* Check if any session has pending messages (for spinner tracking)
*/
hasPendingMessages(): boolean {
return this.getPendingStore().hasAnyPendingWork();
}
/**
* Get number of active sessions (for stats)
*/
getActiveSessionCount(): number {
return this.sessions.size;
}
/**
* Get total queue depth across all sessions (for activity indicator)
*/
getTotalQueueDepth(): number {
let total = 0;
// We can iterate over active sessions to get their pending count
for (const session of this.sessions.values()) {
total += this.getPendingStore().getPendingCount(session.sessionDbId);
}
return total;
}
/**
* Get total active work (queued + currently processing)
* Counts both pending messages and items actively being processed by SDK agents
*/
getTotalActiveWork(): number {
// getPendingCount includes 'processing' status, so this IS the total active work
return this.getTotalQueueDepth();
}
/**
* Check if any session is actively processing (has pending messages OR active generator)
* Used for activity indicator to prevent spinner from stopping while SDK is processing
*/
isAnySessionProcessing(): boolean {
// hasAnyPendingWork checks for 'pending' OR 'processing'
return this.getPendingStore().hasAnyPendingWork();
}
/**
* Get message iterator for SDKAgent to consume (event-driven, no polling)
* Auto-initializes session if not in memory but exists in database
*
* CRITICAL: Uses PendingMessageStore for crash-safe message persistence.
* Messages are marked as 'processing' when yielded and must be marked 'processed'
* by the SDK agent after successful completion.
*/
async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessageWithId> {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
const emitter = this.sessionQueues.get(sessionDbId);
if (!emitter) {
throw new Error(`No emitter for session ${sessionDbId}`);
}
const processor = new SessionQueueProcessor(this.getPendingStore(), emitter);
// Use the robust iterator - messages are deleted on claim (no tracking needed)
// CRITICAL: Pass onIdleTimeout callback that triggers abort to kill the subprocess
// Without this, the iterator returns but the Claude subprocess stays alive as a zombie
for await (const message of processor.createIterator({
sessionDbId,
signal: session.abortController.signal,
onIdleTimeout: () => {
logger.info('SESSION', 'Triggering abort due to idle timeout to kill subprocess', { sessionDbId });
session.idleTimedOut = true;
session.abortController.abort();
}
})) {
// Track earliest timestamp for accurate observation timestamps
// This ensures backlog messages get their original timestamps, not current time
if (session.earliestPendingTimestamp === null) {
session.earliestPendingTimestamp = message._originalTimestamp;
} else {
session.earliestPendingTimestamp = Math.min(session.earliestPendingTimestamp, message._originalTimestamp);
}
// Update generator activity for stale detection (Issue #1099)
session.lastGeneratorActivity = Date.now();
yield message;
}
}
/**
* Get the PendingMessageStore (for SDKAgent to mark messages as processed)
*/
getPendingMessageStore(): PendingMessageStore {
return this.getPendingStore();
}
}