Files
claude-mem/src/services/worker/agents/ResponseProcessor.ts
T
Alex Newman ba1ef6c42c fix: Issue Blowout 2026 — 25 bugs across worker, hooks, security, and search (#2080)
* fix: resolve search, database, and docker bugs (#1913, #1916, #1956, #1957, #2048)

- Fix concept/concepts param mismatch in SearchManager.normalizeParams (#1916)
- Add FTS5 keyword fallback when ChromaDB is unavailable (#1913, #2048)
- Add periodic WAL checkpoint and journal_size_limit to prevent unbounded WAL growth (#1956)
- Add periodic clearFailed() to purge stale pending_messages (#1957)
- Fix nounset-safe TTY_ARGS expansion in docker/claude-mem/run.sh

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: prevent silent data loss on non-XML responses, add queue info to /health (#1867, #1874)

- ResponseProcessor: mark messages as failed (with retry) instead of confirming
  when the LLM returns non-XML garbage (auth errors, rate limits) (#1874)
- Health endpoint: include activeSessions count for queue liveness monitoring (#1867)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: cache isFts5Available() at construction time

Addresses Greptile review: avoid DDL probe (CREATE + DROP) on every text
query. Result is now cached in _fts5Available at construction.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve worker stability bugs — pool deadlock, MCP loopback, restart guard (#1868, #1876, #2053)

- Replace flat consecutiveRestarts counter with time-windowed RestartGuard:
  only counts restarts within 60s window (cap=10), decays after 5min of
  success. Prevents stranding pending messages on long-running sessions. (#2053)

- Add idle session eviction to pool slot allocation: when all slots are full,
  evict the idlest session (no pending work, oldest activity) to free a slot
  for new requests, preventing 60s timeout deadlock. (#1868)

- Fix MCP loopback self-check: use process.execPath instead of bare 'node'
  which fails on non-interactive PATH. Fix crash misclassification by removing
  false "Generator exited unexpectedly" error log on normal completion. (#1876)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve hooks reliability bugs — summarize exit code, session-init health wait (#1896, #1901, #1903, #1907)

- Wrap summarize hook's workerHttpRequest in try/catch to prevent exit
  code 2 (blocking error) on network failures or malformed responses.
  Session exit no longer blocks on worker errors. (#1901)

- Add health-check wait loop to UserPromptSubmit session-init command in
  hooks.json. On Linux/WSL where hook ordering fires UserPromptSubmit
  before SessionStart, session-init now waits up to 10s for worker health
  before proceeding. Also wrap session-init HTTP call in try/catch. (#1907)

- Close #1896 as already-fixed: mtime comparison at file-context.ts:255-267
  bypasses truncation when file is newer than latest observation.

- Close #1903 as no-repro: hooks.json correctly declares all hook events.
  Issue was Claude Code 12.0.1/macOS platform event-dispatch bug.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: security hardening — bearer auth, path validation, rate limits, per-user port (#1932, #1933, #1934, #1935, #1936)

- Add bearer token auth to all API endpoints: auto-generated 32-byte
  token stored at ~/.claude-mem/worker-auth-token (mode 0600). All hook,
  MCP, viewer, and OpenCode requests include Authorization header.
  Health/readiness endpoints exempt for polling. (#1932, #1933)

- Add path traversal protection: watch.context.path validated against
  project root and ~/.claude-mem/ before write. Rejects ../../../etc
  style attacks. (#1934)

- Reduce JSON body limit from 50MB to 5MB. Add in-memory rate limiter
  (300 req/min/IP) to prevent abuse. (#1935)

- Derive default worker port from UID (37700 + uid%100) to prevent
  cross-user data leakage on multi-user macOS. Windows falls back to
  37777. Shell hooks use same formula via id -u. (#1936)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve search project filtering and import Chroma sync (#1911, #1912, #1914, #1918)

- Fix per-type search endpoints to pass project filter to Chroma queries
  and SQLite hydration. searchObservations/Sessions/UserPrompts now use
  $or clause matching project + merged_into_project. (#1912)

- Fix timeline/search methods to pass project to Chroma anchor queries.
  Prevents cross-project result leakage when project param omitted. (#1911)

- Sync imported observations to ChromaDB after FTS rebuild. Import
  endpoint now calls chromaSync.syncObservation() for each imported
  row, making them visible to MCP search(). (#1914)

- Fix session-init cwd fallback to match context.ts (process.cwd()).
  Prevents project key mismatch that caused "no previous sessions"
  on fresh sessions. (#1918)

- Fix sync-marketplace restart to include auth token and per-user port.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve all CodeRabbit and Greptile review comments on PR #2080

- Fix run.sh comment mismatch (no-op flag vs empty array)
- Gate session-init on health check success (prevent running when worker unreachable)
- Fix date_desc ordering ignored in FTS session search
- Age-scope failed message purge (1h retention) instead of clearing all
- Anchor RestartGuard decay to real successes (null init, not Date.now())
- Add recordSuccess() calls in ResponseProcessor and completion path
- Prevent caller headers from overriding bearer auth token
- Add lazy cleanup for rate limiter map to prevent unbounded growth
- Bound post-import Chroma sync with concurrency limit of 8
- Add doc_type:'observation' filter to Chroma queries feeding observation hydration
- Add FTS fallback to all specialized search handlers (observations, sessions, prompts, timeline)
- Add response.ok check and error handling in viewer saveSettings

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve CodeRabbit round-2 review comments

- Use failure timestamp (COALESCE) instead of created_at_epoch for stale purge
- Downgrade _fts5Available flag when FTS table creation fails
- Escape FTS5 MATCH input by quoting user queries as literal phrases
- Escape LIKE metacharacters (%, _, \) in prompt text search
- Add response.ok check in initial settings load (matches save flow)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve CodeRabbit round-3 review comments

- Include failed_at_epoch in COALESCE for age-scoped purge
- Re-throw FTS5 errors so callers can distinguish failure from no-results
- Wrap all FTS fallback calls in SearchManager with try/catch

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-20 11:42:09 -07:00

423 lines
16 KiB
TypeScript

/**
* ResponseProcessor: Shared response processing for all agent implementations
*
* Responsibility:
* - Parse observations and summaries from agent responses
* - Execute atomic database transactions
* - Orchestrate Chroma sync (fire-and-forget)
* - Broadcast to SSE clients
* - Clean up processed messages
*
* This module extracts 150+ lines of duplicate code from SDKAgent, GeminiAgent, and OpenRouterAgent.
*/
import { logger } from '../../../utils/logger.js';
import { parseObservations, parseSummary, type ParsedObservation, type ParsedSummary } from '../../../sdk/parser.js';
import { SUMMARY_MODE_MARKER, MAX_CONSECUTIVE_SUMMARY_FAILURES } from '../../../sdk/prompts.js';
import { updateCursorContextForProject } from '../../integrations/CursorHooksInstaller.js';
import { updateFolderClaudeMdFiles } from '../../../utils/claude-md-utils.js';
import { getWorkerPort } from '../../../shared/worker-utils.js';
import { SettingsDefaultsManager } from '../../../shared/SettingsDefaultsManager.js';
import { USER_SETTINGS_PATH } from '../../../shared/paths.js';
import type { ActiveSession } from '../../worker-types.js';
import type { DatabaseManager } from '../DatabaseManager.js';
import type { SessionManager } from '../SessionManager.js';
import type { WorkerRef, StorageResult } from './types.js';
import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js';
import { cleanupProcessedMessages } from './SessionCleanupHelper.js';
/**
* Process agent response text (parse XML, save to database, sync to Chroma, broadcast SSE)
*
* This is the unified response processor that handles:
* 1. Adding response to conversation history (for provider interop)
* 2. Parsing observations and summaries from XML
* 3. Atomic database transaction to store observations + summary
* 4. Async Chroma sync (fire-and-forget, failures are non-critical)
* 5. SSE broadcast to web UI clients
* 6. Session cleanup
*
* @param text - Response text from the agent
* @param session - Active session being processed
* @param dbManager - Database manager for storage operations
* @param sessionManager - Session manager for message tracking
* @param worker - Worker reference for SSE broadcasting (optional)
* @param discoveryTokens - Token cost delta for this response
* @param originalTimestamp - Original epoch when message was queued (for accurate timestamps)
* @param agentName - Name of the agent for logging (e.g., 'SDK', 'Gemini', 'OpenRouter')
*/
export async function processAgentResponse(
text: string,
session: ActiveSession,
dbManager: DatabaseManager,
sessionManager: SessionManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
originalTimestamp: number | null,
agentName: string,
projectRoot?: string,
modelId?: string
): Promise<void> {
// Track generator activity for stale detection (Issue #1099)
session.lastGeneratorActivity = Date.now();
// Add assistant response to shared conversation history for provider interop
if (text) {
session.conversationHistory.push({ role: 'assistant', content: text });
}
// Parse observations and summary
const observations = parseObservations(text, session.contentSessionId);
// Detect whether the most recent prompt was a summary request.
// If so, enable observation-to-summary coercion to prevent the infinite
// retry loop described in #1633.
const lastMessage = session.conversationHistory.at(-1);
const lastUserMessage = lastMessage?.role === 'user'
? lastMessage
: session.conversationHistory.findLast(m => m.role === 'user') ?? null;
const summaryExpected = lastUserMessage?.content?.includes(SUMMARY_MODE_MARKER) ?? false;
const summary = parseSummary(text, session.sessionDbId, summaryExpected);
// Detect non-XML responses (auth errors, rate limits, garbled output).
// When the response contains no parseable XML and produced no observations,
// mark the pending messages as failed instead of confirming them — this prevents
// silent data loss when the LLM returns garbage (#1874).
const isNonXmlResponse = (
text.trim() &&
observations.length === 0 &&
!summary &&
!/<observation>|<summary>|<skip_summary\b/.test(text)
);
if (isNonXmlResponse) {
const preview = text.length > 200 ? `${text.slice(0, 200)}...` : text;
logger.warn('PARSER', `${agentName} returned non-XML response; marking messages as failed for retry (#1874)`, {
sessionId: session.sessionDbId,
preview
});
// Mark messages as failed (retry logic in PendingMessageStore handles retries)
const pendingStore = sessionManager.getPendingMessageStore();
for (const messageId of session.processingMessageIds) {
pendingStore.markFailed(messageId);
}
session.processingMessageIds = [];
return;
}
// Convert nullable fields to empty strings for storeSummary (if summary exists)
const summaryForStore = normalizeSummaryForStorage(summary);
// Get session store for atomic transaction
const sessionStore = dbManager.getSessionStore();
// CRITICAL: Must use memorySessionId (not contentSessionId) for FK constraint
if (!session.memorySessionId) {
throw new Error('Cannot store observations: memorySessionId not yet captured');
}
// SAFETY NET (Issue #846 / Multi-terminal FK fix):
// The PRIMARY fix is in SDKAgent.ts where ensureMemorySessionIdRegistered() is called
// immediately when the SDK returns a memory_session_id. This call is a defensive safety net
// in case the DB was somehow not updated (race condition, crash, etc.).
// In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL
// for each new generator, ensuring clean isolation.
sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId);
// Log pre-storage with session ID chain for verification
logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId
});
// Label observations with the subagent identity captured from the claimed messages.
// Main-session messages leave these null, so main-session rows stay NULL in the DB.
const labeledObservations = observations.map(obs => ({
...obs,
agent_type: session.pendingAgentType ?? null,
agent_id: session.pendingAgentId ?? null
}));
// ATOMIC TRANSACTION: Store observations + summary ONCE
// Messages are already deleted from queue on claim, so no completion tracking needed.
// Wrap in try/finally so the subagent tracker clears even if storage throws —
// otherwise stale identity could leak into the next batch and mislabel rows.
// Expected invariant: all observations in a batch share the same agent context,
// because ResponseProcessor runs after a single agent-response cycle.
let result: ReturnType<typeof sessionStore.storeObservations>;
try {
result = sessionStore.storeObservations(
session.memorySessionId,
session.project,
labeledObservations,
summaryForStore,
session.lastPromptNumber,
discoveryTokens,
originalTimestamp ?? undefined,
modelId
);
} finally {
session.pendingAgentId = null;
session.pendingAgentType = null;
}
// Log storage result with IDs for end-to-end traceability
logger.info('DB', `STORED | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${result.observationIds.length} | obsIds=[${result.observationIds.join(',')}] | summaryId=${result.summaryId || 'none'}`, {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId
});
// Track whether a summary record was stored so the status endpoint can expose this
// to the Stop hook for silent-summary-loss detection (#1633)
session.lastSummaryStored = result.summaryId !== null;
// Circuit breaker: track consecutive summary failures (#1633).
// Only evaluate when a summary was actually expected (summarize message was sent).
// Without this guard, the counter would increment on every normal observation
// response, tripping the breaker after 3 observations and permanently blocking
// summarization — reproducing the data-loss scenario this fix is meant to prevent.
if (summaryExpected) {
const skippedIntentionally = /<skip_summary\b/.test(text);
if (summaryForStore !== null) {
// Summary was present in the response — reset the failure counter
session.consecutiveSummaryFailures = 0;
} else if (skippedIntentionally) {
// Explicit <skip_summary/> is a valid protocol response — neither success
// nor failure. Leave the counter unchanged so we don't mask a bad run that
// happens to end on a skip, but also don't punish intentional skips.
} else {
// Summary was expected but none was stored — count as failure
session.consecutiveSummaryFailures += 1;
if (session.consecutiveSummaryFailures >= MAX_CONSECUTIVE_SUMMARY_FAILURES) {
logger.error('SESSION', `Circuit breaker: ${session.consecutiveSummaryFailures} consecutive summary failures — further summarize requests will be skipped (#1633)`, {
sessionId: session.sessionDbId,
contentSessionId: session.contentSessionId
});
}
}
}
// CLAIM-CONFIRM: Now that storage succeeded, confirm all processing messages (delete from queue)
// This is the critical step that prevents message loss on generator crash
const pendingStore = sessionManager.getPendingMessageStore();
for (const messageId of session.processingMessageIds) {
pendingStore.confirmProcessed(messageId);
}
if (session.processingMessageIds.length > 0) {
logger.debug('QUEUE', `CONFIRMED_BATCH | sessionDbId=${session.sessionDbId} | count=${session.processingMessageIds.length} | ids=[${session.processingMessageIds.join(',')}]`);
// Record successful processing so restart guard decay is anchored to real successes
session.restartGuard?.recordSuccess();
}
// Clear the tracking array after confirmation
session.processingMessageIds = [];
// AFTER transaction commits - async operations (can fail safely without data loss)
await syncAndBroadcastObservations(
observations,
result,
session,
dbManager,
worker,
discoveryTokens,
agentName,
projectRoot
);
// Sync and broadcast summary if present
await syncAndBroadcastSummary(
summary,
summaryForStore,
result,
session,
dbManager,
worker,
discoveryTokens,
agentName
);
// Clean up session state
cleanupProcessedMessages(session, worker);
}
/**
* Normalize summary for storage (convert null fields to empty strings)
*/
function normalizeSummaryForStorage(summary: ParsedSummary | null): {
request: string;
investigated: string;
learned: string;
completed: string;
next_steps: string;
notes: string | null;
} | null {
if (!summary) return null;
return {
request: summary.request || '',
investigated: summary.investigated || '',
learned: summary.learned || '',
completed: summary.completed || '',
next_steps: summary.next_steps || '',
notes: summary.notes
};
}
/**
* Sync observations to Chroma and broadcast to SSE clients
*/
async function syncAndBroadcastObservations(
observations: ParsedObservation[],
result: StorageResult,
session: ActiveSession,
dbManager: DatabaseManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
agentName: string,
projectRoot?: string
): Promise<void> {
for (let i = 0; i < observations.length; i++) {
const obsId = result.observationIds[i];
const obs = observations[i];
const chromaStart = Date.now();
// Sync to Chroma (fire-and-forget, skipped if Chroma is disabled)
dbManager.getChromaSync()?.syncObservation(
obsId,
session.contentSessionId,
session.project,
obs,
session.lastPromptNumber,
result.createdAtEpoch,
discoveryTokens
).then(() => {
const chromaDuration = Date.now() - chromaStart;
logger.debug('CHROMA', 'Observation synced', {
obsId,
duration: `${chromaDuration}ms`,
type: obs.type,
title: obs.title || '(untitled)'
});
}).catch((error) => {
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
obsId,
type: obs.type,
title: obs.title || '(untitled)'
}, error);
});
// Broadcast to SSE clients (for web UI)
// BUGFIX: Use obs.files_read and obs.files_modified (not obs.files)
broadcastObservation(worker, {
id: obsId,
memory_session_id: session.memorySessionId,
session_id: session.contentSessionId,
platform_source: session.platformSource,
type: obs.type,
title: obs.title,
subtitle: obs.subtitle,
text: null, // text field is not in ParsedObservation
narrative: obs.narrative || null,
facts: JSON.stringify(obs.facts || []),
concepts: JSON.stringify(obs.concepts || []),
files_read: JSON.stringify(obs.files_read || []),
files_modified: JSON.stringify(obs.files_modified || []),
project: session.project,
prompt_number: session.lastPromptNumber,
created_at_epoch: result.createdAtEpoch
});
}
// Update folder CLAUDE.md files for touched folders (fire-and-forget)
// This runs per-observation batch to ensure folders are updated as work happens
// Only runs if CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED is true (default: false)
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
// Handle both string 'true' and boolean true from JSON settings
const settingValue = settings.CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED;
const folderClaudeMdEnabled = settingValue === 'true' || settingValue === true;
if (folderClaudeMdEnabled) {
const allFilePaths: string[] = [];
for (const obs of observations) {
allFilePaths.push(...(obs.files_modified || []));
allFilePaths.push(...(obs.files_read || []));
}
if (allFilePaths.length > 0) {
updateFolderClaudeMdFiles(
allFilePaths,
session.project,
getWorkerPort(),
projectRoot
).catch(error => {
logger.warn('FOLDER_INDEX', 'CLAUDE.md update failed (non-critical)', { project: session.project }, error as Error);
});
}
}
}
/**
* Sync summary to Chroma and broadcast to SSE clients
*/
async function syncAndBroadcastSummary(
summary: ParsedSummary | null,
summaryForStore: { request: string; investigated: string; learned: string; completed: string; next_steps: string; notes: string | null } | null,
result: StorageResult,
session: ActiveSession,
dbManager: DatabaseManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
agentName: string
): Promise<void> {
if (!summaryForStore || !result.summaryId) {
return;
}
const chromaStart = Date.now();
// Sync to Chroma (fire-and-forget, skipped if Chroma is disabled)
dbManager.getChromaSync()?.syncSummary(
result.summaryId,
session.contentSessionId,
session.project,
summaryForStore,
session.lastPromptNumber,
result.createdAtEpoch,
discoveryTokens
).then(() => {
const chromaDuration = Date.now() - chromaStart;
logger.debug('CHROMA', 'Summary synced', {
summaryId: result.summaryId,
duration: `${chromaDuration}ms`,
request: summaryForStore.request || '(no request)'
});
}).catch((error) => {
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
summaryId: result.summaryId,
request: summaryForStore.request || '(no request)'
}, error);
});
// Broadcast to SSE clients (for web UI)
broadcastSummary(worker, {
id: result.summaryId,
session_id: session.contentSessionId,
platform_source: session.platformSource,
request: summaryForStore!.request,
investigated: summaryForStore!.investigated,
learned: summaryForStore!.learned,
completed: summaryForStore!.completed,
next_steps: summaryForStore!.next_steps,
notes: summaryForStore!.notes,
project: session.project,
prompt_number: session.lastPromptNumber,
created_at_epoch: result.createdAtEpoch
});
// Update Cursor context file for registered projects (fire-and-forget)
updateCursorContextForProject(session.project, getWorkerPort()).catch(error => {
logger.warn('CURSOR', 'Context update failed (non-critical)', { project: session.project }, error as Error);
});
}