Files
claude-mem/src/services/server/Server.ts
T
Alex Newman e788fd3676 fix: prevent duplicate worker daemons and zombie processes (#1178)
* fix: prevent duplicate worker daemons and zombie processes

Three root causes of chroma-mcp timeouts:

1. HTTP shutdown (POST /api/admin/shutdown) closed resources but never
   called process.exit(). Zombie workers stayed alive, background tasks
   reconnected to chroma-mcp, spawning duplicate subprocesses that all
   contended for the same persistent data directory.

2. No guard against concurrent daemon startup. When hooks fired
   simultaneously, multiple daemons started before either wrote a PID
   file. The loser got EADDRINUSE but stayed alive because signal
   handlers registered in the constructor prevented exit.

3. Corrupt 147GB HNSW index file caused all chroma queries to timeout
   (MCP error -32001). Data fix: deleted corrupt collection, backfill
   rebuilds from SQLite.

Code fixes:
- Add PID-based guard in daemon startup: exit if PID file process alive
- Add port-based guard in daemon startup: exit if port already bound
  (runs before WorkerService constructor registers keepalive handlers)
- Add process.exit(0) after HTTP shutdown/restart completes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: aggressive startup cleanup and one-time chroma wipe for upgrade

Kill orphaned worker-service.cjs and chroma-mcp processes immediately
at startup (no age gate) while keeping 30-min threshold for mcp-server.
Wipe corrupt chroma data once on upgrade from pre-v10.3 versions —
backfill rebuilds from SQLite automatically.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: wrap shutdown handlers in try/finally to guarantee process.exit

If onShutdown() or onRestart() threw, process.exit(0) was never reached,
leaving the daemon alive as a zombie. Also removed redundant require('fs')
calls in process-manager tests where ESM imports already existed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 20:10:28 -05:00

317 lines
10 KiB
TypeScript

/**
* Server - Express app setup and route registration
*
* Extracted from worker-service.ts monolith to provide centralized HTTP server management.
* Handles:
* - Express app creation and configuration
* - Middleware registration
* - Route registration (delegates to route handlers)
* - Core system endpoints (health, readiness, version, admin)
*/
import express, { Request, Response, Application } from 'express';
import http from 'http';
import * as fs from 'fs';
import path from 'path';
import { ALLOWED_OPERATIONS, ALLOWED_TOPICS } from './allowed-constants.js';
import { logger } from '../../utils/logger.js';
import { createMiddleware, summarizeRequestBody, requireLocalhost } from './Middleware.js';
import { errorHandler, notFoundHandler } from './ErrorHandler.js';
// Build-time injected version constant (set by esbuild define)
declare const __DEFAULT_PACKAGE_VERSION__: string;
const BUILT_IN_VERSION = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined'
? __DEFAULT_PACKAGE_VERSION__
: 'development';
/**
* Interface for route handlers that can be registered with the server
*/
export interface RouteHandler {
setupRoutes(app: Application): void;
}
/**
* AI provider status for health endpoint
*/
export interface AiStatus {
provider: string;
authMethod: string;
lastInteraction: {
timestamp: number;
success: boolean;
error?: string;
} | null;
}
/**
* Options for initializing the server
*/
export interface ServerOptions {
/** Whether initialization is complete (for readiness check) */
getInitializationComplete: () => boolean;
/** Whether MCP is ready (for health/readiness info) */
getMcpReady: () => boolean;
/** Shutdown function for admin endpoints */
onShutdown: () => Promise<void>;
/** Restart function for admin endpoints */
onRestart: () => Promise<void>;
/** Filesystem path to the worker entry point */
workerPath: string;
/** Callback to get current AI provider status */
getAiStatus: () => AiStatus;
}
/**
* Express application and HTTP server wrapper
* Provides centralized setup for middleware and routes
*/
export class Server {
readonly app: Application;
private server: http.Server | null = null;
private readonly options: ServerOptions;
private readonly startTime: number = Date.now();
constructor(options: ServerOptions) {
this.options = options;
this.app = express();
this.setupMiddleware();
this.setupCoreRoutes();
}
/**
* Get the underlying HTTP server
*/
getHttpServer(): http.Server | null {
return this.server;
}
/**
* Start listening on the specified host and port
*/
async listen(port: number, host: string): Promise<void> {
return new Promise<void>((resolve, reject) => {
this.server = this.app.listen(port, host, () => {
logger.info('SYSTEM', 'HTTP server started', { host, port, pid: process.pid });
resolve();
});
this.server.on('error', reject);
});
}
/**
* Close the HTTP server
*/
async close(): Promise<void> {
if (!this.server) return;
// Close all active connections
this.server.closeAllConnections();
// Give Windows time to close connections before closing server
if (process.platform === 'win32') {
await new Promise(r => setTimeout(r, 500));
}
// Close the server
await new Promise<void>((resolve, reject) => {
this.server!.close(err => err ? reject(err) : resolve());
});
// Extra delay on Windows to ensure port is fully released
if (process.platform === 'win32') {
await new Promise(r => setTimeout(r, 500));
}
this.server = null;
logger.info('SYSTEM', 'HTTP server closed');
}
/**
* Register a route handler
*/
registerRoutes(handler: RouteHandler): void {
handler.setupRoutes(this.app);
}
/**
* Finalize route setup by adding error handlers
* Call this after all routes have been registered
*/
finalizeRoutes(): void {
// 404 handler for unmatched routes
this.app.use(notFoundHandler);
// Global error handler (must be last)
this.app.use(errorHandler);
}
/**
* Setup Express middleware
*/
private setupMiddleware(): void {
const middlewares = createMiddleware(summarizeRequestBody);
middlewares.forEach(mw => this.app.use(mw));
}
/**
* Setup core system routes (health, readiness, version, admin)
*/
private setupCoreRoutes(): void {
// Health check endpoint - always responds, even during initialization
this.app.get('/api/health', (_req: Request, res: Response) => {
res.status(200).json({
status: 'ok',
version: BUILT_IN_VERSION,
workerPath: this.options.workerPath,
uptime: Date.now() - this.startTime,
managed: process.env.CLAUDE_MEM_MANAGED === 'true',
hasIpc: typeof process.send === 'function',
platform: process.platform,
pid: process.pid,
initialized: this.options.getInitializationComplete(),
mcpReady: this.options.getMcpReady(),
ai: this.options.getAiStatus(),
});
});
// Readiness check endpoint - returns 503 until full initialization completes
this.app.get('/api/readiness', (_req: Request, res: Response) => {
if (this.options.getInitializationComplete()) {
res.status(200).json({
status: 'ready',
mcpReady: this.options.getMcpReady(),
});
} else {
res.status(503).json({
status: 'initializing',
message: 'Worker is still initializing, please retry',
});
}
});
// Version endpoint - returns the worker's built-in version
this.app.get('/api/version', (_req: Request, res: Response) => {
res.status(200).json({ version: BUILT_IN_VERSION });
});
// Instructions endpoint - loads SKILL.md sections on-demand
this.app.get('/api/instructions', async (req: Request, res: Response) => {
const topic = (req.query.topic as string) || 'all';
const operation = req.query.operation as string | undefined;
// Validate topic
if (topic && !ALLOWED_TOPICS.includes(topic)) {
return res.status(400).json({ error: 'Invalid topic' });
}
try {
let content: string;
if (operation) {
// Validate operation
if (!ALLOWED_OPERATIONS.includes(operation)) {
return res.status(400).json({ error: 'Invalid operation' });
}
// Path boundary check
const OPERATIONS_BASE_DIR = path.resolve(__dirname, '../skills/mem-search/operations');
const operationPath = path.resolve(OPERATIONS_BASE_DIR, `${operation}.md`);
if (!operationPath.startsWith(OPERATIONS_BASE_DIR + path.sep)) {
return res.status(400).json({ error: 'Invalid request' });
}
content = await fs.promises.readFile(operationPath, 'utf-8');
} else {
const skillPath = path.join(__dirname, '../skills/mem-search/SKILL.md');
const fullContent = await fs.promises.readFile(skillPath, 'utf-8');
content = this.extractInstructionSection(fullContent, topic);
}
res.json({
content: [{ type: 'text', text: content }]
});
} catch (error) {
res.status(404).json({ error: 'Instruction not found' });
}
});
// Admin endpoints for process management (localhost-only)
this.app.post('/api/admin/restart', requireLocalhost, async (_req: Request, res: Response) => {
res.json({ status: 'restarting' });
// Handle Windows managed mode via IPC
const isWindowsManaged = process.platform === 'win32' &&
process.env.CLAUDE_MEM_MANAGED === 'true' &&
process.send;
if (isWindowsManaged) {
logger.info('SYSTEM', 'Sending restart request to wrapper');
process.send!({ type: 'restart' });
} else {
// Unix or standalone Windows - handle restart ourselves
// The spawner (ensureWorkerStarted/restart command) handles spawning the new daemon.
// This process just needs to shut down and exit.
setTimeout(async () => {
try {
await this.options.onRestart();
} finally {
process.exit(0);
}
}, 100);
}
});
this.app.post('/api/admin/shutdown', requireLocalhost, async (_req: Request, res: Response) => {
res.json({ status: 'shutting_down' });
// Handle Windows managed mode via IPC
const isWindowsManaged = process.platform === 'win32' &&
process.env.CLAUDE_MEM_MANAGED === 'true' &&
process.send;
if (isWindowsManaged) {
logger.info('SYSTEM', 'Sending shutdown request to wrapper');
process.send!({ type: 'shutdown' });
} else {
// Unix or standalone Windows - handle shutdown ourselves
setTimeout(async () => {
try {
await this.options.onShutdown();
} finally {
// CRITICAL: Exit the process after shutdown completes (or fails).
// Without this, the daemon stays alive as a zombie — background tasks
// (backfill, reconnects) keep running and respawn chroma-mcp subprocesses.
process.exit(0);
}
}, 100);
}
});
}
/**
* Extract a specific section from instruction content
*/
private extractInstructionSection(content: string, topic: string): string {
const sections: Record<string, string> = {
'workflow': this.extractBetween(content, '## The Workflow', '## Search Parameters'),
'search_params': this.extractBetween(content, '## Search Parameters', '## Examples'),
'examples': this.extractBetween(content, '## Examples', '## Why This Workflow'),
'all': content
};
return sections[topic] || sections['all'];
}
/**
* Extract text between two markers
*/
private extractBetween(content: string, startMarker: string, endMarker: string): string {
const startIdx = content.indexOf(startMarker);
const endIdx = content.indexOf(endMarker);
if (startIdx === -1) return content;
if (endIdx === -1) return content.substring(startIdx);
return content.substring(startIdx, endIdx).trim();
}
}