fix: drain invalid observer responses

This commit is contained in:
Alex Newman
2026-05-05 13:00:42 -07:00
committed by GitHub
parent 9a2818fc2e
commit 92f800d49c
2 changed files with 31 additions and 25 deletions
@@ -35,14 +35,14 @@ export async function processAgentResponse(
const parsed = parseAgentXml(text, session.contentSessionId); const parsed = parseAgentXml(text, session.contentSessionId);
if (!parsed.valid) { if (!parsed.valid) {
logger.warn('PARSER', `${agentName} returned unparseable response — leaving queue intact`, { logger.warn('PARSER', `${agentName} returned non-XML/empty response — ignoring queued batch`, {
sessionId: session.sessionDbId, sessionId: session.sessionDbId,
}); });
// Reset claimed messages back to pending so they're re-claimed on the // Plain-text skip responses are intentionally ignored. Re-queueing them
// next pass instead of leaving them in `processing` (which counts toward // creates an observer loop where the same low-signal batch is retried
// pendingCount, which triggers a respawn loop, which trips the restart // until the restart guard fires or the provider quota is exhausted.
// guard, which deletes the message — silent data loss). sessionManager.clearPendingForSession(session.sessionDbId);
sessionManager.getPendingMessageStore().resetProcessingToPending(session.sessionDbId); session.earliestPendingTimestamp = null;
return; return;
} }
+25 -19
View File
@@ -84,6 +84,7 @@ describe('ResponseProcessor', () => {
cleanupProcessed: mock(() => 0), cleanupProcessed: mock(() => 0),
resetStuckMessages: mock(() => 0), resetStuckMessages: mock(() => 0),
}), }),
clearPendingForSession: mock(() => {}),
} as unknown as SessionManager; } as unknown as SessionManager;
mockBroadcast = mock(() => {}); mockBroadcast = mock(() => {});
@@ -204,15 +205,16 @@ describe('ResponseProcessor', () => {
}); });
}); });
describe('non-XML observer responses (fail-fast — plan 03 phase 2)', () => { describe('non-XML observer responses', () => {
it('warns and marks messages failed when the observer returns non-XML prose', async () => { it('warns and clears pending work when the observer returns non-XML prose', async () => {
const markFailed = mock(() => {}); const clearPendingForSession = mock(() => {});
mockSessionManager = { mockSessionManager = {
getMessageIterator: async function* () { yield* []; }, getMessageIterator: async function* () { yield* []; },
getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }),
clearPendingForSession,
} as unknown as SessionManager; } as unknown as SessionManager;
const session = createMockSession({ processingMessageIds: [7] }); const session = createMockSession();
const responseText = 'Skipping — repeated log scan with no new findings.'; const responseText = 'Skipping — repeated log scan with no new findings.';
await processAgentResponse( await processAgentResponse(
@@ -228,10 +230,11 @@ describe('ResponseProcessor', () => {
expect(logger.warn).toHaveBeenCalledWith( expect(logger.warn).toHaveBeenCalledWith(
'PARSER', 'PARSER',
expect.stringMatching(/^TestAgent returned unparseable response:/), expect.stringMatching(/^TestAgent returned non-XML\/empty response/),
expect.objectContaining({ sessionId: 1 }) expect.objectContaining({ sessionId: 1 })
); );
expect(markFailed).toHaveBeenCalledWith(7); expect(clearPendingForSession).toHaveBeenCalledWith(1);
expect(session.earliestPendingTimestamp).toBeNull();
expect(mockStoreObservations).not.toHaveBeenCalled(); expect(mockStoreObservations).not.toHaveBeenCalled();
}); });
}); });
@@ -453,15 +456,16 @@ describe('ResponseProcessor', () => {
}); });
}); });
describe('handling empty / non-XML response (fail-fast — plan 03 phase 2)', () => { describe('handling empty / non-XML response', () => {
it('marks in-flight messages failed and does NOT call storeObservations on empty response', async () => { it('clears pending work and does NOT call storeObservations on empty response', async () => {
const markFailed = mock(() => {}); const clearPendingForSession = mock(() => {});
mockSessionManager = { mockSessionManager = {
getMessageIterator: async function* () { yield* []; }, getMessageIterator: async function* () { yield* []; },
getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }),
clearPendingForSession,
} as unknown as SessionManager; } as unknown as SessionManager;
const session = createMockSession({ processingMessageIds: [1, 2, 3] }); const session = createMockSession();
const responseText = ''; const responseText = '';
await processAgentResponse( await processAgentResponse(
@@ -470,18 +474,19 @@ describe('ResponseProcessor', () => {
); );
expect(mockStoreObservations).not.toHaveBeenCalled(); expect(mockStoreObservations).not.toHaveBeenCalled();
expect(markFailed).toHaveBeenCalledTimes(3); expect(clearPendingForSession).toHaveBeenCalledWith(1);
expect(session.processingMessageIds).toEqual([]); expect(session.earliestPendingTimestamp).toBeNull();
}); });
it('marks in-flight messages failed and does NOT call storeObservations on plain-text response', async () => { it('clears pending work and does NOT call storeObservations on plain-text response', async () => {
const markFailed = mock(() => {}); const clearPendingForSession = mock(() => {});
mockSessionManager = { mockSessionManager = {
getMessageIterator: async function* () { yield* []; }, getMessageIterator: async function* () { yield* []; },
getPendingMessageStore: () => ({ markFailed, confirmProcessed: mock(() => {}) }), getPendingMessageStore: () => ({ confirmProcessed: mock(() => {}) }),
clearPendingForSession,
} as unknown as SessionManager; } as unknown as SessionManager;
const session = createMockSession({ processingMessageIds: [42] }); const session = createMockSession();
const responseText = 'This is just plain text without any XML tags.'; const responseText = 'This is just plain text without any XML tags.';
await processAgentResponse( await processAgentResponse(
@@ -490,7 +495,8 @@ describe('ResponseProcessor', () => {
); );
expect(mockStoreObservations).not.toHaveBeenCalled(); expect(mockStoreObservations).not.toHaveBeenCalled();
expect(markFailed).toHaveBeenCalledTimes(1); expect(clearPendingForSession).toHaveBeenCalledWith(1);
expect(session.earliestPendingTimestamp).toBeNull();
}); });
}); });