diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 9bcbd11d..af044731 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -92,8 +92,8 @@ import { DatabaseManager } from './worker/DatabaseManager.js'; import { SessionManager } from './worker/SessionManager.js'; import { SSEBroadcaster } from './worker/SSEBroadcaster.js'; import { SDKAgent } from './worker/SDKAgent.js'; -import { GeminiAgent, isGeminiAvailable } from './worker/GeminiAgent.js'; -import { OpenRouterAgent, isOpenRouterAvailable } from './worker/OpenRouterAgent.js'; +import { GeminiAgent, isGeminiSelected, isGeminiAvailable } from './worker/GeminiAgent.js'; +import { OpenRouterAgent, isOpenRouterSelected, isOpenRouterAvailable } from './worker/OpenRouterAgent.js'; import { PaginationHelper } from './worker/PaginationHelper.js'; import { SettingsManager } from './worker/SettingsManager.js'; import { SearchManager } from './worker/SearchManager.js'; @@ -392,6 +392,20 @@ export class WorkerService { } } + /** + * Get the appropriate agent based on provider settings. + * Same logic as SessionRoutes.getActiveAgent() for consistency. + */ + private getActiveAgent(): SDKAgent | GeminiAgent | OpenRouterAgent { + if (isOpenRouterSelected() && isOpenRouterAvailable()) { + return this.openRouterAgent; + } + if (isGeminiSelected() && isGeminiAvailable()) { + return this.geminiAgent; + } + return this.sdkAgent; + } + /** * Start a session processor * On SDK resume failure (terminated session), falls back to Gemini/OpenRouter if available, @@ -404,9 +418,12 @@ export class WorkerService { if (!session) return; const sid = session.sessionDbId; - logger.info('SYSTEM', `Starting generator (${source})`, { sessionId: sid }); + const agent = this.getActiveAgent(); + const providerName = agent.constructor.name; - session.generatorPromise = this.sdkAgent.startSession(session, this) + logger.info('SYSTEM', `Starting generator (${source}) using ${providerName}`, { sessionId: sid }); + + session.generatorPromise = agent.startSession(session, this) .catch(async (error: unknown) => { if (this.isSessionTerminatedError(error)) { logger.warn('SDK', 'SDK resume failed, falling back to standalone processing', { @@ -418,7 +435,8 @@ export class WorkerService { } logger.error('SDK', 'Session generator failed', { sessionId: session.sessionDbId, - project: session.project + project: session.project, + provider: providerName }, error as Error); throw error; }) @@ -511,6 +529,46 @@ export class WorkerService { }> { const { PendingMessageStore } = await import('./sqlite/PendingMessageStore.js'); const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3); + const sessionStore = this.dbManager.getSessionStore(); + + // Clean up stale 'active' sessions before processing + // Sessions older than 6 hours without activity are likely orphaned + const STALE_SESSION_THRESHOLD_MS = 6 * 60 * 60 * 1000; + const staleThreshold = Date.now() - STALE_SESSION_THRESHOLD_MS; + + try { + const staleSessionIds = sessionStore.db.prepare(` + SELECT id FROM sdk_sessions + WHERE status = 'active' AND started_at_epoch < ? + `).all(staleThreshold) as { id: number }[]; + + if (staleSessionIds.length > 0) { + const ids = staleSessionIds.map(r => r.id); + const placeholders = ids.map(() => '?').join(','); + + sessionStore.db.prepare(` + UPDATE sdk_sessions + SET status = 'failed', completed_at_epoch = ? + WHERE id IN (${placeholders}) + `).run(Date.now(), ...ids); + + logger.info('SYSTEM', `Marked ${ids.length} stale sessions as failed`); + + const msgResult = sessionStore.db.prepare(` + UPDATE pending_messages + SET status = 'failed', failed_at_epoch = ? + WHERE status = 'pending' + AND session_db_id IN (${placeholders}) + `).run(Date.now(), ...ids); + + if (msgResult.changes > 0) { + logger.info('SYSTEM', `Marked ${msgResult.changes} pending messages from stale sessions as failed`); + } + } + } catch (error) { + logger.error('SYSTEM', 'Failed to clean up stale sessions', {}, error as Error); + } + const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages(); const result = {