Merge remote-tracking branch 'origin/main' into openclaw-installer

# Conflicts:
#	plugin/scripts/mcp-server.cjs
#	plugin/scripts/worker-service.cjs
This commit is contained in:
Alex Newman
2026-02-12 22:04:03 -05:00
12 changed files with 50 additions and 523 deletions
@@ -9,7 +9,6 @@
*/
import http from 'http';
import { execFileSync } from 'child_process';
import { logger } from '../../utils/logger.js';
import {
getChildProcesses,
@@ -77,21 +76,6 @@ export async function performGracefulShutdown(config: GracefulShutdownConfig): P
await config.dbManager.close();
}
// STEP 5.5: Kill any chroma-mcp children that survived transport.close() (Unix only)
// On Unix, getChildProcesses() returns [] (Windows-only), so chroma-mcp
// subprocesses spawned via StdioClientTransport may escape STEP 5 cleanup
if (process.platform !== 'win32') {
try {
execFileSync('pkill', ['-P', String(process.pid), '-f', 'chroma-mcp'], {
timeout: 3000,
stdio: 'ignore'
});
logger.info('SYSTEM', 'Killed chroma-mcp child processes');
} catch {
// pkill returns exit code 1 if no processes matched — that's fine
}
}
// STEP 6: Force kill any remaining child processes (Windows zombie port fix)
if (childPids.length > 0) {
logger.info('SYSTEM', 'Force killing remaining children');
@@ -339,77 +339,6 @@ export async function cleanupOrphanedProcesses(): Promise<void> {
logger.info('SYSTEM', 'Orphaned processes cleaned up', { count: pidsToKill.length });
}
/**
* Clean up excess chroma-mcp processes by count (not age).
*
* Unlike cleanupOrphanedProcesses() which uses ORPHAN_MAX_AGE_MINUTES = 30,
* this function kills by count — essential for catching spawn storms where
* all processes are young. Keeps the newest processes (by elapsed time)
* and kills the rest.
*
* Returns the number of processes killed.
*/
export async function cleanupExcessChromaProcesses(maxAllowed: number = 2): Promise<number> {
// Windows: Chroma is disabled entirely, no cleanup needed
if (process.platform === 'win32') return 0;
try {
const { stdout } = await execAsync(
'ps -eo pid,etime,command | grep -E "chroma-mcp" | grep -v grep || true'
);
if (!stdout.trim()) return 0;
const processes: Array<{ pid: number; ageMinutes: number }> = [];
for (const line of stdout.trim().split('\n')) {
if (!line.trim()) continue;
const match = line.trim().match(/^(\d+)\s+(\S+)\s+(.*)$/);
if (!match) continue;
const pid = parseInt(match[1], 10);
const etime = match[2];
if (!Number.isInteger(pid) || pid <= 0 || pid === process.pid) continue;
const ageMinutes = parseElapsedTime(etime);
// Skip entries with unparseable etime (-1) to avoid sort corruption
if (ageMinutes < 0) continue;
processes.push({ pid, ageMinutes });
}
if (processes.length <= maxAllowed) return 0;
// Sort: newest first (lowest age), keep maxAllowed, kill rest
processes.sort((a, b) => a.ageMinutes - b.ageMinutes);
const toKill = processes.slice(maxAllowed);
let killed = 0;
for (const { pid } of toKill) {
try {
process.kill(pid, 'SIGTERM');
killed++;
logger.info('SYSTEM', 'Killed excess chroma-mcp process', { pid });
} catch {
// Process may already be dead
}
}
if (killed > 0) {
logger.warn('SYSTEM', 'Cleaned up excess chroma-mcp processes by count', {
found: processes.length,
killed,
maxAllowed
});
}
return killed;
} catch (error) {
logger.debug('SYSTEM', 'Failed to enumerate chroma-mcp processes', {}, error as Error);
return 0;
}
}
/**
* Spawn a detached daemon process
* Returns the child PID or undefined if spawn failed
+20 -156
View File
@@ -18,16 +18,12 @@ import { USER_SETTINGS_PATH } from '../../shared/paths.js';
import path from 'path';
import os from 'os';
import fs from 'fs';
import { execSync, execFileSync } from 'child_process';
import { parseElapsedTime } from '../infrastructure/ProcessManager.js';
import { execSync } from 'child_process';
// Version injected at build time by esbuild define
declare const __DEFAULT_PACKAGE_VERSION__: string;
const packageVersion = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined' ? __DEFAULT_PACKAGE_VERSION__ : '0.0.0-dev';
// Maximum allowed chroma-mcp processes before pre-spawn guard kills excess
const MAX_CHROMA_PROCESSES = 2; // 1 active + 1 starting
interface ChromaDocument {
id: string;
document: string;
@@ -94,16 +90,6 @@ export class ChromaSync {
// MCP SDK's StdioClientTransport uses shell:false and no detached flag, so console is inherited.
private readonly disabled: boolean = false;
// Layer 0: Connection mutex — coalesces concurrent callers onto single spawn
private connectionPromise: Promise<void> | null = null;
// Layer 4: Circuit breaker — stops retry storms after repeated failures
private consecutiveFailures: number = 0;
private lastFailureTime: number = 0;
private static readonly MAX_FAILURES = 3;
private static readonly BACKOFF_BASE_MS = 2000;
private static readonly CIRCUIT_OPEN_MS = 60000; // 1 minute cooldown
constructor(project: string) {
this.project = project;
this.collectionName = `cm__${project}`;
@@ -192,114 +178,14 @@ export class ChromaSync {
}
/**
* Ensure MCP client is connected to Chroma server (mutex wrapper).
* Coalesces concurrent callers onto a single connection attempt.
* This prevents N concurrent calls from each spawning a chroma-mcp subprocess.
* Ensure MCP client is connected to Chroma server
* Throws error if connection fails
*/
private async ensureConnection(): Promise<void> {
if (this.connected && this.client) return;
// Layer 0: Coalesce concurrent callers onto a single connection attempt
if (this.connectionPromise) {
return this.connectionPromise;
if (this.connected && this.client) {
return;
}
this.connectionPromise = this._doConnect();
try {
await this.connectionPromise;
} finally {
this.connectionPromise = null;
}
}
/**
* Layer 4: Circuit breaker — refuse to spawn after repeated failures.
* After MAX_FAILURES consecutive connection failures, stops all spawn
* attempts for CIRCUIT_OPEN_MS to prevent process accumulation storms.
*/
private checkCircuitBreaker(): void {
if (this.consecutiveFailures >= ChromaSync.MAX_FAILURES) {
const elapsed = Date.now() - this.lastFailureTime;
if (elapsed < ChromaSync.CIRCUIT_OPEN_MS) {
throw new Error(
`Chroma circuit breaker open: ${this.consecutiveFailures} consecutive failures. ` +
`Retry in ${Math.ceil((ChromaSync.CIRCUIT_OPEN_MS - elapsed) / 1000)}s`
);
}
// Cooldown expired, allow retry
logger.info('CHROMA_SYNC', 'Circuit breaker cooldown expired, allowing retry', {
consecutiveFailures: this.consecutiveFailures,
cooldownMs: ChromaSync.CIRCUIT_OPEN_MS
});
}
}
/**
* Layer 1: Pre-spawn process count guard.
* Kills excess chroma-mcp processes before spawning a new one.
* Uses execFileSync (no shell) to list processes, filters in JavaScript.
*/
private killExcessChromaProcesses(): void {
if (process.platform === 'win32') return; // Windows has Chroma disabled entirely
try {
// Use execFileSync to avoid shell injection — filter and sort in JavaScript
// Include etime column for reliable age-based sorting (PID order is unreliable)
const output = execFileSync('ps', ['-eo', 'pid,etime,command'], {
encoding: 'utf8',
timeout: 5000,
stdio: ['pipe', 'pipe', 'pipe']
});
// Filter for chroma-mcp, parse elapsed time, sort by actual age
const processes = output.split('\n')
.filter(l => l.includes('chroma-mcp'))
.map(l => {
const parts = l.trim().split(/\s+/);
const pid = parseInt(parts[0], 10);
const etime = parts[1] || '';
const ageMinutes = parseElapsedTime(etime);
return { pid, ageMinutes };
})
.filter(p => p.pid > 0 && p.pid !== process.pid && p.ageMinutes >= 0)
.sort((a, b) => a.ageMinutes - b.ageMinutes); // Ascending: newest (lowest age) first
if (processes.length < MAX_CHROMA_PROCESSES) return;
// Keep newest MAX_CHROMA_PROCESSES - 1 (making room for the one we're about to spawn)
const toKill = processes.slice(MAX_CHROMA_PROCESSES - 1);
for (const { pid } of toKill) {
try {
process.kill(pid, 'SIGTERM');
} catch {
// Process may already be dead
}
}
if (toKill.length > 0) {
logger.warn('CHROMA_SYNC', 'Killed excess chroma-mcp processes before spawning', {
found: processes.length,
killed: toKill.length,
maxAllowed: MAX_CHROMA_PROCESSES
});
}
} catch {
// ps may fail — don't block connection
}
}
/**
* Internal connection logic — called only via ensureConnection() mutex.
* Implements circuit breaker (Layer 4), pre-spawn guard (Layer 1),
* and actual connection setup.
*/
private async _doConnect(): Promise<void> {
// Layer 4: Circuit breaker check — refuse if too many recent failures
this.checkCircuitBreaker();
// Layer 1: Kill excess processes before spawning a new one
this.killExcessChromaProcesses();
logger.info('CHROMA_SYNC', 'Connecting to Chroma MCP server...', { project: this.project });
try {
@@ -352,20 +238,9 @@ export class ChromaSync {
await this.client.connect(this.transport);
this.connected = true;
// Layer 4: Reset circuit breaker on success
this.consecutiveFailures = 0;
logger.info('CHROMA_SYNC', 'Connected to Chroma MCP server', { project: this.project });
} catch (error) {
// Layer 4: Track failure for circuit breaker
this.consecutiveFailures++;
this.lastFailureTime = Date.now();
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', {
project: this.project,
consecutiveFailures: this.consecutiveFailures,
circuitBreakerThreshold: ChromaSync.MAX_FAILURES
}, error as Error);
logger.error('CHROMA_SYNC', 'Failed to connect to Chroma MCP server', { project: this.project }, error as Error);
throw new Error(`Chroma connection failed: ${error instanceof Error ? error.message : String(error)}`);
}
}
@@ -416,7 +291,6 @@ export class ChromaSync {
this.connected = false;
this.client = null;
this.transport = null;
this.connectionPromise = null;
logger.error('CHROMA_SYNC', 'Connection lost during collection check',
{ collection: this.collectionName }, error as Error);
throw new Error(`Chroma connection lost: ${errorMessage}`);
@@ -1086,7 +960,6 @@ export class ChromaSync {
this.connected = false;
this.client = null;
this.transport = null;
this.connectionPromise = null;
logger.error('CHROMA_SYNC', 'Connection lost during query',
{ project: this.project, query }, error as Error);
throw new Error(`Chroma query failed - connection lost: ${errorMessage}`);
@@ -1144,37 +1017,28 @@ export class ChromaSync {
}
/**
* Close the Chroma client connection and cleanup subprocess.
* Uses try-finally to guarantee state reset even if close() throws.
* Individual close calls use .catch() to prevent one failure from
* blocking the other (e.g., client.close() failing shouldn't prevent
* transport.close() from killing the subprocess).
* Close the Chroma client connection and cleanup subprocess
*/
async close(): Promise<void> {
if (!this.connected && !this.client && !this.transport) {
return;
}
try {
// Close client first, then transport — catch individual errors
if (this.client) {
await this.client.close().catch((err: Error) => {
logger.debug('CHROMA_SYNC', 'Client close error (may already be disconnected)', {}, err);
});
}
if (this.transport) {
await this.transport.close().catch((err: Error) => {
logger.debug('CHROMA_SYNC', 'Transport close error (may already be dead)', {}, err);
});
}
} finally {
// Always reset state, even if close throws
this.connected = false;
this.client = null;
this.transport = null;
this.connectionPromise = null;
// Close client first
if (this.client) {
await this.client.close();
}
// Explicitly close transport to kill subprocess
if (this.transport) {
await this.transport.close();
}
logger.info('CHROMA_SYNC', 'Chroma client and subprocess closed', { project: this.project });
// Always reset state
this.connected = false;
this.client = null;
this.transport = null;
}
}
-2
View File
@@ -68,7 +68,6 @@ import {
removePidFile,
getPlatformTimeout,
cleanupOrphanedProcesses,
cleanupExcessChromaProcesses,
cleanStalePidFile,
spawnDaemon,
createSignalHandler
@@ -361,7 +360,6 @@ export class WorkerService {
private async initializeBackground(): Promise<void> {
try {
await cleanupOrphanedProcesses();
await cleanupExcessChromaProcesses();
// Load mode configuration
const { ModeManager } = await import('./domain/ModeManager.js');
+1 -5
View File
@@ -19,7 +19,6 @@
import { spawn, exec, ChildProcess } from 'child_process';
import { promisify } from 'util';
import { logger } from '../../utils/logger.js';
import { cleanupExcessChromaProcesses } from '../infrastructure/ProcessManager.js';
const execAsync = promisify(exec);
@@ -213,7 +212,7 @@ async function killSystemOrphans(): Promise<number> {
try {
const { stdout } = await execAsync(
'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format|chroma-mcp" | grep -v grep'
'ps -eo pid,ppid,args 2>/dev/null | grep -E "claude.*haiku|claude.*output-format" | grep -v grep'
);
let killed = 0;
@@ -263,9 +262,6 @@ export async function reapOrphanedProcesses(activeSessionIds: Set<number>): Prom
// Daemon children: find idle SDK processes that didn't terminate
killed += await killIdleDaemonChildren();
// Count-based: kill excess chroma-mcp processes regardless of age
killed += await cleanupExcessChromaProcesses();
return killed;
}