80a8c90a1a
* feat: add embedded Process Supervisor for unified process lifecycle management Consolidates scattered process management (ProcessManager, GracefulShutdown, HealthMonitor, ProcessRegistry) into a unified src/supervisor/ module. New: ProcessRegistry with JSON persistence, env sanitizer (strips CLAUDECODE_* vars), graceful shutdown cascade (SIGTERM → 5s wait → SIGKILL with tree-kill on Windows), PID file liveness validation, and singleton Supervisor API. Fixes #1352 (worker inherits CLAUDECODE env causing nested sessions) Fixes #1356 (zombie TCP socket after Windows reboot) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add session-scoped process reaping to supervisor Adds reapSession(sessionId) to ProcessRegistry for killing session-tagged processes on session end. SessionManager.deleteSession() now triggers reaping. Tightens orphan reaper interval from 60s to 30s. Fixes #1351 (MCP server processes leak on session end) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add Unix domain socket support for worker communication Introduces socket-manager.ts for UDS-based worker communication, eliminating port 37777 collisions between concurrent sessions. Worker listens on ~/.claude-mem/sockets/worker.sock by default with TCP fallback. All hook handlers, MCP server, health checks, and admin commands updated to use socket-aware workerHttpRequest(). Backwards compatible — settings can force TCP mode via CLAUDE_MEM_WORKER_TRANSPORT=tcp. Fixes #1346 (port 37777 collision across concurrent sessions) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: remove in-process worker fallback from hook command Removes the fallback path where hook scripts started WorkerService in-process, making the worker a grandchild of Claude Code (killed by sandbox). Hooks now always delegate to ensureWorkerStarted() which spawns a fully detached daemon. Fixes #1249 (grandchild process killed by sandbox) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat: add health checker and /api/admin/doctor endpoint Adds 30-second periodic health sweep that prunes dead processes from the supervisor registry and cleans stale socket files. Adds /api/admin/doctor endpoint exposing supervisor state, process liveness, and environment health. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: add comprehensive supervisor test suite 64 tests covering all supervisor modules: process registry (18 tests), env sanitizer (8), shutdown cascade (10), socket manager (15), health checker (5), and supervisor API (6). Includes persistence, isolation, edge cases, and cross-module integration scenarios. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: revert Unix domain socket transport, restore TCP on port 37777 The socket-manager introduced UDS as default transport, but this broke the HTTP server's TCP accessibility (viewer UI, curl, external monitoring). Since there's only ever one worker process handling all sessions, the port collision rationale for UDS doesn't apply. Reverts to TCP-only, removing ~900 lines of unnecessary complexity. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: remove dead code found in pre-landing review Remove unused `acceptingSpawns` field from Supervisor class (written but never read — assertCanSpawn uses stopPromise instead) and unused `buildWorkerUrl` import from context handler. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * updated gitignore * fix: address PR review feedback - downgrade HTTP logging, clean up gitignore, harden supervisor - Downgrade request/response HTTP logging from info to debug to reduce noise - Remove unused getWorkerPort imports, use buildWorkerUrl helper - Export ENV_PREFIXES/ENV_EXACT_MATCHES from env-sanitizer, reuse in Server.ts - Fix isPidAlive(0) returning true (should be false) - Add shutdownInitiated flag to prevent signal handler race condition - Make validateWorkerPidFile testable with pidFilePath option - Remove unused dataDir from ShutdownCascadeOptions - Upgrade reapSession log from debug to warn - Rename zombiePidFiles to deadProcessPids (returns actual PIDs) - Clean up gitignore: remove duplicate datasets/, stale ~*/ and http*/ patterns - Fix tests to use temp directories instead of relying on real PID file Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
364 lines
12 KiB
TypeScript
364 lines
12 KiB
TypeScript
/**
|
|
* Server - Express app setup and route registration
|
|
*
|
|
* Extracted from worker-service.ts monolith to provide centralized HTTP server management.
|
|
* Handles:
|
|
* - Express app creation and configuration
|
|
* - Middleware registration
|
|
* - Route registration (delegates to route handlers)
|
|
* - Core system endpoints (health, readiness, version, admin)
|
|
*/
|
|
|
|
import express, { Request, Response, Application } from 'express';
|
|
import http from 'http';
|
|
import * as fs from 'fs';
|
|
import path from 'path';
|
|
import { ALLOWED_OPERATIONS, ALLOWED_TOPICS } from './allowed-constants.js';
|
|
import { logger } from '../../utils/logger.js';
|
|
import { createMiddleware, summarizeRequestBody, requireLocalhost } from './Middleware.js';
|
|
import { errorHandler, notFoundHandler } from './ErrorHandler.js';
|
|
import { getSupervisor } from '../../supervisor/index.js';
|
|
import { isPidAlive } from '../../supervisor/process-registry.js';
|
|
import { ENV_PREFIXES, ENV_EXACT_MATCHES } from '../../supervisor/env-sanitizer.js';
|
|
|
|
// Build-time injected version constant (set by esbuild define)
|
|
declare const __DEFAULT_PACKAGE_VERSION__: string;
|
|
const BUILT_IN_VERSION = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined'
|
|
? __DEFAULT_PACKAGE_VERSION__
|
|
: 'development';
|
|
|
|
/**
|
|
* Interface for route handlers that can be registered with the server
|
|
*/
|
|
export interface RouteHandler {
|
|
setupRoutes(app: Application): void;
|
|
}
|
|
|
|
/**
|
|
* AI provider status for health endpoint
|
|
*/
|
|
export interface AiStatus {
|
|
provider: string;
|
|
authMethod: string;
|
|
lastInteraction: {
|
|
timestamp: number;
|
|
success: boolean;
|
|
error?: string;
|
|
} | null;
|
|
}
|
|
|
|
/**
|
|
* Options for initializing the server
|
|
*/
|
|
export interface ServerOptions {
|
|
/** Whether initialization is complete (for readiness check) */
|
|
getInitializationComplete: () => boolean;
|
|
/** Whether MCP is ready (for health/readiness info) */
|
|
getMcpReady: () => boolean;
|
|
/** Shutdown function for admin endpoints */
|
|
onShutdown: () => Promise<void>;
|
|
/** Restart function for admin endpoints */
|
|
onRestart: () => Promise<void>;
|
|
/** Filesystem path to the worker entry point */
|
|
workerPath: string;
|
|
/** Callback to get current AI provider status */
|
|
getAiStatus: () => AiStatus;
|
|
}
|
|
|
|
/**
|
|
* Express application and HTTP server wrapper
|
|
* Provides centralized setup for middleware and routes
|
|
*/
|
|
export class Server {
|
|
readonly app: Application;
|
|
private server: http.Server | null = null;
|
|
private readonly options: ServerOptions;
|
|
private readonly startTime: number = Date.now();
|
|
|
|
constructor(options: ServerOptions) {
|
|
this.options = options;
|
|
this.app = express();
|
|
this.setupMiddleware();
|
|
this.setupCoreRoutes();
|
|
}
|
|
|
|
/**
|
|
* Get the underlying HTTP server
|
|
*/
|
|
getHttpServer(): http.Server | null {
|
|
return this.server;
|
|
}
|
|
|
|
/**
|
|
* Start listening on the specified host and port
|
|
*/
|
|
async listen(port: number, host: string): Promise<void> {
|
|
return new Promise<void>((resolve, reject) => {
|
|
this.server = this.app.listen(port, host, () => {
|
|
logger.info('SYSTEM', 'HTTP server started', { host, port, pid: process.pid });
|
|
resolve();
|
|
});
|
|
this.server.on('error', reject);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Close the HTTP server
|
|
*/
|
|
async close(): Promise<void> {
|
|
if (!this.server) return;
|
|
|
|
// Close all active connections
|
|
this.server.closeAllConnections();
|
|
|
|
// Give Windows time to close connections before closing server
|
|
if (process.platform === 'win32') {
|
|
await new Promise(r => setTimeout(r, 500));
|
|
}
|
|
|
|
// Close the server
|
|
await new Promise<void>((resolve, reject) => {
|
|
this.server!.close(err => err ? reject(err) : resolve());
|
|
});
|
|
|
|
// Extra delay on Windows to ensure port is fully released
|
|
if (process.platform === 'win32') {
|
|
await new Promise(r => setTimeout(r, 500));
|
|
}
|
|
|
|
this.server = null;
|
|
logger.info('SYSTEM', 'HTTP server closed');
|
|
}
|
|
|
|
/**
|
|
* Register a route handler
|
|
*/
|
|
registerRoutes(handler: RouteHandler): void {
|
|
handler.setupRoutes(this.app);
|
|
}
|
|
|
|
/**
|
|
* Finalize route setup by adding error handlers
|
|
* Call this after all routes have been registered
|
|
*/
|
|
finalizeRoutes(): void {
|
|
// 404 handler for unmatched routes
|
|
this.app.use(notFoundHandler);
|
|
|
|
// Global error handler (must be last)
|
|
this.app.use(errorHandler);
|
|
}
|
|
|
|
/**
|
|
* Setup Express middleware
|
|
*/
|
|
private setupMiddleware(): void {
|
|
const middlewares = createMiddleware(summarizeRequestBody);
|
|
middlewares.forEach(mw => this.app.use(mw));
|
|
}
|
|
|
|
/**
|
|
* Setup core system routes (health, readiness, version, admin)
|
|
*/
|
|
private setupCoreRoutes(): void {
|
|
// Health check endpoint - always responds, even during initialization
|
|
this.app.get('/api/health', (_req: Request, res: Response) => {
|
|
res.status(200).json({
|
|
status: 'ok',
|
|
version: BUILT_IN_VERSION,
|
|
workerPath: this.options.workerPath,
|
|
uptime: Date.now() - this.startTime,
|
|
managed: process.env.CLAUDE_MEM_MANAGED === 'true',
|
|
hasIpc: typeof process.send === 'function',
|
|
platform: process.platform,
|
|
pid: process.pid,
|
|
initialized: this.options.getInitializationComplete(),
|
|
mcpReady: this.options.getMcpReady(),
|
|
ai: this.options.getAiStatus(),
|
|
});
|
|
});
|
|
|
|
// Readiness check endpoint - returns 503 until full initialization completes
|
|
this.app.get('/api/readiness', (_req: Request, res: Response) => {
|
|
if (this.options.getInitializationComplete()) {
|
|
res.status(200).json({
|
|
status: 'ready',
|
|
mcpReady: this.options.getMcpReady(),
|
|
});
|
|
} else {
|
|
res.status(503).json({
|
|
status: 'initializing',
|
|
message: 'Worker is still initializing, please retry',
|
|
});
|
|
}
|
|
});
|
|
|
|
// Version endpoint - returns the worker's built-in version
|
|
this.app.get('/api/version', (_req: Request, res: Response) => {
|
|
res.status(200).json({ version: BUILT_IN_VERSION });
|
|
});
|
|
|
|
// Instructions endpoint - loads SKILL.md sections on-demand
|
|
this.app.get('/api/instructions', async (req: Request, res: Response) => {
|
|
const topic = (req.query.topic as string) || 'all';
|
|
const operation = req.query.operation as string | undefined;
|
|
|
|
// Validate topic
|
|
if (topic && !ALLOWED_TOPICS.includes(topic)) {
|
|
return res.status(400).json({ error: 'Invalid topic' });
|
|
}
|
|
|
|
try {
|
|
let content: string;
|
|
|
|
if (operation) {
|
|
// Validate operation
|
|
if (!ALLOWED_OPERATIONS.includes(operation)) {
|
|
return res.status(400).json({ error: 'Invalid operation' });
|
|
}
|
|
// Path boundary check
|
|
const OPERATIONS_BASE_DIR = path.resolve(__dirname, '../skills/mem-search/operations');
|
|
const operationPath = path.resolve(OPERATIONS_BASE_DIR, `${operation}.md`);
|
|
if (!operationPath.startsWith(OPERATIONS_BASE_DIR + path.sep)) {
|
|
return res.status(400).json({ error: 'Invalid request' });
|
|
}
|
|
content = await fs.promises.readFile(operationPath, 'utf-8');
|
|
} else {
|
|
const skillPath = path.join(__dirname, '../skills/mem-search/SKILL.md');
|
|
const fullContent = await fs.promises.readFile(skillPath, 'utf-8');
|
|
content = this.extractInstructionSection(fullContent, topic);
|
|
}
|
|
|
|
res.json({
|
|
content: [{ type: 'text', text: content }]
|
|
});
|
|
} catch (error) {
|
|
res.status(404).json({ error: 'Instruction not found' });
|
|
}
|
|
});
|
|
|
|
// Admin endpoints for process management (localhost-only)
|
|
this.app.post('/api/admin/restart', requireLocalhost, async (_req: Request, res: Response) => {
|
|
res.json({ status: 'restarting' });
|
|
|
|
// Handle Windows managed mode via IPC
|
|
const isWindowsManaged = process.platform === 'win32' &&
|
|
process.env.CLAUDE_MEM_MANAGED === 'true' &&
|
|
process.send;
|
|
|
|
if (isWindowsManaged) {
|
|
logger.info('SYSTEM', 'Sending restart request to wrapper');
|
|
process.send!({ type: 'restart' });
|
|
} else {
|
|
// Unix or standalone Windows - handle restart ourselves
|
|
// The spawner (ensureWorkerStarted/restart command) handles spawning the new daemon.
|
|
// This process just needs to shut down and exit.
|
|
setTimeout(async () => {
|
|
try {
|
|
await this.options.onRestart();
|
|
} finally {
|
|
process.exit(0);
|
|
}
|
|
}, 100);
|
|
}
|
|
});
|
|
|
|
this.app.post('/api/admin/shutdown', requireLocalhost, async (_req: Request, res: Response) => {
|
|
res.json({ status: 'shutting_down' });
|
|
|
|
// Handle Windows managed mode via IPC
|
|
const isWindowsManaged = process.platform === 'win32' &&
|
|
process.env.CLAUDE_MEM_MANAGED === 'true' &&
|
|
process.send;
|
|
|
|
if (isWindowsManaged) {
|
|
logger.info('SYSTEM', 'Sending shutdown request to wrapper');
|
|
process.send!({ type: 'shutdown' });
|
|
} else {
|
|
// Unix or standalone Windows - handle shutdown ourselves
|
|
setTimeout(async () => {
|
|
try {
|
|
await this.options.onShutdown();
|
|
} finally {
|
|
// CRITICAL: Exit the process after shutdown completes (or fails).
|
|
// Without this, the daemon stays alive as a zombie — background tasks
|
|
// (backfill, reconnects) keep running and respawn chroma-mcp subprocesses.
|
|
process.exit(0);
|
|
}
|
|
}, 100);
|
|
}
|
|
});
|
|
|
|
// Doctor endpoint - diagnostic view of supervisor, processes, and health
|
|
this.app.get('/api/admin/doctor', requireLocalhost, (_req: Request, res: Response) => {
|
|
const supervisor = getSupervisor();
|
|
const registry = supervisor.getRegistry();
|
|
const allRecords = registry.getAll();
|
|
|
|
// Check each process liveness
|
|
const processes = allRecords.map(record => ({
|
|
id: record.id,
|
|
pid: record.pid,
|
|
type: record.type,
|
|
status: isPidAlive(record.pid) ? 'alive' as const : 'dead' as const,
|
|
startedAt: record.startedAt,
|
|
}));
|
|
|
|
// Check for dead processes still in registry
|
|
const deadProcessPids = processes.filter(p => p.status === 'dead').map(p => p.pid);
|
|
|
|
// Check if CLAUDECODE_* env vars are leaking into this process
|
|
const envClean = !Object.keys(process.env).some(key =>
|
|
ENV_EXACT_MATCHES.has(key) || ENV_PREFIXES.some(prefix => key.startsWith(prefix))
|
|
);
|
|
|
|
// Format uptime
|
|
const uptimeMs = Date.now() - this.startTime;
|
|
const uptimeSeconds = Math.floor(uptimeMs / 1000);
|
|
const hours = Math.floor(uptimeSeconds / 3600);
|
|
const minutes = Math.floor((uptimeSeconds % 3600) / 60);
|
|
const formattedUptime = hours > 0 ? `${hours}h ${minutes}m` : `${minutes}m`;
|
|
|
|
res.json({
|
|
supervisor: {
|
|
running: true,
|
|
pid: process.pid,
|
|
uptime: formattedUptime,
|
|
},
|
|
processes,
|
|
health: {
|
|
deadProcessPids,
|
|
envClean,
|
|
},
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Extract a specific section from instruction content
|
|
*/
|
|
private extractInstructionSection(content: string, topic: string): string {
|
|
const sections: Record<string, string> = {
|
|
'workflow': this.extractBetween(content, '## The Workflow', '## Search Parameters'),
|
|
'search_params': this.extractBetween(content, '## Search Parameters', '## Examples'),
|
|
'examples': this.extractBetween(content, '## Examples', '## Why This Workflow'),
|
|
'all': content
|
|
};
|
|
|
|
return sections[topic] || sections['all'];
|
|
}
|
|
|
|
/**
|
|
* Extract text between two markers
|
|
*/
|
|
private extractBetween(content: string, startMarker: string, endMarker: string): string {
|
|
const startIdx = content.indexOf(startMarker);
|
|
const endIdx = content.indexOf(endMarker);
|
|
|
|
if (startIdx === -1) return content;
|
|
if (endIdx === -1) return content.substring(startIdx);
|
|
|
|
return content.substring(startIdx, endIdx).trim();
|
|
}
|
|
}
|