feat: configurable subprocess pool limit for SDK agents (#995)
* feat: configurable subprocess pool limit for SDK agents Prevents runaway accumulation of Claude SDK agent subprocesses by enforcing a configurable concurrency limit. - New CLAUDE_MEM_MAX_CONCURRENT_AGENTS setting (default: 2) - Promise-based waitForSlot() in ProcessRegistry (not polling per review feedback on #830) - Waiters are notified via unregisterProcess when a slot frees up - SDKAgent.startSession() waits for a slot before spawning - 60s timeout prevents indefinite waits Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering> * fix: remove unused originalUnregister const and getActiveCount import Cleanup from Greptile review: - Remove dead `originalUnregister` variable in ProcessRegistry - Remove unused `getActiveCount` import in SDKAgent Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Happy <yesreply@happy.engineering>
This commit is contained in:
@@ -41,11 +41,13 @@ export function registerProcess(pid: number, sessionDbId: number, process: Child
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unregister a process from the registry
|
* Unregister a process from the registry and notify pool waiters
|
||||||
*/
|
*/
|
||||||
export function unregisterProcess(pid: number): void {
|
export function unregisterProcess(pid: number): void {
|
||||||
processRegistry.delete(pid);
|
processRegistry.delete(pid);
|
||||||
logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid });
|
logger.debug('PROCESS', `Unregistered PID ${pid}`, { pid });
|
||||||
|
// Notify waiters that a pool slot may be available
|
||||||
|
notifySlotAvailable();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -66,6 +68,55 @@ export function getProcessBySession(sessionDbId: number): TrackedProcess | undef
|
|||||||
return matches[0];
|
return matches[0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get count of active processes in the registry
|
||||||
|
*/
|
||||||
|
export function getActiveCount(): number {
|
||||||
|
return processRegistry.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Waiters for pool slots - resolved when a process exits and frees a slot
|
||||||
|
const slotWaiters: Array<() => void> = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify waiters that a slot has freed up
|
||||||
|
*/
|
||||||
|
function notifySlotAvailable(): void {
|
||||||
|
const waiter = slotWaiters.shift();
|
||||||
|
if (waiter) waiter();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait for a pool slot to become available (promise-based, not polling)
|
||||||
|
* @param maxConcurrent Max number of concurrent agents
|
||||||
|
* @param timeoutMs Max time to wait before giving up
|
||||||
|
*/
|
||||||
|
export async function waitForSlot(maxConcurrent: number, timeoutMs: number = 60_000): Promise<void> {
|
||||||
|
if (processRegistry.size < maxConcurrent) return;
|
||||||
|
|
||||||
|
logger.info('PROCESS', `Pool limit reached (${processRegistry.size}/${maxConcurrent}), waiting for slot...`);
|
||||||
|
|
||||||
|
return new Promise<void>((resolve, reject) => {
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
const idx = slotWaiters.indexOf(onSlot);
|
||||||
|
if (idx >= 0) slotWaiters.splice(idx, 1);
|
||||||
|
reject(new Error(`Timed out waiting for agent pool slot after ${timeoutMs}ms`));
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
const onSlot = () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
if (processRegistry.size < maxConcurrent) {
|
||||||
|
resolve();
|
||||||
|
} else {
|
||||||
|
// Still full, re-queue
|
||||||
|
slotWaiters.push(onSlot);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
slotWaiters.push(onSlot);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all active PIDs (for debugging)
|
* Get all active PIDs (for debugging)
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ import { buildIsolatedEnv, getAuthMethodDescription } from '../../shared/EnvMana
|
|||||||
import type { ActiveSession, SDKUserMessage } from '../worker-types.js';
|
import type { ActiveSession, SDKUserMessage } from '../worker-types.js';
|
||||||
import { ModeManager } from '../domain/ModeManager.js';
|
import { ModeManager } from '../domain/ModeManager.js';
|
||||||
import { processAgentResponse, type WorkerRef } from './agents/index.js';
|
import { processAgentResponse, type WorkerRef } from './agents/index.js';
|
||||||
import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit } from './ProcessRegistry.js';
|
import { createPidCapturingSpawn, getProcessBySession, ensureProcessExit, waitForSlot } from './ProcessRegistry.js';
|
||||||
|
|
||||||
// Import Agent SDK (assumes it's installed)
|
// Import Agent SDK (assumes it's installed)
|
||||||
// @ts-ignore - Agent SDK types may not be available
|
// @ts-ignore - Agent SDK types may not be available
|
||||||
@@ -88,6 +88,11 @@ export class SDKAgent {
|
|||||||
session.forceInit = false;
|
session.forceInit = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wait for agent pool slot (configurable via CLAUDE_MEM_MAX_CONCURRENT_AGENTS)
|
||||||
|
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
|
||||||
|
const maxConcurrent = parseInt(settings.CLAUDE_MEM_MAX_CONCURRENT_AGENTS, 10) || 2;
|
||||||
|
await waitForSlot(maxConcurrent);
|
||||||
|
|
||||||
// Build isolated environment from ~/.claude-mem/.env
|
// Build isolated environment from ~/.claude-mem/.env
|
||||||
// This prevents Issue #733: random ANTHROPIC_API_KEY from project .env files
|
// This prevents Issue #733: random ANTHROPIC_API_KEY from project .env files
|
||||||
// being used instead of the configured auth method (CLI subscription or explicit API key)
|
// being used instead of the configured auth method (CLI subscription or explicit API key)
|
||||||
|
|||||||
@@ -52,6 +52,8 @@ export interface SettingsDefaults {
|
|||||||
CLAUDE_MEM_CONTEXT_SHOW_LAST_SUMMARY: string;
|
CLAUDE_MEM_CONTEXT_SHOW_LAST_SUMMARY: string;
|
||||||
CLAUDE_MEM_CONTEXT_SHOW_LAST_MESSAGE: string;
|
CLAUDE_MEM_CONTEXT_SHOW_LAST_MESSAGE: string;
|
||||||
CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED: string;
|
CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED: string;
|
||||||
|
// Process Management
|
||||||
|
CLAUDE_MEM_MAX_CONCURRENT_AGENTS: string; // Max concurrent Claude SDK agent subprocesses (default: 2)
|
||||||
// Exclusion Settings
|
// Exclusion Settings
|
||||||
CLAUDE_MEM_EXCLUDED_PROJECTS: string; // Comma-separated glob patterns for excluded project paths
|
CLAUDE_MEM_EXCLUDED_PROJECTS: string; // Comma-separated glob patterns for excluded project paths
|
||||||
CLAUDE_MEM_FOLDER_MD_EXCLUDE: string; // JSON array of folder paths to exclude from CLAUDE.md generation
|
CLAUDE_MEM_FOLDER_MD_EXCLUDE: string; // JSON array of folder paths to exclude from CLAUDE.md generation
|
||||||
@@ -110,6 +112,8 @@ export class SettingsDefaultsManager {
|
|||||||
CLAUDE_MEM_CONTEXT_SHOW_LAST_SUMMARY: 'true',
|
CLAUDE_MEM_CONTEXT_SHOW_LAST_SUMMARY: 'true',
|
||||||
CLAUDE_MEM_CONTEXT_SHOW_LAST_MESSAGE: 'false',
|
CLAUDE_MEM_CONTEXT_SHOW_LAST_MESSAGE: 'false',
|
||||||
CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED: 'false',
|
CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED: 'false',
|
||||||
|
// Process Management
|
||||||
|
CLAUDE_MEM_MAX_CONCURRENT_AGENTS: '2', // Max concurrent Claude SDK agent subprocesses
|
||||||
// Exclusion Settings
|
// Exclusion Settings
|
||||||
CLAUDE_MEM_EXCLUDED_PROJECTS: '', // Comma-separated glob patterns for excluded project paths
|
CLAUDE_MEM_EXCLUDED_PROJECTS: '', // Comma-separated glob patterns for excluded project paths
|
||||||
CLAUDE_MEM_FOLDER_MD_EXCLUDE: '[]', // JSON array of folder paths to exclude from CLAUDE.md generation
|
CLAUDE_MEM_FOLDER_MD_EXCLUDE: '[]', // JSON array of folder paths to exclude from CLAUDE.md generation
|
||||||
|
|||||||
Reference in New Issue
Block a user