feat: Implement Worker Service v2 with improved architecture

- Complete rewrite of the Worker Service following object-oriented principles.
- Introduced a single long-lived database connection to reduce overhead.
- Implemented event-driven queues to eliminate polling.
- Added DRY utilities for pagination and settings management.
- Reduced code size significantly from 1173 lines to approximately 600-700 lines.
- Created various composed services including DatabaseManager, SessionManager, and SDKAgent.
- Enhanced SSE broadcasting capabilities for real-time client updates.
- Established a structured approach for session lifecycle management and event handling.
- Introduced type safety with shared TypeScript interfaces for better maintainability.
This commit is contained in:
Alex Newman
2025-11-06 23:56:25 -05:00
parent 9eddc51979
commit 980151a50e
12 changed files with 3721 additions and 2 deletions
+115
View File
@@ -0,0 +1,115 @@
/**
* DatabaseManager: Single long-lived database connection
*
* Responsibility:
* - Manage single database connection for worker lifetime
* - Provide centralized access to SessionStore and SessionSearch
* - High-level database operations
* - ChromaSync integration
*/
import { SessionStore } from '../sqlite/SessionStore.js';
import { SessionSearch } from '../sqlite/SessionSearch.js';
import { ChromaSync } from '../sync/ChromaSync.js';
import { logger } from '../../utils/logger.js';
import type { DBSession } from '../worker-types.js';
export class DatabaseManager {
private sessionStore: SessionStore | null = null;
private sessionSearch: SessionSearch | null = null;
private chromaSync: ChromaSync | null = null;
/**
* Initialize database connection (once, stays open)
*/
async initialize(): Promise<void> {
// Open database connection (ONCE)
this.sessionStore = new SessionStore();
this.sessionSearch = new SessionSearch();
// Initialize ChromaSync
this.chromaSync = new ChromaSync('claude-mem');
// Start background backfill (fire-and-forget)
this.chromaSync.ensureBackfilled().catch(() => {});
logger.info('DB', 'Database initialized');
}
/**
* Close database connection
*/
async close(): Promise<void> {
if (this.sessionStore) {
this.sessionStore.close();
this.sessionStore = null;
}
if (this.sessionSearch) {
this.sessionSearch.close();
this.sessionSearch = null;
}
logger.info('DB', 'Database closed');
}
/**
* Get SessionStore instance (throws if not initialized)
*/
getSessionStore(): SessionStore {
if (!this.sessionStore) {
throw new Error('Database not initialized');
}
return this.sessionStore;
}
/**
* Get SessionSearch instance (throws if not initialized)
*/
getSessionSearch(): SessionSearch {
if (!this.sessionSearch) {
throw new Error('Database not initialized');
}
return this.sessionSearch;
}
/**
* Get ChromaSync instance (throws if not initialized)
*/
getChromaSync(): ChromaSync {
if (!this.chromaSync) {
throw new Error('ChromaSync not initialized');
}
return this.chromaSync;
}
/**
* Cleanup orphaned sessions from previous runs
* @returns Number of sessions cleaned
*/
cleanupOrphanedSessions(): number {
return this.getSessionStore().cleanupOrphanedSessions();
}
/**
* Get session by ID (throws if not found)
*/
getSessionById(sessionDbId: number): {
id: number;
claude_session_id: string;
sdk_session_id: string | null;
project: string;
user_prompt: string;
} {
const session = this.getSessionStore().getSessionById(sessionDbId);
if (!session) {
throw new Error(`Session ${sessionDbId} not found`);
}
return session;
}
/**
* Mark session as completed
*/
markSessionComplete(sessionDbId: number): void {
this.getSessionStore().markSessionCompleted(sessionDbId);
}
}
+92
View File
@@ -0,0 +1,92 @@
/**
* PaginationHelper: DRY pagination utility
*
* Responsibility:
* - DRY helper for paginated queries
* - Eliminates copy-paste across observations/summaries/prompts endpoints
* - Efficient LIMIT+1 trick to avoid COUNT(*) query
*/
import { DatabaseManager } from './DatabaseManager.js';
import type { PaginatedResult, Observation, Summary, UserPrompt } from '../worker-types.js';
export class PaginationHelper {
private dbManager: DatabaseManager;
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Get paginated observations
*/
getObservations(offset: number, limit: number, project?: string): PaginatedResult<Observation> {
return this.paginate<Observation>(
'observations',
'id, session_db_id, claude_session_id, project, type, title, subtitle, text, concepts, files, prompt_number, created_at, created_at_epoch',
offset,
limit,
project
);
}
/**
* Get paginated summaries
*/
getSummaries(offset: number, limit: number, project?: string): PaginatedResult<Summary> {
return this.paginate<Summary>(
'summaries',
'id, session_db_id, claude_session_id, project, request, completion, summary, learnings, notes, created_at, created_at_epoch',
offset,
limit,
project
);
}
/**
* Get paginated user prompts
*/
getPrompts(offset: number, limit: number, project?: string): PaginatedResult<UserPrompt> {
return this.paginate<UserPrompt>(
'user_prompts',
'id, session_db_id, claude_session_id, project, prompt, created_at, created_at_epoch',
offset,
limit,
project
);
}
/**
* Generic pagination implementation (DRY)
*/
private paginate<T>(
table: string,
columns: string,
offset: number,
limit: number,
project?: string
): PaginatedResult<T> {
const db = this.dbManager.getSessionStore().db;
let query = `SELECT ${columns} FROM ${table}`;
const params: any[] = [];
if (project) {
query += ' WHERE project = ?';
params.push(project);
}
query += ' ORDER BY created_at_epoch DESC LIMIT ? OFFSET ?';
params.push(limit + 1, offset); // Fetch one extra to check hasMore
const stmt = db.prepare(query);
const results = stmt.all(...params) as T[];
return {
items: results.slice(0, limit),
hasMore: results.length > limit,
offset,
limit
};
}
}
+259
View File
@@ -0,0 +1,259 @@
/**
* SDKAgent: SDK query loop handler
*
* Responsibility:
* - Spawn Claude subprocess via Agent SDK
* - Run event-driven query loop (no polling)
* - Process SDK responses (observations, summaries)
* - Sync to database and Chroma
*/
import { execSync } from 'child_process';
import { homedir } from 'os';
import path from 'path';
import { existsSync, readFileSync } from 'fs';
import { DatabaseManager } from './DatabaseManager.js';
import { SessionManager } from './SessionManager.js';
import { logger } from '../../utils/logger.js';
import { parseObservations, parseSummary } from '../../sdk/parser.js';
import { buildInitPrompt, buildObservationPrompt, buildSummaryPrompt } from '../../sdk/prompts.js';
import type { ActiveSession, SDKUserMessage, PendingMessage } from '../worker-types.js';
// Import Agent SDK (assumes it's installed)
// @ts-ignore - Agent SDK types may not be available
import { query } from '@anthropic-ai/claude-agent-sdk';
export class SDKAgent {
private dbManager: DatabaseManager;
private sessionManager: SessionManager;
constructor(dbManager: DatabaseManager, sessionManager: SessionManager) {
this.dbManager = dbManager;
this.sessionManager = sessionManager;
}
/**
* Start SDK agent for a session (event-driven, no polling)
*/
async startSession(session: ActiveSession): Promise<void> {
try {
// Find Claude executable
const claudePath = this.findClaudeExecutable();
// Get model ID and disallowed tools
const modelId = this.getModelId();
const disallowedTools = ['Bash']; // Prevent infinite loops
// Create message generator (event-driven)
const messageGenerator = this.createMessageGenerator(session);
// Run Agent SDK query loop
const queryResult = query({
prompt: messageGenerator,
options: {
model: modelId,
disallowedTools,
abortController: session.abortController,
pathToClaudeCodeExecutable: claudePath
}
});
// Process SDK messages
for await (const message of queryResult) {
// Handle assistant messages
if (message.type === 'assistant') {
const content = message.message.content;
const textContent = Array.isArray(content)
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
: typeof content === 'string' ? content : '';
const responseSize = textContent.length;
logger.dataOut('SDK', `Response received (${responseSize} chars)`, {
sessionId: session.sessionDbId,
promptNumber: session.lastPromptNumber
});
// Parse and process response
await this.processSDKResponse(session, textContent);
}
// Log result messages
if (message.type === 'result' && message.subtype === 'success') {
// Usage telemetry is captured at SDK level
}
}
// Mark session complete
const sessionDuration = Date.now() - session.startTime;
logger.success('SDK', 'Agent completed', {
sessionId: session.sessionDbId,
duration: `${(sessionDuration / 1000).toFixed(1)}s`
});
this.dbManager.getSessionStore().markSessionCompleted(session.sessionDbId);
} catch (error: any) {
if (error.name === 'AbortError') {
logger.warn('SDK', 'Agent aborted', { sessionId: session.sessionDbId });
} else {
logger.failure('SDK', 'Agent error', { sessionDbId: session.sessionDbId }, error);
}
throw error;
} finally {
// Cleanup
this.sessionManager.deleteSession(session.sessionDbId).catch(() => {});
}
}
/**
* Create event-driven message generator (yields messages from SessionManager)
*/
private async *createMessageGenerator(session: ActiveSession): AsyncIterableIterator<SDKUserMessage> {
// Yield initial user prompt with context
yield {
type: 'user',
message: {
role: 'user',
content: buildInitPrompt(session.project, session.claudeSessionId, session.userPrompt)
},
session_id: session.claudeSessionId,
parent_tool_use_id: null,
isSynthetic: true
};
// Consume pending messages from SessionManager (event-driven, no polling)
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
if (message.type === 'observation') {
// Update last prompt number
if (message.prompt_number !== undefined) {
session.lastPromptNumber = message.prompt_number;
}
yield {
type: 'user',
message: {
role: 'user',
content: buildObservationPrompt({
id: 0, // Not used in prompt
tool_name: message.tool_name!,
tool_input: JSON.stringify(message.tool_input),
tool_output: JSON.stringify(message.tool_output),
created_at_epoch: Date.now()
})
},
session_id: session.claudeSessionId,
parent_tool_use_id: null,
isSynthetic: true
};
} else if (message.type === 'summarize') {
yield {
type: 'user',
message: {
role: 'user',
content: buildSummaryPrompt({
id: session.sessionDbId,
sdk_session_id: session.sdkSessionId,
project: session.project,
user_prompt: session.userPrompt
})
},
session_id: session.claudeSessionId,
parent_tool_use_id: null,
isSynthetic: true
};
}
}
}
/**
* Process SDK response text (parse XML, save to database, sync to Chroma)
*/
private async processSDKResponse(session: ActiveSession, text: string): Promise<void> {
// Parse observations
const observations = parseObservations(text, session.claudeSessionId);
// Store observations
for (const obs of observations) {
const { id: obsId, createdAtEpoch } = this.dbManager.getSessionStore().storeObservation(
session.claudeSessionId,
session.project,
obs,
session.lastPromptNumber
);
// Sync to Chroma (fire-and-forget)
this.dbManager.getChromaSync().syncObservation(
obsId,
session.claudeSessionId,
session.project,
obs,
session.lastPromptNumber,
createdAtEpoch
).catch(() => {});
logger.info('SDK', 'Observation saved', { obsId, type: obs.type });
}
// Parse summary
const summary = parseSummary(text, session.sessionDbId);
// Store summary
if (summary) {
const { id: summaryId, createdAtEpoch } = this.dbManager.getSessionStore().storeSummary(
session.claudeSessionId,
session.project,
summary,
session.lastPromptNumber
);
// Sync to Chroma (fire-and-forget)
this.dbManager.getChromaSync().syncSummary(
summaryId,
session.claudeSessionId,
session.project,
summary,
session.lastPromptNumber,
createdAtEpoch
).catch(() => {});
logger.info('SDK', 'Summary saved', { summaryId });
}
}
// ============================================================================
// Configuration Helpers
// ============================================================================
/**
* Find Claude executable (inline, called once per session)
*/
private findClaudeExecutable(): string {
const claudePath = process.env.CLAUDE_CODE_PATH ||
execSync(process.platform === 'win32' ? 'where claude' : 'which claude', { encoding: 'utf8' })
.trim().split('\n')[0].trim();
if (!claudePath) {
throw new Error('Claude executable not found in PATH');
}
return claudePath;
}
/**
* Get model ID from settings or environment
*/
private getModelId(): string {
try {
const settingsPath = path.join(homedir(), '.claude-mem', 'settings.json');
if (existsSync(settingsPath)) {
const settings = JSON.parse(readFileSync(settingsPath, 'utf-8'));
const modelId = settings.env?.CLAUDE_MEM_MODEL;
if (modelId) return modelId;
}
} catch {
// Fall through to env var or default
}
return process.env.CLAUDE_MEM_MODEL || 'claude-haiku-4-5';
}
}
+83
View File
@@ -0,0 +1,83 @@
/**
* SSEBroadcaster: SSE client management
*
* Responsibility:
* - Manage SSE client connections
* - Broadcast events to all connected clients
* - Handle disconnections gracefully
* - Single-pass broadcast (no two-step cleanup)
*/
import type { Response } from 'express';
import { logger } from '../../utils/logger.js';
import type { SSEEvent, SSEClient } from '../worker-types.js';
export class SSEBroadcaster {
private sseClients: Set<SSEClient> = new Set();
/**
* Add a new SSE client connection
*/
addClient(res: Response): void {
this.sseClients.add(res);
logger.debug('WORKER', 'Client connected', { total: this.sseClients.size });
// Setup cleanup on disconnect
res.on('close', () => {
this.removeClient(res);
});
// Send initial event
this.sendToClient(res, { type: 'connected', timestamp: Date.now() });
}
/**
* Remove a client connection
*/
removeClient(res: Response): void {
this.sseClients.delete(res);
logger.debug('WORKER', 'Client disconnected', { total: this.sseClients.size });
}
/**
* Broadcast an event to all connected clients (single-pass)
*/
broadcast(event: SSEEvent): void {
if (this.sseClients.size === 0) {
return; // Short-circuit if no clients
}
const eventWithTimestamp = { ...event, timestamp: Date.now() };
const data = `data: ${JSON.stringify(eventWithTimestamp)}\n\n`;
// Single-pass write + cleanup
for (const client of this.sseClients) {
try {
client.write(data);
} catch (err) {
// Remove failed client immediately
this.sseClients.delete(client);
logger.debug('WORKER', 'Client removed due to write error');
}
}
}
/**
* Get number of connected clients
*/
getClientCount(): number {
return this.sseClients.size;
}
/**
* Send event to a specific client
*/
private sendToClient(res: Response, event: SSEEvent): void {
const data = `data: ${JSON.stringify(event)}\n\n`;
try {
res.write(data);
} catch (err) {
this.sseClients.delete(res);
}
}
}
+182
View File
@@ -0,0 +1,182 @@
/**
* SessionManager: Event-driven session lifecycle
*
* Responsibility:
* - Manage active session lifecycle
* - Handle event-driven message queues
* - Coordinate between HTTP requests and SDK agent
* - Zero-latency event notification (no polling)
*/
import { EventEmitter } from 'events';
import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import type { ActiveSession, PendingMessage, ObservationData } from '../worker-types.js';
export class SessionManager {
private dbManager: DatabaseManager;
private sessions: Map<number, ActiveSession> = new Map();
private sessionQueues: Map<number, EventEmitter> = new Map();
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Initialize a new session or return existing one
*/
initializeSession(sessionDbId: number): ActiveSession {
// Check if already active
let session = this.sessions.get(sessionDbId);
if (session) {
return session;
}
// Fetch from database
const dbSession = this.dbManager.getSessionById(sessionDbId);
// Create active session
session = {
sessionDbId,
claudeSessionId: dbSession.claude_session_id,
sdkSessionId: null,
project: dbSession.project,
userPrompt: dbSession.user_prompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: 0,
startTime: Date.now()
};
this.sessions.set(sessionDbId, session);
// Create event emitter for queue notifications
const emitter = new EventEmitter();
this.sessionQueues.set(sessionDbId, emitter);
logger.info('WORKER', 'Session initialized', { sessionDbId, project: session.project });
return session;
}
/**
* Get active session by ID
*/
getSession(sessionDbId: number): ActiveSession | undefined {
return this.sessions.get(sessionDbId);
}
/**
* Queue an observation for processing (zero-latency notification)
*/
queueObservation(sessionDbId: number, data: ObservationData): void {
const session = this.sessions.get(sessionDbId);
if (!session) {
throw new Error(`Session ${sessionDbId} not active`);
}
session.pendingMessages.push({
type: 'observation',
tool_name: data.tool_name,
tool_input: data.tool_input,
tool_output: data.tool_output,
prompt_number: data.prompt_number
});
// Notify generator immediately (zero latency)
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
logger.debug('WORKER', 'Observation queued', {
sessionDbId,
queueLength: session.pendingMessages.length
});
}
/**
* Queue a summarize request (zero-latency notification)
*/
queueSummarize(sessionDbId: number): void {
const session = this.sessions.get(sessionDbId);
if (!session) {
throw new Error(`Session ${sessionDbId} not active`);
}
session.pendingMessages.push({ type: 'summarize' });
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
logger.debug('WORKER', 'Summarize queued', { sessionDbId });
}
/**
* Delete a session (abort SDK agent and cleanup)
*/
async deleteSession(sessionDbId: number): Promise<void> {
const session = this.sessions.get(sessionDbId);
if (!session) {
return; // Already deleted
}
// Abort the SDK agent
session.abortController.abort();
// Wait for generator to finish
if (session.generatorPromise) {
await session.generatorPromise.catch(() => {});
}
// Cleanup
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);
logger.info('WORKER', 'Session deleted', { sessionDbId });
}
/**
* Shutdown all active sessions
*/
async shutdownAll(): Promise<void> {
const sessionIds = Array.from(this.sessions.keys());
await Promise.all(sessionIds.map(id => this.deleteSession(id)));
}
/**
* Get message iterator for SDKAgent to consume (event-driven, no polling)
*/
async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessage> {
const session = this.sessions.get(sessionDbId);
if (!session) {
throw new Error(`Session ${sessionDbId} not active`);
}
const emitter = this.sessionQueues.get(sessionDbId);
if (!emitter) {
throw new Error(`No emitter for session ${sessionDbId}`);
}
while (!session.abortController.signal.aborted) {
// Wait for messages if queue is empty
if (session.pendingMessages.length === 0) {
await new Promise<void>(resolve => {
const handler = () => resolve();
emitter.once('message', handler);
// Also listen for abort
session.abortController.signal.addEventListener('abort', () => {
emitter.off('message', handler);
resolve();
}, { once: true });
});
}
// Yield all pending messages
while (session.pendingMessages.length > 0) {
const message = session.pendingMessages.shift()!;
yield message;
}
}
}
}
+68
View File
@@ -0,0 +1,68 @@
/**
* SettingsManager: DRY settings CRUD utility
*
* Responsibility:
* - DRY helper for viewer settings CRUD
* - Eliminates duplication in settings read/write logic
* - Type-safe settings management
*/
import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import type { ViewerSettings } from '../worker-types.js';
export class SettingsManager {
private dbManager: DatabaseManager;
private readonly defaultSettings: ViewerSettings = {
sidebarOpen: true,
selectedProject: null,
theme: 'system'
};
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Get current viewer settings (with defaults)
*/
getSettings(): ViewerSettings {
const db = this.dbManager.getSessionStore().db;
try {
const stmt = db.prepare('SELECT key, value FROM viewer_settings');
const rows = stmt.all() as Array<{ key: string; value: string }>;
const settings: ViewerSettings = { ...this.defaultSettings };
for (const row of rows) {
const key = row.key as keyof ViewerSettings;
if (key in settings) {
(settings as any)[key] = JSON.parse(row.value);
}
}
return settings;
} catch (error) {
logger.debug('WORKER', 'Failed to load settings, using defaults', {}, error as Error);
return { ...this.defaultSettings };
}
}
/**
* Update viewer settings (partial update)
*/
updateSettings(updates: Partial<ViewerSettings>): ViewerSettings {
const db = this.dbManager.getSessionStore().db;
const stmt = db.prepare(`
INSERT OR REPLACE INTO viewer_settings (key, value)
VALUES (?, ?)
`);
for (const [key, value] of Object.entries(updates)) {
stmt.run(key, JSON.stringify(value));
}
return this.getSettings();
}
}