fix(worker): gracefully process orphaned pending messages after session termination
This commit is contained in:
@@ -204,6 +204,23 @@ export class PendingMessageStore {
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark all pending and processing messages for a session as failed (abandoned).
|
||||
* Used when SDK session is terminated and no fallback agent is available:
|
||||
* prevents the session from appearing in getSessionsWithPendingMessages forever.
|
||||
* @returns Number of messages marked failed
|
||||
*/
|
||||
markAllSessionMessagesAbandoned(sessionDbId: number): number {
|
||||
const now = Date.now();
|
||||
const stmt = this.db.prepare(`
|
||||
UPDATE pending_messages
|
||||
SET status = 'failed', failed_at_epoch = ?
|
||||
WHERE session_db_id = ? AND status IN ('pending', 'processing')
|
||||
`);
|
||||
const result = stmt.run(now, sessionDbId);
|
||||
return result.changes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Abort a specific message (delete from queue)
|
||||
*/
|
||||
|
||||
@@ -92,8 +92,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js';
|
||||
import { SessionManager } from './worker/SessionManager.js';
|
||||
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
|
||||
import { SDKAgent } from './worker/SDKAgent.js';
|
||||
import { GeminiAgent } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent } from './worker/OpenRouterAgent.js';
|
||||
import { GeminiAgent, isGeminiAvailable } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
|
||||
import { PaginationHelper } from './worker/PaginationHelper.js';
|
||||
import { SettingsManager } from './worker/SettingsManager.js';
|
||||
import { SearchManager } from './worker/SearchManager.js';
|
||||
@@ -394,6 +394,8 @@ export class WorkerService {
|
||||
|
||||
/**
|
||||
* Start a session processor
|
||||
* On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available,
|
||||
* otherwise marks messages abandoned and removes session so queue does not grow unbounded.
|
||||
*/
|
||||
private startSessionProcessor(
|
||||
session: ReturnType<typeof this.sessionManager.getSession>,
|
||||
@@ -405,11 +407,20 @@ export class WorkerService {
|
||||
logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid });
|
||||
|
||||
session.generatorPromise = this.sdkAgent.startSession(session, this)
|
||||
.catch(error => {
|
||||
.catch(async (error: unknown) => {
|
||||
if (this.isSessionTerminatedError(error)) {
|
||||
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project,
|
||||
reason: error instanceof Error ? error.message : String(error)
|
||||
});
|
||||
return this.runFallbackForTerminatedSession(session, error);
|
||||
}
|
||||
logger.error('SDK', 'Session generator failed', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project
|
||||
}, error as Error);
|
||||
throw error;
|
||||
})
|
||||
.finally(() => {
|
||||
session.generatorPromise = null;
|
||||
@@ -417,6 +428,78 @@ export class WorkerService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Match errors that indicate the Claude Code process/session is gone (resume impossible).
|
||||
* Used to trigger graceful fallback instead of leaving pending messages stuck forever.
|
||||
*/
|
||||
private isSessionTerminatedError(error: unknown): boolean {
|
||||
const msg = error instanceof Error ? error.message : String(error);
|
||||
const normalized = msg.toLowerCase();
|
||||
return (
|
||||
normalized.includes('process aborted by user') ||
|
||||
normalized.includes('processtransport') ||
|
||||
normalized.includes('not ready for writing') ||
|
||||
normalized.includes('session generator failed') ||
|
||||
normalized.includes('claude code process')
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* When SDK resume fails due to terminated session: try Gemini then OpenRouter to drain
|
||||
* pending messages; if no fallback available, mark messages abandoned and remove session.
|
||||
*/
|
||||
private async runFallbackForTerminatedSession(
|
||||
session: ReturnType<typeof this.sessionManager.getSession>,
|
||||
_originalError: unknown
|
||||
): Promise<void> {
|
||||
if (!session) return;
|
||||
|
||||
const sessionDbId = session.sessionDbId;
|
||||
|
||||
// Fallback agents need memorySessionId for storeObservations
|
||||
if (!session.memorySessionId) {
|
||||
const syntheticId = `fallback-${sessionDbId}-${Date.now()}`;
|
||||
session.memorySessionId = syntheticId;
|
||||
this.dbManager.getSessionStore().updateMemorySessionId(sessionDbId, syntheticId);
|
||||
}
|
||||
|
||||
if (isGeminiAvailable()) {
|
||||
try {
|
||||
await this.geminiAgent.startSession(session, this);
|
||||
return;
|
||||
} catch (e) {
|
||||
logger.warn('SDK', 'Fallback Gemini failed, trying OpenRouter', {
|
||||
sessionId: sessionDbId,
|
||||
error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (isOpenRouterAvailable()) {
|
||||
try {
|
||||
await this.openRouterAgent.startSession(session, this);
|
||||
return;
|
||||
} catch (e) {
|
||||
logger.warn('SDK', 'Fallback OpenRouter failed', {
|
||||
sessionId: sessionDbId,
|
||||
error: e instanceof Error ? e.message : String(e)
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// No fallback or both failed: mark messages abandoned and remove session so queue doesn't grow
|
||||
const pendingStore = this.sessionManager.getPendingMessageStore();
|
||||
const abandoned = pendingStore.markAllSessionMessagesAbandoned(sessionDbId);
|
||||
if (abandoned > 0) {
|
||||
logger.warn('SDK', 'No fallback available; marked pending messages abandoned', {
|
||||
sessionId: sessionDbId,
|
||||
abandoned
|
||||
});
|
||||
}
|
||||
this.sessionManager.removeSessionImmediate(sessionDbId);
|
||||
this.sessionEventBroadcaster.broadcastSessionCompleted(sessionDbId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process pending session queues
|
||||
*/
|
||||
|
||||
@@ -318,6 +318,28 @@ export class SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove session from in-memory maps and notify without awaiting generator.
|
||||
* Used when SDK resume fails and we give up (no fallback): avoids deadlock
|
||||
* from deleteSession() awaiting the same generator promise we're inside.
|
||||
*/
|
||||
removeSessionImmediate(sessionDbId: number): void {
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
if (!session) return;
|
||||
|
||||
this.sessions.delete(sessionDbId);
|
||||
this.sessionQueues.delete(sessionDbId);
|
||||
|
||||
logger.info('SESSION', 'Session removed (orphaned after SDK termination)', {
|
||||
sessionId: sessionDbId,
|
||||
project: session.project
|
||||
});
|
||||
|
||||
if (this.onSessionDeletedCallback) {
|
||||
this.onSessionDeletedCallback();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown all active sessions
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user