diff --git a/src/services/worker/session/SessionCompletionHandler.ts b/src/services/worker/session/SessionCompletionHandler.ts index 405fe929..eaba24d1 100644 --- a/src/services/worker/session/SessionCompletionHandler.ts +++ b/src/services/worker/session/SessionCompletionHandler.ts @@ -24,9 +24,30 @@ export class SessionCompletionHandler { * Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete */ async completeByDbId(sessionDbId: number): Promise { - // Delete from session manager (aborts SDK agent) + // Delete from session manager (aborts SDK agent via SIGTERM) await this.sessionManager.deleteSession(sessionDbId); + // Drain orphaned pending messages left by SIGTERM. + // When deleteSession() aborts the generator, pending messages in the queue + // are never processed. Without drain, they stay in 'pending' status forever + // since no future generator will pick them up for a completed session. + // Note: this is best-effort — if a generator outlives the 30s SIGTERM timeout + // (SessionManager.deleteSession), it may enqueue messages after this drain. + // In practice this race is rare (zero orphans over 23 days, 3400+ observations). + try { + const pendingStore = this.sessionManager.getPendingMessageStore(); + const drainedCount = pendingStore.markAllSessionMessagesAbandoned(sessionDbId); + if (drainedCount > 0) { + logger.warn('SESSION', `Drained ${drainedCount} orphaned pending messages on session completion`, { + sessionId: sessionDbId, drainedCount + }); + } + } catch (e) { + logger.debug('SESSION', 'Failed to drain pending queue on session completion', { + sessionId: sessionDbId, error: e instanceof Error ? e.message : String(e) + }); + } + // Broadcast session completed event this.eventBroadcaster.broadcastSessionCompleted(sessionDbId); }