feat(queue): Simplify queue processing and enhance reliability

- Implemented atomic message claiming in PendingMessageStore with claimNextMessage.
- Removed obsolete peekPending method to streamline message retrieval.
- Introduced SessionQueueProcessor for robust async message iteration, replacing complex polling logic.
- Refactored SessionManager to eliminate in-memory queue state, relying on PendingMessageStore for message tracking.
- Cleaned up session handling logic, removing recursive restarts and session deletion on empty queues.
- Enhanced error handling and logging for generator failures and session processing.
- Updated SessionRoutes to handle crash recovery more effectively without deleting sessions.
This commit is contained in:
Alex Newman
2025-12-28 16:28:58 -05:00
parent 06739cfdfa
commit b8ce27bd31
7 changed files with 312 additions and 244 deletions
+17 -99
View File
@@ -13,6 +13,7 @@ import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import type { ActiveSession, PendingMessage, PendingMessageWithId, ObservationData } from '../worker-types.js';
import { PendingMessageStore } from '../sqlite/PendingMessageStore.js';
import { SessionQueueProcessor } from '../queue/SessionQueueProcessor.js';
export class SessionManager {
private dbManager: DatabaseManager;
@@ -185,8 +186,6 @@ export class SessionManager {
session = this.initializeSession(sessionDbId);
}
const beforeDepth = session.pendingMessages.length;
// CRITICAL: Persist to database FIRST
const message: PendingMessage = {
type: 'observation',
@@ -212,11 +211,6 @@ export class SessionManager {
throw error; // Don't continue if we can't persist
}
// Add to in-memory queue (for backward compatibility with existing iterator)
session.pendingMessages.push(message);
const afterDepth = session.pendingMessages.length;
// Notify generator immediately (zero latency)
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
@@ -224,7 +218,7 @@ export class SessionManager {
// Format tool name for logging
const toolSummary = logger.formatTool(data.tool_name, data.tool_input);
logger.info('SESSION', `Observation queued (${beforeDepth}${afterDepth})`, {
logger.info('SESSION', `Observation queued`, {
sessionId: sessionDbId,
tool: toolSummary,
hasGenerator: !!session.generatorPromise
@@ -245,8 +239,6 @@ export class SessionManager {
session = this.initializeSession(sessionDbId);
}
const beforeDepth = session.pendingMessages.length;
// CRITICAL: Persist to database FIRST
const message: PendingMessage = {
type: 'summarize',
@@ -267,15 +259,10 @@ export class SessionManager {
throw error; // Don't continue if we can't persist
}
// Add to in-memory queue (for backward compatibility with existing iterator)
session.pendingMessages.push(message);
const afterDepth = session.pendingMessages.length;
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
logger.info('SESSION', `Summarize queued (${beforeDepth}${afterDepth})`, {
logger.info('SESSION', `Summarize queued`, {
sessionId: sessionDbId,
hasGenerator: !!session.generatorPromise
});
@@ -328,9 +315,7 @@ export class SessionManager {
* Check if any session has pending messages (for spinner tracking)
*/
hasPendingMessages(): boolean {
return Array.from(this.sessions.values()).some(
session => session.pendingMessages.length > 0
);
return this.getPendingStore().hasAnyPendingWork();
}
/**
@@ -345,8 +330,9 @@ export class SessionManager {
*/
getTotalQueueDepth(): number {
let total = 0;
// We can iterate over active sessions to get their pending count
for (const session of this.sessions.values()) {
total += session.pendingMessages.length;
total += this.getPendingStore().getPendingCount(session.sessionDbId);
}
return total;
}
@@ -356,16 +342,8 @@ export class SessionManager {
* Counts both pending messages and items actively being processed by SDK agents
*/
getTotalActiveWork(): number {
let total = 0;
for (const session of this.sessions.values()) {
// Count queued messages
total += session.pendingMessages.length;
// Count currently processing item (1 per active generator)
if (session.generatorPromise !== null) {
total += 1;
}
}
return total;
// getPendingCount includes 'processing' status, so this IS the total active work
return this.getTotalQueueDepth();
}
/**
@@ -373,17 +351,8 @@ export class SessionManager {
* Used for activity indicator to prevent spinner from stopping while SDK is processing
*/
isAnySessionProcessing(): boolean {
for (const session of this.sessions.values()) {
// Has queued messages waiting to be processed
if (session.pendingMessages.length > 0) {
return true;
}
// Has active SDK generator running (processing dequeued messages)
if (session.generatorPromise !== null) {
return true;
}
}
return false;
// hasAnyPendingWork checks for 'pending' OR 'processing'
return this.getPendingStore().hasAnyPendingWork();
}
/**
@@ -406,73 +375,22 @@ export class SessionManager {
throw new Error(`No emitter for session ${sessionDbId}`);
}
while (!session.abortController.signal.aborted) {
// Check for pending messages in persistent store
const persistentMessage = this.getPendingStore().peekPending(sessionDbId);
if (!persistentMessage) {
// Wait for new message event
await new Promise<void>(resolve => {
const messageHandler = () => {
emitter.off('message', messageHandler);
resolve();
};
const abortHandler = () => {
emitter.off('message', messageHandler);
resolve();
};
emitter.once('message', messageHandler);
session.abortController.signal.addEventListener('abort', abortHandler, { once: true });
});
// Re-check for messages after waking up (handles race condition)
const recheckMessage = this.getPendingStore().peekPending(sessionDbId);
if (recheckMessage) {
continue; // Got a message, process it
}
// Woke up due to abort
if (session.abortController.signal.aborted) {
logger.info('SESSION', 'Generator exiting due to abort', { sessionId: sessionDbId });
return;
}
continue;
}
// Mark as processing BEFORE yielding (status: pending -> processing)
this.getPendingStore().markProcessing(persistentMessage.id);
const processor = new SessionQueueProcessor(this.getPendingStore(), emitter);
// Use the robust Pump iterator
for await (const message of processor.createIterator(sessionDbId, session.abortController.signal)) {
// Track this message ID for completion marking
session.pendingProcessingIds.add(persistentMessage.id);
session.pendingProcessingIds.add(message._persistentId);
// Track earliest timestamp for accurate observation timestamps
// This ensures backlog messages get their original timestamps, not current time
if (session.earliestPendingTimestamp === null) {
session.earliestPendingTimestamp = persistentMessage.created_at_epoch;
session.earliestPendingTimestamp = message._originalTimestamp;
} else {
session.earliestPendingTimestamp = Math.min(session.earliestPendingTimestamp, persistentMessage.created_at_epoch);
session.earliestPendingTimestamp = Math.min(session.earliestPendingTimestamp, message._originalTimestamp);
}
// Convert to PendingMessageWithId and yield
// Include original timestamp for accurate observation timestamps (survives stuck processing)
const message: PendingMessageWithId = {
_persistentId: persistentMessage.id,
_originalTimestamp: persistentMessage.created_at_epoch,
...this.getPendingStore().toPendingMessage(persistentMessage)
};
// Also add to in-memory queue for backward compatibility (status tracking)
session.pendingMessages.push(message);
yield message;
// Remove from in-memory queue after yielding
session.pendingMessages.shift();
// Continue processing - don't stop after summary, let the queue drain completely
}
}