feat: Implement Worker Service for long-running HTTP service with PM2 management
- Introduced WorkerService class to handle HTTP requests and manage sessions. - Added endpoints for health check, session management, and data retrieval. - Integrated ChromaSync for background data synchronization. - Implemented SSE for real-time updates to connected clients. - Added error handling and logging throughout the service. - Cached Claude executable path for improved performance. - Included settings management for user configuration. - Established database interactions for session and observation management.
This commit is contained in:
@@ -81,13 +81,9 @@ export class DatabaseManager {
|
||||
return this.chromaSync;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup orphaned sessions from previous runs
|
||||
* @returns Number of sessions cleaned
|
||||
*/
|
||||
cleanupOrphanedSessions(): number {
|
||||
return this.getSessionStore().cleanupOrphanedSessions();
|
||||
}
|
||||
// REMOVED: cleanupOrphanedSessions - violates "EVERYTHING SHOULD SAVE ALWAYS"
|
||||
// Worker restarts don't make sessions orphaned. Sessions are managed by hooks
|
||||
// and exist independently of worker state.
|
||||
|
||||
/**
|
||||
* Get session by ID (throws if not found)
|
||||
|
||||
@@ -23,7 +23,7 @@ export class PaginationHelper {
|
||||
getObservations(offset: number, limit: number, project?: string): PaginatedResult<Observation> {
|
||||
return this.paginate<Observation>(
|
||||
'observations',
|
||||
'id, session_db_id, claude_session_id, project, type, title, subtitle, text, concepts, files, prompt_number, created_at, created_at_epoch',
|
||||
'id, sdk_session_id, project, type, title, subtitle, narrative, text, facts, concepts, files_read, files_modified, prompt_number, created_at, created_at_epoch',
|
||||
offset,
|
||||
limit,
|
||||
project
|
||||
@@ -34,26 +34,73 @@ export class PaginationHelper {
|
||||
* Get paginated summaries
|
||||
*/
|
||||
getSummaries(offset: number, limit: number, project?: string): PaginatedResult<Summary> {
|
||||
return this.paginate<Summary>(
|
||||
'summaries',
|
||||
'id, session_db_id, claude_session_id, project, request, completion, summary, learnings, notes, created_at, created_at_epoch',
|
||||
const db = this.dbManager.getSessionStore().db;
|
||||
|
||||
let query = `
|
||||
SELECT
|
||||
ss.id,
|
||||
s.claude_session_id as session_id,
|
||||
ss.request,
|
||||
ss.learned,
|
||||
ss.completed,
|
||||
ss.next_steps,
|
||||
ss.project,
|
||||
ss.created_at,
|
||||
ss.created_at_epoch
|
||||
FROM session_summaries ss
|
||||
JOIN sdk_sessions s ON ss.sdk_session_id = s.sdk_session_id
|
||||
`;
|
||||
const params: any[] = [];
|
||||
|
||||
if (project) {
|
||||
query += ' WHERE ss.project = ?';
|
||||
params.push(project);
|
||||
}
|
||||
|
||||
query += ' ORDER BY ss.created_at_epoch DESC LIMIT ? OFFSET ?';
|
||||
params.push(limit + 1, offset);
|
||||
|
||||
const stmt = db.prepare(query);
|
||||
const results = stmt.all(...params) as Summary[];
|
||||
|
||||
return {
|
||||
items: results.slice(0, limit),
|
||||
hasMore: results.length > limit,
|
||||
offset,
|
||||
limit,
|
||||
project
|
||||
);
|
||||
limit
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Get paginated user prompts
|
||||
*/
|
||||
getPrompts(offset: number, limit: number, project?: string): PaginatedResult<UserPrompt> {
|
||||
return this.paginate<UserPrompt>(
|
||||
'user_prompts',
|
||||
'id, session_db_id, claude_session_id, project, prompt, created_at, created_at_epoch',
|
||||
const db = this.dbManager.getSessionStore().db;
|
||||
|
||||
let query = `
|
||||
SELECT up.id, up.claude_session_id, s.project, up.prompt_number, up.prompt_text, up.created_at, up.created_at_epoch
|
||||
FROM user_prompts up
|
||||
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
|
||||
`;
|
||||
const params: any[] = [];
|
||||
|
||||
if (project) {
|
||||
query += ' WHERE s.project = ?';
|
||||
params.push(project);
|
||||
}
|
||||
|
||||
query += ' ORDER BY up.created_at_epoch DESC LIMIT ? OFFSET ?';
|
||||
params.push(limit + 1, offset);
|
||||
|
||||
const stmt = db.prepare(query);
|
||||
const results = stmt.all(...params) as UserPrompt[];
|
||||
|
||||
return {
|
||||
items: results.slice(0, limit),
|
||||
hasMore: results.length > limit,
|
||||
offset,
|
||||
limit,
|
||||
project
|
||||
);
|
||||
limit
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -34,8 +34,9 @@ export class SDKAgent {
|
||||
|
||||
/**
|
||||
* Start SDK agent for a session (event-driven, no polling)
|
||||
* @param worker WorkerService reference for spinner control (optional)
|
||||
*/
|
||||
async startSession(session: ActiveSession): Promise<void> {
|
||||
async startSession(session: ActiveSession, worker?: any): Promise<void> {
|
||||
try {
|
||||
// Find Claude executable
|
||||
const claudePath = this.findClaudeExecutable();
|
||||
@@ -74,7 +75,7 @@ export class SDKAgent {
|
||||
});
|
||||
|
||||
// Parse and process response
|
||||
await this.processSDKResponse(session, textContent);
|
||||
await this.processSDKResponse(session, textContent, worker);
|
||||
}
|
||||
|
||||
// Log result messages
|
||||
@@ -168,7 +169,7 @@ export class SDKAgent {
|
||||
/**
|
||||
* Process SDK response text (parse XML, save to database, sync to Chroma)
|
||||
*/
|
||||
private async processSDKResponse(session: ActiveSession, text: string): Promise<void> {
|
||||
private async processSDKResponse(session: ActiveSession, text: string, worker?: any): Promise<void> {
|
||||
// Parse observations
|
||||
const observations = parseObservations(text, session.claudeSessionId);
|
||||
|
||||
@@ -191,6 +192,23 @@ export class SDKAgent {
|
||||
createdAtEpoch
|
||||
).catch(() => {});
|
||||
|
||||
// Broadcast to SSE clients (for web UI)
|
||||
if (worker && worker.sseBroadcaster) {
|
||||
worker.sseBroadcaster.broadcast({
|
||||
type: 'new_observation',
|
||||
observation: {
|
||||
id: obsId,
|
||||
session_id: session.claudeSessionId,
|
||||
type: obs.type,
|
||||
title: obs.title,
|
||||
subtitle: obs.subtitle,
|
||||
project: session.project,
|
||||
prompt_number: session.lastPromptNumber,
|
||||
created_at_epoch: createdAtEpoch
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
logger.info('SDK', 'Observation saved', { obsId, type: obs.type });
|
||||
}
|
||||
|
||||
@@ -216,8 +234,33 @@ export class SDKAgent {
|
||||
createdAtEpoch
|
||||
).catch(() => {});
|
||||
|
||||
// Broadcast to SSE clients (for web UI)
|
||||
if (worker && worker.sseBroadcaster) {
|
||||
worker.sseBroadcaster.broadcast({
|
||||
type: 'new_summary',
|
||||
summary: {
|
||||
id: summaryId,
|
||||
session_id: session.claudeSessionId,
|
||||
request: summary.request,
|
||||
investigated: summary.investigated,
|
||||
learned: summary.learned,
|
||||
completed: summary.completed,
|
||||
next_steps: summary.next_steps,
|
||||
notes: summary.notes,
|
||||
project: session.project,
|
||||
prompt_number: session.lastPromptNumber,
|
||||
created_at_epoch: createdAtEpoch
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
logger.info('SDK', 'Summary saved', { summaryId });
|
||||
}
|
||||
|
||||
// Check and stop spinner after processing (debounced)
|
||||
if (worker && typeof worker.checkAndStopSpinner === 'function') {
|
||||
worker.checkAndStopSpinner();
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
@@ -44,12 +44,15 @@ export class SSEBroadcaster {
|
||||
*/
|
||||
broadcast(event: SSEEvent): void {
|
||||
if (this.sseClients.size === 0) {
|
||||
logger.debug('WORKER', 'SSE broadcast skipped (no clients)', { eventType: event.type });
|
||||
return; // Short-circuit if no clients
|
||||
}
|
||||
|
||||
const eventWithTimestamp = { ...event, timestamp: Date.now() };
|
||||
const data = `data: ${JSON.stringify(eventWithTimestamp)}\n\n`;
|
||||
|
||||
logger.debug('WORKER', 'SSE broadcast sent', { eventType: event.type, clients: this.sseClients.size });
|
||||
|
||||
// Single-pass write + cleanup
|
||||
for (const client of this.sseClients) {
|
||||
try {
|
||||
|
||||
@@ -69,11 +69,13 @@ export class SessionManager {
|
||||
|
||||
/**
|
||||
* Queue an observation for processing (zero-latency notification)
|
||||
* Auto-initializes session if not in memory but exists in database
|
||||
*/
|
||||
queueObservation(sessionDbId: number, data: ObservationData): void {
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
// Auto-initialize from database if needed (handles worker restarts)
|
||||
let session = this.sessions.get(sessionDbId);
|
||||
if (!session) {
|
||||
throw new Error(`Session ${sessionDbId} not active`);
|
||||
session = this.initializeSession(sessionDbId);
|
||||
}
|
||||
|
||||
session.pendingMessages.push({
|
||||
@@ -96,11 +98,13 @@ export class SessionManager {
|
||||
|
||||
/**
|
||||
* Queue a summarize request (zero-latency notification)
|
||||
* Auto-initializes session if not in memory but exists in database
|
||||
*/
|
||||
queueSummarize(sessionDbId: number): void {
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
// Auto-initialize from database if needed (handles worker restarts)
|
||||
let session = this.sessions.get(sessionDbId);
|
||||
if (!session) {
|
||||
throw new Error(`Session ${sessionDbId} not active`);
|
||||
session = this.initializeSession(sessionDbId);
|
||||
}
|
||||
|
||||
session.pendingMessages.push({ type: 'summarize' });
|
||||
@@ -143,13 +147,31 @@ export class SessionManager {
|
||||
await Promise.all(sessionIds.map(id => this.deleteSession(id)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if any session has pending messages (for spinner tracking)
|
||||
*/
|
||||
hasPendingMessages(): boolean {
|
||||
return Array.from(this.sessions.values()).some(
|
||||
session => session.pendingMessages.length > 0
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get number of active sessions (for stats)
|
||||
*/
|
||||
getActiveSessionCount(): number {
|
||||
return this.sessions.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get message iterator for SDKAgent to consume (event-driven, no polling)
|
||||
* Auto-initializes session if not in memory but exists in database
|
||||
*/
|
||||
async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessage> {
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
// Auto-initialize from database if needed (handles worker restarts)
|
||||
let session = this.sessions.get(sessionDbId);
|
||||
if (!session) {
|
||||
throw new Error(`Session ${sessionDbId} not active`);
|
||||
session = this.initializeSession(sessionDbId);
|
||||
}
|
||||
|
||||
const emitter = this.sessionQueues.get(sessionDbId);
|
||||
|
||||
Reference in New Issue
Block a user