fix: prevent chroma-mcp spawn storm with 5-layer defense (641 processes → max 2)
During SIGHUP testing with 6+ active sessions, ChromaSync.ensureConnection()
had no mutex — concurrent fire-and-forget syncObservation() calls each spawned
a chroma-mcp subprocess via StdioClientTransport, creating 641 orphans in ~5min.
Error-driven reconnection formed a positive feedback loop amplifying the storm.
Defense layers:
- Layer 0: Connection mutex via promise memoization (prevents concurrent spawns)
- Layer 1: Pre-spawn process count guard using execFileSync('ps') (kills excess)
- Layer 2: Hardened close() with try-finally + Unix pkill in GracefulShutdown
- Layer 3: Count-based orphan reaper in ProcessManager (not age-based)
- Layer 4: Circuit breaker stops retries after 3 consecutive failures for 60s
Closes #1063, closes #695
Relates to #1010, #707
This commit is contained in:
+148
-20
@@ -18,12 +18,15 @@ import { USER_SETTINGS_PATH } from '../../shared/paths.js';
|
||||
import path from 'path';
|
||||
import os from 'os';
|
||||
import fs from 'fs';
|
||||
import { execSync } from 'child_process';
|
||||
import { execSync, execFileSync } from 'child_process';
|
||||
|
||||
// Version injected at build time by esbuild define
|
||||
declare const __DEFAULT_PACKAGE_VERSION__: string;
|
||||
const packageVersion = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined' ? __DEFAULT_PACKAGE_VERSION__ : '0.0.0-dev';
|
||||
|
||||
// Maximum allowed chroma-mcp processes before pre-spawn guard kills excess
|
||||
const MAX_CHROMA_PROCESSES = 2; // 1 active + 1 starting
|
||||
|
||||
interface ChromaDocument {
|
||||
id: string;
|
||||
document: string;
|
||||
@@ -90,6 +93,16 @@ export class ChromaSync {
|
||||
// MCP SDK's StdioClientTransport uses shell:false and no detached flag, so console is inherited.
|
||||
private readonly disabled: boolean = false;
|
||||
|
||||
// Layer 0: Connection mutex — coalesces concurrent callers onto single spawn
|
||||
private connectionPromise: Promise<void> | null = null;
|
||||
|
||||
// Layer 4: Circuit breaker — stops retry storms after repeated failures
|
||||
private consecutiveFailures: number = 0;
|
||||
private lastFailureTime: number = 0;
|
||||
private static readonly MAX_FAILURES = 3;
|
||||
private static readonly BACKOFF_BASE_MS = 2000;
|
||||
private static readonly CIRCUIT_OPEN_MS = 60000; // 1 minute cooldown
|
||||
|
||||
constructor(project: string) {
|
||||
this.project = project;
|
||||
this.collectionName = `cm__${project}`;
|
||||
@@ -178,14 +191,107 @@ export class ChromaSync {
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure MCP client is connected to Chroma server
|
||||
* Throws error if connection fails
|
||||
* Ensure MCP client is connected to Chroma server (mutex wrapper).
|
||||
* Coalesces concurrent callers onto a single connection attempt.
|
||||
* This prevents N concurrent calls from each spawning a chroma-mcp subprocess.
|
||||
*/
|
||||
private async ensureConnection(): Promise<void> {
|
||||
if (this.connected && this.client) {
|
||||
return;
|
||||
if (this.connected && this.client) return;
|
||||
|
||||
// Layer 0: Coalesce concurrent callers onto a single connection attempt
|
||||
if (this.connectionPromise) {
|
||||
return this.connectionPromise;
|
||||
}
|
||||
|
||||
this.connectionPromise = this._doConnect();
|
||||
try {
|
||||
await this.connectionPromise;
|
||||
} finally {
|
||||
this.connectionPromise = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Layer 4: Circuit breaker — refuse to spawn after repeated failures.
|
||||
* After MAX_FAILURES consecutive connection failures, stops all spawn
|
||||
* attempts for CIRCUIT_OPEN_MS to prevent process accumulation storms.
|
||||
*/
|
||||
private checkCircuitBreaker(): void {
|
||||
if (this.consecutiveFailures >= ChromaSync.MAX_FAILURES) {
|
||||
const elapsed = Date.now() - this.lastFailureTime;
|
||||
if (elapsed < ChromaSync.CIRCUIT_OPEN_MS) {
|
||||
throw new Error(
|
||||
`Chroma circuit breaker open: ${this.consecutiveFailures} consecutive failures. ` +
|
||||
`Retry in ${Math.ceil((ChromaSync.CIRCUIT_OPEN_MS - elapsed) / 1000)}s`
|
||||
);
|
||||
}
|
||||
// Cooldown expired, allow retry
|
||||
logger.info('CHROMA_SYNC', 'Circuit breaker cooldown expired, allowing retry', {
|
||||
consecutiveFailures: this.consecutiveFailures,
|
||||
cooldownMs: ChromaSync.CIRCUIT_OPEN_MS
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Layer 1: Pre-spawn process count guard.
|
||||
* Kills excess chroma-mcp processes before spawning a new one.
|
||||
* Uses execFileSync (no shell) to list processes, filters in JavaScript.
|
||||
*/
|
||||
private killExcessChromaProcesses(): void {
|
||||
if (process.platform === 'win32') return; // Windows has Chroma disabled entirely
|
||||
|
||||
try {
|
||||
// Use execFileSync to avoid shell injection — filter in JavaScript
|
||||
const output = execFileSync('ps', ['-eo', 'pid,command'], {
|
||||
encoding: 'utf8',
|
||||
timeout: 5000,
|
||||
stdio: ['pipe', 'pipe', 'pipe']
|
||||
});
|
||||
|
||||
// Filter for chroma-mcp processes in JavaScript (no shell grep needed)
|
||||
const pids = output.split('\n')
|
||||
.filter(l => l.includes('chroma-mcp'))
|
||||
.map(l => parseInt(l.trim().split(/\s+/)[0], 10))
|
||||
.filter(pid => pid > 0 && pid !== process.pid)
|
||||
.sort((a, b) => b - a); // Descending: newest (highest PID) first
|
||||
|
||||
if (pids.length < MAX_CHROMA_PROCESSES) return;
|
||||
|
||||
// Keep newest MAX_CHROMA_PROCESSES - 1 (making room for the one we're about to spawn)
|
||||
const toKill = pids.slice(MAX_CHROMA_PROCESSES - 1);
|
||||
for (const pid of toKill) {
|
||||
try {
|
||||
process.kill(pid, 'SIGTERM');
|
||||
} catch {
|
||||
// Process may already be dead
|
||||
}
|
||||
}
|
||||
|
||||
if (toKill.length > 0) {
|
||||
logger.warn('CHROMA_SYNC', 'Killed excess chroma-mcp processes before spawning', {
|
||||
found: pids.length,
|
||||
killed: toKill.length,
|
||||
maxAllowed: MAX_CHROMA_PROCESSES
|
||||
});
|
||||
}
|
||||
} catch {
|
||||
// ps may fail — don't block connection
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal connection logic — called only via ensureConnection() mutex.
|
||||
* Implements circuit breaker (Layer 4), pre-spawn guard (Layer 1),
|
||||
* and actual connection setup.
|
||||
*/
|
||||
private async _doConnect(): Promise<void> {
|
||||
// Layer 4: Circuit breaker check — refuse if too many recent failures
|
||||
this.checkCircuitBreaker();
|
||||
|
||||
// Layer 1: Kill excess processes before spawning a new one
|
||||
this.killExcessChromaProcesses();
|
||||
|
||||
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });
|
||||
|
||||
try {
|
||||
@@ -238,9 +344,20 @@ export class ChromaSync {
|
||||
await this.client.connect(this.transport);
|
||||
this.connected = true;
|
||||
|
||||
// Layer 4: Reset circuit breaker on success
|
||||
this.consecutiveFailures = 0;
|
||||
|
||||
logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
|
||||
} catch (error) {
|
||||
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
|
||||
// Layer 4: Track failure for circuit breaker
|
||||
this.consecutiveFailures++;
|
||||
this.lastFailureTime = Date.now();
|
||||
|
||||
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', {
|
||||
project: this.project,
|
||||
consecutiveFailures: this.consecutiveFailures,
|
||||
circuitBreakerThreshold: ChromaSync.MAX_FAILURES
|
||||
}, error as Error);
|
||||
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
}
|
||||
}
|
||||
@@ -291,6 +408,7 @@ export class ChromaSync {
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connectionPromise = null;
|
||||
logger.error('CHROMA_SYNC', 'Connection lost during collection check',
|
||||
{ collection: this.collectionName }, error as Error);
|
||||
throw new Error(`Chroma connection lost: ${errorMessage}`);
|
||||
@@ -960,6 +1078,7 @@ export class ChromaSync {
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connectionPromise = null;
|
||||
logger.error('CHROMA_SYNC', 'Connection lost during query',
|
||||
{ project: this.project, query }, error as Error);
|
||||
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
|
||||
@@ -1017,28 +1136,37 @@ export class ChromaSync {
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the Chroma client connection and cleanup subprocess
|
||||
* Close the Chroma client connection and cleanup subprocess.
|
||||
* Uses try-finally to guarantee state reset even if close() throws.
|
||||
* Individual close calls use .catch() to prevent one failure from
|
||||
* blocking the other (e.g., client.close() failing shouldn't prevent
|
||||
* transport.close() from killing the subprocess).
|
||||
*/
|
||||
async close(): Promise<void> {
|
||||
if (!this.connected && !this.client && !this.transport) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Close client first
|
||||
if (this.client) {
|
||||
await this.client.close();
|
||||
}
|
||||
|
||||
// Explicitly close transport to kill subprocess
|
||||
if (this.transport) {
|
||||
await this.transport.close();
|
||||
try {
|
||||
// Close client first, then transport — catch individual errors
|
||||
if (this.client) {
|
||||
await this.client.close().catch((err: Error) => {
|
||||
logger.debug('CHROMA_SYNC', 'Client close error (may already be disconnected)', {}, err);
|
||||
});
|
||||
}
|
||||
if (this.transport) {
|
||||
await this.transport.close().catch((err: Error) => {
|
||||
logger.debug('CHROMA_SYNC', 'Transport close error (may already be dead)', {}, err);
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
// Always reset state, even if close throws
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
this.connectionPromise = null;
|
||||
}
|
||||
|
||||
logger.info('CHROMA_SYNC', 'Chroma client and subprocess closed', { project: this.project });
|
||||
|
||||
// Always reset state
|
||||
this.connected = false;
|
||||
this.client = null;
|
||||
this.transport = null;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user