MAESTRO: Merge PR #937 - gracefully process orphaned pending messages after session termination

This commit is contained in:
Alex Newman
2026-02-06 01:47:55 -05:00
3 changed files with 125 additions and 3 deletions
@@ -204,6 +204,23 @@ export class PendingMessageStore {
return result.changes;
}
/**
* Mark all pending and processing messages for a session as failed (abandoned).
* Used when SDK session is terminated and no fallback agent is available:
* prevents the session from appearing in getSessionsWithPendingMessages forever.
* @returns Number of messages marked failed
*/
markAllSessionMessagesAbandoned(sessionDbId: number): number {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'failed', failed_at_epoch = ?
WHERE session_db_id = ? AND status IN ('pending', 'processing')
`);
const result = stmt.run(now, sessionDbId);
return result.changes;
}
/**
* Abort a specific message (delete from queue)
*/
+86 -3
View File
@@ -92,8 +92,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js';
import { SessionManager } from './worker/SessionManager.js';
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
import { SDKAgent } from './worker/SDKAgent.js';
import { GeminiAgent } from './worker/GeminiAgent.js';
import { OpenRouterAgent } from './worker/OpenRouterAgent.js';
import { GeminiAgent, isGeminiAvailable } from './worker/GeminiAgent.js';
import { OpenRouterAgent, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
import { PaginationHelper } from './worker/PaginationHelper.js';
import { SettingsManager } from './worker/SettingsManager.js';
import { SearchManager } from './worker/SearchManager.js';
@@ -394,6 +394,8 @@ export class WorkerService {
/**
* Start a session processor
* On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available,
* otherwise marks messages abandoned and removes session so queue does not grow unbounded.
*/
private startSessionProcessor(
session: ReturnType<typeof this.sessionManager.getSession>,
@@ -405,11 +407,20 @@ export class WorkerService {
logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid });
session.generatorPromise = this.sdkAgent.startSession(session, this)
.catch(error => {
.catch(async (error: unknown) => {
if (this.isSessionTerminatedError(error)) {
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
sessionId: session.sessionDbId,
project: session.project,
reason: error instanceof Error ? error.message : String(error)
});
return this.runFallbackForTerminatedSession(session, error);
}
logger.error('SDK', 'Session generator failed', {
sessionId: session.sessionDbId,
project: session.project
}, error as Error);
throw error;
})
.finally(() => {
session.generatorPromise = null;
@@ -417,6 +428,78 @@ export class WorkerService {
});
}
/**
* Match errors that indicate the Claude Code process/session is gone (resume impossible).
* Used to trigger graceful fallback instead of leaving pending messages stuck forever.
*/
private isSessionTerminatedError(error: unknown): boolean {
const msg = error instanceof Error ? error.message : String(error);
const normalized = msg.toLowerCase();
return (
normalized.includes('process aborted by user') ||
normalized.includes('processtransport') ||
normalized.includes('not ready for writing') ||
normalized.includes('session generator failed') ||
normalized.includes('claude code process')
);
}
/**
* When SDK resume fails due to terminated session: try Gemini then OpenRouter to drain
* pending messages; if no fallback available, mark messages abandoned and remove session.
*/
private async runFallbackForTerminatedSession(
session: ReturnType<typeof this.sessionManager.getSession>,
_originalError: unknown
): Promise<void> {
if (!session) return;
const sessionDbId = session.sessionDbId;
// Fallback agents need memorySessionId for storeObservations
if (!session.memorySessionId) {
const syntheticId = `fallback-${sessionDbId}-${Date.now()}`;
session.memorySessionId = syntheticId;
this.dbManager.getSessionStore().updateMemorySessionId(sessionDbId, syntheticId);
}
if (isGeminiAvailable()) {
try {
await this.geminiAgent.startSession(session, this);
return;
} catch (e) {
logger.warn('SDK', 'Fallback Gemini failed, trying OpenRouter', {
sessionId: sessionDbId,
error: e instanceof Error ? e.message : String(e)
});
}
}
if (isOpenRouterAvailable()) {
try {
await this.openRouterAgent.startSession(session, this);
return;
} catch (e) {
logger.warn('SDK', 'Fallback OpenRouter failed', {
sessionId: sessionDbId,
error: e instanceof Error ? e.message : String(e)
});
}
}
// No fallback or both failed: mark messages abandoned and remove session so queue doesn't grow
const pendingStore = this.sessionManager.getPendingMessageStore();
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
if (abandoned > 0) {
logger.warn('SDK', 'No fallback available; marked pending messages abandoned', {
sessionId: sessionDbId,
abandoned
});
}
this.sessionManager.removeSessionImmediate(sessionDbId);
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
}
/**
* Process pending session queues
*/
+22
View File
@@ -318,6 +318,28 @@ export class SessionManager {
}
}
/**
* Remove session from in-memory maps and notify without awaiting generator.
* Used when SDK resume fails and we give up (no fallback): avoids deadlock
* from deleteSession() awaiting the same generator promise we're inside.
*/
removeSessionImmediate(sessionDbId: number): void {
const session = this.sessions.get(sessionDbId);
if (!session) return;
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);
logger.info('SESSION', 'Session removed (orphaned after SDK termination)', {
sessionId: sessionDbId,
project: session.project
});
if (this.onSessionDeletedCallback) {
this.onSessionDeletedCallback();
}
}
/**
* Shutdown all active sessions
*/