Refactor worker management and cleanup hooks

- Removed ensureWorkerRunning calls from multiple hooks (cleanup, context, new, save, summary) to streamline code and avoid unnecessary checks.
- Introduced fixed port usage for worker communication across hooks.
- Enhanced error handling in newHook, saveHook, and summaryHook to provide clearer messages for worker connection issues.
- Updated worker service to start without health checks, relying on PM2 for management.
- Cached Claude executable path to optimize repeated calls.
- Improved logging for better traceability of worker actions and errors.
This commit is contained in:
Alex Newman
2025-11-04 14:21:19 -05:00
parent 37f836b719
commit a46a028ddb
25 changed files with 317 additions and 2914 deletions
-7
View File
@@ -5,7 +5,6 @@
import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
export interface SessionEndInput {
session_id: string;
@@ -45,12 +44,6 @@ async function cleanupHook(input?: SessionEndInput): Promise<void> {
const { session_id, reason } = input;
console.error('[claude-mem cleanup] Searching for active SDK session', { session_id, reason });
// Ensure worker is running first
const workerReady = await ensureWorkerRunning();
if (!workerReady) {
console.error('[claude-mem cleanup] Worker not available - skipping HTTP cleanup');
}
// Find active SDK session
const db = new SessionStore();
const session = db.findActiveSDKSession(session_id);
-2
View File
@@ -6,7 +6,6 @@
import path from 'path';
import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
// Configuration: Read from environment or use defaults
const DISPLAY_OBSERVATION_COUNT = parseInt(process.env.CLAUDE_MEM_CONTEXT_OBSERVATIONS || '50', 10);
@@ -127,7 +126,6 @@ function getObservations(db: SessionStore, sessionIds: string[]): Observation[]
* Context Hook Main Logic
*/
function contextHook(input?: SessionStartInput, useColors: boolean = false, useIndexView: boolean = false): string {
ensureWorkerRunning();
const cwd = input?.cwd ?? process.cwd();
const project = cwd ? path.basename(cwd) : 'unknown-project';
+21 -19
View File
@@ -7,7 +7,6 @@ import path from 'path';
import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { ensureWorkerRunning, getWorkerPort } from '../shared/worker-utils.js';
export interface UserPromptSubmitInput {
session_id: string;
@@ -27,12 +26,6 @@ async function newHook(input?: UserPromptSubmitInput): Promise<void> {
const { session_id, cwd, prompt } = input;
const project = path.basename(cwd);
// Ensure worker is running first
const workerReady = await ensureWorkerRunning();
if (!workerReady) {
throw new Error('Worker service failed to start or become healthy');
}
const db = new SessionStore();
// Save session_id for indexing
@@ -46,20 +39,29 @@ async function newHook(input?: UserPromptSubmitInput): Promise<void> {
db.close();
// Get fixed port
const port = getWorkerPort();
// Use fixed worker port
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
// Initialize session via HTTP
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/init`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ project, userPrompt: prompt }),
signal: AbortSignal.timeout(5000)
});
try {
// Initialize session via HTTP
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/init`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ project, userPrompt: prompt }),
signal: AbortSignal.timeout(5000)
});
if (!response.ok) {
const errorText = await response.text();
throw new Error(`Failed to initialize session: ${response.status} ${errorText}`);
if (!response.ok) {
const errorText = await response.text();
throw new Error(`Failed to initialize session: ${response.status} ${errorText}`);
}
} catch (error: any) {
// Only show restart message for connection errors, not HTTP errors
if (error.cause?.code === 'ECONNREFUSED' || error.name === 'TimeoutError' || error.message.includes('fetch failed')) {
throw new Error("There's a problem with the worker. If you just updated, type `pm2 restart claude-mem-worker` in your terminal to continue");
}
// Re-throw HTTP errors and other errors as-is
throw error;
}
console.log(createHookResponse('UserPromptSubmit', true));
+29 -26
View File
@@ -7,7 +7,6 @@ import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { logger } from '../utils/logger.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
export interface PostToolUseInput {
session_id: string;
@@ -38,12 +37,6 @@ async function saveHook(input?: PostToolUseInput): Promise<void> {
return;
}
// Ensure worker is running first
const workerReady = await ensureWorkerRunning();
if (!workerReady) {
throw new Error('Worker service failed to start or become healthy');
}
const db = new SessionStore();
// Get or create session
@@ -61,28 +54,38 @@ async function saveHook(input?: PostToolUseInput): Promise<void> {
workerPort: FIXED_PORT
});
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/observations`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool_name,
tool_input: tool_input !== undefined ? JSON.stringify(tool_input) : '{}',
tool_output: tool_output !== undefined ? JSON.stringify(tool_output) : '{}',
prompt_number: promptNumber
}),
signal: AbortSignal.timeout(2000)
});
try {
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/observations`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool_name,
tool_input: tool_input !== undefined ? JSON.stringify(tool_input) : '{}',
tool_output: tool_output !== undefined ? JSON.stringify(tool_output) : '{}',
prompt_number: promptNumber
}),
signal: AbortSignal.timeout(2000)
});
if (!response.ok) {
const errorText = await response.text();
logger.failure('HOOK', 'Failed to send observation', {
sessionId: sessionDbId,
status: response.status
}, errorText);
throw new Error(`Failed to send observation to worker: ${response.status} ${errorText}`);
if (!response.ok) {
const errorText = await response.text();
logger.failure('HOOK', 'Failed to send observation', {
sessionId: sessionDbId,
status: response.status
}, errorText);
throw new Error(`Failed to send observation to worker: ${response.status} ${errorText}`);
}
logger.debug('HOOK', 'Observation sent successfully', { sessionId: sessionDbId, toolName: tool_name });
} catch (error: any) {
// Only show restart message for connection errors, not HTTP errors
if (error.cause?.code === 'ECONNREFUSED' || error.name === 'TimeoutError' || error.message.includes('fetch failed')) {
throw new Error("There's a problem with the worker. If you just updated, type `pm2 restart claude-mem-worker` in your terminal to continue");
}
// Re-throw HTTP errors and other errors as-is
throw error;
}
logger.debug('HOOK', 'Observation sent successfully', { sessionId: sessionDbId, toolName: tool_name });
console.log(createHookResponse('PostToolUse', true));
}
+24 -21
View File
@@ -7,7 +7,6 @@ import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { logger } from '../utils/logger.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
export interface StopInput {
session_id: string;
@@ -25,12 +24,6 @@ async function summaryHook(input?: StopInput): Promise<void> {
const { session_id } = input;
// Ensure worker is running first
const workerReady = await ensureWorkerRunning();
if (!workerReady) {
throw new Error('Worker service failed to start or become healthy');
}
const db = new SessionStore();
// Get or create session
@@ -47,23 +40,33 @@ async function summaryHook(input?: StopInput): Promise<void> {
promptNumber
});
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/summarize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt_number: promptNumber }),
signal: AbortSignal.timeout(2000)
});
try {
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/summarize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt_number: promptNumber }),
signal: AbortSignal.timeout(2000)
});
if (!response.ok) {
const errorText = await response.text();
logger.failure('HOOK', 'Failed to generate summary', {
sessionId: sessionDbId,
status: response.status
}, errorText);
throw new Error(`Failed to request summary from worker: ${response.status} ${errorText}`);
if (!response.ok) {
const errorText = await response.text();
logger.failure('HOOK', 'Failed to generate summary', {
sessionId: sessionDbId,
status: response.status
}, errorText);
throw new Error(`Failed to request summary from worker: ${response.status} ${errorText}`);
}
logger.debug('HOOK', 'Summary request sent successfully', { sessionId: sessionDbId });
} catch (error: any) {
// Only show restart message for connection errors, not HTTP errors
if (error.cause?.code === 'ECONNREFUSED' || error.name === 'TimeoutError' || error.message.includes('fetch failed')) {
throw new Error("There's a problem with the worker. If you just updated, type `pm2 restart claude-mem-worker` in your terminal to continue");
}
// Re-throw HTTP errors and other errors as-is
throw error;
}
logger.debug('HOOK', 'Summary request sent successfully', { sessionId: sessionDbId });
console.log(createHookResponse('Stop', true));
}
+55 -66
View File
@@ -19,14 +19,25 @@ const MODEL = process.env.CLAUDE_MEM_MODEL || 'claude-sonnet-4-5';
const DISALLOWED_TOOLS = ['Glob', 'Grep', 'ListMcpResourcesTool', 'WebSearch'];
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
/**
* Cached Claude executable path
*/
let cachedClaudePath: string | null = null;
/**
* Find Claude Code executable path using which (Unix/Mac) or where (Windows)
* Cached after first call
*/
function findClaudePath(): string {
if (cachedClaudePath) {
return cachedClaudePath;
}
try {
// Try environment variable first
if (process.env.CLAUDE_CODE_PATH) {
return process.env.CLAUDE_CODE_PATH;
cachedClaudePath = process.env.CLAUDE_CODE_PATH;
return cachedClaudePath;
}
// Use which on Unix/Mac, where on Windows
@@ -41,7 +52,8 @@ function findClaudePath(): string {
}
logger.info('SYSTEM', `Found Claude executable: ${path}`);
return path;
cachedClaudePath = path;
return cachedClaudePath;
} catch (error: any) {
logger.failure('SYSTEM', 'Failed to find Claude executable', {}, error);
throw new Error('Claude Code executable not found. Please ensure claude is in your PATH or set CLAUDE_CODE_PATH environment variable.');
@@ -76,24 +88,19 @@ interface ActiveSession {
abortController: AbortController;
generatorPromise: Promise<void> | null;
lastPromptNumber: number; // Track which prompt_number we last sent to SDK
observationCounter: number; // Counter for correlation IDs
startTime: number; // Session start timestamp
}
class WorkerService {
private app: express.Application;
private port: number | null = null;
private port: number = FIXED_PORT;
private sessions: Map<number, ActiveSession> = new Map();
private chromaSync: ChromaSync;
private chromaSync!: ChromaSync;
constructor() {
this.app = express();
this.app.use(express.json({ limit: '50mb' }));
// Initialize ChromaSync (fail fast if Chroma unavailable)
this.chromaSync = new ChromaSync('claude-mem');
logger.info('SYSTEM', 'ChromaSync initialized');
// Health check
this.app.get('/health', this.handleHealth.bind(this));
@@ -106,7 +113,17 @@ class WorkerService {
}
async start(): Promise<void> {
this.port = FIXED_PORT;
// Start HTTP server FIRST - nothing else matters until we can respond
await new Promise<void>((resolve, reject) => {
this.app.listen(FIXED_PORT, () => resolve())
.on('error', reject);
});
logger.info('SYSTEM', 'Worker started', { port: FIXED_PORT, pid: process.pid });
// Initialize ChromaSync after HTTP is ready
this.chromaSync = new ChromaSync('claude-mem');
logger.info('SYSTEM', 'ChromaSync initialized');
// Clean up orphaned sessions from previous worker instances
const db = new SessionStore();
@@ -117,41 +134,23 @@ class WorkerService {
logger.info('SYSTEM', `Cleaned up ${cleanedCount} orphaned sessions`);
}
// Backfill Chroma with any missing observations/summaries (blocking)
logger.info('SYSTEM', 'Starting Chroma backfill...');
try {
await this.chromaSync.ensureBackfilled();
logger.info('SYSTEM', 'Chroma backfill complete');
} catch (error) {
logger.error('SYSTEM', 'Chroma backfill failed - worker cannot start', {}, error as Error);
throw error;
}
return new Promise((resolve, reject) => {
this.app.listen(FIXED_PORT, '127.0.0.1', () => {
logger.info('SYSTEM', `Worker started`, { port: FIXED_PORT, pid: process.pid, activeSessions: this.sessions.size });
resolve();
}).on('error', (err: any) => {
if (err.code === 'EADDRINUSE') {
logger.error('SYSTEM', `Port ${FIXED_PORT} already in use - worker may already be running`);
}
reject(err);
// Backfill Chroma in background (non-blocking, non-critical)
logger.info('SYSTEM', 'Starting Chroma backfill in background...');
this.chromaSync.ensureBackfilled()
.then(() => {
logger.info('SYSTEM', 'Chroma backfill complete');
})
.catch((error: Error) => {
logger.error('SYSTEM', 'Chroma backfill failed - continuing anyway', {}, error);
// Don't exit - allow worker to continue serving requests
});
});
}
/**
* GET /health
*/
private handleHealth(req: Request, res: Response): void {
res.json({
status: 'ok',
port: this.port,
pid: process.pid,
activeSessions: this.sessions.size,
uptime: process.uptime(),
memory: process.memoryUsage()
});
private handleHealth(_req: Request, res: Response): void {
res.json({ status: 'ok' });
}
/**
@@ -162,8 +161,7 @@ class WorkerService {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const { project, userPrompt } = req.body;
const correlationId = logger.sessionId(sessionDbId);
logger.info('WORKER', 'Session init', { correlationId, project });
logger.info('WORKER', 'Session init', { sessionDbId, project });
// Fetch real Claude Code session ID from database
const db = new SessionStore();
@@ -187,7 +185,6 @@ class WorkerService {
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
observationCounter: 0,
startTime: Date.now()
};
@@ -221,8 +218,8 @@ class WorkerService {
latestPrompt.prompt_number,
latestPrompt.created_at_epoch
).catch(err => {
logger.failure('WORKER', 'Failed to sync user_prompt to Chroma', { promptId: latestPrompt.id }, err);
process.exit(1); // Fail fast - Chroma sync is critical
logger.failure('WORKER', 'Failed to sync user_prompt to Chroma - continuing', { promptId: latestPrompt.id }, err);
// Don't crash - SQLite has the data
});
}
@@ -268,7 +265,6 @@ class WorkerService {
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
observationCounter: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
@@ -283,13 +279,10 @@ class WorkerService {
});
}
// Create correlation ID for tracking this observation
session.observationCounter++;
const correlationId = logger.correlationId(sessionDbId, session.observationCounter);
const toolStr = logger.formatTool(tool_name, tool_input);
logger.dataIn('WORKER', `Observation queued: ${toolStr}`, {
correlationId,
sessionId: sessionDbId,
queue: session.pendingMessages.length + 1
});
@@ -329,7 +322,6 @@ class WorkerService {
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
observationCounter: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
@@ -559,14 +551,13 @@ class WorkerService {
});
const toolStr = logger.formatTool(message.tool_name, message.tool_input);
const correlationId = logger.correlationId(session.sessionDbId, session.observationCounter);
logger.dataIn('SDK', `Observation prompt: ${toolStr}`, {
correlationId,
sessionId: session.sessionDbId,
promptNumber: message.prompt_number,
size: `${observationPrompt.length} chars`
});
logger.debug('SDK', 'Full observation prompt', { correlationId }, observationPrompt);
logger.debug('SDK', 'Full observation prompt', { sessionId: session.sessionDbId }, observationPrompt);
yield {
type: 'user',
@@ -587,8 +578,6 @@ class WorkerService {
* Gets prompt_number from the message that triggered this response
*/
private handleAgentMessage(session: ActiveSession, content: string, promptNumber: number): void {
const correlationId = logger.correlationId(session.sessionDbId, session.observationCounter);
// Always log what we received for debugging
logger.info('PARSER', `Processing response (${content.length} chars)`, {
sessionId: session.sessionDbId,
@@ -597,11 +586,11 @@ class WorkerService {
});
// Parse observations
const observations = parseObservations(content, correlationId);
const observations = parseObservations(content);
if (observations.length > 0) {
logger.info('PARSER', `Parsed ${observations.length} observation(s)`, {
correlationId,
sessionId: session.sessionDbId,
promptNumber,
types: observations.map(o => o.type).join(', ')
});
@@ -613,7 +602,7 @@ class WorkerService {
for (const obs of observations) {
const { id, createdAtEpoch } = db.storeObservation(session.claudeSessionId, session.project, obs, promptNumber);
logger.success('DB', 'Observation stored', {
correlationId,
sessionId: session.sessionDbId,
type: obs.type,
title: obs.title,
id
@@ -628,16 +617,16 @@ class WorkerService {
promptNumber,
createdAtEpoch
).then(() => {
logger.success('CHROMA', 'Observation synced', {
correlationId,
logger.success('WORKER', 'Observation synced to Chroma', {
sessionId: session.sessionDbId,
observationId: id
});
}).catch((error: Error) => {
logger.error('CHROMA', 'Observation sync failed - crashing worker', {
correlationId,
logger.error('WORKER', 'Observation sync failed - continuing', {
sessionId: session.sessionDbId,
observationId: id
}, error);
process.exit(1); // Fail fast - no fallbacks
// Don't crash - SQLite has the data
});
}
@@ -667,16 +656,16 @@ class WorkerService {
promptNumber,
createdAtEpoch
).then(() => {
logger.success('CHROMA', 'Summary synced', {
logger.success('WORKER', 'Summary synced to Chroma', {
sessionId: session.sessionDbId,
summaryId: id
});
}).catch((error: Error) => {
logger.error('CHROMA', 'Summary sync failed - crashing worker', {
logger.error('WORKER', 'Summary sync failed - continuing', {
sessionId: session.sessionDbId,
summaryId: id
}, error);
process.exit(1); // Fail fast - no fallbacks
// Don't crash - SQLite has the data
});
} else {
logger.warn('PARSER', 'NO SUMMARY TAGS FOUND in response', {
+11 -94
View File
@@ -1,106 +1,23 @@
import path from 'path';
import { existsSync } from 'fs';
import { spawn } from 'child_process';
import { getPackageRoot } from './paths.js';
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
const HEALTH_CHECK_URL = `http://127.0.0.1:${FIXED_PORT}/health`;
/**
* Check if worker is responding by hitting health endpoint
* Ensure worker service is running
* Just starts PM2 - no health checks, no retries, no delays
* PM2 handles the rest (already running = no-op, failures = exit code)
*/
async function checkWorkerHealth(): Promise<boolean> {
try {
const response = await fetch(HEALTH_CHECK_URL, {
signal: AbortSignal.timeout(500)
});
return response.ok;
} catch {
return false;
}
}
export function ensureWorkerRunning(): void {
const packageRoot = getPackageRoot();
const pm2Path = path.join(packageRoot, 'node_modules', '.bin', 'pm2');
const ecosystemPath = path.join(packageRoot, 'ecosystem.config.cjs');
/**
* Ensure worker service is running with retry logic
* Auto-starts worker if not running (v4.0.0 feature)
*
* @returns true if worker is responding, false if failed to start
*/
export async function ensureWorkerRunning(): Promise<boolean> {
try {
// Check if worker is already responding
if (await checkWorkerHealth()) {
return true;
}
console.error('[claude-mem] Worker not responding, starting...');
// Find worker service path
const packageRoot = getPackageRoot();
const workerPath = path.join(packageRoot, 'plugin', 'scripts', 'worker-service.cjs');
if (!existsSync(workerPath)) {
console.error(`[claude-mem] Worker service not found at ${workerPath}`);
return false;
}
// Start worker with PM2 (bundled dependency)
const ecosystemPath = path.join(packageRoot, 'ecosystem.config.cjs');
const pm2Path = path.join(packageRoot, 'node_modules', '.bin', 'pm2');
// Fail loudly if bundled pm2 is missing
if (!existsSync(pm2Path)) {
throw new Error(
`PM2 binary not found at ${pm2Path}. ` +
`This is a bundled dependency - try running: npm install`
);
}
if (!existsSync(ecosystemPath)) {
throw new Error(
`PM2 ecosystem config not found at ${ecosystemPath}. ` +
`Plugin installation may be corrupted.`
);
}
// Spawn worker with PM2
const proc = spawn(pm2Path, ['start', ecosystemPath], {
detached: true,
stdio: 'ignore',
cwd: packageRoot
});
// Fail loudly on spawn errors
proc.on('error', (err) => {
throw new Error(`Failed to spawn PM2: ${err.message}`);
});
proc.unref();
console.error('[claude-mem] Worker started with PM2');
// Wait for worker to become healthy (retry 3 times with 500ms delay)
for (let i = 0; i < 3; i++) {
await new Promise(resolve => setTimeout(resolve, 500));
if (await checkWorkerHealth()) {
console.error('[claude-mem] Worker is healthy');
return true;
}
}
console.error('[claude-mem] Worker failed to become healthy after startup');
return false;
} catch (error: any) {
console.error(`[claude-mem] Failed to start worker: ${error.message}`);
return false;
}
}
/**
* Check if worker is currently running
*/
export async function isWorkerRunning(): Promise<boolean> {
return checkWorkerHealth();
spawn(pm2Path, ['start', ecosystemPath], {
cwd: packageRoot,
stdio: 'inherit'
});
}
/**