* fix: add session lifecycle guards to prevent runaway API spend (#1590) Three root causes allowed 30+ subprocess accumulation over 36 hours: 1. SIGTERM-killed processes (code 143) triggered crash recovery and immediately respawned — now detected and treated as intentional termination (aborts controller so wasAborted=true in .finally). 2. No wall-clock limit: sessions ran for 13+ hours continuously spending tokens — now refuses new generators after 4 hours and drains the pending queue to prevent further spawning. 3. Duplicate --resume processes for the same session UUID — now killed and unregistered before a new spawn is registered. Generated by Claude Code Vibe coded by ousamabenyounes Co-Authored-By: Claude <noreply@anthropic.com> * fix: use normalized errorMsg in logger.error payload and annotate SIGTERM override Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: use persisted createdAt for wall-clock guard and bind abortController locally to prevent stale abort Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore: re-trigger CodeRabbit review after rate limit reset * fix: defer process unregistration until exit and align boundary test with strict > (#1693) - ProcessRegistry: don't unregister PID immediately after SIGTERM — let the existing 'exit' handler clean up when the process actually exits, preventing tracking loss for still-live processes. - Test: align wall-clock boundary test with production's strict `>` operator (exactly 4h is NOT terminated, only >4h is). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -382,6 +382,30 @@ export function createPidCapturingSpawn(sessionDbId: number) {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
signal?: AbortSignal;
|
||||
}) => {
|
||||
// Kill any existing process for this session before spawning a new one.
|
||||
// Multiple processes sharing the same --resume UUID waste API credits and
|
||||
// can conflict with each other (Issue #1590).
|
||||
const existing = getProcessBySession(sessionDbId);
|
||||
if (existing && existing.process.exitCode === null) {
|
||||
logger.warn('PROCESS', `Killing duplicate process PID ${existing.pid} before spawning new one for session ${sessionDbId}`, {
|
||||
existingPid: existing.pid,
|
||||
sessionDbId
|
||||
});
|
||||
let exited = false;
|
||||
try {
|
||||
existing.process.kill('SIGTERM');
|
||||
exited = existing.process.exitCode !== null;
|
||||
} catch {
|
||||
// Already dead — safe to unregister immediately
|
||||
exited = true;
|
||||
}
|
||||
|
||||
if (exited) {
|
||||
unregisterProcess(existing.pid);
|
||||
}
|
||||
// If still alive, the 'exit' handler (line ~440) will unregister it.
|
||||
}
|
||||
|
||||
getSupervisor().assertCanSpawn('claude sdk');
|
||||
|
||||
// On Windows, use cmd.exe wrapper for .cmd files to properly handle paths with spaces
|
||||
|
||||
@@ -94,11 +94,37 @@ export class SessionRoutes extends BaseRouteHandler {
|
||||
* The next generator will use the new provider with shared conversationHistory.
|
||||
*/
|
||||
private static readonly STALE_GENERATOR_THRESHOLD_MS = 30_000; // 30 seconds (#1099)
|
||||
private static readonly MAX_SESSION_WALL_CLOCK_MS = 4 * 60 * 60 * 1000; // 4 hours (#1590)
|
||||
|
||||
private ensureGeneratorRunning(sessionDbId: number, source: string): void {
|
||||
const session = this.sessionManager.getSession(sessionDbId);
|
||||
if (!session) return;
|
||||
|
||||
// Wall-clock age guard: refuse to start new generators for sessions that have
|
||||
// been alive too long to prevent runaway API costs (Issue #1590).
|
||||
// Use the persisted started_at_epoch from the DB so the guard survives worker
|
||||
// restarts (session.startTime is reset to Date.now() on every re-activation).
|
||||
const dbSessionRecord = this.dbManager.getSessionStore().db
|
||||
.prepare('SELECT started_at_epoch FROM sdk_sessions WHERE id = ? LIMIT 1')
|
||||
.get(sessionDbId) as { started_at_epoch: number } | undefined;
|
||||
const sessionOriginMs = dbSessionRecord?.started_at_epoch ?? session.startTime;
|
||||
const sessionAgeMs = Date.now() - sessionOriginMs;
|
||||
if (sessionAgeMs > SessionRoutes.MAX_SESSION_WALL_CLOCK_MS) {
|
||||
logger.warn('SESSION', 'Session exceeded wall-clock age limit — aborting to prevent runaway spend', {
|
||||
sessionId: sessionDbId,
|
||||
ageHours: Math.round(sessionAgeMs / 3_600_000 * 10) / 10,
|
||||
limitHours: SessionRoutes.MAX_SESSION_WALL_CLOCK_MS / 3_600_000,
|
||||
source
|
||||
});
|
||||
if (!session.abortController.signal.aborted) {
|
||||
session.abortController.abort();
|
||||
}
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
return;
|
||||
}
|
||||
|
||||
// GUARD: Prevent duplicate spawns
|
||||
if (this.spawnInProgress.get(sessionDbId)) {
|
||||
logger.debug('SESSION', 'Spawn already in progress, skipping', { sessionDbId, source });
|
||||
@@ -187,15 +213,37 @@ export class SessionRoutes extends BaseRouteHandler {
|
||||
session.currentProvider = provider;
|
||||
session.lastGeneratorActivity = Date.now();
|
||||
|
||||
// Capture the AbortController that belongs to THIS generator run.
|
||||
// session.abortController may be replaced (e.g. by stale-recovery) before the
|
||||
// .catch / .finally handlers run, so binding it here prevents a stale rejection
|
||||
// from cancelling a brand-new controller (race condition guard).
|
||||
const myController = session.abortController;
|
||||
|
||||
session.generatorPromise = agent.startSession(session, this.workerService)
|
||||
.catch(error => {
|
||||
// Only log non-abort errors
|
||||
if (session.abortController.signal.aborted) return;
|
||||
|
||||
if (myController.signal.aborted) return;
|
||||
|
||||
const errorMsg = error instanceof Error ? error.message : String(error);
|
||||
|
||||
// Treat SIGTERM (exit code 143) as intentional termination, not a crash.
|
||||
// When a subprocess is killed externally, abort the controller to prevent
|
||||
// crash recovery from immediately respawning the process (Issue #1590).
|
||||
// APPROVED OVERRIDE
|
||||
if (errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM')) {
|
||||
logger.warn('SESSION', 'Generator killed by external signal — aborting session to prevent respawn', {
|
||||
sessionId: session.sessionDbId,
|
||||
provider,
|
||||
error: errorMsg
|
||||
});
|
||||
myController.abort();
|
||||
return;
|
||||
}
|
||||
|
||||
logger.error('SESSION', `Generator failed`, {
|
||||
sessionId: session.sessionDbId,
|
||||
provider: provider,
|
||||
error: error.message
|
||||
error: errorMsg
|
||||
}, error);
|
||||
|
||||
// Mark all processing messages as failed so they can be retried or abandoned
|
||||
|
||||
@@ -0,0 +1,251 @@
|
||||
/**
|
||||
* Tests for Issue #1590: Session lifecycle guards to prevent runaway API spend
|
||||
*
|
||||
* Validates three lifecycle safety mechanisms:
|
||||
* 1. SIGTERM detection: externally-killed processes must NOT trigger crash recovery
|
||||
* 2. Wall-clock age limit: sessions older than MAX_SESSION_WALL_CLOCK_MS must be terminated
|
||||
* 3. Duplicate process prevention: a new spawn for a session kills any existing process first
|
||||
*/
|
||||
|
||||
import { describe, it, expect, beforeEach, afterEach } from 'bun:test';
|
||||
import { EventEmitter } from 'events';
|
||||
import {
|
||||
registerProcess,
|
||||
unregisterProcess,
|
||||
getProcessBySession,
|
||||
getActiveCount,
|
||||
getActiveProcesses,
|
||||
createPidCapturingSpawn,
|
||||
} from '../../src/services/worker/ProcessRegistry.js';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function createMockProcess(overrides: { exitCode?: number | null; killed?: boolean } = {}) {
|
||||
const emitter = new EventEmitter();
|
||||
const mock = Object.assign(emitter, {
|
||||
pid: Math.floor(Math.random() * 100_000) + 10_000,
|
||||
exitCode: overrides.exitCode ?? null,
|
||||
killed: overrides.killed ?? false,
|
||||
stdin: null as null,
|
||||
stdout: null as null,
|
||||
stderr: null as null,
|
||||
kill(signal?: string) {
|
||||
mock.killed = true;
|
||||
setTimeout(() => {
|
||||
mock.exitCode = 0;
|
||||
mock.emit('exit', mock.exitCode, signal || 'SIGTERM');
|
||||
}, 10);
|
||||
return true;
|
||||
},
|
||||
on: emitter.on.bind(emitter),
|
||||
once: emitter.once.bind(emitter),
|
||||
off: emitter.off.bind(emitter),
|
||||
});
|
||||
return mock;
|
||||
}
|
||||
|
||||
function clearRegistry() {
|
||||
for (const p of getActiveProcesses()) {
|
||||
unregisterProcess(p.pid);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 1. SIGTERM detection — does NOT trigger crash recovery
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('SIGTERM detection (Issue #1590)', () => {
|
||||
it('should classify "code 143" as a SIGTERM error', () => {
|
||||
const errorMsg = 'Claude Code process exited with code 143';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(true);
|
||||
});
|
||||
|
||||
it('should classify "signal SIGTERM" as a SIGTERM error', () => {
|
||||
const errorMsg = 'Process terminated with signal SIGTERM';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT classify ordinary errors as SIGTERM', () => {
|
||||
const errorMsg = 'Invalid API key';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(false);
|
||||
});
|
||||
|
||||
it('should NOT classify code 1 (normal error) as SIGTERM', () => {
|
||||
const errorMsg = 'Claude Code process exited with code 1';
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
expect(isSigterm).toBe(false);
|
||||
});
|
||||
|
||||
it('aborting the controller should mark wasAborted=true, preventing crash recovery', () => {
|
||||
// Simulate what the catch handler does: abort when SIGTERM detected
|
||||
const abortController = new AbortController();
|
||||
expect(abortController.signal.aborted).toBe(false);
|
||||
|
||||
// SIGTERM arrives — we abort the controller
|
||||
abortController.abort();
|
||||
|
||||
// By the time .finally() runs, wasAborted should be true
|
||||
const wasAborted = abortController.signal.aborted;
|
||||
expect(wasAborted).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT abort the controller for non-SIGTERM crash errors', () => {
|
||||
const abortController = new AbortController();
|
||||
const errorMsg = 'FOREIGN KEY constraint failed';
|
||||
|
||||
// Non-SIGTERM: do NOT abort
|
||||
const isSigterm = errorMsg.includes('code 143') || errorMsg.includes('signal SIGTERM');
|
||||
if (isSigterm) {
|
||||
abortController.abort();
|
||||
}
|
||||
|
||||
expect(abortController.signal.aborted).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 2. Wall-clock age limit
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('Wall-clock age limit (Issue #1590)', () => {
|
||||
const MAX_SESSION_WALL_CLOCK_MS = 4 * 60 * 60 * 1000; // 4 hours (matches SessionRoutes)
|
||||
|
||||
it('should NOT terminate a session started < 4 hours ago', () => {
|
||||
const startTime = Date.now() - 30 * 60 * 1000; // 30 minutes ago
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeLessThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should NOT terminate a session started exactly 4 hours ago (strict >)', () => {
|
||||
// Production uses strict `>` (not `>=`), so exactly 4h is still alive.
|
||||
const startTime = Date.now() - MAX_SESSION_WALL_CLOCK_MS;
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
// At exactly the boundary, sessionAgeMs === MAX, and `>` is false → no termination.
|
||||
expect(sessionAgeMs).toBeLessThanOrEqual(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should terminate a session started more than 4 hours ago', () => {
|
||||
const startTime = Date.now() - MAX_SESSION_WALL_CLOCK_MS - 1;
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeGreaterThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('should terminate a session started 13+ hours ago (the issue scenario)', () => {
|
||||
const startTime = Date.now() - 13 * 60 * 60 * 1000; // 13 hours ago
|
||||
const sessionAgeMs = Date.now() - startTime;
|
||||
expect(sessionAgeMs).toBeGreaterThan(MAX_SESSION_WALL_CLOCK_MS);
|
||||
});
|
||||
|
||||
it('aborting + draining pending queue should prevent respawn', () => {
|
||||
// Simulate the wall-clock termination sequence:
|
||||
// 1. Abort controller (stops active generator)
|
||||
// 2. Mark pending messages abandoned (no work to restart for)
|
||||
// 3. Remove session from map
|
||||
|
||||
const abortController = new AbortController();
|
||||
let pendingAbandoned = 0;
|
||||
let sessionRemoved = false;
|
||||
|
||||
// Simulate abort
|
||||
abortController.abort();
|
||||
expect(abortController.signal.aborted).toBe(true);
|
||||
|
||||
// Simulate markAllSessionMessagesAbandoned
|
||||
pendingAbandoned = 3; // Pretend 3 messages were abandoned
|
||||
|
||||
// Simulate removeSessionImmediate
|
||||
sessionRemoved = true;
|
||||
|
||||
expect(pendingAbandoned).toBeGreaterThanOrEqual(0);
|
||||
expect(sessionRemoved).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 3. Duplicate process prevention in createPidCapturingSpawn
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe('Duplicate process prevention (Issue #1590)', () => {
|
||||
beforeEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clearRegistry();
|
||||
});
|
||||
|
||||
it('should detect a duplicate when a live process already exists for the session', () => {
|
||||
const proc = createMockProcess();
|
||||
registerProcess(proc.pid, 42, proc as any);
|
||||
|
||||
const existing = getProcessBySession(42);
|
||||
expect(existing).toBeDefined();
|
||||
expect(existing!.process.exitCode).toBeNull(); // Still alive
|
||||
});
|
||||
|
||||
it('should NOT detect a duplicate when the existing process has already exited', () => {
|
||||
const proc = createMockProcess({ exitCode: 0 });
|
||||
registerProcess(proc.pid, 42, proc as any);
|
||||
|
||||
const existing = getProcessBySession(42);
|
||||
expect(existing).toBeDefined();
|
||||
// exitCode is set — process is already done, NOT a live duplicate
|
||||
expect(existing!.process.exitCode).not.toBeNull();
|
||||
});
|
||||
|
||||
it('should kill existing process and unregister before spawning', () => {
|
||||
const existingProc = createMockProcess();
|
||||
registerProcess(existingProc.pid, 99, existingProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
// Simulate the duplicate-kill logic:
|
||||
const duplicate = getProcessBySession(99);
|
||||
if (duplicate && duplicate.process.exitCode === null) {
|
||||
try { duplicate.process.kill('SIGTERM'); } catch { /* already dead */ }
|
||||
unregisterProcess(duplicate.pid);
|
||||
}
|
||||
|
||||
expect(getActiveCount()).toBe(0);
|
||||
expect(getProcessBySession(99)).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should leave registry empty after killing duplicate so new process can register', () => {
|
||||
const oldProc = createMockProcess();
|
||||
registerProcess(oldProc.pid, 77, oldProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
// Kill duplicate
|
||||
const dup = getProcessBySession(77);
|
||||
if (dup && dup.process.exitCode === null) {
|
||||
try { dup.process.kill('SIGTERM'); } catch { /* ignore */ }
|
||||
unregisterProcess(dup.pid);
|
||||
}
|
||||
expect(getActiveCount()).toBe(0);
|
||||
|
||||
// New process can now register cleanly
|
||||
const newProc = createMockProcess();
|
||||
registerProcess(newProc.pid, 77, newProc as any);
|
||||
expect(getActiveCount()).toBe(1);
|
||||
|
||||
const found = getProcessBySession(77);
|
||||
expect(found!.pid).toBe(newProc.pid);
|
||||
});
|
||||
|
||||
it('should not interfere when no existing process is registered', () => {
|
||||
expect(getProcessBySession(55)).toBeUndefined();
|
||||
|
||||
// Duplicate-kill logic: should be a no-op
|
||||
const dup = getProcessBySession(55);
|
||||
if (dup && dup.process.exitCode === null) {
|
||||
unregisterProcess(dup.pid);
|
||||
}
|
||||
|
||||
// Registry should still be empty — no side effects
|
||||
expect(getActiveCount()).toBe(0);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user