diff --git a/src/services/sqlite/PendingMessageStore.ts b/src/services/sqlite/PendingMessageStore.ts index 583b0847..155ae5da 100644 --- a/src/services/sqlite/PendingMessageStore.ts +++ b/src/services/sqlite/PendingMessageStore.ts @@ -204,6 +204,23 @@ export class PendingMessageStore { return result.changes; } + /** + * Mark all pending and processing messages for a session as failed (abandoned). + * Used when SDK session is terminated and no fallback agent is available: + * prevents the session from appearing in getSessionsWithPendingMessages forever. + * @returns Number of messages marked failed + */ + markAllSessionMessagesAbandoned(sessionDbId: number): number { + const now = Date.now(); + const stmt = this.db.prepare(` + UPDATE pending_messages + SET status = 'failed', failed_at_epoch = ? + WHERE session_db_id = ? AND status IN ('pending', 'processing') + `); + const result = stmt.run(now, sessionDbId); + return result.changes; + } + /** * Abort a specific message (delete from queue) */ diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 065fb8ee..9bcbd11d 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -92,8 +92,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js'; import { SessionManager } from './worker/SessionManager.js'; import { SSEBroadcaster } from './worker/SSEBroadcaster.js'; import { SDKAgent } from './worker/SDKAgent.js'; -import { GeminiAgent } from './worker/GeminiAgent.js'; -import { OpenRouterAgent } from './worker/OpenRouterAgent.js'; +import { GeminiAgent, isGeminiAvailable } from './worker/GeminiAgent.js'; +import { OpenRouterAgent, isOpenRouterAvailable } from './worker/OpenRouterAgent.js'; import { PaginationHelper } from './worker/PaginationHelper.js'; import { SettingsManager } from './worker/SettingsManager.js'; import { SearchManager } from './worker/SearchManager.js'; @@ -394,6 +394,8 @@ export class WorkerService { /** * Start a session processor + * On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available, + * otherwise marks messages abandoned and removes session so queue does not grow unbounded. */ private startSessionProcessor( session: ReturnType, @@ -405,11 +407,20 @@ export class WorkerService { logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid }); session.generatorPromise = this.sdkAgent.startSession(session, this) - .catch(error => { + .catch(async (error: unknown) => { + if (this.isSessionTerminatedError(error)) { + logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', { + sessionId: session.sessionDbId, + project: session.project, + reason: error instanceof Error ? error.message : String(error) + }); + return this.runFallbackForTerminatedSession(session, error); + } logger.error('SDK', 'Session generator failed', { sessionId: session.sessionDbId, project: session.project }, error as Error); + throw error; }) .finally(() => { session.generatorPromise = null; @@ -417,6 +428,78 @@ export class WorkerService { }); } + /** + * Match errors that indicate the Claude Code process/session is gone (resume impossible). + * Used to trigger graceful fallback instead of leaving pending messages stuck forever. + */ + private isSessionTerminatedError(error: unknown): boolean { + const msg = error instanceof Error ? error.message : String(error); + const normalized = msg.toLowerCase(); + return ( + normalized.includes('process aborted by user') || + normalized.includes('processtransport') || + normalized.includes('not ready for writing') || + normalized.includes('session generator failed') || + normalized.includes('claude code process') + ); + } + + /** + * When SDK resume fails due to terminated session: try Gemini then OpenRouter to drain + * pending messages; if no fallback available, mark messages abandoned and remove session. + */ + private async runFallbackForTerminatedSession( + session: ReturnType, + _originalError: unknown + ): Promise { + if (!session) return; + + const sessionDbId = session.sessionDbId; + + // Fallback agents need memorySessionId for storeObservations + if (!session.memorySessionId) { + const syntheticId = `fallback-${sessionDbId}-${Date.now()}`; + session.memorySessionId = syntheticId; + this.dbManager.getSessionStore().updateMemorySessionId(sessionDbId, syntheticId); + } + + if (isGeminiAvailable()) { + try { + await this.geminiAgent.startSession(session, this); + return; + } catch (e) { + logger.warn('SDK', 'Fallback Gemini failed, trying OpenRouter', { + sessionId: sessionDbId, + error: e instanceof Error ? e.message : String(e) + }); + } + } + + if (isOpenRouterAvailable()) { + try { + await this.openRouterAgent.startSession(session, this); + return; + } catch (e) { + logger.warn('SDK', 'Fallback OpenRouter failed', { + sessionId: sessionDbId, + error: e instanceof Error ? e.message : String(e) + }); + } + } + + // No fallback or both failed: mark messages abandoned and remove session so queue doesn't grow + const pendingStore = this.sessionManager.getPendingMessageStore(); + const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionDbId); + if (abandoned > 0) { + logger.warn('SDK', 'No fallback available; marked pending messages abandoned', { + sessionId: sessionDbId, + abandoned + }); + } + this.sessionManager.removeSessionImmediate(sessionDbId); + this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId); + } + /** * Process pending session queues */ diff --git a/src/services/worker/SessionManager.ts b/src/services/worker/SessionManager.ts index 4ebe3920..639add09 100644 --- a/src/services/worker/SessionManager.ts +++ b/src/services/worker/SessionManager.ts @@ -318,6 +318,28 @@ export class SessionManager { } } + /** + * Remove session from in-memory maps and notify without awaiting generator. + * Used when SDK resume fails and we give up (no fallback): avoids deadlock + * from deleteSession() awaiting the same generator promise we're inside. + */ + removeSessionImmediate(sessionDbId: number): void { + const session = this.sessions.get(sessionDbId); + if (!session) return; + + this.sessions.delete(sessionDbId); + this.sessionQueues.delete(sessionDbId); + + logger.info('SESSION', 'Session removed (orphaned after SDK termination)', { + sessionId: sessionDbId, + project: session.project + }); + + if (this.onSessionDeletedCallback) { + this.onSessionDeletedCallback(); + } + } + /** * Shutdown all active sessions */