refactor: Complete rewrite of worker-utils.ts and cleanup of worker-service.ts

- Removed fragile PM2 string parsing and replaced with direct PM2 restart logic.
- Eliminated silent error handling in worker-utils.ts for better error visibility.
- Extracted duplicated session auto-creation logic into a new helper method getOrCreateSession() in worker-service.ts.
- Centralized configuration values and replaced magic numbers with named constants.
- Updated health check logic to ensure worker is restarted if unhealthy.
- Removed unnecessary getWorkerPort() wrapper function.
- Improved overall code quality and maintainability by applying DRY and YAGNI principles.
This commit is contained in:
Alex Newman
2025-11-06 22:00:07 -05:00
parent f8dc7f940f
commit 3030f518b5
18 changed files with 2454 additions and 346 deletions
+2 -1
View File
@@ -5,6 +5,7 @@
import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { getWorkerPort } from '../shared/worker-utils.js';
export interface SessionEndInput {
session_id: string;
@@ -71,7 +72,7 @@ async function cleanupHook(input?: SessionEndInput): Promise<void> {
// Tell worker to stop spinner
try {
const workerPort = session.worker_port || 37777;
const workerPort = session.worker_port || getWorkerPort();
await fetch(`http://127.0.0.1:${workerPort}/sessions/${session.id}/complete`, {
method: 'POST',
signal: AbortSignal.timeout(1000)
+3 -4
View File
@@ -7,7 +7,7 @@ import path from 'path';
import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
import { ensureWorkerRunning, getWorkerPort } from '../shared/worker-utils.js';
export interface UserPromptSubmitInput {
session_id: string;
@@ -43,12 +43,11 @@ async function newHook(input?: UserPromptSubmitInput): Promise<void> {
db.close();
// Use fixed worker port
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
const port = getWorkerPort();
try {
// Initialize session via HTTP
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/init`, {
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/init`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ project, userPrompt: prompt }),
+4 -5
View File
@@ -7,7 +7,7 @@ import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { logger } from '../utils/logger.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
import { ensureWorkerRunning, getWorkerPort } from '../shared/worker-utils.js';
export interface PostToolUseInput {
session_id: string;
@@ -50,16 +50,15 @@ async function saveHook(input?: PostToolUseInput): Promise<void> {
const toolStr = logger.formatTool(tool_name, tool_input);
// Use fixed worker port
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
const port = getWorkerPort();
logger.dataIn('HOOK', `PostToolUse: ${toolStr}`, {
sessionId: sessionDbId,
workerPort: FIXED_PORT
workerPort: port
});
try {
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/observations`, {
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/observations`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
+4 -5
View File
@@ -7,7 +7,7 @@ import { stdin } from 'process';
import { SessionStore } from '../services/sqlite/SessionStore.js';
import { createHookResponse } from './hook-response.js';
import { logger } from '../utils/logger.js';
import { ensureWorkerRunning } from '../shared/worker-utils.js';
import { ensureWorkerRunning, getWorkerPort } from '../shared/worker-utils.js';
export interface StopInput {
session_id: string;
@@ -35,17 +35,16 @@ async function summaryHook(input?: StopInput): Promise<void> {
const promptNumber = db.getPromptCounter(sessionDbId);
db.close();
// Use fixed worker port
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
const port = getWorkerPort();
logger.dataIn('HOOK', 'Stop: Requesting summary', {
sessionId: sessionDbId,
workerPort: FIXED_PORT,
workerPort: port,
promptNumber
});
try {
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/sessions/${sessionDbId}/summarize`, {
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/summarize`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt_number: promptNumber }),
+3 -1
View File
@@ -10,6 +10,7 @@ import { execSync } from "child_process";
import { join } from "path";
import { homedir } from "os";
import { existsSync } from "fs";
import { getWorkerPort } from "../shared/worker-utils.js";
// Check if node_modules exists - if not, this is first run
const pluginDir = join(homedir(), '.claude', 'plugins', 'marketplaces', 'thedotmack');
@@ -46,11 +47,12 @@ try {
encoding: 'utf8'
});
const port = getWorkerPort();
console.error(
"\n\n📝 Claude-Mem Context Loaded\n" +
" ️ Note: This appears as stderr but is informational only\n\n" +
output +
"\n\n📺 Watch live in browser http://localhost:37777/ (New! v5.1)\n"
`\n\n📺 Watch live in browser http://localhost:${port}/ (New! v5.1)\n`
);
} catch (error) {
+66 -120
View File
@@ -16,12 +16,18 @@ import { ensureAllDataDirs } from '../shared/paths.js';
import { execSync } from 'child_process';
import { readFileSync, writeFileSync, existsSync, statSync } from 'fs';
import { join, dirname } from 'path';
import { fileURLToPath } from 'url';
import { homedir } from 'os';
import { fileURLToPath } from 'url';
import { getWorkerPort } from '../shared/worker-utils.js';
// Read version from package.json (works in both ESM and CJS after bundling)
const packageJson = JSON.parse(readFileSync(join(__dirname, '..', '..', 'package.json'), 'utf-8'));
const VERSION = packageJson.version;
const MODEL = process.env.CLAUDE_MEM_MODEL || 'claude-sonnet-4-5';
const DISALLOWED_TOOLS = ['Glob', 'Grep', 'ListMcpResourcesTool', 'WebSearch'];
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
const MESSAGE_POLL_INTERVAL_MS = 100;
const MAX_REQUEST_SIZE = '50mb';
/**
* Cached Claude executable path
@@ -97,7 +103,6 @@ interface ActiveSession {
class WorkerService {
private app: express.Application;
private port: number = FIXED_PORT;
private sessions: Map<number, ActiveSession> = new Map();
private chromaSync!: ChromaSync;
private sseClients: Set<Response> = new Set();
@@ -106,7 +111,7 @@ class WorkerService {
constructor() {
this.app = express();
this.app.use(express.json({ limit: '50mb' }));
this.app.use(express.json({ limit: MAX_REQUEST_SIZE }));
// Serve static files for web UI (viewer-bundle.js, logos, etc.)
const uiDir = this.getUIDirectory();
@@ -140,12 +145,13 @@ class WorkerService {
async start(): Promise<void> {
// Start HTTP server FIRST - nothing else matters until we can respond
const port = getWorkerPort();
await new Promise<void>((resolve, reject) => {
this.app.listen(FIXED_PORT, () => resolve())
this.app.listen(port, () => resolve())
.on('error', reject);
});
logger.info('SYSTEM', 'Worker started', { port: FIXED_PORT, pid: process.pid });
logger.info('SYSTEM', 'Worker started', { port, pid: process.pid });
// Initialize ChromaSync after HTTP is ready
this.chromaSync = new ChromaSync('claude-mem');
@@ -188,6 +194,48 @@ class WorkerService {
return join(scriptDir, '..', 'ui');
}
/**
* Get or create session state
* Consolidates session lookup/creation logic used by init, observation, and summarize handlers
*/
private getOrCreateSession(sessionDbId: number): ActiveSession {
let session = this.sessions.get(sessionDbId);
if (session) return session;
const db = new SessionStore();
const dbSession = db.getSessionById(sessionDbId);
if (!dbSession) {
db.close();
throw new Error(`Session ${sessionDbId} not found in database`);
}
session = {
sessionDbId,
claudeSessionId: dbSession.claude_session_id,
sdkSessionId: null,
project: dbSession.project,
userPrompt: dbSession.user_prompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
session.generatorPromise = this.runSDKAgent(session).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
const db = new SessionStore();
db.markSessionFailed(sessionDbId);
db.close();
this.sessions.delete(sessionDbId);
});
db.close();
return session;
}
/**
* GET /health
*/
@@ -340,15 +388,14 @@ class WorkerService {
// Get worker stats
const uptime = process.uptime();
const version = process.env.npm_package_version || '5.0.3'; // fallback to current version
res.json({
worker: {
version,
version: VERSION,
uptime: Math.floor(uptime),
activeSessions: this.sessions.size,
sseClients: this.sseClients.size,
port: this.port
port: getWorkerPort()
},
database: {
path: dbPath,
@@ -403,13 +450,7 @@ class WorkerService {
try {
const { CLAUDE_MEM_MODEL, CLAUDE_MEM_CONTEXT_OBSERVATIONS, CLAUDE_MEM_WORKER_PORT } = req.body;
// Validate inputs
const validModels = ['claude-haiku-4-5', 'claude-sonnet-4-5', 'claude-opus-4'];
if (CLAUDE_MEM_MODEL && !validModels.includes(CLAUDE_MEM_MODEL)) {
res.status(400).json({ success: false, error: `Invalid model name: ${CLAUDE_MEM_MODEL}` });
return;
}
// Validate inputs (SDK will handle model validation)
if (CLAUDE_MEM_CONTEXT_OBSERVATIONS) {
const obsCount = parseInt(CLAUDE_MEM_CONTEXT_OBSERVATIONS, 10);
if (isNaN(obsCount) || obsCount < 1 || obsCount > 200) {
@@ -648,35 +689,12 @@ class WorkerService {
logger.info('WORKER', 'Session init', { sessionDbId, project });
// Fetch real Claude Code session ID from database
const db = new SessionStore();
const dbSession = db.getSessionById(sessionDbId);
if (!dbSession) {
db.close();
res.status(404).json({ error: 'Session not found in database' });
return;
}
const claudeSessionId = dbSession.claude_session_id;
// Create session state
const session: ActiveSession = {
sessionDbId,
claudeSessionId,
sdkSessionId: null,
project,
userPrompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
const session = this.getOrCreateSession(sessionDbId);
const claudeSessionId = session.claudeSessionId;
// Update port in database
db.setWorkerPort(sessionDbId, this.port!);
const db = new SessionStore();
db.setWorkerPort(sessionDbId, getWorkerPort());
// Get the latest user_prompt for this session to sync to Chroma
const latestPrompt = db.db.prepare(`
@@ -723,23 +741,14 @@ class WorkerService {
});
}
// Start SDK agent in background
session.generatorPromise = this.runSDKAgent(session).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
const db = new SessionStore();
db.markSessionFailed(sessionDbId);
db.close();
this.sessions.delete(sessionDbId);
});
// Start processing indicator (user submitted prompt)
this.broadcastProcessingStatus(true);
logger.success('WORKER', 'Session initialized', { sessionId: sessionDbId, port: this.port });
logger.success('WORKER', 'Session initialized', { sessionId: sessionDbId, port: getWorkerPort() });
res.json({
status: 'initialized',
sessionDbId,
port: this.port
port: getWorkerPort()
});
}
@@ -751,39 +760,7 @@ class WorkerService {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const { tool_name, tool_input, tool_output, prompt_number } = req.body;
let session = this.sessions.get(sessionDbId);
if (!session) {
// Auto-create session if not in memory (worker restart, etc.)
// Sessions are organizational metadata - observations are first-class data in vector store
// Session ID comes from Claude Code hooks (guaranteed valid)
const db = new SessionStore();
const dbSession = db.getSessionById(sessionDbId);
db.close();
session = {
sessionDbId,
claudeSessionId: dbSession!.claude_session_id,
sdkSessionId: null,
project: dbSession!.project,
userPrompt: dbSession!.user_prompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
// Start SDK agent in background
session.generatorPromise = this.runSDKAgent(session).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
const db = new SessionStore();
db.markSessionFailed(sessionDbId);
db.close();
this.sessions.delete(sessionDbId);
});
}
const session = this.getOrCreateSession(sessionDbId);
const toolStr = logger.formatTool(tool_name, tool_input);
logger.dataIn('WORKER', `Observation queued: ${toolStr}`, {
@@ -810,38 +787,7 @@ class WorkerService {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const { prompt_number } = req.body;
let session = this.sessions.get(sessionDbId);
if (!session) {
// Auto-create session if not in memory (worker restart, etc.)
// Sessions are organizational metadata - observations are first-class data in vector store
// Session ID comes from Claude Code hooks (guaranteed valid)
const db = new SessionStore();
const dbSession = db.getSessionById(sessionDbId);
db.close();
session = {
sessionDbId,
claudeSessionId: dbSession!.claude_session_id,
sdkSessionId: null,
project: dbSession!.project,
userPrompt: dbSession!.user_prompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
// Start SDK agent in background
session.generatorPromise = this.runSDKAgent(session).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
const db = new SessionStore();
db.markSessionFailed(sessionDbId);
db.close();
this.sessions.delete(sessionDbId);
});
}
const session = this.getOrCreateSession(sessionDbId);
logger.dataIn('WORKER', 'Summary requested', {
sessionId: sessionDbId,
@@ -994,7 +940,7 @@ class WorkerService {
}
if (session.pendingMessages.length === 0) {
await new Promise(resolve => setTimeout(resolve, 100));
await new Promise(resolve => setTimeout(resolve, MESSAGE_POLL_INTERVAL_MS));
continue;
}
+40 -68
View File
@@ -1,16 +1,40 @@
import path from "path";
import { spawn } from "child_process";
import { homedir } from "os";
import { existsSync, readFileSync } from "fs";
import { execSync } from "child_process";
import { getPackageRoot } from "./paths.js";
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || "37777", 10);
// Named constants for health checks
const HEALTH_CHECK_TIMEOUT_MS = 100;
const HEALTH_CHECK_POLL_INTERVAL_MS = 100;
const HEALTH_CHECK_MAX_WAIT_MS = 10000;
/**
* Get the worker port number
* Priority: ~/.claude-mem/settings.json > env var > default
*/
export function getWorkerPort(): number {
try {
const settingsPath = path.join(homedir(), '.claude-mem', 'settings.json');
if (existsSync(settingsPath)) {
const settings = JSON.parse(readFileSync(settingsPath, 'utf-8'));
const port = parseInt(settings.env?.CLAUDE_MEM_WORKER_PORT, 10);
if (!isNaN(port)) return port;
}
} catch {
// Fall through to env var or default
}
return parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
}
/**
* Check if worker is responsive by trying the health endpoint
*/
async function isWorkerHealthy(timeoutMs: number = 100): Promise<boolean> {
async function isWorkerHealthy(): Promise<boolean> {
try {
const response = await fetch(`http://127.0.0.1:${FIXED_PORT}/health`, {
signal: AbortSignal.timeout(timeoutMs)
const port = getWorkerPort();
const response = await fetch(`http://127.0.0.1:${port}/health`, {
signal: AbortSignal.timeout(HEALTH_CHECK_TIMEOUT_MS)
});
return response.ok;
} catch {
@@ -21,89 +45,37 @@ async function isWorkerHealthy(timeoutMs: number = 100): Promise<boolean> {
/**
* Wait for worker to become healthy
*/
async function waitForWorkerHealth(maxWaitMs: number = 10000): Promise<boolean> {
async function waitForWorkerHealth(): Promise<boolean> {
const start = Date.now();
const checkInterval = 100; // Check every 100ms
while (Date.now() - start < maxWaitMs) {
if (await isWorkerHealthy(1000)) {
while (Date.now() - start < HEALTH_CHECK_MAX_WAIT_MS) {
if (await isWorkerHealthy()) {
return true;
}
// Wait before next check
await new Promise(resolve => setTimeout(resolve, checkInterval));
await new Promise(resolve => setTimeout(resolve, HEALTH_CHECK_POLL_INTERVAL_MS));
}
return false;
}
/**
* Ensure worker service is running
* Checks if worker is already running before attempting to start
* This prevents unnecessary restarts that could interrupt mid-action processing
* If unhealthy, restarts PM2 and waits for health
*/
export async function ensureWorkerRunning(): Promise<void> {
// First, check if worker is already healthy
if (await isWorkerHealthy()) {
return; // Worker is already running and responsive
return;
}
const packageRoot = getPackageRoot();
const pm2Path = path.join(packageRoot, "node_modules", ".bin", "pm2");
const ecosystemPath = path.join(packageRoot, "ecosystem.config.cjs");
// Check PM2 status to see if worker process exists
const checkProcess = spawn(pm2Path, ["list", "--no-color"], {
execSync(`"${pm2Path}" restart "${ecosystemPath}"`, {
cwd: packageRoot,
stdio: ["ignore", "pipe", "ignore"],
stdio: 'pipe'
});
let output = "";
checkProcess.stdout?.on("data", (data) => {
output += data.toString();
});
// Wait for PM2 list to complete
await new Promise<void>((resolve, reject) => {
checkProcess.on("error", (error) => reject(error));
checkProcess.on("close", (code) => {
// PM2 list can fail, but we should still continue - just assume worker isn't running
// This handles cases where PM2 isn't installed yet
resolve();
});
});
// Check if 'claude-mem-worker' is in the PM2 list output and is 'online'
const isRunning = output.includes("claude-mem-worker") && output.includes("online");
if (!isRunning) {
// Start the worker
const startProcess = spawn(pm2Path, ["start", ecosystemPath], {
cwd: packageRoot,
stdio: "ignore",
});
// Wait for PM2 start command to complete
await new Promise<void>((resolve, reject) => {
startProcess.on("error", (error) => reject(error));
startProcess.on("close", (code) => {
if (code !== 0 && code !== null) {
reject(new Error(`PM2 start command failed with exit code ${code}`));
} else {
resolve();
}
});
});
}
// Wait for worker to become healthy (either just started or was starting)
const healthy = await waitForWorkerHealth(10000);
if (!healthy) {
throw new Error("Worker failed to become healthy after starting");
if (!await waitForWorkerHealth()) {
throw new Error("Worker failed to become healthy after restart");
}
}
/**
* Get the worker port number (fixed port)
*/
export function getWorkerPort(): number {
return FIXED_PORT;
}