diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index b18a520c..330e39c9 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -69,6 +69,9 @@ import { SearchRoutes } from './worker/http/routes/SearchRoutes.js'; import { SettingsRoutes } from './worker/http/routes/SettingsRoutes.js'; import { LogsRoutes } from './worker/http/routes/LogsRoutes.js'; +// Process management for zombie cleanup (Issue #737) +import { startOrphanReaper, reapOrphanedProcesses } from './worker/ProcessRegistry.js'; + /** * Build JSON status output for hook framework communication. * This is a pure function extracted for testability. @@ -121,6 +124,9 @@ export class WorkerService { private initializationComplete: Promise; private resolveInitialization!: () => void; + // Orphan reaper cleanup function (Issue #737) + private stopOrphanReaper: (() => void) | null = null; + constructor() { // Initialize the promise that will resolve when background initialization completes this.initializationComplete = new Promise((resolve) => { @@ -303,6 +309,16 @@ export class WorkerService { this.resolveInitialization(); logger.info('SYSTEM', 'Background initialization complete'); + // Start orphan reaper to clean up zombie processes (Issue #737) + this.stopOrphanReaper = startOrphanReaper(() => { + const activeIds = new Set(); + for (const [id] of this.sessionManager['sessions']) { + activeIds.add(id); + } + return activeIds; + }); + logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)'); + // Auto-recover orphaned queues (fire-and-forget with error logging) this.processPendingQueues(50).then(result => { if (result.sessionsStarted > 0) { @@ -404,6 +420,12 @@ export class WorkerService { * Shutdown the worker service */ async shutdown(): Promise { + // Stop orphan reaper before shutdown (Issue #737) + if (this.stopOrphanReaper) { + this.stopOrphanReaper(); + this.stopOrphanReaper = null; + } + await performGracefulShutdown({ server: this.server.getHttpServer(), sessionManager: this.sessionManager, diff --git a/src/services/worker/ProcessRegistry.ts b/src/services/worker/ProcessRegistry.ts new file mode 100644 index 00000000..ea09afa6 --- /dev/null +++ b/src/services/worker/ProcessRegistry.ts @@ -0,0 +1,252 @@ +/** + * ProcessRegistry: Track spawned Claude subprocesses + * + * Fixes Issue #737: Claude haiku subprocesses don't terminate properly, + * causing zombie process accumulation (user reported 155 processes / 51GB RAM). + * + * Root causes: + * 1. SDK's SpawnedProcess interface hides subprocess PIDs + * 2. deleteSession() doesn't verify subprocess exit before cleanup + * 3. abort() is fire-and-forget with no confirmation + * + * Solution: + * - Use SDK's spawnClaudeCodeProcess option to capture PIDs + * - Track all spawned processes with session association + * - Verify exit on session deletion with timeout + SIGKILL escalation + * - Safety net orphan reaper runs every 5 minutes + */ + +import { spawn, exec, ChildProcess } from 'child_process'; +import { promisify } from 'util'; +import { logger } from '../../utils/logger.js'; + +const execAsync = promisify(exec); + +interface TrackedProcess { + pid: number; + sessionDbId: number; + spawnedAt: number; + process: ChildProcess; +} + +// PID Registry - tracks spawned Claude subprocesses +const processRegistry = new Map(); + +/** + * Register a spawned process in the registry + */ +export function registerProcess(pid: number, sessionDbId: number, process: ChildProcess): void { + processRegistry.set(pid, { pid, sessionDbId, spawnedAt: Date.now(), process }); + logger.info('PROCESS', `Registered PID ${pid} for session ${sessionDbId}`, { pid, sessionDbId }); +} + +/** + * Unregister a process from the registry + */ +export function unregisterProcess(pid: number): void { + processRegistry.delete(pid); + logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid }); +} + +/** + * Get process info by session ID + * Warns if multiple processes found (indicates race condition) + */ +export function getProcessBySession(sessionDbId: number): TrackedProcess | undefined { + const matches: TrackedProcess[] = []; + for (const [, info] of processRegistry) { + if (info.sessionDbId === sessionDbId) matches.push(info); + } + if (matches.length > 1) { + logger.warn('PROCESS', `Multiple processes found for session ${sessionDbId}`, { + count: matches.length, + pids: matches.map(m => m.pid) + }); + } + return matches[0]; +} + +/** + * Get all active PIDs (for debugging) + */ +export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number; ageMs: number }> { + const now = Date.now(); + return Array.from(processRegistry.values()).map(info => ({ + pid: info.pid, + sessionDbId: info.sessionDbId, + ageMs: now - info.spawnedAt + })); +} + +/** + * Wait for a process to exit with timeout, escalating to SIGKILL if needed + * Uses event-based waiting instead of polling to avoid CPU overhead + */ +export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise { + const { pid, process: proc } = tracked; + + // Already exited? + if (proc.killed || proc.exitCode !== null) { + unregisterProcess(pid); + return; + } + + // Wait for graceful exit with timeout using event-based approach + const exitPromise = new Promise((resolve) => { + proc.once('exit', () => resolve()); + }); + + const timeoutPromise = new Promise((resolve) => { + setTimeout(resolve, timeoutMs); + }); + + await Promise.race([exitPromise, timeoutPromise]); + + // Check if exited gracefully + if (proc.killed || proc.exitCode !== null) { + unregisterProcess(pid); + return; + } + + // Timeout: escalate to SIGKILL + logger.warn('PROCESS', `PID ${pid} did not exit after ${timeoutMs}ms, sending SIGKILL`, { pid, timeoutMs }); + try { + proc.kill('SIGKILL'); + } catch { + // Already dead + } + + // Brief wait for SIGKILL to take effect + await new Promise(resolve => setTimeout(resolve, 200)); + unregisterProcess(pid); +} + +/** + * Kill system-level orphans (ppid=1 on Unix) + * These are Claude processes whose parent died unexpectedly + */ +async function killSystemOrphans(): Promise { + if (process.platform === 'win32') { + return 0; // Windows doesn't have ppid=1 orphan concept + } + + try { + const { stdout } = await execAsync( + 'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format" | grep -v grep' + ); + + let killed = 0; + for (const line of stdout.trim().split('\n')) { + if (!line) continue; + const match = line.trim().match(/^(\d+)\s+(\d+)/); + if (match && parseInt(match[2]) === 1) { // ppid=1 = orphan + const orphanPid = parseInt(match[1]); + logger.warn('PROCESS', `Killing system orphan PID ${orphanPid}`, { pid: orphanPid }); + try { + process.kill(orphanPid, 'SIGKILL'); + killed++; + } catch { + // Already dead or permission denied + } + } + } + return killed; + } catch { + return 0; // No matches or error + } +} + +/** + * Reap orphaned processes - both registry-tracked and system-level + */ +export async function reapOrphanedProcesses(activeSessionIds: Set): Promise { + let killed = 0; + + // Registry-based: kill processes for dead sessions + for (const [pid, info] of processRegistry) { + if (activeSessionIds.has(info.sessionDbId)) continue; // Active = safe + + logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${info.sessionDbId} gone)`, { pid, sessionDbId: info.sessionDbId }); + try { + info.process.kill('SIGKILL'); + killed++; + } catch { + // Already dead + } + unregisterProcess(pid); + } + + // System-level: find ppid=1 orphans + killed += await killSystemOrphans(); + + return killed; +} + +/** + * Create a custom spawn function for SDK that captures PIDs + * + * The SDK's spawnClaudeCodeProcess option allows us to intercept subprocess + * creation and capture the PID before the SDK hides it. + */ +export function createPidCapturingSpawn(sessionDbId: number) { + return (spawnOptions: { + command: string; + args: string[]; + cwd?: string; + env?: NodeJS.ProcessEnv; + signal?: AbortSignal; + }) => { + const child = spawn(spawnOptions.command, spawnOptions.args, { + cwd: spawnOptions.cwd, + env: spawnOptions.env, + stdio: ['pipe', 'pipe', 'pipe'], + signal: spawnOptions.signal, // CRITICAL: Pass signal for AbortController integration + windowsHide: true + }); + + // Register PID + if (child.pid) { + registerProcess(child.pid, sessionDbId, child); + + // Auto-unregister on exit + child.on('exit', () => { + if (child.pid) { + unregisterProcess(child.pid); + } + }); + } + + // Return SDK-compatible interface + return { + stdin: child.stdin, + stdout: child.stdout, + get killed() { return child.killed; }, + get exitCode() { return child.exitCode; }, + kill: child.kill.bind(child), + on: child.on.bind(child), + once: child.once.bind(child), + off: child.off.bind(child) + }; + }; +} + +/** + * Start the orphan reaper interval + * Returns cleanup function to stop the interval + */ +export function startOrphanReaper(getActiveSessionIds: () => Set, intervalMs: number = 5 * 60 * 1000): () => void { + const interval = setInterval(async () => { + try { + const activeIds = getActiveSessionIds(); + const killed = await reapOrphanedProcesses(activeIds); + if (killed > 0) { + logger.info('PROCESS', `Reaper cleaned up ${killed} orphaned processes`, { killed }); + } + } catch (error) { + logger.error('PROCESS', 'Reaper error', {}, error as Error); + } + }, intervalMs); + + // Return cleanup function + return () => clearInterval(interval); +} diff --git a/src/services/worker/SDKAgent.ts b/src/services/worker/SDKAgent.ts index 48c0918c..78b7f18b 100644 --- a/src/services/worker/SDKAgent.ts +++ b/src/services/worker/SDKAgent.ts @@ -20,6 +20,7 @@ import { USER_SETTINGS_PATH } from '../../shared/paths.js'; import type { ActiveSession, SDKUserMessage } from '../worker-types.js'; import { ModeManager } from '../domain/ModeManager.js'; import { processAgentResponse, type WorkerRef } from './agents/index.js'; +import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit } from './ProcessRegistry.js'; // Import Agent SDK (assumes it's installed) // @ts-ignore - Agent SDK types may not be available @@ -99,6 +100,7 @@ export class SDKAgent { // Run Agent SDK query loop // Only resume if we have a captured memory session ID + // Use custom spawn to capture PIDs for zombie process cleanup (Issue #737) const queryResult = query({ prompt: messageGenerator, options: { @@ -109,7 +111,9 @@ export class SDKAgent { ...(hasRealMemorySessionId && session.lastPromptNumber > 1 && { resume: session.memorySessionId }), disallowedTools, abortController: session.abortController, - pathToClaudeCodeExecutable: claudePath + pathToClaudeCodeExecutable: claudePath, + // Custom spawn function captures PIDs to fix zombie process accumulation + spawnClaudeCodeProcess: createPidCapturingSpawn(session.sessionDbId) } }); diff --git a/src/services/worker/SessionManager.ts b/src/services/worker/SessionManager.ts index 0c200710..8915d6ed 100644 --- a/src/services/worker/SessionManager.ts +++ b/src/services/worker/SessionManager.ts @@ -14,6 +14,7 @@ import { logger } from '../../utils/logger.js'; import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationData } from '../worker-types.js'; import { PendingMessageStore } from '../sqlite/PendingMessageStore.js'; import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js'; +import { getProcessBySession, ensureProcessExit } from './ProcessRegistry.js'; export class SessionManager { private dbManager: DatabaseManager; @@ -256,6 +257,7 @@ export class SessionManager { /** * Delete a session (abort SDK agent and cleanup) + * Verifies subprocess exit to prevent zombie process accumulation (Issue #737) */ async deleteSession(sessionDbId: number): Promise { const session = this.sessions.get(sessionDbId); @@ -265,17 +267,27 @@ export class SessionManager { const sessionDuration = Date.now() - session.startTime; - // Abort the SDK agent + // 1. Abort the SDK agent session.abortController.abort(); - // Wait for generator to finish + // 2. Wait for generator to finish if (session.generatorPromise) { - await session.generatorPromise.catch(error => { + await session.generatorPromise.catch(() => { logger.debug('SYSTEM', 'Generator already failed, cleaning up', { sessionId: session.sessionDbId }); }); } - // Cleanup + // 3. Verify subprocess exit with 5s timeout (Issue #737 fix) + const tracked = getProcessBySession(sessionDbId); + if (tracked && !tracked.process.killed && tracked.process.exitCode === null) { + logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, { + sessionId: sessionDbId, + pid: tracked.pid + }); + await ensureProcessExit(tracked, 5000); + } + + // 4. Cleanup this.sessions.delete(sessionDbId); this.sessionQueues.delete(sessionDbId);