proc.killed only means Node sent a signal — the process can still be alive. This caused premature pool slot release, allowing unbounded process spawning. - ensureProcessExit: remove proc.killed from early-exit checks, only trust exitCode - Fix 3 call-site guards that skipped cleanup for signaled-but-alive processes - Add TOTAL_PROCESS_HARD_CAP=10 safety net in waitForSlot() - After SIGKILL, wait up to 1s via exit event instead of blind 200ms sleep - Reduce reaper interval from 5min to 1min, idle threshold from 2min to 1min Closes #1226 Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -470,7 +470,7 @@ export class WorkerService {
|
|||||||
}
|
}
|
||||||
return activeIds;
|
return activeIds;
|
||||||
});
|
});
|
||||||
logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)');
|
logger.info('SYSTEM', 'Started orphan reaper (runs every 1 minute)');
|
||||||
|
|
||||||
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
|
// Reap stale sessions to unblock orphan process cleanup (Issue #1168)
|
||||||
this.staleSessionReaperInterval = setInterval(async () => {
|
this.staleSessionReaperInterval = setInterval(async () => {
|
||||||
@@ -618,7 +618,7 @@ export class WorkerService {
|
|||||||
.finally(async () => {
|
.finally(async () => {
|
||||||
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
|
// CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168)
|
||||||
const trackedProcess = getProcessBySession(session.sessionDbId);
|
const trackedProcess = getProcessBySession(session.sessionDbId);
|
||||||
if (trackedProcess && !trackedProcess.process.killed && trackedProcess.process.exitCode === null) {
|
if (trackedProcess && trackedProcess.process.exitCode === null) {
|
||||||
await ensureProcessExit(trackedProcess, 5000);
|
await ensureProcessExit(trackedProcess, 5000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,7 +91,14 @@ function notifySlotAvailable(): void {
|
|||||||
* @param maxConcurrent Max number of concurrent agents
|
* @param maxConcurrent Max number of concurrent agents
|
||||||
* @param timeoutMs Max time to wait before giving up
|
* @param timeoutMs Max time to wait before giving up
|
||||||
*/
|
*/
|
||||||
|
const TOTAL_PROCESS_HARD_CAP = 10;
|
||||||
|
|
||||||
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
|
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
|
||||||
|
// Hard cap: refuse to spawn if too many processes exist regardless of pool accounting
|
||||||
|
if (processRegistry.size >= TOTAL_PROCESS_HARD_CAP) {
|
||||||
|
throw new Error(`Hard cap exceeded: ${processRegistry.size} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
|
||||||
|
}
|
||||||
|
|
||||||
if (processRegistry.size < maxConcurrent) return;
|
if (processRegistry.size < maxConcurrent) return;
|
||||||
|
|
||||||
logger.info('PROCESS', `Pool limit reached (${processRegistry.size}/${maxConcurrent}), waiting for slot...`);
|
logger.info('PROCESS', `Pool limit reached (${processRegistry.size}/${maxConcurrent}), waiting for slot...`);
|
||||||
@@ -136,8 +143,9 @@ export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number;
|
|||||||
export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise<void> {
|
export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise<void> {
|
||||||
const { pid, process: proc } = tracked;
|
const { pid, process: proc } = tracked;
|
||||||
|
|
||||||
// Already exited?
|
// Already exited? Only trust exitCode, NOT proc.killed
|
||||||
if (proc.killed || proc.exitCode !== null) {
|
// proc.killed only means Node sent a signal — the process can still be alive
|
||||||
|
if (proc.exitCode !== null) {
|
||||||
unregisterProcess(pid);
|
unregisterProcess(pid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -153,8 +161,8 @@ export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: numb
|
|||||||
|
|
||||||
await Promise.race([exitPromise, timeoutPromise]);
|
await Promise.race([exitPromise, timeoutPromise]);
|
||||||
|
|
||||||
// Check if exited gracefully
|
// Check if exited gracefully — only trust exitCode
|
||||||
if (proc.killed || proc.exitCode !== null) {
|
if (proc.exitCode !== null) {
|
||||||
unregisterProcess(pid);
|
unregisterProcess(pid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -167,8 +175,14 @@ export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: numb
|
|||||||
// Already dead
|
// Already dead
|
||||||
}
|
}
|
||||||
|
|
||||||
// Brief wait for SIGKILL to take effect
|
// Wait for SIGKILL to take effect — use exit event with 1s timeout instead of blind sleep
|
||||||
await new Promise(resolve => setTimeout(resolve, 200));
|
const sigkillExitPromise = new Promise<void>((resolve) => {
|
||||||
|
proc.once('exit', () => resolve());
|
||||||
|
});
|
||||||
|
const sigkillTimeout = new Promise<void>((resolve) => {
|
||||||
|
setTimeout(resolve, 1000);
|
||||||
|
});
|
||||||
|
await Promise.race([sigkillExitPromise, sigkillTimeout]);
|
||||||
unregisterProcess(pid);
|
unregisterProcess(pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -234,8 +248,8 @@ async function killIdleDaemonChildren(): Promise<number> {
|
|||||||
minutes = parseInt(minMatch[1], 10);
|
minutes = parseInt(minMatch[1], 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kill if idle for more than 2 minutes
|
// Kill if idle for more than 1 minute
|
||||||
if (minutes >= 2) {
|
if (minutes >= 1) {
|
||||||
logger.info('PROCESS', `Killing idle daemon child PID ${pid} (idle ${minutes}m)`, { pid, minutes });
|
logger.info('PROCESS', `Killing idle daemon child PID ${pid} (idle ${minutes}m)`, { pid, minutes });
|
||||||
try {
|
try {
|
||||||
process.kill(pid, 'SIGKILL');
|
process.kill(pid, 'SIGKILL');
|
||||||
@@ -393,7 +407,7 @@ export function createPidCapturingSpawn(sessionDbId: number) {
|
|||||||
* Start the orphan reaper interval
|
* Start the orphan reaper interval
|
||||||
* Returns cleanup function to stop the interval
|
* Returns cleanup function to stop the interval
|
||||||
*/
|
*/
|
||||||
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 5 * 60 * 1000): () => void {
|
export function startOrphanReaper(getActiveSessionIds: () => Set<number>, intervalMs: number = 60 * 1000): () => void {
|
||||||
const interval = setInterval(async () => {
|
const interval = setInterval(async () => {
|
||||||
try {
|
try {
|
||||||
const activeIds = getActiveSessionIds();
|
const activeIds = getActiveSessionIds();
|
||||||
|
|||||||
@@ -281,7 +281,7 @@ export class SDKAgent {
|
|||||||
} finally {
|
} finally {
|
||||||
// Ensure subprocess is terminated after query completes (or on error)
|
// Ensure subprocess is terminated after query completes (or on error)
|
||||||
const tracked = getProcessBySession(session.sessionDbId);
|
const tracked = getProcessBySession(session.sessionDbId);
|
||||||
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
|
if (tracked && tracked.process.exitCode === null) {
|
||||||
await ensureProcessExit(tracked, 5000);
|
await ensureProcessExit(tracked, 5000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -302,7 +302,7 @@ export class SessionManager {
|
|||||||
|
|
||||||
// 3. Verify subprocess exit with 5s timeout (Issue #737 fix)
|
// 3. Verify subprocess exit with 5s timeout (Issue #737 fix)
|
||||||
const tracked = getProcessBySession(sessionDbId);
|
const tracked = getProcessBySession(sessionDbId);
|
||||||
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
|
if (tracked && tracked.process.exitCode === null) {
|
||||||
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, {
|
logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, {
|
||||||
sessionId: sessionDbId,
|
sessionId: sessionDbId,
|
||||||
pid: tracked.pid
|
pid: tracked.pid
|
||||||
|
|||||||
@@ -0,0 +1,204 @@
|
|||||||
|
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||||
|
import { EventEmitter } from 'events';
|
||||||
|
import {
|
||||||
|
registerProcess,
|
||||||
|
unregisterProcess,
|
||||||
|
getProcessBySession,
|
||||||
|
getActiveCount,
|
||||||
|
getActiveProcesses,
|
||||||
|
waitForSlot,
|
||||||
|
ensureProcessExit,
|
||||||
|
} from '../../src/services/worker/ProcessRegistry.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a mock ChildProcess that behaves like a real one for testing.
|
||||||
|
* Supports exitCode, killed, kill(), and event emission.
|
||||||
|
*/
|
||||||
|
function createMockProcess(overrides: { exitCode?: number | null; killed?: boolean } = {}) {
|
||||||
|
const emitter = new EventEmitter();
|
||||||
|
const mock = Object.assign(emitter, {
|
||||||
|
pid: Math.floor(Math.random() * 100000) + 1000,
|
||||||
|
exitCode: overrides.exitCode ?? null,
|
||||||
|
killed: overrides.killed ?? false,
|
||||||
|
kill(signal?: string) {
|
||||||
|
mock.killed = true;
|
||||||
|
// Simulate async exit after kill
|
||||||
|
setTimeout(() => {
|
||||||
|
mock.exitCode = signal === 'SIGKILL' ? null : 0;
|
||||||
|
mock.emit('exit', mock.exitCode, signal || 'SIGTERM');
|
||||||
|
}, 10);
|
||||||
|
return true;
|
||||||
|
},
|
||||||
|
stdin: null,
|
||||||
|
stdout: null,
|
||||||
|
stderr: null,
|
||||||
|
});
|
||||||
|
return mock;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper to clear registry between tests by unregistering all
|
||||||
|
function clearRegistry() {
|
||||||
|
for (const p of getActiveProcesses()) {
|
||||||
|
unregisterProcess(p.pid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('ProcessRegistry', () => {
|
||||||
|
beforeEach(() => {
|
||||||
|
clearRegistry();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
clearRegistry();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('registerProcess / unregisterProcess', () => {
|
||||||
|
it('should register and track a process', () => {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
expect(getActiveCount()).toBe(1);
|
||||||
|
expect(getProcessBySession(1)).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should unregister a process and free the slot', () => {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
unregisterProcess(proc.pid);
|
||||||
|
expect(getActiveCount()).toBe(0);
|
||||||
|
expect(getProcessBySession(1)).toBeUndefined();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('getProcessBySession', () => {
|
||||||
|
it('should return undefined for unknown session', () => {
|
||||||
|
expect(getProcessBySession(999)).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should find process by session ID', () => {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, 42, proc as any);
|
||||||
|
const found = getProcessBySession(42);
|
||||||
|
expect(found).toBeDefined();
|
||||||
|
expect(found!.pid).toBe(proc.pid);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('waitForSlot', () => {
|
||||||
|
it('should resolve immediately when under limit', async () => {
|
||||||
|
await waitForSlot(2); // 0 processes, limit 2
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should wait until a slot opens', async () => {
|
||||||
|
const proc1 = createMockProcess();
|
||||||
|
const proc2 = createMockProcess();
|
||||||
|
registerProcess(proc1.pid, 1, proc1 as any);
|
||||||
|
registerProcess(proc2.pid, 2, proc2 as any);
|
||||||
|
|
||||||
|
// Start waiting for slot (limit=2, both slots full)
|
||||||
|
const waitPromise = waitForSlot(2, 5000);
|
||||||
|
|
||||||
|
// Free a slot after 50ms
|
||||||
|
setTimeout(() => unregisterProcess(proc1.pid), 50);
|
||||||
|
|
||||||
|
await waitPromise; // Should resolve once slot freed
|
||||||
|
expect(getActiveCount()).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw on timeout when no slot opens', async () => {
|
||||||
|
const proc1 = createMockProcess();
|
||||||
|
const proc2 = createMockProcess();
|
||||||
|
registerProcess(proc1.pid, 1, proc1 as any);
|
||||||
|
registerProcess(proc2.pid, 2, proc2 as any);
|
||||||
|
|
||||||
|
await expect(waitForSlot(2, 100)).rejects.toThrow('Timed out waiting for agent pool slot');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should throw when hard cap (10) is exceeded', async () => {
|
||||||
|
// Register 10 processes to hit the hard cap
|
||||||
|
const procs = [];
|
||||||
|
for (let i = 0; i < 10; i++) {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, i + 100, proc as any);
|
||||||
|
procs.push(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
await expect(waitForSlot(20)).rejects.toThrow('Hard cap exceeded');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('ensureProcessExit', () => {
|
||||||
|
it('should unregister immediately if exitCode is set', async () => {
|
||||||
|
const proc = createMockProcess({ exitCode: 0 });
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
|
||||||
|
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any });
|
||||||
|
expect(getActiveCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should NOT treat proc.killed as exited — must wait for actual exit', async () => {
|
||||||
|
// This is the core bug fix: proc.killed=true but exitCode=null means NOT dead
|
||||||
|
const proc = createMockProcess({ killed: true, exitCode: null });
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
|
||||||
|
// Override kill to simulate SIGKILL + delayed exit
|
||||||
|
proc.kill = (signal?: string) => {
|
||||||
|
proc.killed = true;
|
||||||
|
setTimeout(() => {
|
||||||
|
proc.exitCode = 0;
|
||||||
|
proc.emit('exit', 0, signal);
|
||||||
|
}, 20);
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
// ensureProcessExit should NOT short-circuit on proc.killed
|
||||||
|
// It should wait for exit event or timeout, then escalate to SIGKILL
|
||||||
|
const start = Date.now();
|
||||||
|
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||||
|
expect(getActiveCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should escalate to SIGKILL after timeout', async () => {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
|
||||||
|
// Override kill: only respond to SIGKILL
|
||||||
|
let sigkillSent = false;
|
||||||
|
proc.kill = (signal?: string) => {
|
||||||
|
proc.killed = true;
|
||||||
|
if (signal === 'SIGKILL') {
|
||||||
|
sigkillSent = true;
|
||||||
|
setTimeout(() => {
|
||||||
|
proc.exitCode = -1;
|
||||||
|
proc.emit('exit', -1, 'SIGKILL');
|
||||||
|
}, 10);
|
||||||
|
}
|
||||||
|
// Don't emit exit for non-SIGKILL signals (simulates stuck process)
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||||
|
expect(sigkillSent).toBe(true);
|
||||||
|
expect(getActiveCount()).toBe(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should unregister even if process ignores SIGKILL (after 1s timeout)', async () => {
|
||||||
|
const proc = createMockProcess();
|
||||||
|
registerProcess(proc.pid, 1, proc as any);
|
||||||
|
|
||||||
|
// Override kill to never emit exit (completely stuck process)
|
||||||
|
proc.kill = () => {
|
||||||
|
proc.killed = true;
|
||||||
|
return true;
|
||||||
|
};
|
||||||
|
|
||||||
|
const start = Date.now();
|
||||||
|
await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100);
|
||||||
|
const elapsed = Date.now() - start;
|
||||||
|
|
||||||
|
// Should have waited ~100ms for graceful + ~1000ms for SIGKILL timeout
|
||||||
|
expect(elapsed).toBeGreaterThan(90);
|
||||||
|
// Process is unregistered regardless (safety net)
|
||||||
|
expect(getActiveCount()).toBe(0);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user