Merge main into feat/chroma-http-server
Resolve conflicts between Chroma HTTP server PR and main branch changes (folder CLAUDE.md, exclusion settings, Zscaler SSL, transport cleanup). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
+519
-81
@@ -10,12 +10,54 @@
|
||||
*/
|
||||
|
||||
import path from 'path';
|
||||
import { existsSync, writeFileSync, unlinkSync, statSync } from 'fs';
|
||||
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
|
||||
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
|
||||
import { getWorkerPort, getWorkerHost } from '../shared/worker-utils.js';
|
||||
import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
|
||||
import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
|
||||
import { getAuthMethodDescription } from '../shared/EnvManager.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import { ChromaServerManager } from './sync/ChromaServerManager.js';
|
||||
|
||||
// Windows: avoid repeated spawn popups when startup fails (issue #921)
|
||||
const WINDOWS_SPAWN_COOLDOWN_MS = 2 * 60 * 1000;
|
||||
|
||||
function getWorkerSpawnLockPath(): string {
|
||||
return path.join(SettingsDefaultsManager.get('CLAUDE_MEM_DATA_DIR'), '.worker-start-attempted');
|
||||
}
|
||||
|
||||
function shouldSkipSpawnOnWindows(): boolean {
|
||||
if (process.platform !== 'win32') return false;
|
||||
const lockPath = getWorkerSpawnLockPath();
|
||||
if (!existsSync(lockPath)) return false;
|
||||
try {
|
||||
const modifiedTimeMs = statSync(lockPath).mtimeMs;
|
||||
return Date.now() - modifiedTimeMs < WINDOWS_SPAWN_COOLDOWN_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function markWorkerSpawnAttempted(): void {
|
||||
if (process.platform !== 'win32') return;
|
||||
try {
|
||||
writeFileSync(getWorkerSpawnLockPath(), '', 'utf-8');
|
||||
} catch {
|
||||
// Best-effort lock file — failure to write shouldn't block startup
|
||||
}
|
||||
}
|
||||
|
||||
function clearWorkerSpawnAttempted(): void {
|
||||
if (process.platform !== 'win32') return;
|
||||
try {
|
||||
const lockPath = getWorkerSpawnLockPath();
|
||||
if (existsSync(lockPath)) unlinkSync(lockPath);
|
||||
} catch {
|
||||
// Best-effort cleanup
|
||||
}
|
||||
}
|
||||
|
||||
// Version injected at build time by esbuild define
|
||||
declare const __DEFAULT_PACKAGE_VERSION__: string;
|
||||
const packageVersion = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined' ? __DEFAULT_PACKAGE_VERSION__ : '0.0.0-dev';
|
||||
@@ -27,6 +69,7 @@ import {
|
||||
removePidFile,
|
||||
getPlatformTimeout,
|
||||
cleanupOrphanedProcesses,
|
||||
cleanStalePidFile,
|
||||
spawnDaemon,
|
||||
createSignalHandler
|
||||
} from './infrastructure/ProcessManager.js';
|
||||
@@ -53,8 +96,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js';
|
||||
import { SessionManager } from './worker/SessionManager.js';
|
||||
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
|
||||
import { SDKAgent } from './worker/SDKAgent.js';
|
||||
import { GeminiAgent } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent } from './worker/OpenRouterAgent.js';
|
||||
import { GeminiAgent, isGeminiSelected, isGeminiAvailable } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent, isOpenRouterSelected, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
|
||||
import { PaginationHelper } from './worker/PaginationHelper.js';
|
||||
import { SettingsManager } from './worker/SettingsManager.js';
|
||||
import { SearchManager } from './worker/SearchManager.js';
|
||||
@@ -69,6 +112,7 @@ import { DataRoutes } from './worker/http/routes/DataRoutes.js';
|
||||
import { SearchRoutes } from './worker/http/routes/SearchRoutes.js';
|
||||
import { SettingsRoutes } from './worker/http/routes/SettingsRoutes.js';
|
||||
import { LogsRoutes } from './worker/http/routes/LogsRoutes.js';
|
||||
import { MemoryRoutes } from './worker/http/routes/MemoryRoutes.js';
|
||||
|
||||
// Process management for zombie cleanup (Issue #737)
|
||||
import { startOrphanReaper, reapOrphanedProcesses } from './worker/ProcessRegistry.js';
|
||||
@@ -131,6 +175,14 @@ export class WorkerService {
|
||||
// Orphan reaper cleanup function (Issue #737)
|
||||
private stopOrphanReaper: (() => void) | null = null;
|
||||
|
||||
// AI interaction tracking for health endpoint
|
||||
private lastAiInteraction: {
|
||||
timestamp: number;
|
||||
success: boolean;
|
||||
provider: string;
|
||||
error?: string;
|
||||
} | null = null;
|
||||
|
||||
constructor() {
|
||||
// Initialize the promise that will resolve when background initialization completes
|
||||
this.initializationComplete = new Promise((resolve) => {
|
||||
@@ -154,6 +206,7 @@ export class WorkerService {
|
||||
this.broadcastProcessingStatus();
|
||||
});
|
||||
|
||||
|
||||
// Initialize MCP client
|
||||
// Empty capabilities object: this client only calls tools, doesn't expose any
|
||||
this.mcpClient = new Client({
|
||||
@@ -166,7 +219,24 @@ export class WorkerService {
|
||||
getInitializationComplete: () => this.initializationCompleteFlag,
|
||||
getMcpReady: () => this.mcpReady,
|
||||
onShutdown: () => this.shutdown(),
|
||||
onRestart: () => this.shutdown()
|
||||
onRestart: () => this.shutdown(),
|
||||
workerPath: __filename,
|
||||
getAiStatus: () => {
|
||||
let provider = 'claude';
|
||||
if (isOpenRouterSelected() && isOpenRouterAvailable()) provider = 'openrouter';
|
||||
else if (isGeminiSelected() && isGeminiAvailable()) provider = 'gemini';
|
||||
return {
|
||||
provider,
|
||||
authMethod: getAuthMethodDescription(),
|
||||
lastInteraction: this.lastAiInteraction
|
||||
? {
|
||||
timestamp: this.lastAiInteraction.timestamp,
|
||||
success: this.lastAiInteraction.success,
|
||||
...(this.lastAiInteraction.error && { error: this.lastAiInteraction.error }),
|
||||
}
|
||||
: null,
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
// Register route handlers
|
||||
@@ -191,35 +261,74 @@ export class WorkerService {
|
||||
this.isShuttingDown = shutdownRef.value;
|
||||
handler('SIGINT');
|
||||
});
|
||||
|
||||
// SIGHUP: sent by kernel when controlling terminal closes.
|
||||
// Daemon mode: ignore it (survive parent shell exit).
|
||||
// Interactive mode: treat like SIGTERM (graceful shutdown).
|
||||
if (process.platform !== 'win32') {
|
||||
if (process.argv.includes('--daemon')) {
|
||||
process.on('SIGHUP', () => {
|
||||
logger.debug('SYSTEM', 'Ignoring SIGHUP in daemon mode');
|
||||
});
|
||||
} else {
|
||||
process.on('SIGHUP', () => {
|
||||
this.isShuttingDown = shutdownRef.value;
|
||||
handler('SIGHUP');
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register all route handlers with the server
|
||||
*/
|
||||
private registerRoutes(): void {
|
||||
// Standard routes
|
||||
this.server.registerRoutes(new ViewerRoutes(this.sseBroadcaster, this.dbManager, this.sessionManager));
|
||||
this.server.registerRoutes(new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this));
|
||||
this.server.registerRoutes(new DataRoutes(this.paginationHelper, this.dbManager, this.sessionManager, this.sseBroadcaster, this, this.startTime));
|
||||
this.server.registerRoutes(new SettingsRoutes(this.settingsManager));
|
||||
this.server.registerRoutes(new LogsRoutes());
|
||||
// IMPORTANT: Middleware must be registered BEFORE routes (Express processes in order)
|
||||
|
||||
// Early handler for /api/context/inject to avoid 404 during startup
|
||||
// Early handler for /api/context/inject — fail open if not yet initialized
|
||||
this.server.app.get('/api/context/inject', async (req, res, next) => {
|
||||
const timeoutMs = 300000; // 5 minute timeout for slow systems
|
||||
const timeoutPromise = new Promise((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Initialization timeout')), timeoutMs)
|
||||
);
|
||||
|
||||
await Promise.race([this.initializationComplete, timeoutPromise]);
|
||||
|
||||
if (!this.searchRoutes) {
|
||||
res.status(503).json({ error: 'Search routes not initialized' });
|
||||
if (!this.initializationCompleteFlag || !this.searchRoutes) {
|
||||
logger.warn('SYSTEM', 'Context requested before initialization complete, returning empty');
|
||||
res.status(200).json({ content: [{ type: 'text', text: '' }] });
|
||||
return;
|
||||
}
|
||||
|
||||
next(); // Delegate to SearchRoutes handler
|
||||
});
|
||||
|
||||
// Guard ALL /api/* routes during initialization — wait for DB with timeout
|
||||
// Exceptions: /api/health, /api/readiness, /api/version (handled by Server.ts core routes)
|
||||
// and /api/context/inject (handled above with fail-open)
|
||||
this.server.app.use('/api', async (req, res, next) => {
|
||||
if (this.initializationCompleteFlag) {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
|
||||
const timeoutMs = 30000;
|
||||
const timeoutPromise = new Promise<void>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Database initialization timeout')), timeoutMs)
|
||||
);
|
||||
|
||||
try {
|
||||
await Promise.race([this.initializationComplete, timeoutPromise]);
|
||||
next();
|
||||
} catch (error) {
|
||||
logger.error('HTTP', `Request to ${req.method} ${req.path} rejected — DB not initialized`, {}, error as Error);
|
||||
res.status(503).json({
|
||||
error: 'Service initializing',
|
||||
message: 'Database is still initializing, please retry'
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Standard routes (registered AFTER guard middleware)
|
||||
this.server.registerRoutes(new ViewerRoutes(this.sseBroadcaster, this.dbManager, this.sessionManager));
|
||||
this.server.registerRoutes(new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this));
|
||||
this.server.registerRoutes(new DataRoutes(this.paginationHelper, this.dbManager, this.sessionManager, this.sseBroadcaster, this, this.startTime));
|
||||
this.server.registerRoutes(new SettingsRoutes(this.settingsManager));
|
||||
this.server.registerRoutes(new LogsRoutes());
|
||||
this.server.registerRoutes(new MemoryRoutes(this.dbManager, 'claude-mem'));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -293,13 +402,12 @@ export class WorkerService {
|
||||
|
||||
await this.dbManager.initialize();
|
||||
|
||||
// Recover stuck messages from previous crashes
|
||||
// Reset any messages that were processing when worker died
|
||||
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
|
||||
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
|
||||
const STUCK_THRESHOLD_MS = 5 * 60 * 1000;
|
||||
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
|
||||
const resetCount = pendingStore.resetStaleProcessingMessages(0); // 0 = reset ALL processing
|
||||
if (resetCount > 0) {
|
||||
logger.info('SYSTEM', `Recovered ${resetCount} stuck messages from previous session`, { thresholdMinutes: 5 });
|
||||
logger.info('SYSTEM', `Reset ${resetCount} stale processing messages to pending`);
|
||||
}
|
||||
|
||||
// Initialize search services
|
||||
@@ -366,8 +474,24 @@ export class WorkerService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the appropriate agent based on provider settings.
|
||||
* Same logic as SessionRoutes.getActiveAgent() for consistency.
|
||||
*/
|
||||
private getActiveAgent(): SDKAgent | GeminiAgent | OpenRouterAgent {
|
||||
if (isOpenRouterSelected() && isOpenRouterAvailable()) {
|
||||
return this.openRouterAgent;
|
||||
}
|
||||
if (isGeminiSelected() && isGeminiAvailable()) {
|
||||
return this.geminiAgent;
|
||||
}
|
||||
return this.sdkAgent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a session processor
|
||||
* On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available,
|
||||
* otherwise marks messages abandoned and removes session so queue does not grow unbounded.
|
||||
*/
|
||||
private startSessionProcessor(
|
||||
session: ReturnType<typeof this.sessionManager.getSession>,
|
||||
@@ -376,21 +500,203 @@ export class WorkerService {
|
||||
if (!session) return;
|
||||
|
||||
const sid = session.sessionDbId;
|
||||
logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid });
|
||||
const agent = this.getActiveAgent();
|
||||
const providerName = agent.constructor.name;
|
||||
|
||||
session.generatorPromise = this.sdkAgent.startSession(session, this)
|
||||
.catch(error => {
|
||||
// Before starting generator, check if AbortController is already aborted
|
||||
// This can happen after a previous generator was aborted but the session still has pending work
|
||||
if (session.abortController.signal.aborted) {
|
||||
logger.debug('SYSTEM', 'Replacing aborted AbortController before starting generator', {
|
||||
sessionId: session.sessionDbId
|
||||
});
|
||||
session.abortController = new AbortController();
|
||||
}
|
||||
|
||||
// Track whether generator failed with an unrecoverable error to prevent infinite restart loops
|
||||
let hadUnrecoverableError = false;
|
||||
let sessionFailed = false;
|
||||
|
||||
logger.info('SYSTEM', `Starting generator (${source}) using ${providerName}`, { sessionId: sid });
|
||||
|
||||
session.generatorPromise = agent.startSession(session, this)
|
||||
.catch(async (error: unknown) => {
|
||||
const errorMessage = (error as Error)?.message || '';
|
||||
|
||||
// Detect unrecoverable errors that should NOT trigger restart
|
||||
// These errors will fail immediately on retry, causing infinite loops
|
||||
const unrecoverablePatterns = [
|
||||
'Claude executable not found',
|
||||
'CLAUDE_CODE_PATH',
|
||||
'ENOENT',
|
||||
'spawn',
|
||||
'Invalid API key',
|
||||
];
|
||||
if (unrecoverablePatterns.some(pattern => errorMessage.includes(pattern))) {
|
||||
hadUnrecoverableError = true;
|
||||
this.lastAiInteraction = {
|
||||
timestamp: Date.now(),
|
||||
success: false,
|
||||
provider: providerName,
|
||||
error: errorMessage,
|
||||
};
|
||||
logger.error('SDK', 'Unrecoverable generator error - will NOT restart', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project,
|
||||
errorMessage
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback for terminated SDK sessions (provider abstraction)
|
||||
if (this.isSessionTerminatedError(error)) {
|
||||
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project,
|
||||
reason: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
return this.runFallbackForTerminatedSession(session, error);
|
||||
}
|
||||
|
||||
// Detect stale resume failures - SDK session context was lost
|
||||
if ((errorMessage.includes('aborted by user') || errorMessage.includes('No conversation found'))
|
||||
&& session.memorySessionId) {
|
||||
logger.warn('SDK', 'Detected stale resume failure, clearing memorySessionId for fresh start', {
|
||||
sessionId: session.sessionDbId,
|
||||
memorySessionId: session.memorySessionId,
|
||||
errorMessage
|
||||
});
|
||||
// Clear stale memorySessionId and force fresh init on next attempt
|
||||
this.dbManager.getSessionStore().updateMemorySessionId(session.sessionDbId, null);
|
||||
session.memorySessionId = null;
|
||||
session.forceInit = true;
|
||||
}
|
||||
logger.error('SDK', 'Session generator failed', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project
|
||||
project: session.project,
|
||||
provider: providerName
|
||||
}, error as Error);
|
||||
sessionFailed = true;
|
||||
this.lastAiInteraction = {
|
||||
timestamp: Date.now(),
|
||||
success: false,
|
||||
provider: providerName,
|
||||
error: errorMessage,
|
||||
};
|
||||
throw error;
|
||||
})
|
||||
.finally(() => {
|
||||
session.generatorPromise = null;
|
||||
|
||||
// Record successful AI interaction if no error occurred
|
||||
if (!sessionFailed && !hadUnrecoverableError) {
|
||||
this.lastAiInteraction = {
|
||||
timestamp: Date.now(),
|
||||
success: true,
|
||||
provider: providerName,
|
||||
};
|
||||
}
|
||||
|
||||
// Do NOT restart after unrecoverable errors - prevents infinite loops
|
||||
if (hadUnrecoverableError) {
|
||||
logger.warn('SYSTEM', 'Skipping restart due to unrecoverable error', {
|
||||
sessionId: session.sessionDbId
|
||||
});
|
||||
this.broadcastProcessingStatus();
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if there's pending work that needs processing with a fresh AbortController
|
||||
const { PendingMessageStore } = require('./sqlite/PendingMessageStore.js');
|
||||
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
|
||||
const pendingCount = pendingStore.getPendingCount(session.sessionDbId);
|
||||
|
||||
if (pendingCount > 0) {
|
||||
logger.info('SYSTEM', 'Pending work remains after generator exit, restarting with fresh AbortController', {
|
||||
sessionId: session.sessionDbId,
|
||||
pendingCount
|
||||
});
|
||||
// Reset AbortController for restart
|
||||
session.abortController = new AbortController();
|
||||
// Restart processor
|
||||
this.startSessionProcessor(session, 'pending-work-restart');
|
||||
}
|
||||
|
||||
this.broadcastProcessingStatus();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Match errors that indicate the Claude Code process/session is gone (resume impossible).
|
||||
* Used to trigger graceful fallback instead of leaving pending messages stuck forever.
|
||||
*/
|
||||
private isSessionTerminatedError(error: unknown): boolean {
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
const normalized = msg.toLowerCase();
|
||||
return (
|
||||
normalized.includes('process aborted by user') ||
|
||||
normalized.includes('processtransport') ||
|
||||
normalized.includes('not ready for writing') ||
|
||||
normalized.includes('session generator failed') ||
|
||||
normalized.includes('claude code process')
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* When SDK resume fails due to terminated session: try Gemini then OpenRouter to drain
|
||||
* pending messages; if no fallback available, mark messages abandoned and remove session.
|
||||
*/
|
||||
private async runFallbackForTerminatedSession(
|
||||
session: ReturnType<typeof this.sessionManager.getSession>,
|
||||
_originalError: unknown
|
||||
): Promise<void> {
|
||||
if (!session) return;
|
||||
|
||||
const sessionDbId = session.sessionDbId;
|
||||
|
||||
// Fallback agents need memorySessionId for storeObservations
|
||||
if (!session.memorySessionId) {
|
||||
const syntheticId = `fallback-${sessionDbId}-${Date.now()}`;
|
||||
session.memorySessionId = syntheticId;
|
||||
this.dbManager.getSessionStore().updateMemorySessionId(sessionDbId, syntheticId);
|
||||
}
|
||||
|
||||
if (isGeminiAvailable()) {
|
||||
try {
|
||||
await this.geminiAgent.startSession(session, this);
|
||||
return;
|
||||
} catch (e) {
|
||||
logger.warn('SDK', 'Fallback Gemini failed, trying OpenRouter', {
|
||||
sessionId: sessionDbId,
|
||||
error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (isOpenRouterAvailable()) {
|
||||
try {
|
||||
await this.openRouterAgent.startSession(session, this);
|
||||
return;
|
||||
} catch (e) {
|
||||
logger.warn('SDK', 'Fallback OpenRouter failed', {
|
||||
sessionId: sessionDbId,
|
||||
error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// No fallback or both failed: mark messages abandoned and remove session so queue doesn't grow
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
|
||||
if (abandoned > 0) {
|
||||
logger.warn('SDK', 'No fallback available; marked pending messages abandoned', {
|
||||
sessionId: sessionDbId,
|
||||
abandoned
|
||||
});
|
||||
}
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process pending session queues
|
||||
*/
|
||||
@@ -402,6 +708,46 @@ export class WorkerService {
|
||||
}> {
|
||||
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
|
||||
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
|
||||
const sessionStore = this.dbManager.getSessionStore();
|
||||
|
||||
// Clean up stale 'active' sessions before processing
|
||||
// Sessions older than 6 hours without activity are likely orphaned
|
||||
const STALE_SESSION_THRESHOLD_MS = 6 * 60 * 60 * 1000;
|
||||
const staleThreshold = Date.now() - STALE_SESSION_THRESHOLD_MS;
|
||||
|
||||
try {
|
||||
const staleSessionIds = sessionStore.db.prepare(`
|
||||
SELECT id FROM sdk_sessions
|
||||
WHERE status = 'active' AND started_at_epoch < ?
|
||||
`).all(staleThreshold) as { id: number }[];
|
||||
|
||||
if (staleSessionIds.length > 0) {
|
||||
const ids = staleSessionIds.map(r => r.id);
|
||||
const placeholders = ids.map(() => '?').join(',');
|
||||
|
||||
sessionStore.db.prepare(`
|
||||
UPDATE sdk_sessions
|
||||
SET status = 'failed', completed_at_epoch = ?
|
||||
WHERE id IN (${placeholders})
|
||||
`).run(Date.now(), ...ids);
|
||||
|
||||
logger.info('SYSTEM', `Marked ${ids.length} stale sessions as failed`);
|
||||
|
||||
const msgResult = sessionStore.db.prepare(`
|
||||
UPDATE pending_messages
|
||||
SET status = 'failed', failed_at_epoch = ?
|
||||
WHERE status = 'pending'
|
||||
AND session_db_id IN (${placeholders})
|
||||
`).run(Date.now(), ...ids);
|
||||
|
||||
if (msgResult.changes > 0) {
|
||||
logger.info('SYSTEM', `Marked ${msgResult.changes} pending messages from stale sessions as failed`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('SYSTEM', 'Failed to clean up stale sessions', {}, error as Error);
|
||||
}
|
||||
|
||||
const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();
|
||||
|
||||
const result = {
|
||||
@@ -486,6 +832,86 @@ export class WorkerService {
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Reusable Worker Startup Logic
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Ensures the worker is started and healthy.
|
||||
* This function can be called by both 'start' and 'hook' commands.
|
||||
*
|
||||
* @param port - The port the worker should run on
|
||||
* @returns true if worker is healthy (existing or newly started), false on failure
|
||||
*/
|
||||
async function ensureWorkerStarted(port: number): Promise<boolean> {
|
||||
// Clean stale PID file first (cheap: 1 fs read + 1 signal-0 check)
|
||||
cleanStalePidFile();
|
||||
|
||||
// Check if worker is already running and healthy
|
||||
if (await waitForHealth(port, 1000)) {
|
||||
const versionCheck = await checkVersionMatch(port);
|
||||
if (!versionCheck.matches) {
|
||||
logger.info('SYSTEM', 'Worker version mismatch detected - auto-restarting', {
|
||||
pluginVersion: versionCheck.pluginVersion,
|
||||
workerVersion: versionCheck.workerVersion
|
||||
});
|
||||
|
||||
await httpShutdown(port);
|
||||
const freed = await waitForPortFree(port, getPlatformTimeout(HOOK_TIMEOUTS.PORT_IN_USE_WAIT));
|
||||
if (!freed) {
|
||||
logger.error('SYSTEM', 'Port did not free up after shutdown for version mismatch restart', { port });
|
||||
return false;
|
||||
}
|
||||
removePidFile();
|
||||
} else {
|
||||
logger.info('SYSTEM', 'Worker already running and healthy');
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if port is in use by something else
|
||||
const portInUse = await isPortInUse(port);
|
||||
if (portInUse) {
|
||||
logger.info('SYSTEM', 'Port in use, waiting for worker to become healthy');
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(HOOK_TIMEOUTS.PORT_IN_USE_WAIT));
|
||||
if (healthy) {
|
||||
logger.info('SYSTEM', 'Worker is now healthy');
|
||||
return true;
|
||||
}
|
||||
logger.error('SYSTEM', 'Port in use but worker not responding to health checks');
|
||||
return false;
|
||||
}
|
||||
|
||||
// Windows: skip spawn if a recent attempt already failed (prevents repeated bun.exe popups, issue #921)
|
||||
if (shouldSkipSpawnOnWindows()) {
|
||||
logger.warn('SYSTEM', 'Worker unavailable on Windows — skipping spawn (recent attempt failed within cooldown)');
|
||||
return false;
|
||||
}
|
||||
|
||||
// Spawn new worker daemon
|
||||
logger.info('SYSTEM', 'Starting worker daemon');
|
||||
markWorkerSpawnAttempted();
|
||||
const pid = spawnDaemon(__filename, port);
|
||||
if (pid === undefined) {
|
||||
logger.error('SYSTEM', 'Failed to spawn worker daemon');
|
||||
return false;
|
||||
}
|
||||
|
||||
// PID file is written by the worker itself after listen() succeeds
|
||||
// This is race-free and works correctly on Windows where cmd.exe PID is useless
|
||||
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(HOOK_TIMEOUTS.POST_SPAWN_WAIT));
|
||||
if (!healthy) {
|
||||
removePidFile();
|
||||
logger.error('SYSTEM', 'Worker failed to start (health check timeout)');
|
||||
return false;
|
||||
}
|
||||
|
||||
clearWorkerSpawnAttempted();
|
||||
logger.info('SYSTEM', 'Worker started successfully');
|
||||
return true;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// CLI Entry Point
|
||||
// ============================================================================
|
||||
@@ -504,58 +930,12 @@ async function main() {
|
||||
|
||||
switch (command) {
|
||||
case 'start': {
|
||||
if (await waitForHealth(port, 1000)) {
|
||||
const versionCheck = await checkVersionMatch(port);
|
||||
if (!versionCheck.matches) {
|
||||
logger.info('SYSTEM', 'Worker version mismatch detected - auto-restarting', {
|
||||
pluginVersion: versionCheck.pluginVersion,
|
||||
workerVersion: versionCheck.workerVersion
|
||||
});
|
||||
|
||||
await httpShutdown(port);
|
||||
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
|
||||
if (!freed) {
|
||||
logger.error('SYSTEM', 'Port did not free up after shutdown for version mismatch restart', { port });
|
||||
exitWithStatus('error', 'Port did not free after version mismatch restart');
|
||||
}
|
||||
removePidFile();
|
||||
} else {
|
||||
logger.info('SYSTEM', 'Worker already running and healthy');
|
||||
exitWithStatus('ready');
|
||||
}
|
||||
const success = await ensureWorkerStarted(port);
|
||||
if (success) {
|
||||
exitWithStatus('ready');
|
||||
} else {
|
||||
exitWithStatus('error', 'Failed to start worker');
|
||||
}
|
||||
|
||||
const portInUse = await isPortInUse(port);
|
||||
if (portInUse) {
|
||||
logger.info('SYSTEM', 'Port in use, waiting for worker to become healthy');
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(15000));
|
||||
if (healthy) {
|
||||
logger.info('SYSTEM', 'Worker is now healthy');
|
||||
exitWithStatus('ready');
|
||||
}
|
||||
logger.error('SYSTEM', 'Port in use but worker not responding to health checks');
|
||||
exitWithStatus('error', 'Port in use but worker not responding');
|
||||
}
|
||||
|
||||
logger.info('SYSTEM', 'Starting worker daemon');
|
||||
const pid = spawnDaemon(__filename, port);
|
||||
if (pid === undefined) {
|
||||
logger.error('SYSTEM', 'Failed to spawn worker daemon');
|
||||
exitWithStatus('error', 'Failed to spawn worker daemon');
|
||||
}
|
||||
|
||||
// PID file is written by the worker itself after listen() succeeds
|
||||
// This is race-free and works correctly on Windows where cmd.exe PID is useless
|
||||
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
|
||||
if (!healthy) {
|
||||
removePidFile();
|
||||
logger.error('SYSTEM', 'Worker failed to start (health check timeout)');
|
||||
exitWithStatus('error', 'Worker failed to start (health check timeout)');
|
||||
}
|
||||
|
||||
logger.info('SYSTEM', 'Worker started successfully');
|
||||
exitWithStatus('ready');
|
||||
}
|
||||
|
||||
case 'stop': {
|
||||
@@ -592,7 +972,7 @@ async function main() {
|
||||
// PID file is written by the worker itself after listen() succeeds
|
||||
// This is race-free and works correctly on Windows where cmd.exe PID is useless
|
||||
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
|
||||
const healthy = await waitForHealth(port, getPlatformTimeout(HOOK_TIMEOUTS.POST_SPAWN_WAIT));
|
||||
if (!healthy) {
|
||||
removePidFile();
|
||||
logger.error('SYSTEM', 'Worker failed to restart');
|
||||
@@ -626,21 +1006,79 @@ async function main() {
|
||||
}
|
||||
|
||||
case 'hook': {
|
||||
// Auto-start worker if not running
|
||||
const workerReady = await ensureWorkerStarted(port);
|
||||
if (!workerReady) {
|
||||
logger.warn('SYSTEM', 'Worker failed to start before hook, handler will retry');
|
||||
}
|
||||
|
||||
// Existing logic unchanged
|
||||
const platform = process.argv[3];
|
||||
const event = process.argv[4];
|
||||
if (!platform || !event) {
|
||||
console.error('Usage: claude-mem hook <platform> <event>');
|
||||
console.error('Platforms: claude-code, cursor, raw');
|
||||
console.error('Events: context, session-init, observation, summarize, user-message');
|
||||
console.error('Events: context, session-init, observation, summarize, session-complete');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
// Check if worker is already running on port
|
||||
const portInUse = await isPortInUse(port);
|
||||
let startedWorkerInProcess = false;
|
||||
|
||||
if (!portInUse) {
|
||||
// Port free - start worker IN THIS PROCESS (no spawn!)
|
||||
// This process becomes the worker and stays alive
|
||||
try {
|
||||
logger.info('SYSTEM', 'Starting worker in-process for hook', { event });
|
||||
const worker = new WorkerService();
|
||||
await worker.start();
|
||||
startedWorkerInProcess = true;
|
||||
// Worker is now running in this process on the port
|
||||
} catch (error) {
|
||||
logger.failure('SYSTEM', 'Worker failed to start in hook', {}, error as Error);
|
||||
removePidFile();
|
||||
process.exit(0);
|
||||
}
|
||||
}
|
||||
// If port in use, we'll use HTTP to the existing worker
|
||||
|
||||
const { hookCommand } = await import('../cli/hook-command.js');
|
||||
await hookCommand(platform, event);
|
||||
// If we started the worker in this process, skip process.exit() so we stay alive as the worker
|
||||
await hookCommand(platform, event, { skipExit: startedWorkerInProcess });
|
||||
// Note: if we started worker in-process, this process stays alive as the worker
|
||||
// The break allows the event loop to continue serving requests
|
||||
break;
|
||||
}
|
||||
|
||||
case 'generate': {
|
||||
const dryRun = process.argv.includes('--dry-run');
|
||||
const { generateClaudeMd } = await import('../cli/claude-md-commands.js');
|
||||
const result = await generateClaudeMd(dryRun);
|
||||
process.exit(result);
|
||||
}
|
||||
|
||||
case 'clean': {
|
||||
const dryRun = process.argv.includes('--dry-run');
|
||||
const { cleanClaudeMd } = await import('../cli/claude-md-commands.js');
|
||||
const result = await cleanClaudeMd(dryRun);
|
||||
process.exit(result);
|
||||
}
|
||||
|
||||
case '--daemon':
|
||||
default: {
|
||||
// Prevent daemon from dying silently on unhandled errors.
|
||||
// The HTTP server can continue serving even if a background task throws.
|
||||
process.on('unhandledRejection', (reason) => {
|
||||
logger.error('SYSTEM', 'Unhandled rejection in daemon', {
|
||||
reason: reason instanceof Error ? reason.message : String(reason)
|
||||
});
|
||||
});
|
||||
process.on('uncaughtException', (error) => {
|
||||
logger.error('SYSTEM', 'Uncaught exception in daemon', {}, error as Error);
|
||||
// Don't exit — keep the HTTP server running
|
||||
});
|
||||
|
||||
const worker = new WorkerService();
|
||||
worker.start().catch((error) => {
|
||||
logger.failure('SYSTEM', 'Worker failed to start', {}, error as Error);
|
||||
|
||||
Reference in New Issue
Block a user