fix: self-healing claimNextMessage prevents stuck processing messages (#1159)

* fix: self-healing claimNextMessage prevents stuck processing messages

claimAndDelete → claimNextMessage with atomic self-healing: resets stale
processing messages (>60s) back to pending before claiming. Eliminates
stuck messages from generator crashes without external timers. Removes
redundant idle-timeout reset in worker-service.ts. Adds QUEUE to logger
Component type.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: update stale comments in SessionQueueProcessor to reflect claim-confirm pattern

Comments still referenced the old claim-and-delete pattern after the
claimNextMessage rename. Updated to accurately describe the current
lifecycle where messages are marked as processing and stay in DB until
confirmProcessed() is called.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: move Date.now() inside transaction and extract stale threshold constant

- Move Date.now() inside claimNextMessage transaction closure so timestamp
  is fresh if WAL contention causes retry
- Extract STALE_PROCESSING_THRESHOLD_MS to module-level constant
- Add comment clarifying strict < boundary semantics

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-02-17 23:15:46 -05:00
committed by GitHub
parent b2e3a7e668
commit b88251bc8b
9 changed files with 298 additions and 93 deletions
@@ -5,11 +5,11 @@ import type { PendingMessageStore, PersistentPendingMessage } from '../../../src
/**
* Mock PendingMessageStore that returns null (empty queue) by default.
* Individual tests can override claimAndDelete behavior.
* Individual tests can override claimNextMessage behavior.
*/
function createMockStore(): PendingMessageStore {
return {
claimAndDelete: mock(() => null),
claimNextMessage: mock(() => null),
toPendingMessage: mock((msg: PersistentPendingMessage) => ({
type: msg.message_type,
tool_name: msg.tool_name || undefined,
@@ -140,7 +140,7 @@ describe('SessionQueueProcessor', () => {
let callCount = 0;
// Return a message on first call, then null
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
return createMockMessage({ id: 1 });
@@ -170,7 +170,7 @@ describe('SessionQueueProcessor', () => {
expect(results).toHaveLength(1);
expect(results[0]._persistentId).toBe(1);
// Store's claimAndDelete should have been called at least twice
// Store's claimNextMessage should have been called at least twice
// (once returning message, once returning null)
expect(callCount).toBeGreaterThanOrEqual(1);
});
@@ -206,7 +206,7 @@ describe('SessionQueueProcessor', () => {
const onIdleTimeout = mock(() => {});
// Return null to trigger wait
(store.claimAndDelete as any) = mock(() => null);
(store.claimNextMessage as any) = mock(() => null);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -242,7 +242,7 @@ describe('SessionQueueProcessor', () => {
// First call: return null (queue empty)
// After message event: return message
// Then return null again
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
// First check - queue empty, will wait
@@ -312,7 +312,7 @@ describe('SessionQueueProcessor', () => {
it('should clean up event listeners when message received', async () => {
// Return a message immediately
(store.claimAndDelete as any) = mock(() => createMockMessage({ id: 1 }));
(store.claimNextMessage as any) = mock(() => createMockMessage({ id: 1 }));
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -344,7 +344,7 @@ describe('SessionQueueProcessor', () => {
it('should continue after store error with backoff', async () => {
let callCount = 0;
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
callCount++;
if (callCount === 1) {
throw new Error('Database error');
@@ -377,7 +377,7 @@ describe('SessionQueueProcessor', () => {
});
it('should exit cleanly if aborted during error backoff', async () => {
(store.claimAndDelete as any) = mock(() => {
(store.claimNextMessage as any) = mock(() => {
throw new Error('Database error');
});
@@ -413,7 +413,7 @@ describe('SessionQueueProcessor', () => {
created_at_epoch: 1704067200000
});
(store.claimAndDelete as any) = mock(() => mockPersistentMessage);
(store.claimNextMessage as any) = mock(() => mockPersistentMessage);
const options: CreateIteratorOptions = {
sessionDbId: 123,
@@ -0,0 +1,146 @@
import { describe, test, expect, beforeEach, afterEach } from 'bun:test';
import { ClaudeMemDatabase } from '../../../src/services/sqlite/Database.js';
import { PendingMessageStore } from '../../../src/services/sqlite/PendingMessageStore.js';
import { createSDKSession } from '../../../src/services/sqlite/Sessions.js';
import type { PendingMessage } from '../../../src/services/worker-types.js';
import type { Database } from 'bun:sqlite';
describe('PendingMessageStore - Self-Healing claimNextMessage', () => {
let db: Database;
let store: PendingMessageStore;
let sessionDbId: number;
const CONTENT_SESSION_ID = 'test-self-heal';
beforeEach(() => {
db = new ClaudeMemDatabase(':memory:').db;
store = new PendingMessageStore(db, 3);
sessionDbId = createSDKSession(db, CONTENT_SESSION_ID, 'test-project', 'Test prompt');
});
afterEach(() => {
db.close();
});
function enqueueMessage(overrides: Partial<PendingMessage> = {}): number {
const message: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
...overrides,
};
return store.enqueue(sessionDbId, CONTENT_SESSION_ID, message);
}
/**
* Helper to simulate a stuck processing message by directly updating the DB
* to set started_processing_at_epoch to a time in the past (>60s ago)
*/
function makeMessageStaleProcessing(messageId: number): void {
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago (well past 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, messageId]
);
}
test('stuck processing messages are recovered on next claim', () => {
// Enqueue a message and make it stuck in processing
const msgId = enqueueMessage();
makeMessageStaleProcessing(msgId);
// Verify it's stuck (status = processing)
const beforeClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(beforeClaim.status).toBe('processing');
// claimNextMessage should self-heal: reset the stuck message, then claim it
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// It should now be in 'processing' status again (freshly claimed)
const afterClaim = db.query('SELECT status FROM pending_messages WHERE id = ?').get(msgId) as { status: string };
expect(afterClaim.status).toBe('processing');
});
test('actively processing messages are NOT recovered', () => {
// Enqueue two messages
const activeId = enqueueMessage();
const pendingId = enqueueMessage();
// Make the first one actively processing (recent timestamp, NOT stale)
const recentTimestamp = Date.now() - 5_000; // 5 seconds ago (well within 60s threshold)
db.run(
`UPDATE pending_messages SET status = 'processing', started_processing_at_epoch = ? WHERE id = ?`,
[recentTimestamp, activeId]
);
// claimNextMessage should NOT reset the active one — should claim the pending one instead
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(pendingId);
// The active message should still be processing
const activeMsg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(activeId) as { status: string };
expect(activeMsg.status).toBe('processing');
});
test('recovery and claim is atomic within single call', () => {
// Enqueue three messages
const stuckId = enqueueMessage();
const pendingId1 = enqueueMessage();
const pendingId2 = enqueueMessage();
// Make the first one stuck
makeMessageStaleProcessing(stuckId);
// Single claimNextMessage should reset stuck AND claim oldest pending (which is the reset stuck one)
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).not.toBeNull();
// The stuck message was reset to pending, and being oldest, it gets claimed
expect(claimed!.id).toBe(stuckId);
// The other two should still be pending
const msg1 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId1) as { status: string };
const msg2 = db.query('SELECT status FROM pending_messages WHERE id = ?').get(pendingId2) as { status: string };
expect(msg1.status).toBe('pending');
expect(msg2.status).toBe('pending');
});
test('no messages returns null without error', () => {
const claimed = store.claimNextMessage(sessionDbId);
expect(claimed).toBeNull();
});
test('self-healing only affects the specified session', () => {
// Create a second session
const session2Id = createSDKSession(db, 'other-session', 'test-project', 'Test');
// Enqueue and make stuck in session 1
const stuckInSession1 = enqueueMessage();
makeMessageStaleProcessing(stuckInSession1);
// Enqueue in session 2
const msg: PendingMessage = {
type: 'observation',
tool_name: 'TestTool',
tool_input: { test: 'input' },
tool_response: { test: 'response' },
prompt_number: 1,
};
const session2MsgId = store.enqueue(session2Id, 'other-session', msg);
makeMessageStaleProcessing(session2MsgId);
// Claim for session 2 — should only heal session 2's stuck message
const claimed = store.claimNextMessage(session2Id);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(session2MsgId);
// Session 1's stuck message should still be stuck (not healed by session 2's claim)
const session1Msg = db.query('SELECT status FROM pending_messages WHERE id = ?').get(stuckInSession1) as { status: string };
expect(session1Msg.status).toBe('processing');
});
});
+34 -4
View File
@@ -192,8 +192,8 @@ describe('Zombie Agent Prevention', () => {
// hasAnyPendingWork should return true
expect(pendingStore.hasAnyPendingWork()).toBe(true);
// CLAIM-CONFIRM pattern: claimAndDelete marks as 'processing' (not deleted)
const claimed = pendingStore.claimAndDelete(sessionId);
// CLAIM-CONFIRM pattern: claimNextMessage marks as 'processing' (not deleted)
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed?.id).toBe(msgId1);
@@ -206,11 +206,11 @@ describe('Zombie Agent Prevention', () => {
expect(pendingStore.getPendingCount(sessionId)).toBe(2);
// Claim and confirm remaining messages
const msg2 = pendingStore.claimAndDelete(sessionId);
const msg2 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg2!.id);
expect(pendingStore.getPendingCount(sessionId)).toBe(1);
const msg3 = pendingStore.claimAndDelete(sessionId);
const msg3 = pendingStore.claimNextMessage(sessionId);
pendingStore.confirmProcessed(msg3!.id);
// Should be empty now
@@ -266,6 +266,36 @@ describe('Zombie Agent Prevention', () => {
expect(session.abortController.signal.aborted).toBe(false);
});
// Test: Stuck processing messages are recovered by claimNextMessage self-healing
test('should recover stuck processing messages via claimNextMessage self-healing', async () => {
const sessionId = createDbSession('content-stuck-recovery');
// Enqueue and claim a message (transitions to 'processing')
const msgId = enqueueTestMessage(sessionId, 'content-stuck-recovery');
const claimed = pendingStore.claimNextMessage(sessionId);
expect(claimed).not.toBeNull();
expect(claimed!.id).toBe(msgId);
// Simulate crash: message stuck in 'processing' with stale timestamp
const staleTimestamp = Date.now() - 120_000; // 2 minutes ago
db.run(
`UPDATE pending_messages SET started_processing_at_epoch = ? WHERE id = ?`,
[staleTimestamp, msgId]
);
// Verify it's stuck
expect(pendingStore.getPendingCount(sessionId)).toBe(1); // processing counts as pending work
// Next claimNextMessage should self-heal: reset stuck message and re-claim it
const recovered = pendingStore.claimNextMessage(sessionId);
expect(recovered).not.toBeNull();
expect(recovered!.id).toBe(msgId);
// Confirm it can be processed successfully
pendingStore.confirmProcessed(msgId);
expect(pendingStore.getPendingCount(sessionId)).toBe(0);
});
// Test: Generator cleanup on session delete
test('should properly cleanup generator promise on session delete', async () => {
const session = createMockSession(1);