feat: Enhance summary hook to include last user message from transcript (#95)

* feat: Enhance summary hook to include last user message from transcript

- Added functionality to extract the last user message from a JSONL transcript file in the summary hook.
- Updated the summary hook to send the last user message along with the summary request.
- Modified the SDKSession interface to include an optional last_user_message field.
- Updated the summary prompt to incorporate the last user message in the output format.
- Refactored worker service to handle the last user message in the summarize queue.
- Enhanced session manager to track and broadcast processing status based on active sessions and queue depth.
- Improved error handling and logging for better traceability during transcript reading and processing.

* feat(worker): enhance processing status broadcasting and session management

- Added immediate broadcasting of processing status when a prompt is received.
- Implemented logging for generator completion in multiple locations.
- Updated `broadcastProcessingStatus` to include queue depth and active session count in logs.
- Modified session iterator to stop yielding messages after a summary is yielded, with appropriate logging.
This commit is contained in:
Alex Newman
2025-11-11 17:38:22 -05:00
committed by GitHub
parent ecb8b39f6d
commit 39fedfc5fc
8 changed files with 266 additions and 99 deletions
+82 -31
View File
@@ -44,9 +44,6 @@ export class WorkerService {
private paginationHelper: PaginationHelper;
private settingsManager: SettingsManager;
// Processing status tracking for viewer UI spinner
private isProcessing: boolean = false;
constructor() {
this.app = express();
@@ -58,6 +55,11 @@ export class WorkerService {
this.paginationHelper = new PaginationHelper(this.dbManager);
this.settingsManager = new SettingsManager(this.dbManager);
// Set callback for when sessions are deleted (to update activity indicator)
this.sessionManager.setOnSessionDeleted(() => {
this.broadcastProcessingStatus();
});
this.mcpClient = new Client({
name: 'worker-search-proxy',
version: '1.0.0'
@@ -273,10 +275,11 @@ export class WorkerService {
timestamp: Date.now()
});
// Send initial processing status
// Send initial processing status (based on queue depth + active generators)
const isProcessing = this.sessionManager.isAnySessionProcessing();
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: this.isProcessing
isProcessing
});
}
@@ -316,6 +319,12 @@ export class WorkerService {
}
});
// Start activity indicator immediately when prompt arrives (work is about to begin)
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: true
});
// Sync user prompt to Chroma with error logging
const chromaStart = Date.now();
const promptText = latestPrompt.prompt_text;
@@ -344,8 +353,8 @@ export class WorkerService {
});
}
// Start processing indicator
this.broadcastProcessingStatus(true);
// Broadcast processing status (based on queue depth)
this.broadcastProcessingStatus();
// Start SDK agent in background (pass worker ref for spinner control)
logger.info('SESSION', 'Generator starting', {
@@ -354,9 +363,17 @@ export class WorkerService {
promptNum: session.lastPromptNumber
});
session.generatorPromise = this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
});
session.generatorPromise = this.sdkAgent.startSession(session, this)
.catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
})
.finally(() => {
// Clear generator reference when completed
logger.info('SESSION', `Generator finished`, { sessionId: sessionDbId });
session.generatorPromise = null;
// Broadcast status change (generator finished, may stop spinner)
this.broadcastProcessingStatus();
});
// Broadcast SSE event
this.sseBroadcaster.broadcast({
@@ -397,11 +414,22 @@ export class WorkerService {
queueDepth: session.pendingMessages.length
});
session.generatorPromise = this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
});
session.generatorPromise = this.sdkAgent.startSession(session, this)
.catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
})
.finally(() => {
// Clear generator reference when completed
logger.info('SESSION', `Generator finished`, { sessionId: sessionDbId });
session.generatorPromise = null;
// Broadcast status change (generator finished, may stop spinner)
this.broadcastProcessingStatus();
});
}
// Broadcast activity status (queue depth changed)
this.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'observation_queued',
@@ -422,7 +450,9 @@ export class WorkerService {
private handleSummarize(req: Request, res: Response): void {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
this.sessionManager.queueSummarize(sessionDbId);
const { last_user_message } = req.body;
this.sessionManager.queueSummarize(sessionDbId, last_user_message);
// CRITICAL: Ensure SDK agent is running to consume the queue
const session = this.sessionManager.getSession(sessionDbId);
@@ -432,11 +462,22 @@ export class WorkerService {
queueDepth: session.pendingMessages.length
});
session.generatorPromise = this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
});
session.generatorPromise = this.sdkAgent.startSession(session, this)
.catch(err => {
logger.failure('SDK', 'SDK agent error', { sessionId: sessionDbId }, err);
})
.finally(() => {
// Clear generator reference when completed
logger.info('SESSION', `Generator finished`, { sessionId: sessionDbId });
session.generatorPromise = null;
// Broadcast status change (generator finished, may stop spinner)
this.broadcastProcessingStatus();
});
}
// Broadcast activity status (queue depth changed)
this.broadcastProcessingStatus();
res.json({ status: 'queued' });
} catch (error) {
logger.failure('WORKER', 'Summarize queuing failed', {}, error as Error);
@@ -511,8 +552,8 @@ export class WorkerService {
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Stop processing indicator
this.broadcastProcessingStatus(false);
// Broadcast processing status (based on queue depth)
this.broadcastProcessingStatus();
// Broadcast SSE event
this.sseBroadcaster.broadcast({
@@ -722,7 +763,8 @@ export class WorkerService {
* Get processing status (for viewer UI spinner)
*/
private handleGetProcessingStatus(req: Request, res: Response): void {
res.json({ isProcessing: this.isProcessing });
const isProcessing = this.sessionManager.isAnySessionProcessing();
res.json({ isProcessing });
}
// ============================================================================
@@ -731,9 +773,19 @@ export class WorkerService {
/**
* Broadcast processing status change to SSE clients
* Checks both queue depth and active generators to prevent premature spinner stop
*/
broadcastProcessingStatus(isProcessing: boolean): void {
this.isProcessing = isProcessing;
broadcastProcessingStatus(): void {
const isProcessing = this.sessionManager.isAnySessionProcessing();
const queueDepth = this.sessionManager.getTotalQueueDepth();
const activeSessions = this.sessionManager.getActiveSessionCount();
logger.info('WORKER', 'Broadcasting processing status', {
isProcessing,
queueDepth,
activeSessions
});
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing
@@ -742,22 +794,21 @@ export class WorkerService {
/**
* Set processing status (called by hooks)
* NOTE: This now broadcasts computed status based on active processing (ignores input)
*/
private handleSetProcessing(req: Request, res: Response): void {
try {
const { isProcessing } = req.body;
// Broadcast current computed status (ignores manual input)
this.broadcastProcessingStatus();
if (typeof isProcessing !== 'boolean') {
res.status(400).json({ error: 'isProcessing must be a boolean' });
return;
}
this.broadcastProcessingStatus(isProcessing);
logger.debug('WORKER', 'Processing status updated', { isProcessing });
const isProcessing = this.sessionManager.isAnySessionProcessing();
const queueDepth = this.sessionManager.getTotalQueueDepth();
const activeSessions = this.sessionManager.getActiveSessionCount();
logger.debug('WORKER', 'Processing status broadcast', { isProcessing, queueDepth, activeSessions });
res.json({ status: 'ok', isProcessing });
} catch (error) {
logger.failure('WORKER', 'Failed to set processing status', {}, error as Error);
logger.failure('WORKER', 'Failed to broadcast processing status', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
+1
View File
@@ -28,6 +28,7 @@ export interface PendingMessage {
tool_response?: any;
prompt_number?: number;
cwd?: string;
last_user_message?: string;
}
export interface ObservationData {
+5 -4
View File
@@ -188,7 +188,8 @@ export class SDKAgent {
id: session.sessionDbId,
sdk_session_id: session.sdkSessionId,
project: session.project,
user_prompt: session.userPrompt
user_prompt: session.userPrompt,
last_user_message: message.last_user_message || ''
})
},
session_id: session.claudeSessionId,
@@ -351,9 +352,9 @@ export class SDKAgent {
}
}
// Check and stop spinner after processing (debounced)
if (worker && typeof worker.checkAndStopSpinner === 'function') {
worker.checkAndStopSpinner();
// Broadcast activity status after processing (queue may have changed)
if (worker && typeof worker.broadcastProcessingStatus === 'function') {
worker.broadcastProcessingStatus();
}
}
+53 -2
View File
@@ -17,11 +17,19 @@ export class SessionManager {
private dbManager: DatabaseManager;
private sessions: Map<number, ActiveSession> = new Map();
private sessionQueues: Map<number, EventEmitter> = new Map();
private onSessionDeletedCallback?: () => void;
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Set callback to be called when a session is deleted (for broadcasting status)
*/
setOnSessionDeleted(callback: () => void): void {
this.onSessionDeletedCallback = callback;
}
/**
* Initialize a new session or return existing one
*/
@@ -115,7 +123,7 @@ export class SessionManager {
* Queue a summarize request (zero-latency notification)
* Auto-initializes session if not in memory but exists in database
*/
queueSummarize(sessionDbId: number): void {
queueSummarize(sessionDbId: number, lastUserMessage: string): void {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
@@ -124,7 +132,10 @@ export class SessionManager {
const beforeDepth = session.pendingMessages.length;
session.pendingMessages.push({ type: 'summarize' });
session.pendingMessages.push({
type: 'summarize',
last_user_message: lastUserMessage
});
const afterDepth = session.pendingMessages.length;
@@ -165,6 +176,11 @@ export class SessionManager {
duration: `${(sessionDuration / 1000).toFixed(1)}s`,
project: session.project
});
// Trigger callback to broadcast status update (spinner may need to stop)
if (this.onSessionDeletedCallback) {
this.onSessionDeletedCallback();
}
}
/**
@@ -191,6 +207,35 @@ export class SessionManager {
return this.sessions.size;
}
/**
* Get total queue depth across all sessions (for activity indicator)
*/
getTotalQueueDepth(): number {
let total = 0;
for (const session of this.sessions.values()) {
total += session.pendingMessages.length;
}
return total;
}
/**
* Check if any session is actively processing (has pending messages OR active generator)
* Used for activity indicator to prevent spinner from stopping while SDK is processing
*/
isAnySessionProcessing(): boolean {
for (const session of this.sessions.values()) {
// Has queued messages waiting to be processed
if (session.pendingMessages.length > 0) {
return true;
}
// Has active SDK generator running (processing dequeued messages)
if (session.generatorPromise !== null) {
return true;
}
}
return false;
}
/**
* Get message iterator for SDKAgent to consume (event-driven, no polling)
* Auto-initializes session if not in memory but exists in database
@@ -226,6 +271,12 @@ export class SessionManager {
while (session.pendingMessages.length > 0) {
const message = session.pendingMessages.shift()!;
yield message;
// If we just yielded a summary, that's the end of this batch - stop the iterator
if (message.type === 'summarize') {
logger.info('SESSION', `Summary yielded - ending generator`, { sessionId: sessionDbId });
return;
}
}
}
}