/** * SessionManager: Event-driven session lifecycle * * Responsibility: * - Manage active session lifecycle * - Handle event-driven message queues * - Coordinate between HTTP requests and SDK agent * - Zero-latency event notification (no polling) */ import { EventEmitter } from 'events'; import { DatabaseManager } from './DatabaseManager.js'; import { logger } from '../../utils/logger.js'; import type { ActiveSession, PendingMessage, ObservationData } from '../worker-types.js'; export class SessionManager { private dbManager: DatabaseManager; private sessions: Map = new Map(); private sessionQueues: Map = new Map(); private onSessionDeletedCallback?: () => void; constructor(dbManager: DatabaseManager) { this.dbManager = dbManager; } /** * Set callback to be called when a session is deleted (for broadcasting status) */ setOnSessionDeleted(callback: () => void): void { this.onSessionDeletedCallback = callback; } /** * Initialize a new session or return existing one */ initializeSession(sessionDbId: number, currentUserPrompt?: string, promptNumber?: number): ActiveSession { // Check if already active let session = this.sessions.get(sessionDbId); if (session) { // Refresh project from database in case it was updated by new-hook // This fixes the bug where sessions created with empty project get updated // in the database but the in-memory session still has the stale empty value const dbSession = this.dbManager.getSessionById(sessionDbId); if (dbSession.project && dbSession.project !== session.project) { logger.debug('SESSION', 'Updating project from database', { sessionDbId, oldProject: session.project, newProject: dbSession.project }); session.project = dbSession.project; } // Update userPrompt for continuation prompts if (currentUserPrompt) { logger.debug('SESSION', 'Updating userPrompt for continuation', { sessionDbId, promptNumber, oldPrompt: session.userPrompt.substring(0, 80), newPrompt: currentUserPrompt.substring(0, 80) }); session.userPrompt = currentUserPrompt; session.lastPromptNumber = promptNumber || session.lastPromptNumber; } else { logger.debug('SESSION', 'No currentUserPrompt provided for existing session', { sessionDbId, promptNumber, usingCachedPrompt: session.userPrompt.substring(0, 80) }); } return session; } // Fetch from database const dbSession = this.dbManager.getSessionById(sessionDbId); // Use currentUserPrompt if provided, otherwise fall back to database (first prompt) const userPrompt = currentUserPrompt || dbSession.user_prompt; if (!currentUserPrompt) { logger.debug('SESSION', 'No currentUserPrompt provided for new session, using database', { sessionDbId, promptNumber, dbPrompt: dbSession.user_prompt.substring(0, 80) }); } else { logger.debug('SESSION', 'Initializing session with fresh userPrompt', { sessionDbId, promptNumber, userPrompt: currentUserPrompt.substring(0, 80) }); } // Create active session session = { sessionDbId, claudeSessionId: dbSession.claude_session_id, sdkSessionId: null, project: dbSession.project, userPrompt, pendingMessages: [], abortController: new AbortController(), generatorPromise: null, lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptCounter(sessionDbId), startTime: Date.now(), cumulativeInputTokens: 0, cumulativeOutputTokens: 0 }; this.sessions.set(sessionDbId, session); // Create event emitter for queue notifications const emitter = new EventEmitter(); this.sessionQueues.set(sessionDbId, emitter); logger.info('SESSION', 'Session initialized', { sessionId: sessionDbId, project: session.project, claudeSessionId: session.claudeSessionId, queueDepth: 0, hasGenerator: false }); return session; } /** * Get active session by ID */ getSession(sessionDbId: number): ActiveSession | undefined { return this.sessions.get(sessionDbId); } /** * Queue an observation for processing (zero-latency notification) * Auto-initializes session if not in memory but exists in database */ queueObservation(sessionDbId: number, data: ObservationData): void { // Auto-initialize from database if needed (handles worker restarts) let session = this.sessions.get(sessionDbId); if (!session) { session = this.initializeSession(sessionDbId); } const beforeDepth = session.pendingMessages.length; session.pendingMessages.push({ type: 'observation', tool_name: data.tool_name, tool_input: data.tool_input, tool_response: data.tool_response, prompt_number: data.prompt_number, cwd: data.cwd }); const afterDepth = session.pendingMessages.length; // Notify generator immediately (zero latency) const emitter = this.sessionQueues.get(sessionDbId); emitter?.emit('message'); // Format tool name for logging const toolSummary = logger.formatTool(data.tool_name, data.tool_input); logger.info('SESSION', `Observation queued (${beforeDepth}→${afterDepth})`, { sessionId: sessionDbId, tool: toolSummary, hasGenerator: !!session.generatorPromise }); } /** * Queue a summarize request (zero-latency notification) * Auto-initializes session if not in memory but exists in database */ queueSummarize(sessionDbId: number, lastUserMessage: string, lastAssistantMessage?: string): void { // Auto-initialize from database if needed (handles worker restarts) let session = this.sessions.get(sessionDbId); if (!session) { session = this.initializeSession(sessionDbId); } const beforeDepth = session.pendingMessages.length; session.pendingMessages.push({ type: 'summarize', last_user_message: lastUserMessage, last_assistant_message: lastAssistantMessage }); const afterDepth = session.pendingMessages.length; const emitter = this.sessionQueues.get(sessionDbId); emitter?.emit('message'); logger.info('SESSION', `Summarize queued (${beforeDepth}→${afterDepth})`, { sessionId: sessionDbId, hasGenerator: !!session.generatorPromise }); } /** * Delete a session (abort SDK agent and cleanup) */ async deleteSession(sessionDbId: number): Promise { const session = this.sessions.get(sessionDbId); if (!session) { return; // Already deleted } const sessionDuration = Date.now() - session.startTime; // Abort the SDK agent session.abortController.abort(); // Wait for generator to finish if (session.generatorPromise) { await session.generatorPromise.catch(() => {}); } // Cleanup this.sessions.delete(sessionDbId); this.sessionQueues.delete(sessionDbId); logger.info('SESSION', 'Session deleted', { sessionId: sessionDbId, duration: `${(sessionDuration / 1000).toFixed(1)}s`, project: session.project }); // Trigger callback to broadcast status update (spinner may need to stop) if (this.onSessionDeletedCallback) { this.onSessionDeletedCallback(); } } /** * Shutdown all active sessions */ async shutdownAll(): Promise { const sessionIds = Array.from(this.sessions.keys()); await Promise.all(sessionIds.map(id => this.deleteSession(id))); } /** * Check if any session has pending messages (for spinner tracking) */ hasPendingMessages(): boolean { return Array.from(this.sessions.values()).some( session => session.pendingMessages.length > 0 ); } /** * Get number of active sessions (for stats) */ getActiveSessionCount(): number { return this.sessions.size; } /** * Get total queue depth across all sessions (for activity indicator) */ getTotalQueueDepth(): number { let total = 0; for (const session of this.sessions.values()) { total += session.pendingMessages.length; } return total; } /** * Get total active work (queued + currently processing) * Counts both pending messages and items actively being processed by SDK agents */ getTotalActiveWork(): number { let total = 0; for (const session of this.sessions.values()) { // Count queued messages total += session.pendingMessages.length; // Count currently processing item (1 per active generator) if (session.generatorPromise !== null) { total += 1; } } return total; } /** * Check if any session is actively processing (has pending messages OR active generator) * Used for activity indicator to prevent spinner from stopping while SDK is processing */ isAnySessionProcessing(): boolean { for (const session of this.sessions.values()) { // Has queued messages waiting to be processed if (session.pendingMessages.length > 0) { return true; } // Has active SDK generator running (processing dequeued messages) if (session.generatorPromise !== null) { return true; } } return false; } /** * Get message iterator for SDKAgent to consume (event-driven, no polling) * Auto-initializes session if not in memory but exists in database */ async *getMessageIterator(sessionDbId: number): AsyncIterableIterator { // Auto-initialize from database if needed (handles worker restarts) let session = this.sessions.get(sessionDbId); if (!session) { session = this.initializeSession(sessionDbId); } const emitter = this.sessionQueues.get(sessionDbId); if (!emitter) { throw new Error(`No emitter for session ${sessionDbId}`); } while (!session.abortController.signal.aborted) { // Wait for messages if queue is empty if (session.pendingMessages.length === 0) { await new Promise(resolve => { const handler = () => resolve(); emitter.once('message', handler); // Also listen for abort session.abortController.signal.addEventListener('abort', () => { emitter.off('message', handler); resolve(); }, { once: true }); }); } // Yield all pending messages while (session.pendingMessages.length > 0) { const message = session.pendingMessages.shift()!; yield message; // If we just yielded a summary, that's the end of this batch - stop the iterator if (message.type === 'summarize') { logger.info('SESSION', `Summary yielded - ending generator`, { sessionId: sessionDbId }); return; } } } } }