Files
claude-mem/src/services/worker/http/middleware.ts
T
Alex Newman 80a8c90a1a feat: add embedded Process Supervisor for unified process lifecycle (#1370)
* feat: add embedded Process Supervisor for unified process lifecycle management

Consolidates scattered process management (ProcessManager, GracefulShutdown,
HealthMonitor, ProcessRegistry) into a unified src/supervisor/ module.

New: ProcessRegistry with JSON persistence, env sanitizer (strips CLAUDECODE_*
vars), graceful shutdown cascade (SIGTERM → 5s wait → SIGKILL with tree-kill
on Windows), PID file liveness validation, and singleton Supervisor API.

Fixes #1352 (worker inherits CLAUDECODE env causing nested sessions)
Fixes #1356 (zombie TCP socket after Windows reboot)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add session-scoped process reaping to supervisor

Adds reapSession(sessionId) to ProcessRegistry for killing session-tagged
processes on session end. SessionManager.deleteSession() now triggers reaping.
Tightens orphan reaper interval from 60s to 30s.

Fixes #1351 (MCP server processes leak on session end)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add Unix domain socket support for worker communication

Introduces socket-manager.ts for UDS-based worker communication, eliminating
port 37777 collisions between concurrent sessions. Worker listens on
~/.claude-mem/sockets/worker.sock by default with TCP fallback.

All hook handlers, MCP server, health checks, and admin commands updated to
use socket-aware workerHttpRequest(). Backwards compatible — settings can
force TCP mode via CLAUDE_MEM_WORKER_TRANSPORT=tcp.

Fixes #1346 (port 37777 collision across concurrent sessions)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: remove in-process worker fallback from hook command

Removes the fallback path where hook scripts started WorkerService in-process,
making the worker a grandchild of Claude Code (killed by sandbox). Hooks now
always delegate to ensureWorkerStarted() which spawns a fully detached daemon.

Fixes #1249 (grandchild process killed by sandbox)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add health checker and /api/admin/doctor endpoint

Adds 30-second periodic health sweep that prunes dead processes from the
supervisor registry and cleans stale socket files. Adds /api/admin/doctor
endpoint exposing supervisor state, process liveness, and environment health.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: add comprehensive supervisor test suite

64 tests covering all supervisor modules: process registry (18 tests),
env sanitizer (8), shutdown cascade (10), socket manager (15), health
checker (5), and supervisor API (6). Includes persistence, isolation,
edge cases, and cross-module integration scenarios.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: revert Unix domain socket transport, restore TCP on port 37777

The socket-manager introduced UDS as default transport, but this broke
the HTTP server's TCP accessibility (viewer UI, curl, external monitoring).
Since there's only ever one worker process handling all sessions, the
port collision rationale for UDS doesn't apply. Reverts to TCP-only,
removing ~900 lines of unnecessary complexity.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: remove dead code found in pre-landing review

Remove unused `acceptingSpawns` field from Supervisor class (written but
never read — assertCanSpawn uses stopPromise instead) and unused
`buildWorkerUrl` import from context handler.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* updated gitignore

* fix: address PR review feedback - downgrade HTTP logging, clean up gitignore, harden supervisor

- Downgrade request/response HTTP logging from info to debug to reduce noise
- Remove unused getWorkerPort imports, use buildWorkerUrl helper
- Export ENV_PREFIXES/ENV_EXACT_MATCHES from env-sanitizer, reuse in Server.ts
- Fix isPidAlive(0) returning true (should be false)
- Add shutdownInitiated flag to prevent signal handler race condition
- Make validateWorkerPidFile testable with pidFilePath option
- Remove unused dataDir from ShutdownCascadeOptions
- Upgrade reapSession log from debug to warn
- Rename zombiePidFiles to deadProcessPids (returns actual PIDs)
- Clean up gitignore: remove duplicate datasets/, stale ~*/ and http*/ patterns
- Fix tests to use temp directories instead of relying on real PID file

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 14:49:23 -07:00

136 lines
4.3 KiB
TypeScript

/**
* HTTP Middleware for Worker Service
*
* Extracted from WorkerService.ts for better organization.
* Handles request/response logging, CORS, JSON parsing, and static file serving.
*/
import express, { Request, Response, NextFunction, RequestHandler } from 'express';
import cors from 'cors';
import path from 'path';
import { getPackageRoot } from '../../../shared/paths.js';
import { logger } from '../../../utils/logger.js';
/**
* Create all middleware for the worker service
* @param summarizeRequestBody - Function to summarize request bodies for logging
* @returns Array of middleware functions
*/
export function createMiddleware(
summarizeRequestBody: (method: string, path: string, body: any) => string
): RequestHandler[] {
const middlewares: RequestHandler[] = [];
// JSON parsing with 50mb limit
middlewares.push(express.json({ limit: '50mb' }));
// CORS - restrict to localhost origins only
middlewares.push(cors({
origin: (origin, callback) => {
// Allow: requests without Origin header (hooks, curl, CLI tools)
// Allow: localhost and 127.0.0.1 origins
if (!origin ||
origin.startsWith('http://localhost:') ||
origin.startsWith('http://127.0.0.1:')) {
callback(null, true);
} else {
callback(new Error('CORS not allowed'));
}
},
methods: ['GET', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE'],
allowedHeaders: ['Content-Type', 'Authorization', 'X-Requested-With'],
credentials: false
}));
// HTTP request/response logging
middlewares.push((req: Request, res: Response, next: NextFunction) => {
// Skip logging for static assets, health checks, and polling endpoints
const staticExtensions = ['.html', '.js', '.css', '.svg', '.png', '.jpg', '.jpeg', '.webp', '.woff', '.woff2', '.ttf', '.eot'];
const isStaticAsset = staticExtensions.some(ext => req.path.endsWith(ext));
const isPollingEndpoint = req.path === '/api/logs'; // Skip logs endpoint to avoid noise from auto-refresh
if (req.path.startsWith('/health') || req.path === '/' || isStaticAsset || isPollingEndpoint) {
return next();
}
const start = Date.now();
const requestId = `${req.method}-${Date.now()}`;
// Log incoming request with body summary
const bodySummary = summarizeRequestBody(req.method, req.path, req.body);
logger.debug('HTTP', `${req.method} ${req.path}`, { requestId }, bodySummary);
// Capture response
const originalSend = res.send.bind(res);
res.send = function(body: any) {
const duration = Date.now() - start;
logger.debug('HTTP', `${res.statusCode} ${req.path}`, { requestId, duration: `${duration}ms` });
return originalSend(body);
};
next();
});
// Serve static files for web UI (viewer-bundle.js, logos, fonts, etc.)
const packageRoot = getPackageRoot();
const uiDir = path.join(packageRoot, 'plugin', 'ui');
middlewares.push(express.static(uiDir));
return middlewares;
}
/**
* Middleware to require localhost-only access
* Used for admin endpoints that should not be exposed when binding to 0.0.0.0
*/
export function requireLocalhost(req: Request, res: Response, next: NextFunction): void {
const clientIp = req.ip || req.connection.remoteAddress || '';
const isLocalhost =
clientIp === '127.0.0.1' ||
clientIp === '::1' ||
clientIp === '::ffff:127.0.0.1' ||
clientIp === 'localhost';
if (!isLocalhost) {
logger.warn('SECURITY', 'Admin endpoint access denied - not localhost', {
endpoint: req.path,
clientIp,
method: req.method
});
res.status(403).json({
error: 'Forbidden',
message: 'Admin endpoints are only accessible from localhost'
});
return;
}
next();
}
/**
* Summarize request body for logging
* Used to avoid logging sensitive data or large payloads
*/
export function summarizeRequestBody(method: string, path: string, body: any): string {
if (!body || Object.keys(body).length === 0) return '';
// Session init
if (path.includes('/init')) {
return '';
}
// Observations
if (path.includes('/observations')) {
const toolName = body.tool_name || '?';
const toolInput = body.tool_input;
const toolSummary = logger.formatTool(toolName, toolInput);
return `tool=${toolSummary}`;
}
// Summarize request
if (path.includes('/summarize')) {
return 'requesting summary';
}
return '';
}