feat: Introduce SessionEventBroadcaster and SessionCompletionHandler for improved session management

- Added SessionEventBroadcaster to handle broadcasting of session lifecycle events, consolidating SSE broadcasting and processing status updates.
- Refactored SessionRoutes to utilize SessionEventBroadcaster for broadcasting events related to new prompts, session starts, and completions.
- Created SessionCompletionHandler to centralize session completion logic, reducing duplication across multiple endpoints.
- Updated WorkerService to initialize SessionEventBroadcaster and pass it to SessionRoutes.
This commit is contained in:
Alex Newman
2025-12-07 22:35:31 -05:00
parent f494d3b168
commit 54c53fda04
5 changed files with 239 additions and 140 deletions
@@ -0,0 +1,96 @@
/**
* Session Event Broadcaster
*
* Provides semantic broadcast methods for session lifecycle events.
* Consolidates SSE broadcasting and processing status updates.
*/
import { SSEBroadcaster } from '../SSEBroadcaster.js';
import type { WorkerService } from '../../worker-service.js';
export class SessionEventBroadcaster {
constructor(
private sseBroadcaster: SSEBroadcaster,
private workerService: WorkerService
) {}
/**
* Broadcast new user prompt arrival
* Starts activity indicator to show work is beginning
*/
broadcastNewPrompt(prompt: {
id: number;
claude_session_id: string;
project: string;
prompt_number: number;
prompt_text: string;
created_at_epoch: number;
}): void {
// Broadcast prompt details
this.sseBroadcaster.broadcast({
type: 'new_prompt',
prompt
});
// Start activity indicator (work is about to begin)
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: true
});
// Update processing status based on queue depth
this.workerService.broadcastProcessingStatus();
}
/**
* Broadcast session initialization
*/
broadcastSessionStarted(sessionDbId: number, project: string): void {
this.sseBroadcaster.broadcast({
type: 'session_started',
sessionDbId,
project
});
// Update processing status
this.workerService.broadcastProcessingStatus();
}
/**
* Broadcast observation queued
* Updates processing status to reflect new queue depth
*/
broadcastObservationQueued(sessionDbId: number): void {
this.sseBroadcaster.broadcast({
type: 'observation_queued',
sessionDbId
});
// Update processing status (queue depth changed)
this.workerService.broadcastProcessingStatus();
}
/**
* Broadcast session completion
* Updates processing status to reflect session removal
*/
broadcastSessionCompleted(sessionDbId: number): void {
this.sseBroadcaster.broadcast({
type: 'session_completed',
timestamp: Date.now(),
sessionDbId
});
// Update processing status (session removed from queue)
this.workerService.broadcastProcessingStatus();
}
/**
* Broadcast summarize request queued
* Updates processing status to reflect new queue depth
*/
broadcastSummarizeQueued(): void {
// Update processing status (queue depth changed)
this.workerService.broadcastProcessingStatus();
}
}
@@ -12,19 +12,27 @@ import { stripMemoryTagsFromJson } from '../../../../utils/tag-stripping.js';
import { SessionManager } from '../../SessionManager.js';
import { DatabaseManager } from '../../DatabaseManager.js';
import { SDKAgent } from '../../SDKAgent.js';
import { SSEBroadcaster } from '../../SSEBroadcaster.js';
import type { WorkerService } from '../../../worker-service.js';
import { BaseRouteHandler } from '../BaseRouteHandler.js';
import { SessionEventBroadcaster } from '../../events/SessionEventBroadcaster.js';
import { SessionCompletionHandler } from '../../session/SessionCompletionHandler.js';
export class SessionRoutes extends BaseRouteHandler {
private completionHandler: SessionCompletionHandler;
constructor(
private sessionManager: SessionManager,
private dbManager: DatabaseManager,
private sdkAgent: SDKAgent,
private sseBroadcaster: SSEBroadcaster,
private eventBroadcaster: SessionEventBroadcaster,
private workerService: WorkerService
) {
super();
this.completionHandler = new SessionCompletionHandler(
sessionManager,
dbManager,
eventBroadcaster
);
}
/**
@@ -81,22 +89,13 @@ export class SessionRoutes extends BaseRouteHandler {
// Broadcast new prompt to SSE clients (for web UI)
if (latestPrompt) {
this.sseBroadcaster.broadcast({
type: 'new_prompt',
prompt: {
id: latestPrompt.id,
claude_session_id: latestPrompt.claude_session_id,
project: latestPrompt.project,
prompt_number: latestPrompt.prompt_number,
prompt_text: latestPrompt.prompt_text,
created_at_epoch: latestPrompt.created_at_epoch
}
});
// Start activity indicator immediately when prompt arrives (work is about to begin)
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: true
this.eventBroadcaster.broadcastNewPrompt({
id: latestPrompt.id,
claude_session_id: latestPrompt.claude_session_id,
project: latestPrompt.project,
prompt_number: latestPrompt.prompt_number,
prompt_text: latestPrompt.prompt_text,
created_at_epoch: latestPrompt.created_at_epoch
});
// Sync user prompt to Chroma with error logging
@@ -127,9 +126,6 @@ export class SessionRoutes extends BaseRouteHandler {
});
}
// Broadcast processing status (based on queue depth)
this.workerService.broadcastProcessingStatus();
// Start SDK agent in background (pass worker ref for spinner control)
logger.info('SESSION', 'Generator starting', {
sessionId: sessionDbId,
@@ -149,12 +145,8 @@ export class SessionRoutes extends BaseRouteHandler {
this.workerService.broadcastProcessingStatus();
});
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_started',
sessionDbId,
project: session.project
});
// Broadcast session started event
this.eventBroadcaster.broadcastSessionStarted(sessionDbId, session.project);
res.json({ status: 'initialized', sessionDbId, port: getWorkerPort() });
});
@@ -180,14 +172,8 @@ export class SessionRoutes extends BaseRouteHandler {
// CRITICAL: Ensure SDK agent is running to consume the queue
this.ensureGeneratorRunning(sessionDbId, 'observation');
// Broadcast activity status (queue depth changed)
this.workerService.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'observation_queued',
sessionDbId
});
// Broadcast observation queued event
this.eventBroadcaster.broadcastObservationQueued(sessionDbId);
res.json({ status: 'queued' });
});
@@ -207,8 +193,8 @@ export class SessionRoutes extends BaseRouteHandler {
// CRITICAL: Ensure SDK agent is running to consume the queue
this.ensureGeneratorRunning(sessionDbId, 'summarize');
// Broadcast activity status (queue depth changed)
this.workerService.broadcastProcessingStatus();
// Broadcast summarize queued event
this.eventBroadcaster.broadcastSummarizeQueued();
res.json({ status: 'queued' });
});
@@ -243,16 +229,7 @@ export class SessionRoutes extends BaseRouteHandler {
const sessionDbId = this.parseIntParam(req, res, 'sessionDbId');
if (sessionDbId === null) return;
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_completed',
sessionDbId
});
await this.completionHandler.completeByDbId(sessionDbId);
res.json({ status: 'deleted' });
});
@@ -265,20 +242,7 @@ export class SessionRoutes extends BaseRouteHandler {
const sessionDbId = this.parseIntParam(req, res, 'sessionDbId');
if (sessionDbId === null) return;
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast processing status (based on queue depth)
this.workerService.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_completed',
timestamp: Date.now(),
sessionDbId
});
await this.completionHandler.completeByDbId(sessionDbId);
res.json({ success: true });
});
@@ -345,14 +309,8 @@ export class SessionRoutes extends BaseRouteHandler {
// Ensure SDK agent is running
this.ensureGeneratorRunning(sessionDbId, 'observation');
// Broadcast activity status
this.workerService.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'observation_queued',
sessionDbId
});
// Broadcast observation queued event
this.eventBroadcaster.broadcastObservationQueued(sessionDbId);
res.json({ status: 'queued' });
});
@@ -394,8 +352,8 @@ export class SessionRoutes extends BaseRouteHandler {
// Ensure SDK agent is running
this.ensureGeneratorRunning(sessionDbId, 'summarize');
// Broadcast activity status
this.workerService.broadcastProcessingStatus();
// Broadcast summarize queued event
this.eventBroadcaster.broadcastSummarizeQueued();
res.json({ status: 'queued' });
});
@@ -414,34 +372,14 @@ export class SessionRoutes extends BaseRouteHandler {
return this.badRequest(res, 'Missing claudeSessionId');
}
const store = this.dbManager.getSessionStore();
const found = await this.completionHandler.completeByClaudeId(claudeSessionId);
// Find session by claudeSessionId
const session = store.findActiveSDKSession(claudeSessionId);
if (!session) {
if (!found) {
// No active session - nothing to clean up (may have already been completed)
res.json({ success: true, message: 'No active session found' });
return;
}
const sessionDbId = session.id;
// Delete from session manager (aborts SDK agent)
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast processing status
this.workerService.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_completed',
timestamp: Date.now(),
sessionDbId
});
res.json({ success: true });
});
}
@@ -0,0 +1,62 @@
/**
* Session Completion Handler
*
* Consolidates session completion logic to eliminate duplication across
* three different completion endpoints (DELETE, POST by DB ID, POST by Claude ID).
*
* All completion flows follow the same pattern:
* 1. Delete session from SessionManager (aborts SDK agent)
* 2. Mark session complete in database
* 3. Broadcast session completed event
*/
import { SessionManager } from '../SessionManager.js';
import { DatabaseManager } from '../DatabaseManager.js';
import { SessionEventBroadcaster } from '../events/SessionEventBroadcaster.js';
export class SessionCompletionHandler {
constructor(
private sessionManager: SessionManager,
private dbManager: DatabaseManager,
private eventBroadcaster: SessionEventBroadcaster
) {}
/**
* Complete session by database ID
* Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete
*/
async completeByDbId(sessionDbId: number): Promise<void> {
// Delete from session manager (aborts SDK agent)
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast session completed event
this.eventBroadcaster.broadcastSessionCompleted(sessionDbId);
}
/**
* Complete session by Claude session ID
* Used by POST /api/sessions/complete (cleanup-hook endpoint)
*
* @returns true if session was found and completed, false if no active session found
*/
async completeByClaudeId(claudeSessionId: string): Promise<boolean> {
const store = this.dbManager.getSessionStore();
// Find session by claudeSessionId
const session = store.findActiveSDKSession(claudeSessionId);
if (!session) {
// No active session - nothing to clean up (may have already been completed)
return false;
}
const sessionDbId = session.id;
// Complete using standard flow
await this.completeByDbId(sessionDbId);
return true;
}
}