diff --git a/src/services/worker/agents/ResponseProcessor.ts b/src/services/worker/agents/ResponseProcessor.ts index 1990351d..d725602f 100644 --- a/src/services/worker/agents/ResponseProcessor.ts +++ b/src/services/worker/agents/ResponseProcessor.ts @@ -35,14 +35,14 @@ export async function processAgentResponse( const parsed = parseAgentXml(text, session.contentSessionId); if (!parsed.valid) { - logger.warn('PARSER', `${agentName} returned unparseable response — leaving queue intact`, { + logger.warn('PARSER', `${agentName} returned non-XML/empty response — ignoring queued batch`, { sessionId: session.sessionDbId, }); - // Reset claimed messages back to pending so they're re-claimed on the - // next pass instead of leaving them in `processing` (which counts toward - // pendingCount, which triggers a respawn loop, which trips the restart - // guard, which deletes the message — silent data loss). - sessionManager.getPendingMessageStore().resetProcessingToPending(session.sessionDbId); + // Plain-text skip responses are intentionally ignored. Re-queueing them + // creates an observer loop where the same low-signal batch is retried + // until the restart guard fires or the provider quota is exhausted. + sessionManager.clearPendingForSession(session.sessionDbId); + session.earliestPendingTimestamp = null; return; } diff --git a/tests/worker/agents/response-processor.test.ts b/tests/worker/agents/response-processor.test.ts index 54596391..5e07b2bb 100644 --- a/tests/worker/agents/response-processor.test.ts +++ b/tests/worker/agents/response-processor.test.ts @@ -84,6 +84,7 @@ describe('ResponseProcessor', () => { cleanupProcessed: mock(() => 0), resetStuckMessages: mock(() => 0), }), + clearPendingForSession: mock(() => {}), } as unknown as SessionManager; mockBroadcast = mock(() => {}); @@ -204,15 +205,16 @@ describe('ResponseProcessor', () => { }); }); - describe('non-XML observer responses (fail-fast — plan 03 phase 2)', () => { - it('warns and marks messages failed when the observer returns non-XML prose', async () => { - const markFailed = mock(() => {}); + describe('non-XML observer responses', () => { + it('warns and clears pending work when the observer returns non-XML prose', async () => { + const clearPendingForSession = mock(() => {}); mockSessionManager = { getMessageIterator: async function* () { yield* []; }, - getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), + getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }), + clearPendingForSession, } as unknown as SessionManager; - const session = createMockSession({ processingMessageIds: [7] }); + const session = createMockSession(); const responseText = 'Skipping — repeated log scan with no new findings.'; await processAgentResponse( @@ -228,10 +230,11 @@ describe('ResponseProcessor', () => { expect(logger.warn).toHaveBeenCalledWith( 'PARSER', - expect.stringMatching(/^TestAgent returned unparseable response:/), + expect.stringMatching(/^TestAgent returned non-XML\/empty response/), expect.objectContaining({ sessionId: 1 }) ); - expect(markFailed).toHaveBeenCalledWith(7); + expect(clearPendingForSession).toHaveBeenCalledWith(1); + expect(session.earliestPendingTimestamp).toBeNull(); expect(mockStoreObservations).not.toHaveBeenCalled(); }); }); @@ -453,15 +456,16 @@ describe('ResponseProcessor', () => { }); }); - describe('handling empty / non-XML response (fail-fast — plan 03 phase 2)', () => { - it('marks in-flight messages failed and does NOT call storeObservations on empty response', async () => { - const markFailed = mock(() => {}); + describe('handling empty / non-XML response', () => { + it('clears pending work and does NOT call storeObservations on empty response', async () => { + const clearPendingForSession = mock(() => {}); mockSessionManager = { getMessageIterator: async function* () { yield* []; }, - getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), + getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }), + clearPendingForSession, } as unknown as SessionManager; - const session = createMockSession({ processingMessageIds: [1, 2, 3] }); + const session = createMockSession(); const responseText = ''; await processAgentResponse( @@ -470,18 +474,19 @@ describe('ResponseProcessor', () => { ); expect(mockStoreObservations).not.toHaveBeenCalled(); - expect(markFailed).toHaveBeenCalledTimes(3); - expect(session.processingMessageIds).toEqual([]); + expect(clearPendingForSession).toHaveBeenCalledWith(1); + expect(session.earliestPendingTimestamp).toBeNull(); }); - it('marks in-flight messages failed and does NOT call storeObservations on plain-text response', async () => { - const markFailed = mock(() => {}); + it('clears pending work and does NOT call storeObservations on plain-text response', async () => { + const clearPendingForSession = mock(() => {}); mockSessionManager = { getMessageIterator: async function* () { yield* []; }, - getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), + getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }), + clearPendingForSession, } as unknown as SessionManager; - const session = createMockSession({ processingMessageIds: [42] }); + const session = createMockSession(); const responseText = 'This is just plain text without any XML tags.'; await processAgentResponse( @@ -490,7 +495,8 @@ describe('ResponseProcessor', () => { ); expect(mockStoreObservations).not.toHaveBeenCalled(); - expect(markFailed).toHaveBeenCalledTimes(1); + expect(clearPendingForSession).toHaveBeenCalledWith(1); + expect(session.earliestPendingTimestamp).toBeNull(); }); });