/** * ResponseProcessor: Shared response processing for all agent implementations * * Responsibility: * - Parse observations and summaries from agent responses * - Execute atomic database transactions * - Orchestrate Chroma sync (fire-and-forget) * - Broadcast to SSE clients * - Clean up processed messages * * This module extracts 150+ lines of duplicate code from SDKAgent, GeminiAgent, and OpenRouterAgent. */ import { logger } from '../../../utils/logger.js'; import { parseObservations, parseSummary, type ParsedObservation, type ParsedSummary } from '../../../sdk/parser.js'; import { updateCursorContextForProject } from '../../integrations/CursorHooksInstaller.js'; import { updateFolderClaudeMdFiles } from '../../../utils/claude-md-utils.js'; import { getWorkerPort } from '../../../shared/worker-utils.js'; import { SettingsDefaultsManager } from '../../../shared/SettingsDefaultsManager.js'; import { USER_SETTINGS_PATH } from '../../../shared/paths.js'; import type { ActiveSession } from '../../worker-types.js'; import type { DatabaseManager } from '../DatabaseManager.js'; import type { SessionManager } from '../SessionManager.js'; import type { WorkerRef, StorageResult } from './types.js'; import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js'; import { cleanupProcessedMessages } from './SessionCleanupHelper.js'; /** * Process agent response text (parse XML, save to database, sync to Chroma, broadcast SSE) * * This is the unified response processor that handles: * 1. Adding response to conversation history (for provider interop) * 2. Parsing observations and summaries from XML * 3. Atomic database transaction to store observations + summary * 4. Async Chroma sync (fire-and-forget, failures are non-critical) * 5. SSE broadcast to web UI clients * 6. Session cleanup * * @param text - Response text from the agent * @param session - Active session being processed * @param dbManager - Database manager for storage operations * @param sessionManager - Session manager for message tracking * @param worker - Worker reference for SSE broadcasting (optional) * @param discoveryTokens - Token cost delta for this response * @param originalTimestamp - Original epoch when message was queued (for accurate timestamps) * @param agentName - Name of the agent for logging (e.g., 'SDK', 'Gemini', 'OpenRouter') */ export async function processAgentResponse( text: string, session: ActiveSession, dbManager: DatabaseManager, sessionManager: SessionManager, worker: WorkerRef | undefined, discoveryTokens: number, originalTimestamp: number | null, agentName: string, projectRoot?: string, modelId?: string ): Promise { // Track generator activity for stale detection (Issue #1099) session.lastGeneratorActivity = Date.now(); // Add assistant response to shared conversation history for provider interop if (text) { session.conversationHistory.push({ role: 'assistant', content: text }); } // Parse observations and summary const observations = parseObservations(text, session.contentSessionId); const summary = parseSummary(text, session.sessionDbId); if ( text.trim() && observations.length === 0 && !summary && !/|| 200 ? `${text.slice(0, 200)}...` : text; logger.warn('PARSER', `${agentName} returned non-XML response; observation content was discarded`, { sessionId: session.sessionDbId, preview }); } // Convert nullable fields to empty strings for storeSummary (if summary exists) const summaryForStore = normalizeSummaryForStorage(summary); // Fallback: When summary parse fails but observations exist, salvage a synthetic summary. // Fixes Issue #1312: AI sometimes returns instead of despite clear instructions. // Observations are stored normally; this only affects the session summary. let finalSummaryForStore = summaryForStore; if (!summaryForStore && observations.length > 0) { const primary = observations[0]; finalSummaryForStore = { request: primary.title || `Session observations (${observations.length} items)`, investigated: primary.narrative || primary.facts?.join('; ') || '', learned: primary.facts?.join('; ') || '', completed: primary.type === 'feature' || primary.type === 'bugfix' ? (primary.title || '') : '', next_steps: '', notes: `[Salvaged from ${observations.length} observation(s)] AI returned instead of ` }; logger.warn('PARSER', `SALVAGED summary from ${observations.length} observation(s) — AI did not output tags`, { sessionId: session.sessionDbId, agentName, observationIds: observations.map(o => o.title).filter(Boolean).slice(0, 3) }); } // Get session store for atomic transaction const sessionStore = dbManager.getSessionStore(); // CRITICAL: Must use memorySessionId (not contentSessionId) for FK constraint if (!session.memorySessionId) { throw new Error('Cannot store observations: memorySessionId not yet captured'); } // SAFETY NET (Issue #846 / Multi-terminal FK fix): // The PRIMARY fix is in SDKAgent.ts where ensureMemorySessionIdRegistered() is called // immediately when the SDK returns a memory_session_id. This call is a defensive safety net // in case the DB was somehow not updated (race condition, crash, etc.). // In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL // for each new generator, ensuring clean isolation. sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId); // Log pre-storage with session ID chain for verification logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!finalSummaryForStore}`, { sessionId: session.sessionDbId, memorySessionId: session.memorySessionId }); // ATOMIC TRANSACTION: Store observations + summary ONCE // Messages are already deleted from queue on claim, so no completion tracking needed const result = sessionStore.storeObservations( session.memorySessionId, session.project, observations, finalSummaryForStore, session.lastPromptNumber, discoveryTokens, originalTimestamp ?? undefined, modelId ); // Log storage result with IDs for end-to-end traceability logger.info('DB', `STORED | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${result.observationIds.length} | obsIds=[${result.observationIds.join(',')}] | summaryId=${result.summaryId || 'none'}`, { sessionId: session.sessionDbId, memorySessionId: session.memorySessionId }); // CLAIM-CONFIRM: Now that storage succeeded, confirm all processing messages (delete from queue) // This is the critical step that prevents message loss on generator crash const pendingStore = sessionManager.getPendingMessageStore(); for (const messageId of session.processingMessageIds) { pendingStore.confirmProcessed(messageId); } if (session.processingMessageIds.length > 0) { logger.debug('QUEUE', `CONFIRMED_BATCH | sessionDbId=${session.sessionDbId} | count=${session.processingMessageIds.length} | ids=[${session.processingMessageIds.join(',')}]`); } // Clear the tracking array after confirmation session.processingMessageIds = []; // AFTER transaction commits - async operations (can fail safely without data loss) await syncAndBroadcastObservations( observations, result, session, dbManager, worker, discoveryTokens, agentName, projectRoot ); // Sync and broadcast summary if present await syncAndBroadcastSummary( summary, finalSummaryForStore, result, session, dbManager, worker, discoveryTokens, agentName ); // Clean up session state cleanupProcessedMessages(session, worker); } /** * Normalize summary for storage (convert null fields to empty strings) */ function normalizeSummaryForStorage(summary: ParsedSummary | null): { request: string; investigated: string; learned: string; completed: string; next_steps: string; notes: string | null; } | null { if (!summary) return null; return { request: summary.request || '', investigated: summary.investigated || '', learned: summary.learned || '', completed: summary.completed || '', next_steps: summary.next_steps || '', notes: summary.notes }; } /** * Sync observations to Chroma and broadcast to SSE clients */ async function syncAndBroadcastObservations( observations: ParsedObservation[], result: StorageResult, session: ActiveSession, dbManager: DatabaseManager, worker: WorkerRef | undefined, discoveryTokens: number, agentName: string, projectRoot?: string ): Promise { for (let i = 0; i < observations.length; i++) { const obsId = result.observationIds[i]; const obs = observations[i]; const chromaStart = Date.now(); // Sync to Chroma (fire-and-forget, skipped if Chroma is disabled) dbManager.getChromaSync()?.syncObservation( obsId, session.contentSessionId, session.project, obs, session.lastPromptNumber, result.createdAtEpoch, discoveryTokens ).then(() => { const chromaDuration = Date.now() - chromaStart; logger.debug('CHROMA', 'Observation synced', { obsId, duration: `${chromaDuration}ms`, type: obs.type, title: obs.title || '(untitled)' }); }).catch((error) => { logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, { obsId, type: obs.type, title: obs.title || '(untitled)' }, error); }); // Broadcast to SSE clients (for web UI) // BUGFIX: Use obs.files_read and obs.files_modified (not obs.files) broadcastObservation(worker, { id: obsId, memory_session_id: session.memorySessionId, session_id: session.contentSessionId, platform_source: session.platformSource, type: obs.type, title: obs.title, subtitle: obs.subtitle, text: null, // text field is not in ParsedObservation narrative: obs.narrative || null, facts: JSON.stringify(obs.facts || []), concepts: JSON.stringify(obs.concepts || []), files_read: JSON.stringify(obs.files_read || []), files_modified: JSON.stringify(obs.files_modified || []), project: session.project, prompt_number: session.lastPromptNumber, created_at_epoch: result.createdAtEpoch }); } // Update folder CLAUDE.md files for touched folders (fire-and-forget) // This runs per-observation batch to ensure folders are updated as work happens // Only runs if CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED is true (default: false) const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH); // Handle both string 'true' and boolean true from JSON settings const settingValue = settings.CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED; const folderClaudeMdEnabled = settingValue === 'true' || settingValue === true; if (folderClaudeMdEnabled) { const allFilePaths: string[] = []; for (const obs of observations) { allFilePaths.push(...(obs.files_modified || [])); allFilePaths.push(...(obs.files_read || [])); } if (allFilePaths.length > 0) { updateFolderClaudeMdFiles( allFilePaths, session.project, getWorkerPort(), projectRoot ).catch(error => { logger.warn('FOLDER_INDEX', 'CLAUDE.md update failed (non-critical)', { project: session.project }, error as Error); }); } } } /** * Sync summary to Chroma and broadcast to SSE clients */ async function syncAndBroadcastSummary( summary: ParsedSummary | null, summaryForStore: { request: string; investigated: string; learned: string; completed: string; next_steps: string; notes: string | null } | null, result: StorageResult, session: ActiveSession, dbManager: DatabaseManager, worker: WorkerRef | undefined, discoveryTokens: number, agentName: string ): Promise { if (!summaryForStore || !result.summaryId) { return; } const chromaStart = Date.now(); // Sync to Chroma (fire-and-forget, skipped if Chroma is disabled) dbManager.getChromaSync()?.syncSummary( result.summaryId, session.contentSessionId, session.project, summaryForStore, session.lastPromptNumber, result.createdAtEpoch, discoveryTokens ).then(() => { const chromaDuration = Date.now() - chromaStart; logger.debug('CHROMA', 'Summary synced', { summaryId: result.summaryId, duration: `${chromaDuration}ms`, request: summaryForStore.request || '(no request)' }); }).catch((error) => { logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, { summaryId: result.summaryId, request: summaryForStore.request || '(no request)' }, error); }); // Broadcast to SSE clients (for web UI) broadcastSummary(worker, { id: result.summaryId, session_id: session.contentSessionId, platform_source: session.platformSource, request: summary!.request, investigated: summary!.investigated, learned: summary!.learned, completed: summary!.completed, next_steps: summary!.next_steps, notes: summary!.notes, project: session.project, prompt_number: session.lastPromptNumber, created_at_epoch: result.createdAtEpoch }); // Update Cursor context file for registered projects (fire-and-forget) updateCursorContextForProject(session.project, getWorkerPort()).catch(error => { logger.warn('CURSOR', 'Context update failed (non-critical)', { project: session.project }, error as Error); }); }