From 8958c3335db5997ccf18b931c2f62beb35811e3d Mon Sep 17 00:00:00 2001 From: Alessandro Costa Date: Sat, 4 Apr 2026 19:14:25 -0300 Subject: [PATCH] feat: drain orphaned pending messages on SIGTERM session completion (#1567) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: drain orphaned pending messages on session completion (SIGTERM) When deleteSession() aborts the SDK agent via SIGTERM, pending messages in the queue are never processed. Without drain, they remain in 'pending' status forever — no future generator picks them up because the session is already completed. Adds markAllSessionMessagesAbandoned() call after deleteSession() in completeByDbId(). This reuses the existing PendingMessageStore method already used by worker-service.ts terminateSession(). Production evidence: 15 orphaned summarize messages found across completed sessions (ages 3h to 3 days) before this fix. After fix: 0 orphaned messages over 23 days of operation. Co-Authored-By: Claude Opus 4.6 (1M context) * fix: document best-effort drain limitation per CodeRabbit review #1567 Add comment noting the rare race condition when generators outlive the 30s SIGTERM timeout. Practical risk is negligible (0 orphans over 23 days). Co-Authored-By: Claude Opus 4.6 (1M context) --------- Co-authored-by: Alessandro Costa Co-authored-by: Claude Opus 4.6 (1M context) --- .../session/SessionCompletionHandler.ts | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/services/worker/session/SessionCompletionHandler.ts b/src/services/worker/session/SessionCompletionHandler.ts index 405fe929..eaba24d1 100644 --- a/src/services/worker/session/SessionCompletionHandler.ts +++ b/src/services/worker/session/SessionCompletionHandler.ts @@ -24,9 +24,30 @@ export class SessionCompletionHandler { * Used by DELETE /api/sessions/:id and POST /api/sessions/:id/complete */ async completeByDbId(sessionDbId: number): Promise { - // Delete from session manager (aborts SDK agent) + // Delete from session manager (aborts SDK agent via SIGTERM) await this.sessionManager.deleteSession(sessionDbId); + // Drain orphaned pending messages left by SIGTERM. + // When deleteSession() aborts the generator, pending messages in the queue + // are never processed. Without drain, they stay in 'pending' status forever + // since no future generator will pick them up for a completed session. + // Note: this is best-effort — if a generator outlives the 30s SIGTERM timeout + // (SessionManager.deleteSession), it may enqueue messages after this drain. + // In practice this race is rare (zero orphans over 23 days, 3400+ observations). + try { + const pendingStore = this.sessionManager.getPendingMessageStore(); + const drainedCount = pendingStore.markAllSessionMessagesAbandoned(sessionDbId); + if (drainedCount > 0) { + logger.warn('SESSION', `Drained ${drainedCount} orphaned pending messages on session completion`, { + sessionId: sessionDbId, drainedCount + }); + } + } catch (e) { + logger.debug('SESSION', 'Failed to drain pending queue on session completion', { + sessionId: sessionDbId, error: e instanceof Error ? e.message : String(e) + }); + } + // Broadcast session completed event this.eventBroadcaster.broadcastSessionCompleted(sessionDbId); }