fix: prevent zombie process accumulation via PID registry and signal propagation (Issue #737) (#806)
* Fix zombie process accumulation (Issue #737) Problem: Claude haiku subprocesses spawned by the SDK weren't terminating properly, causing zombie process accumulation (user reported 155 processes consuming 51GB RAM). Root causes: 1. SDK's SpawnedProcess interface hides subprocess PIDs 2. deleteSession() didn't verify subprocess exit 3. abort() was fire-and-forget with no confirmation 4. No mechanism to track or clean up orphaned processes Solution: - Add ProcessRegistry module to track spawned Claude subprocesses - Use SDK's spawnClaudeCodeProcess option to capture PIDs via custom spawn - Pass signal parameter to enable AbortController integration - Wait for subprocess exit in deleteSession() with 5s timeout - Escalate to SIGKILL if graceful exit fails - Add orphan reaper running every 5 minutes as safety net Files changed: - src/services/worker/ProcessRegistry.ts (new): PID registry and reaper - src/services/worker/SDKAgent.ts: Use custom spawn to capture PIDs - src/services/worker/SessionManager.ts: Verify subprocess exit on delete - src/services/worker-service.ts: Start/stop orphan reaper Fixes #737 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: address code review feedback - Replace busy-wait polling with event-based proc.once('exit') - Detect and warn about multiple processes per session (race condition) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: bigphoot <bigphoot@gmail.com> Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -69,6 +69,9 @@ import { SearchRoutes } from './worker/http/routes/SearchRoutes.js';
|
||||
import { SettingsRoutes } from './worker/http/routes/SettingsRoutes.js';
|
||||
import { LogsRoutes } from './worker/http/routes/LogsRoutes.js';
|
||||
|
||||
// Process management for zombie cleanup (Issue #737)
|
||||
import { startOrphanReaper, reapOrphanedProcesses } from './worker/ProcessRegistry.js';
|
||||
|
||||
/**
|
||||
* Build JSON status output for hook framework communication.
|
||||
* This is a pure function extracted for testability.
|
||||
@@ -121,6 +124,9 @@ export class WorkerService {
|
||||
private initializationComplete: Promise<void>;
|
||||
private resolveInitialization!: () => void;
|
||||
|
||||
// Orphan reaper cleanup function (Issue #737)
|
||||
private stopOrphanReaper: (() => void) | null = null;
|
||||
|
||||
constructor() {
|
||||
// Initialize the promise that will resolve when background initialization completes
|
||||
this.initializationComplete = new Promise((resolve) => {
|
||||
@@ -303,6 +309,16 @@ export class WorkerService {
|
||||
this.resolveInitialization();
|
||||
logger.info('SYSTEM', 'Background initialization complete');
|
||||
|
||||
// Start orphan reaper to clean up zombie processes (Issue #737)
|
||||
this.stopOrphanReaper = startOrphanReaper(() => {
|
||||
const activeIds = new Set<number>();
|
||||
for (const [id] of this.sessionManager['sessions']) {
|
||||
activeIds.add(id);
|
||||
}
|
||||
return activeIds;
|
||||
});
|
||||
logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)');
|
||||
|
||||
// Auto-recover orphaned queues (fire-and-forget with error logging)
|
||||
this.processPendingQueues(50).then(result => {
|
||||
if (result.sessionsStarted > 0) {
|
||||
@@ -404,6 +420,12 @@ export class WorkerService {
|
||||
* Shutdown the worker service
|
||||
*/
|
||||
async shutdown(): Promise<void> {
|
||||
// Stop orphan reaper before shutdown (Issue #737)
|
||||
if (this.stopOrphanReaper) {
|
||||
this.stopOrphanReaper();
|
||||
this.stopOrphanReaper = null;
|
||||
}
|
||||
|
||||
await performGracefulShutdown({
|
||||
server: this.server.getHttpServer(),
|
||||
sessionManager: this.sessionManager,
|
||||
|
||||
@@ -0,0 +1,252 @@
|
||||
/**
|
||||
* ProcessRegistry: Track spawned Claude subprocesses
|
||||
*
|
||||
* Fixes Issue #737: Claude haiku subprocesses don't terminate properly,
|
||||
* causing zombie process accumulation (user reported 155 processes / 51GB RAM).
|
||||
*
|
||||
* Root causes:
|
||||
* 1. SDK's SpawnedProcess interface hides subprocess PIDs
|
||||
* 2. deleteSession() doesn't verify subprocess exit before cleanup
|
||||
* 3. abort() is fire-and-forget with no confirmation
|
||||
*
|
||||
* Solution:
|
||||
* - Use SDK's spawnClaudeCodeProcess option to capture PIDs
|
||||
* - Track all spawned processes with session association
|
||||
* - Verify exit on session deletion with timeout + SIGKILL escalation
|
||||
* - Safety net orphan reaper runs every 5 minutes
|
||||
*/
|
||||
|
||||
import { spawn, exec, ChildProcess } from 'child_process';
|
||||
import { promisify } from 'util';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
|
||||
const execAsync = promisify(exec);
|
||||
|
||||
interface TrackedProcess {
|
||||
pid: number;
|
||||
sessionDbId: number;
|
||||
spawnedAt: number;
|
||||
process: ChildProcess;
|
||||
}
|
||||
|
||||
// PID Registry - tracks spawned Claude subprocesses
|
||||
const processRegistry = new Map<number, TrackedProcess>();
|
||||
|
||||
/**
|
||||
* Register a spawned process in the registry
|
||||
*/
|
||||
export function registerProcess(pid: number, sessionDbId: number, process: ChildProcess): void {
|
||||
processRegistry.set(pid, { pid, sessionDbId, spawnedAt: Date.now(), process });
|
||||
logger.info('PROCESS', `Registered PID ${pid} for session ${sessionDbId}`, { pid, sessionDbId });
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister a process from the registry
|
||||
*/
|
||||
export function unregisterProcess(pid: number): void {
|
||||
processRegistry.delete(pid);
|
||||
logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid });
|
||||
}
|
||||
|
||||
/**
|
||||
* Get process info by session ID
|
||||
* Warns if multiple processes found (indicates race condition)
|
||||
*/
|
||||
export function getProcessBySession(sessionDbId: number): TrackedProcess | undefined {
|
||||
const matches: TrackedProcess[] = [];
|
||||
for (const [, info] of processRegistry) {
|
||||
if (info.sessionDbId === sessionDbId) matches.push(info);
|
||||
}
|
||||
if (matches.length > 1) {
|
||||
logger.warn('PROCESS', `Multiple processes found for session ${sessionDbId}`, {
|
||||
count: matches.length,
|
||||
pids: matches.map(m => m.pid)
|
||||
});
|
||||
}
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all active PIDs (for debugging)
|
||||
*/
|
||||
export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number; ageMs: number }> {
|
||||
const now = Date.now();
|
||||
return Array.from(processRegistry.values()).map(info => ({
|
||||
pid: info.pid,
|
||||
sessionDbId: info.sessionDbId,
|
||||
ageMs: now - info.spawnedAt
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a process to exit with timeout, escalating to SIGKILL if needed
|
||||
* Uses event-based waiting instead of polling to avoid CPU overhead
|
||||
*/
|
||||
export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise<void> {
|
||||
const { pid, process: proc } = tracked;
|
||||
|
||||
// Already exited?
|
||||
if (proc.killed || proc.exitCode !== null) {
|
||||
unregisterProcess(pid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wait for graceful exit with timeout using event-based approach
|
||||
const exitPromise = new Promise<void>((resolve) => {
|
||||
proc.once('exit', () => resolve());
|
||||
});
|
||||
|
||||
const timeoutPromise = new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, timeoutMs);
|
||||
});
|
||||
|
||||
await Promise.race([exitPromise, timeoutPromise]);
|
||||
|
||||
// Check if exited gracefully
|
||||
if (proc.killed || proc.exitCode !== null) {
|
||||
unregisterProcess(pid);
|
||||
return;
|
||||
}
|
||||
|
||||
// Timeout: escalate to SIGKILL
|
||||
logger.warn('PROCESS', `PID ${pid} did not exit after ${timeoutMs}ms, sending SIGKILL`, { pid, timeoutMs });
|
||||
try {
|
||||
proc.kill('SIGKILL');
|
||||
} catch {
|
||||
// Already dead
|
||||
}
|
||||
|
||||
// Brief wait for SIGKILL to take effect
|
||||
await new Promise(resolve => setTimeout(resolve, 200));
|
||||
unregisterProcess(pid);
|
||||
}
|
||||
|
||||
/**
|
||||
* Kill system-level orphans (ppid=1 on Unix)
|
||||
* These are Claude processes whose parent died unexpectedly
|
||||
*/
|
||||
async function killSystemOrphans(): Promise<number> {
|
||||
if (process.platform === 'win32') {
|
||||
return 0; // Windows doesn't have ppid=1 orphan concept
|
||||
}
|
||||
|
||||
try {
|
||||
const { stdout } = await execAsync(
|
||||
'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format" | grep -v grep'
|
||||
);
|
||||
|
||||
let killed = 0;
|
||||
for (const line of stdout.trim().split('\n')) {
|
||||
if (!line) continue;
|
||||
const match = line.trim().match(/^(\d+)\s+(\d+)/);
|
||||
if (match && parseInt(match[2]) === 1) { // ppid=1 = orphan
|
||||
const orphanPid = parseInt(match[1]);
|
||||
logger.warn('PROCESS', `Killing system orphan PID ${orphanPid}`, { pid: orphanPid });
|
||||
try {
|
||||
process.kill(orphanPid, 'SIGKILL');
|
||||
killed++;
|
||||
} catch {
|
||||
// Already dead or permission denied
|
||||
}
|
||||
}
|
||||
}
|
||||
return killed;
|
||||
} catch {
|
||||
return 0; // No matches or error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reap orphaned processes - both registry-tracked and system-level
|
||||
*/
|
||||
export async function reapOrphanedProcesses(activeSessionIds: Set<number>): Promise<number> {
|
||||
let killed = 0;
|
||||
|
||||
// Registry-based: kill processes for dead sessions
|
||||
for (const [pid, info] of processRegistry) {
|
||||
if (activeSessionIds.has(info.sessionDbId)) continue; // Active = safe
|
||||
|
||||
logger.warn('PROCESS', `Killing orphan PID ${pid} (session ${info.sessionDbId} gone)`, { pid, sessionDbId: info.sessionDbId });
|
||||
try {
|
||||
info.process.kill('SIGKILL');
|
||||
killed++;
|
||||
} catch {
|
||||
// Already dead
|
||||
}
|
||||
unregisterProcess(pid);
|
||||
}
|
||||
|
||||
// System-level: find ppid=1 orphans
|
||||
killed += await killSystemOrphans();
|
||||
|
||||
return killed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a custom spawn function for SDK that captures PIDs
|
||||
*
|
||||
* The SDK's spawnClaudeCodeProcess option allows us to intercept subprocess
|
||||
* creation and capture the PID before the SDK hides it.
|
||||
*/
|
||||
export function createPidCapturingSpawn(sessionDbId: number) {
|
||||
return (spawnOptions: {
|
||||
command: string;
|
||||
args: string[];
|
||||
cwd?: string;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
signal?: AbortSignal;
|
||||
}) => {
|
||||
const child = spawn(spawnOptions.command, spawnOptions.args, {
|
||||
cwd: spawnOptions.cwd,
|
||||
env: spawnOptions.env,
|
||||
stdio: ['pipe', 'pipe', 'pipe'],
|
||||
signal: spawnOptions.signal, // CRITICAL: Pass signal for AbortController integration
|
||||
windowsHide: true
|
||||
});
|
||||
|
||||
// Register PID
|
||||
if (child.pid) {
|
||||
registerProcess(child.pid, sessionDbId, child);
|
||||
|
||||
// Auto-unregister on exit
|
||||
child.on('exit', () => {
|
||||
if (child.pid) {
|
||||
unregisterProcess(child.pid);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Return SDK-compatible interface
|
||||
return {
|
||||
stdin: child.stdin,
|
||||
stdout: child.stdout,
|
||||
get killed() { return child.killed; },
|
||||
get exitCode() { return child.exitCode; },
|
||||
kill: child.kill.bind(child),
|
||||
on: child.on.bind(child),
|
||||
once: child.once.bind(child),
|
||||
off: child.off.bind(child)
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the orphan reaper interval
|
||||
* Returns cleanup function to stop the interval
|
||||
*/
|
||||
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 5 * 60 * 1000): () => void {
|
||||
const interval = setInterval(async () => {
|
||||
try {
|
||||
const activeIds = getActiveSessionIds();
|
||||
const killed = await reapOrphanedProcesses(activeIds);
|
||||
if (killed > 0) {
|
||||
logger.info('PROCESS', `Reaper cleaned up ${killed} orphaned processes`, { killed });
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('PROCESS', 'Reaper error', {}, error as Error);
|
||||
}
|
||||
}, intervalMs);
|
||||
|
||||
// Return cleanup function
|
||||
return () => clearInterval(interval);
|
||||
}
|
||||
@@ -20,6 +20,7 @@ import { USER_SETTINGS_PATH } from '../../shared/paths.js';
|
||||
import type { ActiveSession, SDKUserMessage } from '../worker-types.js';
|
||||
import { ModeManager } from '../domain/ModeManager.js';
|
||||
import { processAgentResponse, type WorkerRef } from './agents/index.js';
|
||||
import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
|
||||
|
||||
// Import Agent SDK (assumes it's installed)
|
||||
// @ts-ignore - Agent SDK types may not be available
|
||||
@@ -99,6 +100,7 @@ export class SDKAgent {
|
||||
|
||||
// Run Agent SDK query loop
|
||||
// Only resume if we have a captured memory session ID
|
||||
// Use custom spawn to capture PIDs for zombie process cleanup (Issue #737)
|
||||
const queryResult = query({
|
||||
prompt: messageGenerator,
|
||||
options: {
|
||||
@@ -109,7 +111,9 @@ export class SDKAgent {
|
||||
...(hasRealMemorySessionId && session.lastPromptNumber > 1 && { resume: session.memorySessionId }),
|
||||
disallowedTools,
|
||||
abortController: session.abortController,
|
||||
pathToClaudeCodeExecutable: claudePath
|
||||
pathToClaudeCodeExecutable: claudePath,
|
||||
// Custom spawn function captures PIDs to fix zombie process accumulation
|
||||
spawnClaudeCodeProcess: createPidCapturingSpawn(session.sessionDbId)
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ import { logger } from '../../utils/logger.js';
|
||||
import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationData } from '../worker-types.js';
|
||||
import { PendingMessageStore } from '../sqlite/PendingMessageStore.js';
|
||||
import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js';
|
||||
import { getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
|
||||
|
||||
export class SessionManager {
|
||||
private dbManager: DatabaseManager;
|
||||
@@ -256,6 +257,7 @@ export class SessionManager {
|
||||
|
||||
/**
|
||||
* Delete a session (abort SDK agent and cleanup)
|
||||
* Verifies subprocess exit to prevent zombie process accumulation (Issue #737)
|
||||
*/
|
||||
async deleteSession(sessionDbId: number): Promise<void> {
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
@@ -265,17 +267,27 @@ export class SessionManager {
|
||||
|
||||
const sessionDuration = Date.now() - session.startTime;
|
||||
|
||||
// Abort the SDK agent
|
||||
// 1. Abort the SDK agent
|
||||
session.abortController.abort();
|
||||
|
||||
// Wait for generator to finish
|
||||
// 2. Wait for generator to finish
|
||||
if (session.generatorPromise) {
|
||||
await session.generatorPromise.catch(error => {
|
||||
await session.generatorPromise.catch(() => {
|
||||
logger.debug('SYSTEM', 'Generator already failed, cleaning up', { sessionId: session.sessionDbId });
|
||||
});
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
// 3. Verify subprocess exit with 5s timeout (Issue #737 fix)
|
||||
const tracked = getProcessBySession(sessionDbId);
|
||||
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
|
||||
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, {
|
||||
sessionId: sessionDbId,
|
||||
pid: tracked.pid
|
||||
});
|
||||
await ensureProcessExit(tracked, 5000);
|
||||
}
|
||||
|
||||
// 4. Cleanup
|
||||
this.sessions.delete(sessionDbId);
|
||||
this.sessionQueues.delete(sessionDbId);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user