Files
claude-mem/src/services/worker-service.ts
T
Alex Newman d13662d5d8 Cynical deletion: close 27 issues by removing defenders + tolerators (#2141)
* fix: mirror migration 28 in SessionStore so pending_messages.tool_use_id and worker_pid columns are created (#2139)

SessionStore's inline migration list jumped from v27 to v29, skipping
rebuildPendingMessagesForSelfHealingClaim. The worker uses SessionStore
directly via worker/DatabaseManager.ts and bypasses the canonical
MigrationRunner, so fresh installs ended up at "max v29" with neither
column present — every queue claim and observation insert failed.

Adds addPendingMessagesToolUseIdAndWorkerPidColumns following the existing
mirror precedent (addObservationSubagentColumns / addObservationsUniqueContentHashIndex).
Uses ALTER TABLE + column-existence guards so already-broken DBs at v29
self-heal on next worker boot.

Verified on fresh DB and on a synthetic v29-without-v28 broken DB:
both columns and indexes (idx_pending_messages_worker_pid,
ux_pending_session_tool) appear after one boot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: wrap v28 mirror dedup+index creation in transaction

Addresses Greptile P2 review on PR #2140: matches the existing pattern in
addObservationsUniqueContentHashIndex (v29 mirror at SessionStore.ts:1127)
and runner.ts rebuildPendingMessagesForSelfHealingClaim. A crash between
the dedup DELETE and the schema_versions INSERT no longer leaves the DB
in a half-applied state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* docs(plan): cynical-deletion plan for 29 open issues

9-phase plan applying delete-first lens to triaged issue corpus.
Headlines: kill defenders (orphan cleanup, EncodedCommand spawn,
restart-port-steal) and tolerators (silent JSON drops, drifted SSE
filters). Each phase closes a named subset of issues.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: delete process-management theater (Phase 1: DEL-1 + DEL-2)

Delete aggressiveStartupCleanup, the PowerShell -EncodedCommand
spawn branch, and the restart-with-port-steal sequence. Replace
daemon spawning with a single uniform child_process.spawn path
using arg-array form, keeping setsid on Unix when available.

The defenders (orphan cleanup, duplicate-worker probes, port
stealing) bred more bugs than they fixed. PID file with start-time
token already provides correct OS-trust ownership; restart now
requests httpShutdown, waits 5s for the port to free, then exits 1
if it didn't (user resolves). Net -247 lines.

Closes #2090, #2095 (already fixed at session-init.ts:78), #2107,
#2111, #2114, #2117, #2123, #2097, #2135.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: observer-sessions trust boundary via CLAUDE_MEM_INTERNAL env (Phase 2: DEL-9)

Replace the cwd === OBSERVER_SESSIONS_DIR discriminator (which every
consumer must repeat and inevitably drifts) with a single env-var
trust boundary set once at spawn time in buildIsolatedEnv.

- buildIsolatedEnv now sets CLAUDE_MEM_INTERNAL=1, covering all three
  spawn sites (SDKAgent, KnowledgeAgent.prime, KnowledgeAgent.executeQuery)
- shouldTrackProject checks the env var first (cwd check stays as
  belt-and-braces fallback)
- New shared shouldEmitProjectRow predicate — SSE broadcaster and
  pagination filter share the same predicate so they can never drift
  apart (#2118)
- ObservationBroadcaster filters observer rows from SSE stream
- PaginationHelper hardcoded 'observer-sessions' replaced with
  OBSERVER_SESSIONS_PROJECT const
- project-filter basename match pass — *observer-sessions* now matches
  basename, not just full path (globToRegex's [^/]* can't cross /)
  (#2126 item 1)
- New `claude-mem cleanup [--dry-run]` subcommand wires CleanupV12_4_3
  through to the worker for #2126 item 5

Closes #2118, #2126.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: strip proxy env vars before spawning worker (Phase 4: CON-1)

User's HTTP_PROXY/HTTPS_PROXY config was bleeding into internal AI
calls when claude-mem spawns the claude subprocess, causing
connection failures. Strip unconditionally — no passthrough knob,
which rejects #2099's whitelist proposal.

Closes #2115, #2099.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: fail-fast on silent drops in stdin/file-context/memory-save (Phase 5: FF-1)

Three independent fail-fast fixes:

#2089 — stdin-reader silent drop. Non-empty stdin that fails JSON.parse
now rejects with a clear error instead of resolving undefined. Empty
stdin still resolves undefined.

#2094 — PreToolUse:Read truncation Edit deadlock. file-context handler
no longer returns a fake truncated Read result via updatedInput.
Removes userOffset/userLimit/truncated machinery; injects the timeline
via additionalContext only and lets the real Read pass through. Read
state and Claude's expectation now stay consistent, eliminating the
infinite Edit retry loop.

#2116 — /api/memory/save metadata drop + project bug. Schema accepts
metadata as a documented JSON column (migration 30 adds observations.
metadata TEXT, mirrored in SessionStore). Schema also tightened to
.strict() so unknown top-level fields fail fast instead of being
silently dropped. Project resolution now consults metadata.project as
a fallback before defaultProject.

Closes #2089, #2094, #2116.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: small deletions — Zod externalize / Gemini fallback / session timeout / installCLI alias (Phase 6)

DEL-4 (#2113): Externalize zod from mcp-server.cjs and context-generator.cjs
hook bundles so OpenCode's runtime resolves a single Zod copy. Worker
keeps Zod bundled (it's a daemon subprocess, not in OpenCode's hook
bundle). Added zod to plugin/package.json so externalized requires
resolve at runtime.

DEL-5 (#2087): Delete the never-wired GeminiAgent → Claude fallback.
fallbackAgent was always null in production. On 429 the agent now
throws cleanly (message stays pending for retry). Removed
setFallbackAgent, FallbackAgent interface, and the 429 fallback
branch from both GeminiAgent and OpenRouterAgent. Updated docs
that claimed automatic Claude fallback.

DEL-6 (#2127, #2098): Raise MAX_SESSION_WALL_CLOCK_MS from 4h to
24h. The timeout is a real guard against runaway-cost loops (per
issue #1590), but 4h kills legitimate long Claude Code days. 24h
preserves the guard while never hitting in normal use. No knob —
a session approaching this age is a bug worth investigating, not
a value worth tuning.

DEL-8 (#2054): Delete installCLI() alias function. Saves 4 keystrokes
at the cost of cross-platform shell-config mutation surface — not
worth it. Canonical entry is npx claude-mem (and bunx). Uninstall
now strips legacy alias/function lines from ~/.bashrc, ~/.zshrc,
and the PowerShell profile.

Closes #2087, #2098, #2113, #2127, #2054.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: de-hardcode worker port + multi-account commit (Phase 3: CON-2 + DEL-7)

Replace hardcoded 37777 fallbacks with SettingsDefaultsManager.get(
'CLAUDE_MEM_WORKER_PORT') in npx-cli (runtime/install/uninstall),
opencode-plugin, OpenClaw installer, SearchRoutes example URLs.
Timeline-report SKILL.md now resolves WORKER_PORT from settings.json
at the top and uses ${WORKER_PORT} in all curl invocations.
Remaining 37777 literals are doc comments + viewer build-time form-
field placeholder (which is replaced by /api/settings on mount).

hooks.json: add cygpath POSIX→Windows path translation between _R
resolution and node invocation. No-op on macOS/Linux. Closes the
Windows + Git Bash MODULE_NOT_FOUND in #2109.

CLAUDE.md gains a Multi-account section documenting CLAUDE_MEM_DATA_DIR
+ optional CLAUDE_MEM_WORKER_PORT — every existing path/port code
path now honors them.

Closes #2103, #2109, #2101.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: install/uninstall improvements (Phase 7: #2106)

5 fixes for the install/uninstall flow:

Item 1 — multiselect default. install.ts no longer pre-selects every
detected IDE; user explicitly opts in.

Item 3 — shutdown-before-overwrite. New
src/services/install/shutdown-helper.ts shared by install and
uninstall: POSTs /api/admin/shutdown then polls /api/health until
the worker stops responding. install calls it before
copyPluginToMarketplace so reinstall over a running worker doesn't
conflict; uninstall calls it before deletion.

Item 4 — uninstall path coverage. Removes ~/.npm/_npx/*/node_modules/
claude-mem, ~/.cache/claude-cli-nodejs/*/mcp-logs-plugin-claude-mem-*,
~/.claude/plugins/data/claude-mem-thedotmack/. Best-effort: per-path
try/catch so a single permission failure doesn't abort uninstall.
chroma-mcp shutdown is implicit via the worker's GracefulShutdown
cascade in item 3's helper.

Item 5 — install summary documents "Close all Claude Code sessions
before uninstalling, or ~/.claude-mem will be recreated by active
hooks."

Item 6 — real-port query. After install, fetches /api/health on the
configured port with 3s timeout. Reports actually-bound port if the
response carries it; falls back to requested port. No retry loop.

Closes #2106 (items 1, 3, 4, 5, 6). Items 2, 7 closed separately
as already-fixed and insufficient-detail.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: pin chroma-mcp to 0.2.6 (Phase 8: DEL-3 lite)

Replace unpinned 'chroma-mcp' arg with chroma-mcp==0.2.6 in both
local and remote modes. Pinning makes installs deterministic across
machines and across time, eliminating the dependency-drift class
of bugs.

Verified 0.2.6 in a clean uv cache: starts cleanly, no httpcore/
httpx ImportError, no --with flags needed. The --with flags removed
in a0dd516c are not required at this pin (transitive deps resolve
correctly when the top-level version is fixed).

#2102's three protections (transport cleanup on failure, stale onclose
handler guard, 10s reconnect backoff) confirmed intact.

Closes #2046, #2085, #2102.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* test: update stale assertions for per-UID port + migration 30 (Phase 9)

SettingsDefaultsManager.CLAUDE_MEM_WORKER_PORT default is per-UID
(37700 + uid%100), not literal '37777'. Three assertions in
settings-defaults-manager.test.ts now compute the expected value
the same way the source does.

migration-runner.test.ts: drop expect(versions).toContain(19)
(version 19 was a noop never recorded — pre-existing bug at parent),
add expect(versions).toContain(30) for the new observations.metadata
column added in Phase 5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: address Greptile P1/P2 review comments on PR #2141

P1: spawnDaemon return value was unchecked in worker-service.ts restart
case, so a failed spawn silently exited 0 with a misleading "Worker
restart spawned" log. Now error and exit 1 when restartPid is undefined.

P2: shutdown-helper.ts health-poll catch treated AbortError (timeout)
the same as connection-refused, so a slow worker could be reported
confirmedStopped while still holding file locks. Now distinguish:
AbortError continues polling; other errors return confirmedStopped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* build: rebuild plugin artifacts after merging main

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: address CodeRabbit review comments on PR #2141

- hooks.json: quote $HOME in cache lookup so paths with spaces work
- timeline-report SKILL.md: fall back when process.getuid is unavailable (Windows)
- opencode-plugin: validate CLAUDE_MEM_WORKER_PORT before using
- uninstall.ts: only strip alias lines, not function declarations (multi-line bodies left intact)
- MemoryRoutes: trim whitespace-only project before precedence resolution
- SessionStore migration 21: preserve metadata column if observations already has it
- stdin-reader test: restore full property descriptor to avoid cross-test pollution

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-25 21:23:24 -07:00

1384 lines
56 KiB
TypeScript

/**
* Worker Service - Slim Orchestrator
*
* Refactored from 2000-line monolith to ~300-line orchestrator.
* Delegates to specialized modules:
* - src/services/server/ - HTTP server, middleware, error handling
* - src/services/infrastructure/ - Process management, health monitoring, shutdown
* - src/services/integrations/ - IDE integrations (Cursor)
* - src/services/worker/ - Business logic, routes, agents
*/
import path from 'path';
import { existsSync } from 'fs';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { getWorkerPort, getWorkerHost } from '../shared/worker-utils.js';
import { HOOK_TIMEOUTS } from '../shared/hook-constants.js';
import { SettingsDefaultsManager } from '../shared/SettingsDefaultsManager.js';
import { getAuthMethodDescription } from '../shared/EnvManager.js';
import { logger } from '../utils/logger.js';
import { ChromaMcpManager } from './sync/ChromaMcpManager.js';
import { ChromaSync } from './sync/ChromaSync.js';
import { configureSupervisorSignalHandlers, getSupervisor, startSupervisor } from '../supervisor/index.js';
import { sanitizeEnv } from '../supervisor/env-sanitizer.js';
// Worker spawn / Windows-cooldown helpers are defined in ./worker-spawner.ts
// so that lightweight consumers (e.g. the MCP server running under Node) can
// ensure the worker daemon is up without importing this entire module — which
// transitively pulls in the SQLite database layer via ChromaSync/DatabaseManager.
import { ensureWorkerStarted as ensureWorkerStartedShared } from './worker-spawner.js';
import { RestartGuard } from './worker/RestartGuard.js';
// Re-export for backward compatibility — canonical implementation in shared/plugin-state.ts
export { isPluginDisabledInClaudeSettings } from '../shared/plugin-state.js';
import { isPluginDisabledInClaudeSettings } from '../shared/plugin-state.js';
// Version injected at build time by esbuild define
declare const __DEFAULT_PACKAGE_VERSION__: string;
const packageVersion = typeof __DEFAULT_PACKAGE_VERSION__ !== 'undefined' ? __DEFAULT_PACKAGE_VERSION__ : '0.0.0-dev';
// Infrastructure imports
import {
writePidFile,
readPidFile,
removePidFile,
getPlatformTimeout,
runOneTimeChromaMigration,
runOneTimeCwdRemap,
cleanStalePidFile,
verifyPidFileOwnership,
spawnDaemon,
touchPidFile
} from './infrastructure/ProcessManager.js';
import { runOneTimeV12_4_3Cleanup } from './infrastructure/CleanupV12_4_3.js';
import {
isPortInUse,
waitForHealth,
waitForReadiness,
waitForPortFree,
httpShutdown
} from './infrastructure/HealthMonitor.js';
import { performGracefulShutdown } from './infrastructure/GracefulShutdown.js';
import { adoptMergedWorktrees, adoptMergedWorktreesForAllKnownRepos } from './infrastructure/WorktreeAdoption.js';
// Server imports
import { Server } from './server/Server.js';
// Integration imports
import {
updateCursorContextForProject,
handleCursorCommand
} from './integrations/CursorHooksInstaller.js';
import {
handleGeminiCliCommand
} from './integrations/GeminiCliHooksInstaller.js';
// Service layer imports
import { DatabaseManager } from './worker/DatabaseManager.js';
import { SessionManager } from './worker/SessionManager.js';
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
import { SDKAgent } from './worker/SDKAgent.js';
import type { WorkerRef } from './worker/agents/types.js';
import { GeminiAgent, isGeminiSelected, isGeminiAvailable } from './worker/GeminiAgent.js';
import { OpenRouterAgent, isOpenRouterSelected, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
import { PaginationHelper } from './worker/PaginationHelper.js';
import { SettingsManager } from './worker/SettingsManager.js';
import { SearchManager } from './worker/SearchManager.js';
import { FormattingService } from './worker/FormattingService.js';
import { TimelineService } from './worker/TimelineService.js';
import { SessionEventBroadcaster } from './worker/events/SessionEventBroadcaster.js';
import { SessionCompletionHandler } from './worker/session/SessionCompletionHandler.js';
import { setIngestContext, attachIngestGeneratorStarter } from './worker/http/shared.js';
import { DEFAULT_CONFIG_PATH, DEFAULT_STATE_PATH, expandHomePath, loadTranscriptWatchConfig, writeSampleConfig } from './transcripts/config.js';
import { TranscriptWatcher } from './transcripts/watcher.js';
// HTTP route handlers
import { ViewerRoutes } from './worker/http/routes/ViewerRoutes.js';
import { SessionRoutes } from './worker/http/routes/SessionRoutes.js';
import { DataRoutes } from './worker/http/routes/DataRoutes.js';
import { SearchRoutes } from './worker/http/routes/SearchRoutes.js';
import { SettingsRoutes } from './worker/http/routes/SettingsRoutes.js';
import { LogsRoutes } from './worker/http/routes/LogsRoutes.js';
import { MemoryRoutes } from './worker/http/routes/MemoryRoutes.js';
import { CorpusRoutes } from './worker/http/routes/CorpusRoutes.js';
import { ChromaRoutes } from './worker/http/routes/ChromaRoutes.js';
// Knowledge agent services
import { CorpusStore } from './worker/knowledge/CorpusStore.js';
import { CorpusBuilder } from './worker/knowledge/CorpusBuilder.js';
import { KnowledgeAgent } from './worker/knowledge/KnowledgeAgent.js';
// Primary-path session lifecycle helpers — no reapers, no orphan sweeps.
// The SDK subprocess is spawned in its own POSIX process group via
// createSdkSpawnFactory; teardown via ensureSdkProcessExit kills the whole
// group so no descendants leak (Principle 5).
import { getSdkProcessForSession, ensureSdkProcessExit } from '../supervisor/process-registry.js';
/**
* Build JSON status output for hook framework communication.
* This is a pure function extracted for testability.
*
* @param status - 'ready' for successful startup, 'error' for failures
* @param message - Optional error message (only included when provided)
* @returns JSON object with continue, suppressOutput, status, and optionally message
*/
export interface StatusOutput {
continue: true;
suppressOutput: true;
status: 'ready' | 'error';
message?: string;
}
export function buildStatusOutput(status: 'ready' | 'error', message?: string): StatusOutput {
return {
continue: true,
suppressOutput: true,
status,
...(message && { message })
};
}
export class WorkerService implements WorkerRef {
private server: Server;
private startTime: number = Date.now();
private mcpClient: Client;
// Initialization flags
private mcpReady: boolean = false;
private initializationCompleteFlag: boolean = false;
private isShuttingDown: boolean = false;
// Service layer
private dbManager: DatabaseManager;
private sessionManager: SessionManager;
public sseBroadcaster: SSEBroadcaster;
private sdkAgent: SDKAgent;
private geminiAgent: GeminiAgent;
private openRouterAgent: OpenRouterAgent;
private paginationHelper: PaginationHelper;
private settingsManager: SettingsManager;
private sessionEventBroadcaster: SessionEventBroadcaster;
private completionHandler: SessionCompletionHandler;
private corpusStore: CorpusStore;
// Route handlers
private searchRoutes: SearchRoutes | null = null;
// Chroma MCP manager (lazy - connects on first use)
private chromaMcpManager: ChromaMcpManager | null = null;
// Transcript watcher for Codex and other transcript-based clients
private transcriptWatcher: TranscriptWatcher | null = null;
// Initialization tracking
private initializationComplete: Promise<void>;
private resolveInitialization!: () => void;
// AI interaction tracking for health endpoint
private lastAiInteraction: {
timestamp: number;
success: boolean;
provider: string;
error?: string;
} | null = null;
constructor() {
// Initialize the promise that will resolve when background initialization completes
this.initializationComplete = new Promise((resolve) => {
this.resolveInitialization = resolve;
});
// Initialize service layer
this.dbManager = new DatabaseManager();
this.sessionManager = new SessionManager(this.dbManager);
this.sseBroadcaster = new SSEBroadcaster();
this.sdkAgent = new SDKAgent(this.dbManager, this.sessionManager);
this.geminiAgent = new GeminiAgent(this.dbManager, this.sessionManager);
this.openRouterAgent = new OpenRouterAgent(this.dbManager, this.sessionManager);
this.paginationHelper = new PaginationHelper(this.dbManager);
this.settingsManager = new SettingsManager(this.dbManager);
this.sessionEventBroadcaster = new SessionEventBroadcaster(this.sseBroadcaster, this);
this.completionHandler = new SessionCompletionHandler(
this.sessionManager,
this.sessionEventBroadcaster,
this.dbManager,
);
this.corpusStore = new CorpusStore();
// Wire ingest helpers (plan 03 phase 0). Worker-internal callers use these
// directly instead of HTTP-loopback into our own routes.
setIngestContext({
sessionManager: this.sessionManager,
dbManager: this.dbManager,
eventBroadcaster: this.sessionEventBroadcaster,
});
// Set callback for when sessions are deleted
this.sessionManager.setOnSessionDeleted(() => {
this.broadcastProcessingStatus();
});
// Initialize MCP client
// Empty capabilities object: this client only calls tools, doesn't expose any
this.mcpClient = new Client({
name: 'worker-search-proxy',
version: packageVersion
}, { capabilities: {} });
// Initialize HTTP server with core routes
this.server = new Server({
getInitializationComplete: () => this.initializationCompleteFlag,
getMcpReady: () => this.mcpReady,
onShutdown: () => this.shutdown(),
onRestart: () => this.shutdown(),
workerPath: __filename,
getAiStatus: () => {
let provider = 'claude';
if (isOpenRouterSelected() && isOpenRouterAvailable()) provider = 'openrouter';
else if (isGeminiSelected() && isGeminiAvailable()) provider = 'gemini';
return {
provider,
authMethod: getAuthMethodDescription(),
lastInteraction: this.lastAiInteraction
? {
timestamp: this.lastAiInteraction.timestamp,
success: this.lastAiInteraction.success,
...(this.lastAiInteraction.error && { error: this.lastAiInteraction.error }),
}
: null,
};
},
});
// Register route handlers
this.registerRoutes();
// Register signal handlers early to ensure cleanup even if start() hasn't completed
this.registerSignalHandlers();
}
/**
* Register signal handlers for graceful shutdown
*/
private registerSignalHandlers(): void {
configureSupervisorSignalHandlers(async () => {
this.isShuttingDown = true;
await this.shutdown();
});
}
/**
* Register all route handlers with the server
*/
private registerRoutes(): void {
// IMPORTANT: Middleware must be registered BEFORE routes (Express processes in order)
// Register Chroma routes immediately so they bypass the initialization guard
this.server.registerRoutes(new ChromaRoutes());
// Early handler for /api/context/inject — fail open if not yet initialized
this.server.app.get('/api/context/inject', async (req, res, next) => {
if (!this.initializationCompleteFlag || !this.searchRoutes) {
logger.warn('SYSTEM', 'Context requested before initialization complete, returning empty');
res.status(200).json({ content: [{ type: 'text', text: '' }] });
return;
}
next(); // Delegate to SearchRoutes handler
});
// Guard ALL /api/* routes during initialization — wait for DB with timeout
// Exceptions: /api/health, /api/readiness, /api/version (handled by Server.ts core routes)
// and /api/chroma/status (diagnostic endpoint)
this.server.app.use('/api', async (req, res, next) => {
// Bypass guard for diagnostic endpoints
if (req.path === '/chroma/status' || req.path === '/health' || req.path === '/readiness' || req.path === '/version') {
next();
return;
}
if (this.initializationCompleteFlag) {
next();
return;
}
const timeoutMs = 120000; // 2 minutes
const timeoutPromise = new Promise<void>((_, reject) =>
setTimeout(() => reject(new Error('Database initialization timeout')), timeoutMs)
);
try {
await Promise.race([this.initializationComplete, timeoutPromise]);
next();
} catch (error) {
if (error instanceof Error) {
logger.error('WORKER', `Request to ${req.method} ${req.path} rejected — DB not initialized`, {}, error);
} else {
logger.error('WORKER', `Request to ${req.method} ${req.path} rejected — DB not initialized with non-Error`, {}, new Error(String(error)));
}
res.status(503).json({
error: 'Service initializing',
message: 'Database is still initializing, please retry'
});
return;
}
});
// Standard routes (registered AFTER guard middleware)
this.server.registerRoutes(new ViewerRoutes(this.sseBroadcaster, this.dbManager, this.sessionManager));
const sessionRoutes = new SessionRoutes(this.sessionManager, this.dbManager, this.sdkAgent, this.geminiAgent, this.openRouterAgent, this.sessionEventBroadcaster, this, this.completionHandler);
this.server.registerRoutes(sessionRoutes);
// Wire the generator-starter callback now that SessionRoutes exists.
// `setIngestContext` ran in the constructor before routes were
// constructed; transcript-watcher observations depend on this side-effect
// to auto-start the SDK generator after enqueue.
attachIngestGeneratorStarter((sessionDbId, source) =>
sessionRoutes.ensureGeneratorRunning(sessionDbId, source),
);
this.server.registerRoutes(new DataRoutes(this.paginationHelper, this.dbManager, this.sessionManager, this.sseBroadcaster, this, this.startTime));
this.server.registerRoutes(new SettingsRoutes(this.settingsManager));
this.server.registerRoutes(new LogsRoutes());
this.server.registerRoutes(new MemoryRoutes(this.dbManager, 'claude-mem'));
}
/**
* Start the worker service
*/
async start(): Promise<void> {
const port = getWorkerPort();
const host = getWorkerHost();
await startSupervisor();
// Start HTTP server FIRST - make it available immediately
await this.server.listen(port, host);
// Worker writes its own PID - reliable on all platforms
// This happens after listen() succeeds, ensuring the worker is actually ready
// On Windows, the spawner's PID is cmd.exe (useless), so worker must write its own
writePidFile({
pid: process.pid,
port,
startedAt: new Date().toISOString()
});
getSupervisor().registerProcess('worker', {
pid: process.pid,
type: 'worker',
startedAt: new Date().toISOString()
});
logger.info('SYSTEM', 'Worker started', { host, port, pid: process.pid });
// Do slow initialization in background (non-blocking)
this.initializeBackground().catch((error) => {
logger.error('SYSTEM', 'Background initialization failed', {}, error as Error);
});
}
/**
* Background initialization - runs after HTTP server is listening
*/
private async initializeBackground(): Promise<void> {
try {
logger.info('WORKER', 'Background initialization starting...');
// Load mode configuration
const { ModeManager } = await import('./domain/ModeManager.js');
const { SettingsDefaultsManager } = await import('../shared/SettingsDefaultsManager.js');
const { USER_SETTINGS_PATH } = await import('../shared/paths.js');
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
const modeId = settings.CLAUDE_MEM_MODE;
ModeManager.getInstance().loadMode(modeId);
logger.info('SYSTEM', `Mode loaded: ${modeId}`);
// One-time chroma wipe for users upgrading from versions with duplicate worker bugs.
if (settings.CLAUDE_MEM_MODE === 'local' || !settings.CLAUDE_MEM_MODE) {
logger.info('WORKER', 'Checking for one-time Chroma migration...');
runOneTimeChromaMigration();
}
// One-time remap of pre-worktree project names using pending_messages.cwd.
logger.info('WORKER', 'Checking for one-time CWD remap...');
runOneTimeCwdRemap();
// Stamp merged worktrees (Non-blocking, fire-and-forget)
logger.info('WORKER', 'Adopting merged worktrees (background)...');
adoptMergedWorktreesForAllKnownRepos({}).then(adoptions => {
if (adoptions) {
for (const adoption of adoptions) {
if (adoption.adoptedObservations > 0 || adoption.adoptedSummaries > 0 || adoption.chromaUpdates > 0) {
logger.info('SYSTEM', 'Merged worktrees adopted in background', adoption);
}
if (adoption.errors.length > 0) {
logger.warn('SYSTEM', 'Worktree adoption had per-branch errors', {
repoPath: adoption.repoPath,
errors: adoption.errors
});
}
}
}
}).catch(err => {
logger.error('WORKER', 'Worktree adoption failed (background)', {}, err instanceof Error ? err : new Error(String(err)));
});
// Initialize ChromaMcpManager only if Chroma is enabled
const chromaEnabled = settings.CLAUDE_MEM_CHROMA_ENABLED !== 'false';
if (chromaEnabled) {
this.chromaMcpManager = ChromaMcpManager.getInstance();
logger.info('SYSTEM', 'ChromaMcpManager initialized (lazy - connects on first use)');
} else {
logger.info('SYSTEM', 'Chroma disabled via CLAUDE_MEM_CHROMA_ENABLED=false, skipping ChromaMcpManager');
}
logger.info('WORKER', 'Initializing database manager...');
await this.dbManager.initialize();
// One-shot GC for terminally-failed rows
try {
logger.info('WORKER', 'Running startup GC for pending messages...');
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const cleared = pendingStore.clearFailedOlderThan(7 * 24 * 60 * 60 * 1000);
if (cleared > 0) {
logger.info('QUEUE', 'Startup GC cleared old failed pending_messages rows', { cleared });
}
} catch (err) {
logger.warn('QUEUE', 'Startup GC for failed pending_messages rows failed', {}, err instanceof Error ? err : undefined);
}
// One-time v12.4.3 pollution cleanup. Runs AFTER migrations have applied
// and BEFORE backfillAllProjects so the rebuilt Chroma sees a clean SQLite.
runOneTimeV12_4_3Cleanup();
// Initialize search services
logger.info('WORKER', 'Initializing search services...');
const formattingService = new FormattingService();
const timelineService = new TimelineService();
const searchManager = new SearchManager(
this.dbManager.getSessionSearch(),
this.dbManager.getSessionStore(),
this.dbManager.getChromaSync(),
formattingService,
timelineService
);
this.searchRoutes = new SearchRoutes(searchManager);
this.server.registerRoutes(this.searchRoutes);
logger.info('WORKER', 'SearchManager initialized and search routes registered');
// Register corpus routes (knowledge agents) — needs SearchOrchestrator from search module
const { SearchOrchestrator } = await import('./worker/search/SearchOrchestrator.js');
const corpusSearchOrchestrator = new SearchOrchestrator(
this.dbManager.getSessionSearch(),
this.dbManager.getSessionStore(),
this.dbManager.getChromaSync()
);
const corpusBuilder = new CorpusBuilder(
this.dbManager.getSessionStore(),
corpusSearchOrchestrator,
this.corpusStore
);
const knowledgeAgent = new KnowledgeAgent(this.corpusStore);
this.server.registerRoutes(new CorpusRoutes(this.corpusStore, corpusBuilder, knowledgeAgent));
logger.info('WORKER', 'CorpusRoutes registered');
// DB and search are ready — mark initialization complete so hooks can proceed.
this.initializationCompleteFlag = true;
this.resolveInitialization();
logger.info('SYSTEM', 'Core initialization complete (DB + search ready)');
await this.startTranscriptWatcher(settings);
// Auto-backfill Chroma for all projects if out of sync with SQLite (fire-and-forget)
if (this.chromaMcpManager) {
ChromaSync.backfillAllProjects(this.dbManager.getSessionStore()).then(() => {
logger.info('CHROMA_SYNC', 'Backfill check complete for all projects');
}).catch(error => {
logger.error('CHROMA_SYNC', 'Backfill failed (non-blocking)', {}, error as Error);
});
}
// Mark MCP as externally ready once the bundled stdio server binary exists.
const mcpServerPath = path.join(__dirname, 'mcp-server.cjs');
this.mcpReady = existsSync(mcpServerPath);
// Best-effort loopback MCP self-check (Non-blocking, F&F)
this.runMcpSelfCheck(mcpServerPath).catch(err => {
logger.debug('WORKER', 'MCP self-check failed (non-fatal)', { error: err.message });
});
return;
} catch (error) {
// Background initialization failed - log and let worker fail health checks
logger.error('SYSTEM', 'Background initialization failed', {}, error instanceof Error ? error : undefined);
}
}
/**
* Run a best-effort loopback MCP self-check to verify the bundled server can start.
* This is entirely diagnostic and does not block worker availability.
*/
private async runMcpSelfCheck(mcpServerPath: string): Promise<void> {
try {
getSupervisor().assertCanSpawn('mcp server');
const transport = new StdioClientTransport({
command: process.execPath,
args: [mcpServerPath],
env: Object.fromEntries(
Object.entries(sanitizeEnv(process.env)).filter(([, value]) => value !== undefined)
) as Record<string, string>
});
const MCP_INIT_TIMEOUT_MS = 60000; // 1 minute is plenty for local check
const mcpConnectionPromise = this.mcpClient.connect(transport);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error('MCP connection timeout')),
60000
);
});
await Promise.race([mcpConnectionPromise, timeoutPromise]);
logger.info('WORKER', 'MCP loopback self-check connected successfully');
// Cleanup
await transport.close();
} catch (error) {
logger.warn('WORKER', 'MCP loopback self-check failed', {
error: error instanceof Error ? error.message : String(error)
});
}
}
/**
* Start transcript watcher for Codex and other transcript-based clients.
* This is intentionally non-fatal so Claude hooks remain usable even if
* transcript ingestion is misconfigured.
*/
private async startTranscriptWatcher(settings: ReturnType<typeof SettingsDefaultsManager.loadFromFile>): Promise<void> {
const transcriptsEnabled = settings.CLAUDE_MEM_TRANSCRIPTS_ENABLED !== 'false';
if (!transcriptsEnabled) {
logger.info('TRANSCRIPT', 'Transcript watcher disabled via CLAUDE_MEM_TRANSCRIPTS_ENABLED=false');
return;
}
const configPath = settings.CLAUDE_MEM_TRANSCRIPTS_CONFIG_PATH || DEFAULT_CONFIG_PATH;
const resolvedConfigPath = expandHomePath(configPath);
// Ensure sample config exists (setup, outside try)
if (!existsSync(resolvedConfigPath)) {
writeSampleConfig(configPath);
logger.info('TRANSCRIPT', 'Created default transcript watch config', {
configPath: resolvedConfigPath
});
}
const transcriptConfig = loadTranscriptWatchConfig(configPath);
const statePath = expandHomePath(transcriptConfig.stateFile ?? DEFAULT_STATE_PATH);
try {
this.transcriptWatcher = new TranscriptWatcher(transcriptConfig, statePath);
await this.transcriptWatcher.start();
} catch (error) {
this.transcriptWatcher?.stop();
this.transcriptWatcher = null;
if (error instanceof Error) {
logger.error('WORKER', 'Failed to start transcript watcher (continuing without Codex ingestion)', {
configPath: resolvedConfigPath
}, error);
} else {
logger.error('WORKER', 'Failed to start transcript watcher with non-Error (continuing without Codex ingestion)', {
configPath: resolvedConfigPath
}, new Error(String(error)));
}
// [ANTI-PATTERN IGNORED]: Transcript watcher is intentionally non-fatal so Claude hooks remain usable even if transcript ingestion is misconfigured
return;
}
logger.info('TRANSCRIPT', 'Transcript watcher started', {
configPath: resolvedConfigPath,
statePath,
watches: transcriptConfig.watches.length
});
}
/**
* Get the appropriate agent based on provider settings.
* Same logic as SessionRoutes.getActiveAgent() for consistency.
*/
private getActiveAgent(): SDKAgent | GeminiAgent | OpenRouterAgent {
if (isOpenRouterSelected() && isOpenRouterAvailable()) {
return this.openRouterAgent;
}
if (isGeminiSelected() && isGeminiAvailable()) {
return this.geminiAgent;
}
return this.sdkAgent;
}
/**
* Start a session processor
* On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available,
* otherwise marks messages abandoned and removes session so queue does not grow unbounded.
*/
private startSessionProcessor(
session: ReturnType<typeof this.sessionManager.getSession>,
source: string
): void {
if (!session) return;
const sid = session.sessionDbId;
const agent = this.getActiveAgent();
const providerName = agent.constructor.name;
// Before starting generator, check if AbortController is already aborted
// This can happen after a previous generator was aborted but the session still has pending work
if (session.abortController.signal.aborted) {
logger.debug('SYSTEM', 'Replacing aborted AbortController before starting generator', {
sessionId: session.sessionDbId
});
session.abortController = new AbortController();
}
// Track whether generator failed with an unrecoverable error to prevent infinite restart loops
let hadUnrecoverableError = false;
let sessionFailed = false;
logger.info('SYSTEM', `Starting generator (${source}) using ${providerName}`, { sessionId: sid });
// Track generator activity for stale detection (Issue #1099)
session.lastGeneratorActivity = Date.now();
session.generatorPromise = agent.startSession(session, this)
.catch(async (error: unknown) => {
const errorMessage = (error as Error)?.message || '';
// Detect unrecoverable errors that should NOT trigger restart
// These errors will fail immediately on retry, causing infinite loops
const unrecoverablePatterns = [
'Claude executable not found',
'CLAUDE_CODE_PATH',
'ENOENT',
'spawn',
'Invalid API key',
'API_KEY_INVALID',
'API key expired',
'API key not valid',
'PERMISSION_DENIED',
'Gemini API error: 400',
'Gemini API error: 401',
'Gemini API error: 403',
'FOREIGN KEY constraint failed',
];
if (unrecoverablePatterns.some(pattern => errorMessage.includes(pattern))) {
hadUnrecoverableError = true;
this.lastAiInteraction = {
timestamp: Date.now(),
success: false,
provider: providerName,
error: errorMessage,
};
logger.error('SDK', 'Unrecoverable generator error - will NOT restart', {
sessionId: session.sessionDbId,
project: session.project,
errorMessage
});
return;
}
// Fallback for terminated SDK sessions (provider abstraction)
if (this.isSessionTerminatedError(error)) {
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
sessionId: session.sessionDbId,
project: session.project,
reason: error instanceof Error ? error.message : String(error)
});
return this.runFallbackForTerminatedSession(session, error);
}
// Detect stale resume failures - SDK session context was lost
const staleResumePatterns = ['aborted by user', 'No conversation found'];
if (staleResumePatterns.some(p => errorMessage.includes(p))
&& session.memorySessionId) {
logger.warn('SDK', 'Detected stale resume failure, clearing memorySessionId for fresh start', {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId,
errorMessage
});
// Clear stale memorySessionId and force fresh init on next attempt
this.dbManager.getSessionStore().updateMemorySessionId(session.sessionDbId, null);
session.memorySessionId = null;
session.forceInit = true;
}
logger.error('SDK', 'Session generator failed', {
sessionId: session.sessionDbId,
project: session.project,
provider: providerName
}, error as Error);
sessionFailed = true;
this.lastAiInteraction = {
timestamp: Date.now(),
success: false,
provider: providerName,
error: errorMessage,
};
throw error;
})
.finally(async () => {
// Primary-path subprocess teardown — process-group kill ensures any
// SDK descendants are reaped too (Principle 5).
const trackedProcess = getSdkProcessForSession(session.sessionDbId);
if (trackedProcess && trackedProcess.process.exitCode === null) {
await ensureSdkProcessExit(trackedProcess, 5000);
}
session.generatorPromise = null;
// Record successful AI interaction if no error occurred
if (!sessionFailed && !hadUnrecoverableError) {
this.lastAiInteraction = {
timestamp: Date.now(),
success: true,
provider: providerName,
};
}
// Do NOT restart after unrecoverable errors - prevents infinite loops
if (hadUnrecoverableError) {
this.terminateSession(session.sessionDbId, 'unrecoverable_error');
return;
}
const pendingStore = this.sessionManager.getPendingMessageStore();
// Check if there's pending work that needs processing with a fresh AbortController
const pendingCount = pendingStore.getPendingCount(session.sessionDbId);
// Idle timeout means no new work arrived for 3 minutes - don't restart
// But check pendingCount first: a message may have arrived between idle
// abort and .finally(), and we must not abandon it
if (session.idleTimedOut) {
session.idleTimedOut = false; // Reset flag
if (pendingCount === 0) {
this.terminateSession(session.sessionDbId, 'idle_timeout');
return;
}
// Fall through to pending-work restart below
}
if (pendingCount > 0) {
// Windowed restart guard: only blocks tight-loop restarts, not spread-out ones (#2053)
if (!session.restartGuard) session.restartGuard = new RestartGuard();
const restartAllowed = session.restartGuard.recordRestart();
session.consecutiveRestarts = (session.consecutiveRestarts || 0) + 1; // Keep for logging
if (!restartAllowed) {
logger.error('SYSTEM', 'Restart guard tripped: session is dead, terminating', {
sessionId: session.sessionDbId,
pendingCount,
restartsInWindow: session.restartGuard.restartsInWindow,
windowMs: session.restartGuard.windowMs,
maxRestarts: session.restartGuard.maxRestarts,
consecutiveFailures: session.restartGuard.consecutiveFailuresSinceSuccess,
maxConsecutiveFailures: session.restartGuard.maxConsecutiveFailures
});
session.consecutiveRestarts = 0;
this.terminateSession(session.sessionDbId, 'max_restarts_exceeded');
return;
}
logger.info('SYSTEM', 'Pending work remains after generator exit, restarting with fresh AbortController', {
sessionId: session.sessionDbId,
pendingCount,
attempt: session.consecutiveRestarts
});
// Reset AbortController for restart
session.abortController = new AbortController();
// Restart processor
this.startSessionProcessor(session, 'pending-work-restart');
this.broadcastProcessingStatus();
} else {
// Successful completion with no pending work — finalize then drop
// in-memory state. finalizeSession flips sdk_sessions.status to
// 'completed', drains orphaned pendings, broadcasts. This is the
// sole completion path now that the SessionEnd hook shim is gone.
session.restartGuard?.recordSuccess();
session.consecutiveRestarts = 0;
this.completionHandler.finalizeSession(session.sessionDbId);
this.sessionManager.removeSessionImmediate(session.sessionDbId);
}
});
}
/**
* Match errors that indicate the Claude Code process/session is gone (resume impossible).
* Used to trigger graceful fallback instead of leaving pending messages stuck forever.
*
* These patterns come from the Claude SDK's ProcessTransport and related internals.
* The SDK does not export typed error classes, so string matching on normalized
* messages is the only reliable detection method. Each pattern corresponds to a
* specific SDK failure mode:
* - 'process aborted by user': user cancelled the Claude Code session
* - 'processtransport': transport layer disconnected
* - 'not ready for writing': stdio pipe to Claude process is closed
* - 'session generator failed': wrapper error from our own agent layer
* - 'claude code process': process exited or was killed
*/
private static readonly SESSION_TERMINATED_PATTERNS = [
'process aborted by user',
'processtransport',
'not ready for writing',
'session generator failed',
'claude code process',
] as const;
private isSessionTerminatedError(error: unknown): boolean {
const msg = error instanceof Error ? error.message : String(error);
const normalized = msg.toLowerCase();
return WorkerService.SESSION_TERMINATED_PATTERNS.some(
pattern => normalized.includes(pattern)
);
}
/**
* When SDK resume fails due to terminated session: try Gemini then OpenRouter to drain
* pending messages; if no fallback available, mark messages abandoned and remove session.
*/
private async runFallbackForTerminatedSession(
session: ReturnType<typeof this.sessionManager.getSession>,
_originalError: unknown
): Promise<void> {
if (!session) return;
const sessionDbId = session.sessionDbId;
// Fallback agents need memorySessionId for storeObservations
if (!session.memorySessionId) {
const syntheticId = `fallback-${sessionDbId}-${Date.now()}`;
session.memorySessionId = syntheticId;
this.dbManager.getSessionStore().updateMemorySessionId(sessionDbId, syntheticId);
}
if (isGeminiAvailable()) {
try {
await this.geminiAgent.startSession(session, this);
return;
} catch (e) {
// [ANTI-PATTERN IGNORED]: Fallback chain by design — Gemini failure falls through to OpenRouter attempt
if (e instanceof Error) {
logger.warn('WORKER', 'Fallback Gemini failed, trying OpenRouter', {
sessionId: sessionDbId,
});
logger.error('WORKER', 'Gemini fallback error detail', { sessionId: sessionDbId }, e);
} else {
logger.error('WORKER', 'Gemini fallback failed with non-Error', { sessionId: sessionDbId }, new Error(String(e)));
}
}
}
if (isOpenRouterAvailable()) {
try {
await this.openRouterAgent.startSession(session, this);
return;
} catch (e) {
// [ANTI-PATTERN IGNORED]: Last fallback in chain — failure falls through to message abandonment, which is the designed terminal behavior
if (e instanceof Error) {
logger.error('WORKER', 'Fallback OpenRouter failed, will abandon messages', { sessionId: sessionDbId }, e);
} else {
logger.error('WORKER', 'Fallback OpenRouter failed with non-Error, will abandon messages', { sessionId: sessionDbId }, new Error(String(e)));
}
}
}
// No fallback or both failed: mark session completed in DB (drain pending
// + broadcast via finalizeSession, idempotent) then drop in-memory state.
// Without this, sdk_sessions.status stays 'active' forever — the deleted
// reapStaleSessions interval was the only prior backstop.
this.completionHandler.finalizeSession(sessionDbId);
this.sessionManager.removeSessionImmediate(sessionDbId);
}
/**
* Terminate a session that will not restart.
* Enforces the restart-or-terminate invariant: every generator exit
* must either call startSessionProcessor() or terminateSession().
* No zombie sessions allowed.
*
* GENERATOR EXIT INVARIANT:
* .finally() → restart? → startSessionProcessor()
* no? → terminateSession()
*/
private terminateSession(sessionDbId: number, reason: string): void {
logger.info('SYSTEM', 'Session terminated', { sessionId: sessionDbId, reason });
// finalizeSession marks sdk_sessions.status='completed', drains pending
// messages, and broadcasts. Idempotent. Without this, wall-clock-limited
// and unrecoverable-error paths leave DB rows as 'active' forever.
this.completionHandler.finalizeSession(sessionDbId);
// removeSessionImmediate fires onSessionDeletedCallback → broadcastProcessingStatus()
this.sessionManager.removeSessionImmediate(sessionDbId);
}
/**
* Process pending session queues
*/
async processPendingQueues(sessionLimit: number = 10): Promise<{
totalPendingSessions: number;
sessionsStarted: number;
sessionsSkipped: number;
startedSessionIds: number[];
}> {
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const sessionStore = this.dbManager.getSessionStore();
// Clean up stale 'active' sessions before processing
// Sessions older than 6 hours without activity are likely orphaned
const STALE_SESSION_THRESHOLD_MS = 6 * 60 * 60 * 1000;
const staleThreshold = Date.now() - STALE_SESSION_THRESHOLD_MS;
const staleSessionIds = sessionStore.db.prepare(`
SELECT id FROM sdk_sessions
WHERE status = 'active' AND started_at_epoch < ?
`).all(staleThreshold) as { id: number }[];
if (staleSessionIds.length > 0) {
const ids = staleSessionIds.map(r => r.id);
const placeholders = ids.map(() => '?').join(',');
const now = Date.now();
try {
sessionStore.db.prepare(`
UPDATE sdk_sessions
SET status = 'failed', completed_at_epoch = ?
WHERE id IN (${placeholders})
`).run(now, ...ids);
logger.info('SYSTEM', `Marked ${ids.length} stale sessions as failed`);
} catch (error) {
// [ANTI-PATTERN IGNORED]: Stale session cleanup is best-effort; pending queue processing below must still proceed
if (error instanceof Error) {
logger.error('WORKER', 'Failed to mark stale sessions as failed', { staleCount: ids.length }, error);
} else {
logger.error('WORKER', 'Failed to mark stale sessions as failed with non-Error', { staleCount: ids.length }, new Error(String(error)));
}
}
try {
const msgResult = sessionStore.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?
WHERE status = 'pending'
AND session_db_id IN (${placeholders})
`).run(now, ...ids);
if (msgResult.changes > 0) {
logger.info('SYSTEM', `Marked ${msgResult.changes} pending messages from stale sessions as failed`);
}
} catch (error) {
// [ANTI-PATTERN IGNORED]: Pending message cleanup is best-effort; queue processing below must still proceed
if (error instanceof Error) {
logger.error('WORKER', 'Failed to clean up stale pending messages', { staleCount: ids.length }, error);
} else {
logger.error('WORKER', 'Failed to clean up stale pending messages with non-Error', { staleCount: ids.length }, new Error(String(error)));
}
}
}
const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();
const result = {
totalPendingSessions: orphanedSessionIds.length,
sessionsStarted: 0,
sessionsSkipped: 0,
startedSessionIds: [] as number[]
};
if (orphanedSessionIds.length === 0) return result;
logger.info('SYSTEM', `Processing up to ${sessionLimit} of ${orphanedSessionIds.length} pending session queues`);
for (const sessionDbId of orphanedSessionIds) {
if (result.sessionsStarted >= sessionLimit) break;
const existingSession = this.sessionManager.getSession(sessionDbId);
if (existingSession?.generatorPromise) {
result.sessionsSkipped++;
continue;
}
try {
const session = this.sessionManager.initializeSession(sessionDbId);
this.startSessionProcessor(session, 'startup-recovery');
result.sessionsStarted++;
result.startedSessionIds.push(sessionDbId);
} catch (error) {
if (error instanceof Error) {
logger.error('WORKER', `Failed to initialize/start session ${sessionDbId}`, { sessionDbId }, error);
} else {
logger.error('WORKER', `Failed to initialize/start session ${sessionDbId} with non-Error`, { sessionDbId }, new Error(String(error)));
}
result.sessionsSkipped++;
// [ANTI-PATTERN IGNORED]: Per-session failure must not abort the loop; other sessions may still be recoverable
continue;
}
logger.info('SYSTEM', `Starting processor for session ${sessionDbId}`, {
project: this.sessionManager.getSession(sessionDbId)?.project,
pendingCount: pendingStore.getPendingCount(sessionDbId)
});
await new Promise(resolve => setTimeout(resolve, 100));
}
return result;
}
/**
* Shutdown the worker service
*/
async shutdown(): Promise<void> {
if (this.transcriptWatcher) {
this.transcriptWatcher.stop();
this.transcriptWatcher = null;
logger.info('TRANSCRIPT', 'Transcript watcher stopped');
}
await performGracefulShutdown({
server: this.server.getHttpServer(),
sessionManager: this.sessionManager,
mcpClient: this.mcpClient,
dbManager: this.dbManager,
chromaMcpManager: this.chromaMcpManager || undefined
});
}
/**
* Broadcast processing status change to SSE clients
*/
broadcastProcessingStatus(): void {
const queueDepth = this.sessionManager.getTotalActiveWork();
const isProcessing = queueDepth > 0;
const activeSessions = this.sessionManager.getActiveSessionCount();
logger.info('WORKER', 'Broadcasting processing status', {
isProcessing,
queueDepth,
activeSessions
});
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing,
queueDepth
});
}
}
// ============================================================================
// Reusable Worker Startup Logic
// ============================================================================
/**
* Ensures the worker is started and healthy.
*
* Thin wrapper around the canonical implementation in ./worker-spawner.ts.
*
* `__filename` is forwarded as the worker script path because, in the CJS
* bundle that ships to users, `__filename` always resolves to the compiled
* `worker-service.cjs` itself — which is exactly the script the spawner
* needs to relaunch as a detached daemon. The MCP server (a separate Node
* bundle) cannot rely on its own `__filename` because that would point at
* `mcp-server.cjs`, so it computes the worker path explicitly via
* `dirname(__filename) + 'worker-service.cjs'` instead.
*
* @param port - The TCP port (used for port-in-use checks and daemon spawn)
* @returns true if worker is healthy (existing or newly started), false on failure
*/
export async function ensureWorkerStarted(port: number): Promise<boolean> {
return ensureWorkerStartedShared(port, __filename);
}
// ============================================================================
// CLI Entry Point
// ============================================================================
async function main() {
const command = process.argv[2];
// Early exit if plugin is disabled in Claude Code settings (#781).
// Only gate hook-initiated commands; CLI management (stop/status) still works.
const hookInitiatedCommands = ['start', 'hook', 'restart', '--daemon'];
if ((hookInitiatedCommands.includes(command) || command === undefined) && isPluginDisabledInClaudeSettings()) {
process.exit(0);
}
const port = getWorkerPort();
// Helper for JSON status output in 'start' command
// Exit code 0 ensures Windows Terminal doesn't keep tabs open
function exitWithStatus(status: 'ready' | 'error', message?: string): never {
const output = buildStatusOutput(status, message);
console.log(JSON.stringify(output));
process.exit(0);
}
switch (command) {
case 'start': {
const success = await ensureWorkerStarted(port);
if (success) {
exitWithStatus('ready');
} else {
exitWithStatus('error', 'Failed to start worker');
}
break;
}
case 'stop': {
await httpShutdown(port);
const freed = await waitForPortFree(port, getPlatformTimeout(15000));
if (!freed) {
logger.warn('SYSTEM', 'Port did not free up after shutdown', { port });
}
removePidFile();
logger.info('SYSTEM', 'Worker stopped successfully');
process.exit(0);
break;
}
case 'restart': {
logger.info('SYSTEM', 'Restarting worker');
await httpShutdown(port);
const restartFreed = await waitForPortFree(port, 5000);
if (!restartFreed) {
// Don't loop, don't force-kill, don't steal the port. The PID file
// owns the lock; if the previous worker won't release the port the
// user resolves it manually.
console.error('Port still bound after shutdown. Resolve manually.');
process.exit(1);
}
removePidFile();
const restartPid = spawnDaemon(__filename, port);
if (restartPid === undefined) {
console.error('Failed to spawn worker daemon during restart.');
process.exit(1);
}
logger.info('SYSTEM', 'Worker restart spawned', { pid: restartPid });
process.exit(0);
break;
}
case 'status': {
const portInUse = await isPortInUse(port);
const pidInfo = readPidFile();
if (portInUse && pidInfo) {
console.log('Worker is running');
console.log(` PID: ${pidInfo.pid}`);
console.log(` Port: ${pidInfo.port}`);
console.log(` Started: ${pidInfo.startedAt}`);
} else {
console.log('Worker is not running');
}
process.exit(0);
break;
}
case 'cursor': {
const subcommand = process.argv[3];
const cursorResult = await handleCursorCommand(subcommand, process.argv.slice(4));
process.exit(cursorResult);
break;
}
case 'gemini-cli': {
const geminiSubcommand = process.argv[3];
const geminiResult = await handleGeminiCliCommand(geminiSubcommand, process.argv.slice(4));
process.exit(geminiResult);
break;
}
case 'hook': {
// Validate CLI args first (before any I/O)
const platform = process.argv[3];
const event = process.argv[4];
if (!platform || !event) {
console.error('Usage: claude-mem hook <platform> <event>');
console.error('Platforms: claude-code, cursor, gemini-cli, raw');
console.error('Events: context, session-init, observation, summarize, user-message');
process.exit(1);
}
// Ensure worker is running as a detached daemon (#1249).
//
// IMPORTANT: The hook process MUST NOT become the worker. Starting the
// worker in-process makes it a grandchild of Claude Code, which the
// sandbox kills. Instead, ensureWorkerStarted() spawns a fully detached
// daemon (detached: true, stdio: 'ignore', child.unref()) that survives
// the hook process's exit and is invisible to Claude Code's sandbox.
const workerReady = await ensureWorkerStarted(port);
if (!workerReady) {
logger.warn('SYSTEM', 'Worker failed to start before hook, handler will proceed gracefully');
}
const { hookCommand } = await import('../cli/hook-command.js');
await hookCommand(platform, event);
break;
}
case 'generate': {
const dryRun = process.argv.includes('--dry-run');
const { generateClaudeMd } = await import('../cli/claude-md-commands.js');
const result = await generateClaudeMd(dryRun);
process.exit(result);
break;
}
case 'clean': {
const dryRun = process.argv.includes('--dry-run');
const { cleanClaudeMd } = await import('../cli/claude-md-commands.js');
const result = await cleanClaudeMd(dryRun);
process.exit(result);
break;
}
case 'adopt': {
const dryRun = process.argv.includes('--dry-run');
const branchIndex = process.argv.indexOf('--branch');
const branchValue = branchIndex !== -1 ? process.argv[branchIndex + 1] : undefined;
if (branchIndex !== -1 && (!branchValue || branchValue.startsWith('--'))) {
console.error('Usage: adopt [--dry-run] [--branch <branch>] [--cwd <path>]');
process.exit(1);
}
const onlyBranch = branchValue;
// Honor an explicit --cwd override so the NPX CLI can pass through the
// user's working directory (the spawn sets cwd to the marketplace dir).
const cwdIndex = process.argv.indexOf('--cwd');
const cwdValue = cwdIndex !== -1 ? process.argv[cwdIndex + 1] : undefined;
if (cwdIndex !== -1 && (!cwdValue || cwdValue.startsWith('--'))) {
console.error('Usage: adopt [--dry-run] [--branch <branch>] [--cwd <path>]');
process.exit(1);
}
const repoPath = cwdValue ?? process.cwd();
const result = await adoptMergedWorktrees({ repoPath, dryRun, onlyBranch });
const tag = result.dryRun ? '(dry-run)' : '(applied)';
console.log(`\nWorktree adoption ${tag}`);
console.log(` Parent project: ${result.parentProject || '(unknown)'}`);
console.log(` Repo: ${result.repoPath}`);
console.log(` Worktrees scanned: ${result.scannedWorktrees}`);
console.log(` Merged branches: ${result.mergedBranches.join(', ') || '(none)'}`);
console.log(` Observations adopted: ${result.adoptedObservations}`);
console.log(` Summaries adopted: ${result.adoptedSummaries}`);
console.log(` Chroma docs updated: ${result.chromaUpdates}`);
if (result.chromaFailed > 0) {
console.log(` Chroma sync failures: ${result.chromaFailed} (will retry on next run)`);
}
for (const err of result.errors) {
console.log(` ! ${err.worktree}: ${err.error}`);
}
process.exit(0);
}
case 'cleanup': {
// CLI surface for the v12.4.3 pollution cleanup. Shares its scan logic
// with the auto-run-on-startup path so --dry-run reports counts that
// exactly match what the next startup would delete. (#2126 item 5)
const dryRun = process.argv.includes('--dry-run');
const counts = runOneTimeV12_4_3Cleanup(undefined, { dryRun });
const tag = dryRun ? '(dry-run, no changes made)' : '(applied)';
console.log(`\nv12.4.3 cleanup ${tag}`);
if (counts) {
console.log(` Observer sessions: ${counts.observerSessions}`);
console.log(` Observer cascade rows: ${counts.observerCascadeRows}`);
console.log(` Stuck pending_messages: ${counts.stuckPendingMessages}`);
} else if (dryRun) {
console.log(' Scan failed — see worker log for details.');
} else {
console.log(' Already applied (marker present) or skipped.');
}
process.exit(0);
}
case '--daemon':
default: {
// GUARD 1: Refuse to start if another worker is already alive.
// Verifies PID *identity* (via start-time token) not just liveness, so a
// stale PID file pointing at a PID that's since been reused by an
// unrelated process (e.g. container restart reusing low PIDs) doesn't
// false-positive.
const existingPidInfo = readPidFile();
if (verifyPidFileOwnership(existingPidInfo)) {
logger.info('SYSTEM', 'Worker already running (PID alive), refusing to start duplicate', {
existingPid: existingPidInfo.pid,
existingPort: existingPidInfo.port,
startedAt: existingPidInfo.startedAt
});
process.exit(0);
}
// GUARD 2: Refuse to start if the port is already bound.
// Catches the race where two daemons start simultaneously before
// either writes a PID file. Must run BEFORE constructing WorkerService
// because the constructor registers signal handlers and timers that
// prevent the process from exiting even if listen() fails later.
if (await isPortInUse(port)) {
logger.info('SYSTEM', 'Port already in use, refusing to start duplicate', { port });
process.exit(0);
}
// Prevent daemon from dying silently on unhandled errors.
// The HTTP server can continue serving even if a background task throws.
process.on('unhandledRejection', (reason) => {
logger.error('SYSTEM', 'Unhandled rejection in daemon', {
reason: reason instanceof Error ? reason.message : String(reason)
});
});
process.on('uncaughtException', (error) => {
logger.error('SYSTEM', 'Uncaught exception in daemon', {}, error as Error);
// Don't exit — keep the HTTP server running
});
const worker = new WorkerService();
worker.start().catch(async (error) => {
// Port race: when the MCP server and SessionStart hook both spawn a daemon
// concurrently, one will lose the bind race with EADDRINUSE or Bun's equivalent
// "port in use" error. If the winner is already healthy, exit cleanly (#1447).
const isPortConflict = error instanceof Error && (
(error as NodeJS.ErrnoException).code === 'EADDRINUSE' ||
/port.*in use|address.*in use/i.test(error.message)
);
if (isPortConflict && await waitForHealth(port, 3000)) {
logger.info('SYSTEM', 'Duplicate daemon exiting — another worker already claimed port', { port });
process.exit(0);
}
logger.failure('SYSTEM', 'Worker failed to start', {}, error as Error);
removePidFile();
// Exit gracefully: Windows Terminal won't keep tab open on exit 0
// The wrapper/plugin will handle restart logic if needed
process.exit(0);
});
}
}
}
// Check if running as main module in both ESM and CommonJS
// The CLAUDE_MEM_MANAGED check handles Bun on Windows where require.main !== module
// in CJS mode despite being the entry point (see #1450)
const isMainModule = typeof require !== 'undefined' && typeof module !== 'undefined'
? require.main === module || !module.parent || process.env.CLAUDE_MEM_MANAGED === 'true'
: import.meta.url === `file://${process.argv[1]}`
|| process.argv[1]?.endsWith('worker-service')
|| process.argv[1]?.endsWith('worker-service.cjs')
|| process.argv[1]?.replaceAll('\\', '/') === __filename?.replaceAll('\\', '/');
if (isMainModule) {
main().catch((error) => {
logger.error('SYSTEM', 'Fatal error in main', {}, error instanceof Error ? error : undefined);
process.exit(0); // Exit 0: don't block Claude Code, don't leave Windows Terminal tabs open
});
}