fix: remove agent pool timeout data loss

This commit is contained in:
Alex Newman
2026-05-04 19:43:45 -07:00
parent 39f1102600
commit 8bef7c6a34
4 changed files with 511 additions and 445 deletions
File diff suppressed because one or more lines are too long
+1 -1
View File
@@ -150,7 +150,7 @@ export class ClaudeProvider {
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const maxConcurrent = parseInt(settings.CLAUDE_MEM_MAX_CONCURRENT_AGENTS, 10) || 2;
await waitForSlot(maxConcurrent, 60_000);
await waitForSlot(maxConcurrent);
const isolatedEnv = sanitizeEnv(await buildIsolatedEnvWithFreshOAuth());
const authMethod = getAuthMethodDescription();
@@ -147,16 +147,16 @@ export class SessionRoutes extends BaseRouteHandler {
const pendingStore = this.sessionManager.getPendingMessageStore();
try {
const cleared = pendingStore.clearPendingForSession(session.sessionDbId);
if (cleared > 0) {
logger.error('SESSION', `Cleared pending messages after generator error`, {
const reset = pendingStore.resetProcessingToPending(session.sessionDbId);
if (reset > 0) {
logger.warn('SESSION', `Reset processing messages after generator error`, {
sessionId: session.sessionDbId,
cleared
reset
});
}
} catch (dbError) {
const normalizedDbError = dbError instanceof Error ? dbError : new Error(String(dbError));
logger.error('HTTP', 'Failed to clear pending messages', {
logger.error('HTTP', 'Failed to reset processing messages after generator error', {
sessionId: session.sessionDbId
}, normalizedDbError);
}
+29 -9
View File
@@ -174,9 +174,11 @@ export class ProcessRegistry {
unregister(id: string): void {
this.initialize();
const existing = this.entries.get(id);
this.entries.delete(id);
this.runtimeProcesses.delete(id);
this.persist();
if (existing?.type === 'sdk') notifySlotAvailable();
}
clear(): void {
@@ -213,16 +215,19 @@ export class ProcessRegistry {
this.initialize();
let removed = 0;
let removedSdk = 0;
for (const [id, info] of this.entries) {
if (isPidAlive(info.pid)) continue;
this.entries.delete(id);
this.runtimeProcesses.delete(id);
removed += 1;
if (info.type === 'sdk') removedSdk += 1;
}
if (removed > 0) {
this.persist();
}
for (let i = 0; i < removedSdk; i += 1) notifySlotAvailable();
return removed;
}
@@ -321,6 +326,9 @@ export class ProcessRegistry {
this.runtimeProcesses.delete(record.id);
}
this.persist();
for (const record of sessionRecords) {
if (record.type === 'sdk') notifySlotAvailable();
}
logger.info('SYSTEM', `Reaped ${sessionRecords.length} process(es) for session ${sessionId}`, {
sessionId: sessionIdNum,
@@ -428,6 +436,7 @@ export async function ensureSdkProcessExit(
}
const TOTAL_PROCESS_HARD_CAP = 10;
const SLOT_RECHECK_INTERVAL_MS = 5_000;
const slotWaiters: Array<() => void> = [];
function getActiveSdkCount(): number {
@@ -439,7 +448,8 @@ function notifySlotAvailable(): void {
if (waiter) waiter();
}
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
export async function waitForSlot(maxConcurrent: number): Promise<void> {
getProcessRegistry().pruneDeadEntries();
const activeCount = getActiveSdkCount();
if (activeCount >= TOTAL_PROCESS_HARD_CAP) {
throw new Error(`Hard cap exceeded: ${activeCount} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`);
@@ -450,15 +460,17 @@ export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_
logger.info('PROCESS', `Pool limit reached (${activeCount}/${maxConcurrent}), waiting for slot...`);
return new Promise<void>((resolve, reject) => {
const timeout = setTimeout(() => {
const idx = slotWaiters.indexOf(onSlot);
if (idx >= 0) slotWaiters.splice(idx, 1);
reject(new Error(`Timed out waiting for agent pool slot after ${timeoutMs}ms`));
}, timeoutMs);
let recheckTimer: ReturnType<typeof setInterval> | null = null;
const onSlot = () => {
clearTimeout(timeout);
if (getActiveSdkCount() < maxConcurrent) {
const count = getActiveSdkCount();
if (count >= TOTAL_PROCESS_HARD_CAP) {
if (recheckTimer) clearInterval(recheckTimer);
reject(new Error(`Hard cap exceeded: ${count} processes in registry (cap=${TOTAL_PROCESS_HARD_CAP}). Refusing to spawn more.`));
return;
}
if (count < maxConcurrent) {
if (recheckTimer) clearInterval(recheckTimer);
resolve();
} else {
slotWaiters.push(onSlot);
@@ -466,6 +478,14 @@ export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_
};
slotWaiters.push(onSlot);
recheckTimer = setInterval(() => {
const removed = getProcessRegistry().pruneDeadEntries();
if (removed > 0) {
logger.info('PROCESS', 'Pruned stale process registry entries while waiting for agent slot', { removed });
}
notifySlotAvailable();
}, SLOT_RECHECK_INTERVAL_MS);
recheckTimer.unref?.();
});
}