fix: provider-aware recovery and stale session cleanup (PR #741)
Applies PR #741 by @licutis onto main, resolving conflicts with recently merged PRs #693, #937, and #627. Adds getActiveAgent() to WorkerService so startup-recovery uses the correct provider instead of hardcoding SDKAgent. Also cleans up sessions stuck 'active' for 6+ hours and their pending messages before processing orphaned queues. Co-Authored-By: licutis <43884712+licutis@users.noreply.github.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -92,8 +92,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js';
|
||||
import { SessionManager } from './worker/SessionManager.js';
|
||||
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
|
||||
import { SDKAgent } from './worker/SDKAgent.js';
|
||||
import { GeminiAgent, isGeminiAvailable } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
|
||||
import { GeminiAgent, isGeminiSelected, isGeminiAvailable } from './worker/GeminiAgent.js';
|
||||
import { OpenRouterAgent, isOpenRouterSelected, isOpenRouterAvailable } from './worker/OpenRouterAgent.js';
|
||||
import { PaginationHelper } from './worker/PaginationHelper.js';
|
||||
import { SettingsManager } from './worker/SettingsManager.js';
|
||||
import { SearchManager } from './worker/SearchManager.js';
|
||||
@@ -392,6 +392,20 @@ export class WorkerService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the appropriate agent based on provider settings.
|
||||
* Same logic as SessionRoutes.getActiveAgent() for consistency.
|
||||
*/
|
||||
private getActiveAgent(): SDKAgent | GeminiAgent | OpenRouterAgent {
|
||||
if (isOpenRouterSelected() && isOpenRouterAvailable()) {
|
||||
return this.openRouterAgent;
|
||||
}
|
||||
if (isGeminiSelected() && isGeminiAvailable()) {
|
||||
return this.geminiAgent;
|
||||
}
|
||||
return this.sdkAgent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a session processor
|
||||
* On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available,
|
||||
@@ -404,9 +418,12 @@ export class WorkerService {
|
||||
if (!session) return;
|
||||
|
||||
const sid = session.sessionDbId;
|
||||
logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid });
|
||||
const agent = this.getActiveAgent();
|
||||
const providerName = agent.constructor.name;
|
||||
|
||||
session.generatorPromise = this.sdkAgent.startSession(session, this)
|
||||
logger.info('SYSTEM', `Starting generator (${source}) using ${providerName}`, { sessionId: sid });
|
||||
|
||||
session.generatorPromise = agent.startSession(session, this)
|
||||
.catch(async (error: unknown) => {
|
||||
if (this.isSessionTerminatedError(error)) {
|
||||
logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', {
|
||||
@@ -418,7 +435,8 @@ export class WorkerService {
|
||||
}
|
||||
logger.error('SDK', 'Session generator failed', {
|
||||
sessionId: session.sessionDbId,
|
||||
project: session.project
|
||||
project: session.project,
|
||||
provider: providerName
|
||||
}, error as Error);
|
||||
throw error;
|
||||
})
|
||||
@@ -511,6 +529,46 @@ export class WorkerService {
|
||||
}> {
|
||||
const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js');
|
||||
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
|
||||
const sessionStore = this.dbManager.getSessionStore();
|
||||
|
||||
// Clean up stale 'active' sessions before processing
|
||||
// Sessions older than 6 hours without activity are likely orphaned
|
||||
const STALE_SESSION_THRESHOLD_MS = 6 * 60 * 60 * 1000;
|
||||
const staleThreshold = Date.now() - STALE_SESSION_THRESHOLD_MS;
|
||||
|
||||
try {
|
||||
const staleSessionIds = sessionStore.db.prepare(`
|
||||
SELECT id FROM sdk_sessions
|
||||
WHERE status = 'active' AND started_at_epoch < ?
|
||||
`).all(staleThreshold) as { id: number }[];
|
||||
|
||||
if (staleSessionIds.length > 0) {
|
||||
const ids = staleSessionIds.map(r => r.id);
|
||||
const placeholders = ids.map(() => '?').join(',');
|
||||
|
||||
sessionStore.db.prepare(`
|
||||
UPDATE sdk_sessions
|
||||
SET status = 'failed', completed_at_epoch = ?
|
||||
WHERE id IN (${placeholders})
|
||||
`).run(Date.now(), ...ids);
|
||||
|
||||
logger.info('SYSTEM', `Marked ${ids.length} stale sessions as failed`);
|
||||
|
||||
const msgResult = sessionStore.db.prepare(`
|
||||
UPDATE pending_messages
|
||||
SET status = 'failed', failed_at_epoch = ?
|
||||
WHERE status = 'pending'
|
||||
AND session_db_id IN (${placeholders})
|
||||
`).run(Date.now(), ...ids);
|
||||
|
||||
if (msgResult.changes > 0) {
|
||||
logger.info('SYSTEM', `Marked ${msgResult.changes} pending messages from stale sessions as failed`);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('SYSTEM', 'Failed to clean up stale sessions', {}, error as Error);
|
||||
}
|
||||
|
||||
const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();
|
||||
|
||||
const result = {
|
||||
|
||||
Reference in New Issue
Block a user