Enhance queue processing and recovery mechanisms

- Implement auto-recovery of orphaned queues on startup in WorkerService.
- Introduce startSessionWithAutoRestart method for continuous processing of pending work.
- Modify SDKAgent to prevent session deletion during processing to avoid race conditions.
- Update SessionManager to allow continued processing after yielding summaries.
- Add logic in SessionRoutes to mark processing messages as failed upon generator errors.
- Create detailed documentation for the queue system logic, including recovery mechanisms and potential issues.
This commit is contained in:
Alex Newman
2025-12-28 15:44:54 -05:00
parent 2d92e8a63f
commit 4ecdc4c9b3
6 changed files with 927 additions and 88 deletions
+67 -7
View File
@@ -670,7 +670,18 @@ export class WorkerService {
this.resolveInitialization();
logger.info('SYSTEM', 'Background initialization complete');
// Note: Auto-recovery of orphaned queues disabled - use /api/pending-queue/process endpoint instead
// Auto-recover orphaned queues on startup (process pending work from previous sessions)
this.processPendingQueues(50).then(result => {
if (result.sessionsStarted > 0) {
logger.info('SYSTEM', `Auto-recovered ${result.sessionsStarted} sessions with pending work`, {
totalPending: result.totalPendingSessions,
started: result.sessionsStarted,
sessionIds: result.startedSessionIds
});
}
}).catch(error => {
logger.warn('SYSTEM', 'Auto-recovery of pending queues failed', {}, error as Error);
});
} catch (error) {
logger.error('SYSTEM', 'Background initialization failed', {}, error as Error);
// Don't resolve - let the promise remain pending so readiness check continues to fail
@@ -678,6 +689,55 @@ export class WorkerService {
}
}
/**
* Start a session with auto-restart and cleanup logic
* Recursively restarts the generator if there's pending work
*/
private startSessionWithAutoRestart(
session: ReturnType<typeof this.sessionManager.getSession>,
getPendingCount: (sid: number) => number,
source: string
): void {
if (!session) return;
const sid = session.sessionDbId;
logger.info('SYSTEM', `Starting generator (${source})`, {
sessionId: sid,
pendingCount: getPendingCount(sid)
});
session.generatorPromise = this.sdkAgent.startSession(session, this)
.catch(error => {
logger.error('SYSTEM', `Generator failed (${source})`, {
sessionId: sid,
error: error.message
}, error);
})
.finally(() => {
session.generatorPromise = null;
this.broadcastProcessingStatus();
// Check if there's more work pending
const stillPending = getPendingCount(sid);
if (stillPending > 0) {
logger.info('SYSTEM', `Auto-restarting generator for pending work`, {
sessionId: sid,
pendingCount: stillPending
});
setTimeout(() => {
const stillExists = this.sessionManager.getSession(sid);
if (stillExists && !stillExists.generatorPromise) {
// Recursive call for continuous processing
this.startSessionWithAutoRestart(stillExists, getPendingCount, 'auto-restart');
}
}, 0);
} else {
// No more work - clean up session
this.sessionManager.deleteSession(sid).catch(() => {});
}
});
}
/**
* Process pending session queues
* Starts SDK agents for sessions that have pending messages but no active processor
@@ -729,12 +789,12 @@ export class WorkerService {
pendingCount: pendingStore.getPendingCount(sessionDbId)
});
// Start SDK agent (non-blocking)
session.generatorPromise = this.sdkAgent.startSession(session, this)
.finally(() => {
session.generatorPromise = null;
this.broadcastProcessingStatus();
});
// Start SDK agent (non-blocking) using shared helper
this.startSessionWithAutoRestart(
session,
(sid) => pendingStore.getPendingCount(sid),
'startup-recovery'
);
result.sessionsStarted++;
result.startedSessionIds.push(sessionDbId);
+2 -2
View File
@@ -164,8 +164,8 @@ export class SDKAgent {
}
throw error;
} finally {
// Cleanup
this.sessionManager.deleteSession(session.sessionDbId).catch(() => {});
// NOTE: Do NOT delete session here - SessionRoutes.finally() handles cleanup
// and auto-restart logic. Deleting here races with pending work checks.
}
}
+1 -5
View File
@@ -472,11 +472,7 @@ export class SessionManager {
// Remove from in-memory queue after yielding
session.pendingMessages.shift();
// If we just yielded a summary, that's the end of this batch - stop the iterator
if (message.type === 'summarize') {
logger.info('SESSION', `Summary yielded - ending generator`, { sessionId: sessionDbId });
return;
}
// Continue processing - don't stop after summary, let the queue drain completely
}
}
@@ -141,12 +141,50 @@ export class SessionRoutes extends BaseRouteHandler {
provider: provider,
error: error.message
}, error);
// Mark all processing messages as failed so they can be retried or abandoned
const pendingStore = this.sessionManager.getPendingMessageStore();
const db = this.dbManager.getDatabase();
const stmt = db.prepare(`
SELECT id FROM pending_messages
WHERE session_db_id = ? AND status = 'processing'
`);
const processingMessages = stmt.all(session.sessionDbId) as { id: number }[];
for (const msg of processingMessages) {
pendingStore.markFailed(msg.id);
logger.warn('SESSION', `Marked message as failed after generator error`, {
sessionId: session.sessionDbId,
messageId: msg.id
});
}
})
.finally(() => {
logger.info('SESSION', `Generator finished`, { sessionId: session.sessionDbId });
const sessionDbId = session.sessionDbId;
logger.info('SESSION', `Generator finished`, { sessionId: sessionDbId });
session.generatorPromise = null;
session.currentProvider = null;
this.workerService.broadcastProcessingStatus();
// Check if there's more work pending
const pendingStore = this.sessionManager.getPendingMessageStore();
const pendingCount = pendingStore.getPendingCount(sessionDbId);
if (pendingCount > 0) {
// Auto-restart for pending work
logger.info('SESSION', `Auto-restarting generator for pending work`, {
sessionId: sessionDbId,
pendingCount
});
setTimeout(() => {
const stillExists = this.sessionManager.getSession(sessionDbId);
if (stillExists && !stillExists.generatorPromise) {
this.startGeneratorWithProvider(stillExists, this.getSelectedProvider(), 'auto-restart');
}
}, 0);
} else {
// No more work - clean up session
this.sessionManager.deleteSession(sessionDbId).catch(() => {});
}
});
}