diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 02dd4f32..f43e05d7 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -470,7 +470,7 @@ export class WorkerService { } return activeIds; }); - logger.info('SYSTEM', 'Started orphan reaper (runs every 5 minutes)'); + logger.info('SYSTEM', 'Started orphan reaper (runs every 1 minute)'); // Reap stale sessions to unblock orphan process cleanup (Issue #1168) this.staleSessionReaperInterval = setInterval(async () => { @@ -618,7 +618,7 @@ export class WorkerService { .finally(async () => { // CRITICAL: Verify subprocess exit to prevent zombie accumulation (Issue #1168) const trackedProcess = getProcessBySession(session.sessionDbId); - if (trackedProcess && !trackedProcess.process.killed && trackedProcess.process.exitCode === null) { + if (trackedProcess && trackedProcess.process.exitCode === null) { await ensureProcessExit(trackedProcess, 5000); } diff --git a/src/services/worker/ProcessRegistry.ts b/src/services/worker/ProcessRegistry.ts index 01773e31..e5c3ab1a 100644 --- a/src/services/worker/ProcessRegistry.ts +++ b/src/services/worker/ProcessRegistry.ts @@ -91,7 +91,14 @@ function notifySlotAvailable(): void { * @param maxConcurrent Max number of concurrent agents * @param timeoutMs Max time to wait before giving up */ +const TOTAL_PROCESS_HARD_CAP = 10; + export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise { + // Hard cap: refuse to spawn if too many processes exist regardless of pool accounting + if (processRegistry.size >= TOTAL_PROCESS_HARD_CAP) { + throw new Error(`Hard cap exceeded: ${processRegistry.size} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`); + } + if (processRegistry.size < maxConcurrent) return; logger.info('PROCESS', `Pool limit reached (${processRegistry.size}/${maxConcurrent}), waiting for slot...`); @@ -136,8 +143,9 @@ export function getActiveProcesses(): Array<{ pid: number; sessionDbId: number; export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: number = 5000): Promise { const { pid, process: proc } = tracked; - // Already exited? - if (proc.killed || proc.exitCode !== null) { + // Already exited? Only trust exitCode, NOT proc.killed + // proc.killed only means Node sent a signal — the process can still be alive + if (proc.exitCode !== null) { unregisterProcess(pid); return; } @@ -153,8 +161,8 @@ export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: numb await Promise.race([exitPromise, timeoutPromise]); - // Check if exited gracefully - if (proc.killed || proc.exitCode !== null) { + // Check if exited gracefully — only trust exitCode + if (proc.exitCode !== null) { unregisterProcess(pid); return; } @@ -167,8 +175,14 @@ export async function ensureProcessExit(tracked: TrackedProcess, timeoutMs: numb // Already dead } - // Brief wait for SIGKILL to take effect - await new Promise(resolve => setTimeout(resolve, 200)); + // Wait for SIGKILL to take effect — use exit event with 1s timeout instead of blind sleep + const sigkillExitPromise = new Promise((resolve) => { + proc.once('exit', () => resolve()); + }); + const sigkillTimeout = new Promise((resolve) => { + setTimeout(resolve, 1000); + }); + await Promise.race([sigkillExitPromise, sigkillTimeout]); unregisterProcess(pid); } @@ -234,8 +248,8 @@ async function killIdleDaemonChildren(): Promise { minutes = parseInt(minMatch[1], 10); } - // Kill if idle for more than 2 minutes - if (minutes >= 2) { + // Kill if idle for more than 1 minute + if (minutes >= 1) { logger.info('PROCESS', `Killing idle daemon child PID ${pid} (idle ${minutes}m)`, { pid, minutes }); try { process.kill(pid, 'SIGKILL'); @@ -393,7 +407,7 @@ export function createPidCapturingSpawn(sessionDbId: number) { * Start the orphan reaper interval * Returns cleanup function to stop the interval */ -export function startOrphanReaper(getActiveSessionIds: () => Set, intervalMs: number = 5 * 60 * 1000): () => void { +export function startOrphanReaper(getActiveSessionIds: () => Set, intervalMs: number = 60 * 1000): () => void { const interval = setInterval(async () => { try { const activeIds = getActiveSessionIds(); diff --git a/src/services/worker/SDKAgent.ts b/src/services/worker/SDKAgent.ts index 45a89b68..e181229b 100644 --- a/src/services/worker/SDKAgent.ts +++ b/src/services/worker/SDKAgent.ts @@ -281,7 +281,7 @@ export class SDKAgent { } finally { // Ensure subprocess is terminated after query completes (or on error) const tracked = getProcessBySession(session.sessionDbId); - if (tracked && !tracked.process.killed && tracked.process.exitCode === null) { + if (tracked && tracked.process.exitCode === null) { await ensureProcessExit(tracked, 5000); } } diff --git a/src/services/worker/SessionManager.ts b/src/services/worker/SessionManager.ts index 831c3393..24e847df 100644 --- a/src/services/worker/SessionManager.ts +++ b/src/services/worker/SessionManager.ts @@ -302,7 +302,7 @@ export class SessionManager { // 3. Verify subprocess exit with 5s timeout (Issue #737 fix) const tracked = getProcessBySession(sessionDbId); - if (tracked && !tracked.process.killed && tracked.process.exitCode === null) { + if (tracked && tracked.process.exitCode === null) { logger.debug('SESSION', `Waiting for subprocess PID ${tracked.pid} to exit`, { sessionId: sessionDbId, pid: tracked.pid diff --git a/tests/worker/process-registry.test.ts b/tests/worker/process-registry.test.ts new file mode 100644 index 00000000..8fd77232 --- /dev/null +++ b/tests/worker/process-registry.test.ts @@ -0,0 +1,204 @@ +import { describe, it, expect, beforeEach, afterEach } from 'bun:test'; +import { EventEmitter } from 'events'; +import { + registerProcess, + unregisterProcess, + getProcessBySession, + getActiveCount, + getActiveProcesses, + waitForSlot, + ensureProcessExit, +} from '../../src/services/worker/ProcessRegistry.js'; + +/** + * Create a mock ChildProcess that behaves like a real one for testing. + * Supports exitCode, killed, kill(), and event emission. + */ +function createMockProcess(overrides: { exitCode?: number | null; killed?: boolean } = {}) { + const emitter = new EventEmitter(); + const mock = Object.assign(emitter, { + pid: Math.floor(Math.random() * 100000) + 1000, + exitCode: overrides.exitCode ?? null, + killed: overrides.killed ?? false, + kill(signal?: string) { + mock.killed = true; + // Simulate async exit after kill + setTimeout(() => { + mock.exitCode = signal === 'SIGKILL' ? null : 0; + mock.emit('exit', mock.exitCode, signal || 'SIGTERM'); + }, 10); + return true; + }, + stdin: null, + stdout: null, + stderr: null, + }); + return mock; +} + +// Helper to clear registry between tests by unregistering all +function clearRegistry() { + for (const p of getActiveProcesses()) { + unregisterProcess(p.pid); + } +} + +describe('ProcessRegistry', () => { + beforeEach(() => { + clearRegistry(); + }); + + afterEach(() => { + clearRegistry(); + }); + + describe('registerProcess / unregisterProcess', () => { + it('should register and track a process', () => { + const proc = createMockProcess(); + registerProcess(proc.pid, 1, proc as any); + expect(getActiveCount()).toBe(1); + expect(getProcessBySession(1)).toBeDefined(); + }); + + it('should unregister a process and free the slot', () => { + const proc = createMockProcess(); + registerProcess(proc.pid, 1, proc as any); + unregisterProcess(proc.pid); + expect(getActiveCount()).toBe(0); + expect(getProcessBySession(1)).toBeUndefined(); + }); + }); + + describe('getProcessBySession', () => { + it('should return undefined for unknown session', () => { + expect(getProcessBySession(999)).toBeUndefined(); + }); + + it('should find process by session ID', () => { + const proc = createMockProcess(); + registerProcess(proc.pid, 42, proc as any); + const found = getProcessBySession(42); + expect(found).toBeDefined(); + expect(found!.pid).toBe(proc.pid); + }); + }); + + describe('waitForSlot', () => { + it('should resolve immediately when under limit', async () => { + await waitForSlot(2); // 0 processes, limit 2 + }); + + it('should wait until a slot opens', async () => { + const proc1 = createMockProcess(); + const proc2 = createMockProcess(); + registerProcess(proc1.pid, 1, proc1 as any); + registerProcess(proc2.pid, 2, proc2 as any); + + // Start waiting for slot (limit=2, both slots full) + const waitPromise = waitForSlot(2, 5000); + + // Free a slot after 50ms + setTimeout(() => unregisterProcess(proc1.pid), 50); + + await waitPromise; // Should resolve once slot freed + expect(getActiveCount()).toBe(1); + }); + + it('should throw on timeout when no slot opens', async () => { + const proc1 = createMockProcess(); + const proc2 = createMockProcess(); + registerProcess(proc1.pid, 1, proc1 as any); + registerProcess(proc2.pid, 2, proc2 as any); + + await expect(waitForSlot(2, 100)).rejects.toThrow('Timed out waiting for agent pool slot'); + }); + + it('should throw when hard cap (10) is exceeded', async () => { + // Register 10 processes to hit the hard cap + const procs = []; + for (let i = 0; i < 10; i++) { + const proc = createMockProcess(); + registerProcess(proc.pid, i + 100, proc as any); + procs.push(proc); + } + + await expect(waitForSlot(20)).rejects.toThrow('Hard cap exceeded'); + }); + }); + + describe('ensureProcessExit', () => { + it('should unregister immediately if exitCode is set', async () => { + const proc = createMockProcess({ exitCode: 0 }); + registerProcess(proc.pid, 1, proc as any); + + await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }); + expect(getActiveCount()).toBe(0); + }); + + it('should NOT treat proc.killed as exited — must wait for actual exit', async () => { + // This is the core bug fix: proc.killed=true but exitCode=null means NOT dead + const proc = createMockProcess({ killed: true, exitCode: null }); + registerProcess(proc.pid, 1, proc as any); + + // Override kill to simulate SIGKILL + delayed exit + proc.kill = (signal?: string) => { + proc.killed = true; + setTimeout(() => { + proc.exitCode = 0; + proc.emit('exit', 0, signal); + }, 20); + return true; + }; + + // ensureProcessExit should NOT short-circuit on proc.killed + // It should wait for exit event or timeout, then escalate to SIGKILL + const start = Date.now(); + await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100); + expect(getActiveCount()).toBe(0); + }); + + it('should escalate to SIGKILL after timeout', async () => { + const proc = createMockProcess(); + registerProcess(proc.pid, 1, proc as any); + + // Override kill: only respond to SIGKILL + let sigkillSent = false; + proc.kill = (signal?: string) => { + proc.killed = true; + if (signal === 'SIGKILL') { + sigkillSent = true; + setTimeout(() => { + proc.exitCode = -1; + proc.emit('exit', -1, 'SIGKILL'); + }, 10); + } + // Don't emit exit for non-SIGKILL signals (simulates stuck process) + return true; + }; + + await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100); + expect(sigkillSent).toBe(true); + expect(getActiveCount()).toBe(0); + }); + + it('should unregister even if process ignores SIGKILL (after 1s timeout)', async () => { + const proc = createMockProcess(); + registerProcess(proc.pid, 1, proc as any); + + // Override kill to never emit exit (completely stuck process) + proc.kill = () => { + proc.killed = true; + return true; + }; + + const start = Date.now(); + await ensureProcessExit({ pid: proc.pid, sessionDbId: 1, spawnedAt: Date.now(), process: proc as any }, 100); + const elapsed = Date.now() - start; + + // Should have waited ~100ms for graceful + ~1000ms for SIGKILL timeout + expect(elapsed).toBeGreaterThan(90); + // Process is unregistered regardless (safety net) + expect(getActiveCount()).toBe(0); + }); + }); +});