Refactor SessionManager to simplify message handling and remove linger timeout

- Removed the linger timeout mechanism to streamline the waiting process for new messages.
- Updated the message handling logic to use a single event listener for new messages.
- Improved abort handling by ensuring the session exits cleanly when aborted.
This commit is contained in:
Alex Newman
2025-12-27 21:40:44 -05:00
parent 661eac2b1c
commit 64328d4120
3 changed files with 83 additions and 107 deletions
File diff suppressed because one or more lines are too long
Binary file not shown.
+12 -36
View File
@@ -406,60 +406,36 @@ export class SessionManager {
throw new Error(`No emitter for session ${sessionDbId}`); throw new Error(`No emitter for session ${sessionDbId}`);
} }
// Linger timeout: how long to wait for new messages before exiting
// This keeps the agent alive between messages, reducing "No active agent" windows
const LINGER_TIMEOUT_MS = 5000; // 5 seconds
while (!session.abortController.signal.aborted) { while (!session.abortController.signal.aborted) {
// Check for pending messages in persistent store // Check for pending messages in persistent store
const persistentMessage = this.getPendingStore().peekPending(sessionDbId); const persistentMessage = this.getPendingStore().peekPending(sessionDbId);
if (!persistentMessage) { if (!persistentMessage) {
// Wait for new messages with timeout // Wait for new message event
const gotMessage = await new Promise<boolean>(resolve => { await new Promise<void>(resolve => {
let resolved = false;
const messageHandler = () => { const messageHandler = () => {
if (!resolved) {
resolved = true;
clearTimeout(timeoutId);
resolve(true);
}
};
const timeoutHandler = () => {
if (!resolved) {
resolved = true;
emitter.off('message', messageHandler); emitter.off('message', messageHandler);
resolve(false); resolve();
}
}; };
const timeoutId = setTimeout(timeoutHandler, LINGER_TIMEOUT_MS); const abortHandler = () => {
emitter.off('message', messageHandler);
resolve();
};
emitter.once('message', messageHandler); emitter.once('message', messageHandler);
session.abortController.signal.addEventListener('abort', abortHandler, { once: true });
// Also listen for abort
session.abortController.signal.addEventListener('abort', () => {
if (!resolved) {
resolved = true;
clearTimeout(timeoutId);
emitter.off('message', messageHandler);
resolve(false);
}
}, { once: true });
}); });
// Re-check for messages after waking up (handles race condition) // Re-check for messages after waking up (handles race condition)
const recheckMessage = this.getPendingStore().peekPending(sessionDbId); const recheckMessage = this.getPendingStore().peekPending(sessionDbId);
if (recheckMessage) { if (recheckMessage) {
// Got a message, continue processing continue; // Got a message, process it
continue;
} }
if (!gotMessage) { // Woke up due to abort
// Timeout or abort - exit the loop if (session.abortController.signal.aborted) {
logger.info('SESSION', `Generator exiting after linger timeout`, { sessionId: sessionDbId }); logger.info('SESSION', 'Generator exiting due to abort', { sessionId: sessionDbId });
return; return;
} }