Files
claude-mem/src/services/infrastructure/HealthMonitor.ts
T
Alessandro Costa 64cce2bf10 fix: resolve 3 upstream bugs (summarize, ChromaSync, HealthMonitor) (#1566)
* fix: resolve 3 upstream bugs in summarize, ChromaSync, and HealthMonitor

1. summarize.ts: Skip summary when transcript has no assistant message.
   Prevents error loop where empty transcripts cause repeated failed
   summarize attempts (~30 errors/day observed in production).

2. ChromaSync.ts: Fallback to chroma_update_documents when add fails
   with "IDs already exist". Handles partial writes after MCP timeout
   without waiting for next backfill cycle.

3. HealthMonitor.ts: Replace HTTP-based isPortInUse with atomic socket
   bind on Unix. Eliminates TOCTOU race when two sessions start
   simultaneously (HTTP check is non-atomic — both see "port free"
   before either completes listen()). Updated tests accordingly.

All three bugs are pre-existing in v10.5.5. Confirmed via log analysis
of 543K lines over 17 days of production usage across two servers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: add CONTRIB_NOTES.md to gitignore

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: address CodeRabbit review on PR #1566

- HealthMonitor: add APPROVED OVERRIDE annotation for Win32 HTTP fallback
- ChromaSync: replace chroma_update_documents with delete+add for proper
  upsert (update only modifies existing IDs, silently ignores missing ones)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Alessandro Costa <alessandro@claudio.dev>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 15:15:08 -07:00

217 lines
7.5 KiB
TypeScript

/**
* HealthMonitor - Port monitoring, health checks, and version checking
*
* Extracted from worker-service.ts monolith to provide centralized health monitoring.
* Handles:
* - Port availability checking
* - Worker health/readiness polling
* - Version mismatch detection (critical for plugin updates)
* - HTTP-based shutdown requests
*/
import path from 'path';
import net from 'net';
import { readFileSync } from 'fs';
import { logger } from '../../utils/logger.js';
import { MARKETPLACE_ROOT } from '../../shared/paths.js';
/**
* Make an HTTP request to the worker via TCP.
* Returns { ok, statusCode, body } or throws on transport error.
*/
async function httpRequestToWorker(
port: number,
endpointPath: string,
method: string = 'GET'
): Promise<{ ok: boolean; statusCode: number; body: string }> {
const response = await fetch(`http://127.0.0.1:${port}${endpointPath}`, { method });
// Gracefully handle cases where response body isn't available (e.g., test mocks)
let body = '';
try {
body = await response.text();
} catch {
// Body unavailable — health/readiness checks only need .ok
}
return { ok: response.ok, statusCode: response.status, body };
}
/**
* Check if a port is in use by attempting an atomic socket bind.
* More reliable than HTTP health check for daemon spawn guards —
* prevents TOCTOU race where two daemons both see "port free" via
* HTTP and then both try to listen() (upstream bug workaround).
*
* Falls back to HTTP health check on Windows where socket bind
* behavior differs.
*/
export async function isPortInUse(port: number): Promise<boolean> {
if (process.platform === 'win32') {
// APPROVED OVERRIDE: Windows keeps HTTP health check because socket bind
// semantics differ (SO_REUSEADDR defaults, firewall prompts). The TOCTOU
// race remains on Windows but is an accepted limitation — the atomic
// socket approach would cause false positives or UAC popups.
try {
const response = await fetch(`http://127.0.0.1:${port}/api/health`);
return response.ok;
} catch {
return false;
}
}
// Unix: atomic socket bind check — no TOCTOU race
return new Promise((resolve) => {
const server = net.createServer();
server.once('error', (err: NodeJS.ErrnoException) => {
if (err.code === 'EADDRINUSE') {
resolve(true);
} else {
resolve(false);
}
});
server.once('listening', () => {
server.close(() => resolve(false));
});
server.listen(port, '127.0.0.1');
});
}
/**
* Poll a worker endpoint until it returns 200 OK or timeout.
* Shared implementation for liveness and readiness checks.
*/
async function pollEndpointUntilOk(
port: number,
endpointPath: string,
timeoutMs: number,
retryLogMessage: string
): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
const result = await httpRequestToWorker(port, endpointPath);
if (result.ok) return true;
} catch (error) {
// [ANTI-PATTERN IGNORED]: Retry loop - expected failures during startup, will retry
logger.debug('SYSTEM', retryLogMessage, {}, error as Error);
}
await new Promise(r => setTimeout(r, 500));
}
return false;
}
/**
* Wait for the worker HTTP server to become responsive (liveness check).
* Uses /api/health which returns 200 as soon as the HTTP server is listening.
* For full initialization (DB + search), use waitForReadiness() instead.
*/
export function waitForHealth(port: number, timeoutMs: number = 30000): Promise<boolean> {
return pollEndpointUntilOk(port, '/api/health', timeoutMs, 'Service not ready yet, will retry');
}
/**
* Wait for the worker to be fully initialized (DB + search ready).
* Uses /api/readiness which returns 200 only after core initialization completes.
* Now that initializationCompleteFlag is set after DB/search init (not MCP),
* this typically completes in a few seconds.
*/
export function waitForReadiness(port: number, timeoutMs: number = 30000): Promise<boolean> {
return pollEndpointUntilOk(port, '/api/readiness', timeoutMs, 'Worker not ready yet, will retry');
}
/**
* Wait for a port to become free (no longer responding to health checks)
* Used after shutdown to confirm the port is available for restart
*/
export async function waitForPortFree(port: number, timeoutMs: number = 10000): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (!(await isPortInUse(port))) return true;
await new Promise(r => setTimeout(r, 500));
}
return false;
}
/**
* Send HTTP shutdown request to a running worker
* @returns true if shutdown request was acknowledged, false otherwise
*/
export async function httpShutdown(port: number): Promise<boolean> {
try {
const result = await httpRequestToWorker(port, '/api/admin/shutdown', 'POST');
if (!result.ok) {
logger.warn('SYSTEM', 'Shutdown request returned error', { status: result.statusCode });
return false;
}
return true;
} catch (error) {
// Connection refused is expected if worker already stopped
if (error instanceof Error && error.message?.includes('ECONNREFUSED')) {
logger.debug('SYSTEM', 'Worker already stopped', {}, error);
return false;
}
// Unexpected error - log full details
logger.error('SYSTEM', 'Shutdown request failed unexpectedly', {}, error as Error);
return false;
}
}
/**
* Get the plugin version from the installed marketplace package.json
* This is the "expected" version that should be running.
* Returns 'unknown' on ENOENT/EBUSY (shutdown race condition, fix #1042).
*/
export function getInstalledPluginVersion(): string {
try {
const packageJsonPath = path.join(MARKETPLACE_ROOT, 'package.json');
const packageJson = JSON.parse(readFileSync(packageJsonPath, 'utf-8'));
return packageJson.version;
} catch (error: unknown) {
const code = (error as NodeJS.ErrnoException).code;
if (code === 'ENOENT' || code === 'EBUSY') {
logger.debug('SYSTEM', 'Could not read plugin version (shutdown race)', { code });
return 'unknown';
}
throw error;
}
}
/**
* Get the running worker's version via API
* This is the "actual" version currently running.
*/
export async function getRunningWorkerVersion(port: number): Promise<string | null> {
try {
const result = await httpRequestToWorker(port, '/api/version');
if (!result.ok) return null;
const data = JSON.parse(result.body) as { version: string };
return data.version;
} catch {
// Expected: worker not running or version endpoint unavailable
logger.debug('SYSTEM', 'Could not fetch worker version', {});
return null;
}
}
export interface VersionCheckResult {
matches: boolean;
pluginVersion: string;
workerVersion: string | null;
}
/**
* Check if worker version matches plugin version
* Critical for detecting when plugin is updated but worker is still running old code
* Returns true if versions match or if we can't determine (assume match for graceful degradation)
*/
export async function checkVersionMatch(port: number): Promise<VersionCheckResult> {
const pluginVersion = getInstalledPluginVersion();
const workerVersion = await getRunningWorkerVersion(port);
// If either version is unknown/null, assume match (graceful degradation, fix #1042)
if (!workerVersion || pluginVersion === 'unknown') {
return { matches: true, pluginVersion, workerVersion };
}
return { matches: pluginVersion === workerVersion, pluginVersion, workerVersion };
}