refactor(worker): Remove file-based locking and improve Windows stability

This commit simplifies worker startup coordination and addresses Windows-specific issues:

**Lock Removal**:
- Removed entire file-based locking system (~100 lines)
- Replaced with health-check-first approach
- Port binding provides natural mutual exclusion - multiple spawns fail cleanly

**Windows Stability**:
- Removed all AbortSignal.timeout() calls to reduce Bun libuv assertion errors
- Added 500ms shutdown delays on Windows to prevent zombie ports
- Worker service has its own timeouts, so client-side timeouts are redundant

**Package.json Updates**:
- Updated worker scripts to use worker-service.cjs directly
- Removed references to deleted worker-cli.js and worker-wrapper.cjs

**Key Changes**:
- src/services/worker-service.ts: Lock removal, shutdown delays, simplified start logic
- src/hooks/*.ts: Removed AbortSignal.timeout from all HTTP requests
- src/shared/worker-utils.ts: Removed AbortSignal.timeout from health checks
- package.json: Updated worker:* scripts

Resolves startup hangs, reduces assertion errors, and prevents zombie port issues.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Erik M Jacobs
2025-12-29 12:11:04 -05:00
parent 0330b4d37e
commit 3ea180c1ef
16 changed files with 1041 additions and 554 deletions
+3 -1
View File
@@ -29,7 +29,9 @@ async function contextHook(input?: SessionStartInput): Promise<string> {
const url = `http://127.0.0.1:${port}/api/context/inject?project=${encodeURIComponent(project)}`;
const response = await fetch(url, { signal: AbortSignal.timeout(HOOK_TIMEOUTS.DEFAULT) });
// Note: Removed AbortSignal.timeout due to Windows Bun cleanup issue (libuv assertion)
// Worker service has its own timeouts, so client-side timeout is redundant
const response = await fetch(url);
if (!response.ok) {
throw new Error(`Context generation failed: ${response.status}`);
+4 -4
View File
@@ -39,8 +39,8 @@ async function newHook(input?: UserPromptSubmitInput): Promise<void> {
contentSessionId: session_id,
project,
prompt
}),
signal: AbortSignal.timeout(5000)
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!initResponse.ok) {
@@ -72,8 +72,8 @@ async function newHook(input?: UserPromptSubmitInput): Promise<void> {
const response = await fetch(`http://127.0.0.1:${port}/sessions/${sessionDbId}/init`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ userPrompt: cleanedPrompt, promptNumber }),
signal: AbortSignal.timeout(5000)
body: JSON.stringify({ userPrompt: cleanedPrompt, promptNumber })
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {
+2 -2
View File
@@ -56,8 +56,8 @@ async function saveHook(input?: PostToolUseInput): Promise<void> {
tool_input,
tool_response,
cwd
}),
signal: AbortSignal.timeout(HOOK_TIMEOUTS.DEFAULT)
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {
+2 -2
View File
@@ -60,8 +60,8 @@ async function summaryHook(input?: StopInput): Promise<void> {
contentSessionId: session_id,
last_user_message: lastUserMessage,
last_assistant_message: lastAssistantMessage
}),
signal: AbortSignal.timeout(HOOK_TIMEOUTS.DEFAULT)
})
// Note: Removed signal to avoid Windows Bun cleanup issue (libuv assertion)
});
if (!response.ok) {
+2 -1
View File
@@ -18,9 +18,10 @@ const port = getWorkerPort();
const project = basename(process.cwd());
// Fetch formatted context directly from worker API
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(
`http://127.0.0.1:${port}/api/context/inject?project=${encodeURIComponent(project)}&colors=true`,
{ method: 'GET', signal: AbortSignal.timeout(5000) }
{ method: 'GET' }
);
if (!response.ok) {
+122 -241
View File
@@ -56,105 +56,7 @@ function removePidFile(): void {
}
}
// Lockfile for CLI command mutual exclusion (prevents race conditions on Windows)
const LOCK_FILE = path.join(DATA_DIR, 'worker.lock');
const LOCK_STALE_MS = 120000; // Lock considered stale after 2 minutes
interface LockInfo {
pid: number;
command: string;
startedAt: string;
}
/**
* Clean up stale lock from crashed processes
*/
function cleanupStaleLock(): void {
try {
if (!existsSync(LOCK_FILE)) return;
const lockData = readFileSync(LOCK_FILE, 'utf-8');
const lockInfo: LockInfo = JSON.parse(lockData);
const lockAge = Date.now() - new Date(lockInfo.startedAt).getTime();
if (lockAge > LOCK_STALE_MS) {
logger.warn('SYSTEM', 'Removing stale lock', {
lockAge: Math.round(lockAge / 1000) + 's',
originalPid: lockInfo.pid,
originalCommand: lockInfo.command
});
unlinkSync(LOCK_FILE);
}
} catch {
// If we can't read the lock, it's likely corrupted - remove it
try { unlinkSync(LOCK_FILE); } catch {}
}
}
/**
* Acquire exclusive lock for worker operations
* Uses atomic file creation (O_EXCL) for cross-process safety
*/
function acquireLock(command: string): boolean {
mkdirSync(DATA_DIR, { recursive: true });
cleanupStaleLock();
const lockInfo: LockInfo = {
pid: process.pid,
command,
startedAt: new Date().toISOString()
};
let retries = 3;
while (retries > 0) {
try {
// O_EXCL ensures atomic creation - fails if file exists
const fd = fs.openSync(LOCK_FILE, fs.constants.O_CREAT | fs.constants.O_EXCL | fs.constants.O_WRONLY);
fs.writeSync(fd, JSON.stringify(lockInfo, null, 2));
fs.closeSync(fd);
return true;
} catch (error: unknown) {
if ((error as NodeJS.ErrnoException).code === 'EEXIST') {
return false;
}
// Retry on ENOENT (can happen on Windows if file/dir state is in flux)
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
retries--;
if (retries === 0) {
logger.warn('SYSTEM', 'Lock acquisition error (ENOENT)', { error: (error as Error).message });
return false;
}
// Ensure directory exists and try again
try { mkdirSync(DATA_DIR, { recursive: true }); } catch {}
continue;
}
logger.warn('SYSTEM', 'Lock acquisition error', { error: (error as Error).message });
return false;
}
}
return false;
}
/**
* Release lock file
*/
function releaseLock(): void {
try {
if (existsSync(LOCK_FILE)) unlinkSync(LOCK_FILE);
} catch (error) {
logger.warn('SYSTEM', 'Lock release error', { error: (error as Error).message });
}
}
/**
* Wait for lock with timeout
*/
async function waitForLock(command: string, timeoutMs: number): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
if (acquireLock(command)) return true;
await new Promise(r => setTimeout(r, 200));
}
return false;
}
// No lock file needed - health checks and port binding provide coordination
/**
* Get platform-adjusted timeout (Windows socket cleanup is slower)
@@ -166,9 +68,8 @@ function getPlatformTimeout(baseMs: number): number {
async function isPortInUse(port: number): Promise<boolean> {
try {
const response = await fetch(`http://127.0.0.1:${port}/api/health`, {
signal: AbortSignal.timeout(2000)
});
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/health`);
return response.ok;
} catch { return false; }
}
@@ -177,9 +78,8 @@ async function waitForHealth(port: number, timeoutMs: number = 30000): Promise<b
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
const response = await fetch(`http://127.0.0.1:${port}/api/readiness`, {
signal: AbortSignal.timeout(2000)
});
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/readiness`);
if (response.ok) return true;
} catch {
// Not ready yet
@@ -191,9 +91,9 @@ async function waitForHealth(port: number, timeoutMs: number = 30000): Promise<b
async function httpShutdown(port: number): Promise<boolean> {
try {
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/admin/shutdown`, {
method: 'POST',
signal: AbortSignal.timeout(5000)
method: 'POST'
});
if (!response.ok) {
logger.warn('SYSTEM', 'Shutdown request returned error', { port, status: response.status });
@@ -876,11 +776,23 @@ export class WorkerService {
// STEP 2: Close HTTP server first
if (this.server) {
this.server.closeAllConnections();
// Give Windows time to close connections before closing server (prevents zombie ports)
if (process.platform === 'win32') {
await new Promise(r => setTimeout(r, 500));
}
await new Promise<void>((resolve, reject) => {
this.server!.close(err => err ? reject(err) : resolve());
});
this.server = null;
logger.info('SYSTEM', 'HTTP server closed');
// Extra delay on Windows to ensure port is fully released
if (process.platform === 'win32') {
await new Promise(r => setTimeout(r, 500));
logger.info('SYSTEM', 'Waited for Windows port cleanup');
}
}
// STEP 3: Shutdown active sessions
@@ -1031,159 +943,128 @@ async function main() {
switch (command) {
case 'start': {
// Acquire lock BEFORE checking port to prevent race condition
// If we can't get lock, another session is spawning - wait for health instead
if (!acquireLock('start')) {
logger.info('SYSTEM', 'Another session is spawning worker, waiting for health');
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
if (healthy) {
logger.info('SYSTEM', 'Worker healthy, returning success');
process.exit(0);
}
// Still not healthy after wait - try to acquire lock and spawn
const gotLock = await waitForLock('start', 5000);
if (!gotLock) {
logger.error('SYSTEM', 'Failed to acquire lock after timeout');
process.exit(1);
}
}
try {
// Re-check port AFTER acquiring lock
if (await isPortInUse(port)) {
releaseLock();
logger.info('SYSTEM', 'Port already in use, worker already running');
process.exit(0);
}
// Spawn self as daemon
const child = spawn(process.execPath, [__filename, '--daemon'], {
detached: true,
stdio: 'ignore',
windowsHide: true,
env: { ...process.env, CLAUDE_MEM_WORKER_PORT: String(port) }
});
if (child.pid === undefined) {
releaseLock();
logger.error('SYSTEM', 'Failed to spawn worker daemon');
process.exit(1);
}
child.unref();
// Write PID file
writePidFile({ pid: child.pid, port, startedAt: new Date().toISOString() });
// Wait for health with platform-adjusted timeout
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
releaseLock();
if (!healthy) {
removePidFile();
logger.error('SYSTEM', 'Worker failed to start');
process.exit(1);
}
logger.info('SYSTEM', 'Worker started successfully');
// Health-check-first approach: simple, fast, reliable
// Check if worker is already healthy
if (await waitForHealth(port, 1000)) {
logger.info('SYSTEM', 'Worker already running and healthy');
process.exit(0);
} catch (error) {
releaseLock();
throw error;
}
}
case 'stop': {
// Acquire lock for stop operation
if (!acquireLock('stop')) {
// Wait briefly for concurrent operation to complete
await new Promise(r => setTimeout(r, 2000));
}
try {
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
logger.warn('SYSTEM', 'Port did not free up after shutdown', { port });
// Could force kill here if we knew the PID, but for now just warn
}
removePidFile();
releaseLock();
logger.info('SYSTEM', 'Worker stopped successfully');
process.exit(0);
} catch (error) {
releaseLock();
throw error;
}
}
// Worker not healthy - check if port is in use
const portInUse = await isPortInUse(port);
case 'restart': {
// Acquire lock for restart operation
if (!acquireLock('restart')) {
// Another session is already restarting - wait for health
logger.info('SYSTEM', 'Another session is restarting worker, waiting');
const healthy = await waitForHealth(port, getPlatformTimeout(45000));
if (portInUse) {
// Port in use but not healthy - wait a bit longer in case it's starting up
logger.info('SYSTEM', 'Port in use, waiting for worker to become healthy');
const healthy = await waitForHealth(port, getPlatformTimeout(15000));
if (healthy) {
logger.info('SYSTEM', 'Worker healthy after restart');
logger.info('SYSTEM', 'Worker is now healthy');
process.exit(0);
}
logger.error('SYSTEM', 'Worker failed to restart (concurrent operation)');
logger.error('SYSTEM', 'Port in use but worker not responding to health checks');
process.exit(1);
}
try {
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
releaseLock();
logger.error('SYSTEM', 'Port did not free up after shutdown, aborting restart', { port });
process.exit(1);
}
removePidFile();
// Port not in use - spawn daemon
logger.info('SYSTEM', 'Starting worker daemon');
const child = spawn(process.execPath, [__filename, '--daemon'], {
detached: true,
stdio: 'ignore',
windowsHide: true,
env: { ...process.env, CLAUDE_MEM_WORKER_PORT: String(port) }
});
const child = spawn(process.execPath, [__filename, '--daemon'], {
detached: true,
stdio: 'ignore',
windowsHide: true,
env: { ...process.env, CLAUDE_MEM_WORKER_PORT: String(port) }
});
if (child.pid === undefined) {
releaseLock();
logger.error('SYSTEM', 'Failed to spawn worker daemon during restart');
process.exit(1);
}
child.unref();
writePidFile({ pid: child.pid, port, startedAt: new Date().toISOString() });
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
releaseLock();
if (!healthy) {
removePidFile();
logger.error('SYSTEM', 'Worker failed to restart');
process.exit(1);
}
logger.info('SYSTEM', 'Worker restarted successfully');
process.exit(0);
} catch (error) {
releaseLock();
throw error;
if (child.pid === undefined) {
logger.error('SYSTEM', 'Failed to spawn worker daemon');
process.exit(1);
}
child.unref();
// Write PID file
writePidFile({ pid: child.pid, port, startedAt: new Date().toISOString() });
// Wait for health with platform-adjusted timeout
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
if (!healthy) {
removePidFile();
logger.error('SYSTEM', 'Worker failed to start (health check timeout)');
process.exit(1);
}
logger.info('SYSTEM', 'Worker started successfully');
process.exit(0);
}
case 'stop': {
// Simple stop: send shutdown request, wait for port to free
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
logger.warn('SYSTEM', 'Port did not free up after shutdown', { port });
// Could force kill here if we knew the PID, but for now just warn
}
removePidFile();
logger.info('SYSTEM', 'Worker stopped successfully');
process.exit(0);
}
case 'restart': {
// Simple restart: stop, then start
logger.info('SYSTEM', 'Restarting worker');
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
logger.error('SYSTEM', 'Port did not free up after shutdown, aborting restart', { port });
process.exit(1);
}
removePidFile();
// Spawn new daemon
const child = spawn(process.execPath, [__filename, '--daemon'], {
detached: true,
stdio: 'ignore',
windowsHide: true,
env: { ...process.env, CLAUDE_MEM_WORKER_PORT: String(port) }
});
if (child.pid === undefined) {
logger.error('SYSTEM', 'Failed to spawn worker daemon during restart');
process.exit(1);
}
child.unref();
writePidFile({ pid: child.pid, port, startedAt: new Date().toISOString() });
// Wait for health
const healthy = await waitForHealth(port, getPlatformTimeout(30000));
if (!healthy) {
removePidFile();
logger.error('SYSTEM', 'Worker failed to restart');
process.exit(1);
}
logger.info('SYSTEM', 'Worker restarted successfully');
process.exit(0);
}
case 'status': {
const running = await isPortInUse(port);
const pidInfo = readPidFile();
if (running && pidInfo) {
logger.info('SYSTEM', `Worker running (PID: ${pidInfo.pid}, Port: ${pidInfo.port})`);
console.log('Worker is running');
console.log(` PID: ${pidInfo.pid}`);
console.log(` Port: ${pidInfo.port}`);
console.log(` Started: ${pidInfo.startedAt}`);
} else {
logger.info('SYSTEM', 'Worker not running');
console.log('Worker is not running');
}
process.exit(0);
}
+4 -6
View File
@@ -62,9 +62,8 @@ export function clearPortCache(): void {
*/
async function isWorkerHealthy(): Promise<boolean> {
const port = getWorkerPort();
const response = await fetch(`http://127.0.0.1:${port}/api/readiness`, {
signal: AbortSignal.timeout(HEALTH_CHECK_TIMEOUT_MS)
});
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/readiness`);
return response.ok;
}
@@ -82,9 +81,8 @@ function getPluginVersion(): string {
*/
async function getWorkerVersion(): Promise<string> {
const port = getWorkerPort();
const response = await fetch(`http://127.0.0.1:${port}/api/version`, {
signal: AbortSignal.timeout(HEALTH_CHECK_TIMEOUT_MS)
});
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
const response = await fetch(`http://127.0.0.1:${port}/api/version`);
if (!response.ok) {
throw new Error(`Failed to get worker version: ${response.status}`);
}