Files
claude-mem/src/services/worker/agents/ResponseProcessor.ts
T
Alex Newman 05323c9db5 Cleanup worker-service.ts: remove dead code and fallback concept (#706)
* refactor(worker): remove dead code from worker-service.ts

Remove ~216 lines of unreachable code:
- Delete `runInteractiveSetup` function (defined but never called)
- Remove unused imports: fs namespace, spawn, homedir, readline,
  existsSync/writeFileSync/readFileSync/mkdirSync
- Clean up CursorHooksInstaller imports (keep only used exports)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(worker): only enable SDK fallback when Claude is configured

Add isConfigured() method to SDKAgent that checks for ANTHROPIC_API_KEY
or claude CLI availability. Worker now only sets SDK agent as fallback
for third-party providers when credentials exist, preventing cascading
failures for users who intentionally use Gemini/OpenRouter without Claude.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(worker): remove misleading re-export indirection

Remove unnecessary re-export of updateCursorContextForProject from
worker-service.ts. ResponseProcessor now imports directly from
CursorHooksInstaller.ts where the function is defined. This eliminates
misleading indirection that suggested a circular dependency existed.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(mcp): use build-time injected version instead of hardcoded strings

Replace hardcoded '1.0.0' version strings with __DEFAULT_PACKAGE_VERSION__
constant that esbuild replaces at build time. This ensures MCP server and
client versions stay synchronized with package.json.

- worker-service.ts: MCP client version now uses packageVersion
- ChromaSync.ts: MCP client version now uses packageVersion
- mcp-server.ts: MCP server version now uses packageVersion
- Added clarifying comments for empty MCP capabilities objects

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* feat: Implement cleanup and validation plans for worker-service.ts

- Added a comprehensive cleanup plan addressing 23 identified issues in worker-service.ts, focusing on safe deletions, low-risk simplifications, and medium-risk improvements.
- Created an execution plan for validating intentional patterns in worker-service.ts, detailing necessary actions and priorities.
- Generated a report on unjustified logic in worker-service.ts, categorizing issues by severity and providing recommendations for immediate and short-term actions.
- Introduced documentation for recent activity in the mem-search plugin, enhancing traceability and context for changes.

* fix(sdk): remove dangerous ANTHROPIC_API_KEY check from isConfigured

Claude Code uses CLI authentication, not direct API calls. Checking for
ANTHROPIC_API_KEY could accidentally use a user's API key (from other
projects) which costs 20x more than Claude Code's pricing.

Now only checks for claude CLI availability.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(worker): remove fallback agent concept entirely

Users who choose Gemini/OpenRouter want those providers, not secret
fallback behavior. Removed setFallbackAgent calls and the unused
isConfigured() method.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-13 23:30:13 -05:00

298 lines
10 KiB
TypeScript

/**
* ResponseProcessor: Shared response processing for all agent implementations
*
* Responsibility:
* - Parse observations and summaries from agent responses
* - Execute atomic database transactions
* - Orchestrate Chroma sync (fire-and-forget)
* - Broadcast to SSE clients
* - Clean up processed messages
*
* This module extracts 150+ lines of duplicate code from SDKAgent, GeminiAgent, and OpenRouterAgent.
*/
import { logger } from '../../../utils/logger.js';
import { parseObservations, parseSummary, type ParsedObservation, type ParsedSummary } from '../../../sdk/parser.js';
import { updateCursorContextForProject } from '../../integrations/CursorHooksInstaller.js';
import { updateFolderClaudeMdFiles } from '../../../utils/claude-md-utils.js';
import { getWorkerPort } from '../../../shared/worker-utils.js';
import type { ActiveSession } from '../../worker-types.js';
import type { DatabaseManager } from '../DatabaseManager.js';
import type { SessionManager } from '../SessionManager.js';
import type { WorkerRef, StorageResult } from './types.js';
import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js';
import { cleanupProcessedMessages } from './SessionCleanupHelper.js';
/**
* Process agent response text (parse XML, save to database, sync to Chroma, broadcast SSE)
*
* This is the unified response processor that handles:
* 1. Adding response to conversation history (for provider interop)
* 2. Parsing observations and summaries from XML
* 3. Atomic database transaction to store observations + summary
* 4. Async Chroma sync (fire-and-forget, failures are non-critical)
* 5. SSE broadcast to web UI clients
* 6. Session cleanup
*
* @param text - Response text from the agent
* @param session - Active session being processed
* @param dbManager - Database manager for storage operations
* @param sessionManager - Session manager for message tracking
* @param worker - Worker reference for SSE broadcasting (optional)
* @param discoveryTokens - Token cost delta for this response
* @param originalTimestamp - Original epoch when message was queued (for accurate timestamps)
* @param agentName - Name of the agent for logging (e.g., 'SDK', 'Gemini', 'OpenRouter')
*/
export async function processAgentResponse(
text: string,
session: ActiveSession,
dbManager: DatabaseManager,
sessionManager: SessionManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
originalTimestamp: number | null,
agentName: string,
projectRoot?: string
): Promise<void> {
// Add assistant response to shared conversation history for provider interop
if (text) {
session.conversationHistory.push({ role: 'assistant', content: text });
}
// Parse observations and summary
const observations = parseObservations(text, session.contentSessionId);
const summary = parseSummary(text, session.sessionDbId);
// Convert nullable fields to empty strings for storeSummary (if summary exists)
const summaryForStore = normalizeSummaryForStorage(summary);
// Get session store for atomic transaction
const sessionStore = dbManager.getSessionStore();
// CRITICAL: Must use memorySessionId (not contentSessionId) for FK constraint
if (!session.memorySessionId) {
throw new Error('Cannot store observations: memorySessionId not yet captured');
}
// Log pre-storage with session ID chain for verification
logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId
});
// ATOMIC TRANSACTION: Store observations + summary ONCE
// Messages are already deleted from queue on claim, so no completion tracking needed
const result = sessionStore.storeObservations(
session.memorySessionId,
session.project,
observations,
summaryForStore,
session.lastPromptNumber,
discoveryTokens,
originalTimestamp ?? undefined
);
// Log storage result with IDs for end-to-end traceability
logger.info('DB', `STORED | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${result.observationIds.length} | obsIds=[${result.observationIds.join(',')}] | summaryId=${result.summaryId || 'none'}`, {
sessionId: session.sessionDbId,
memorySessionId: session.memorySessionId
});
// AFTER transaction commits - async operations (can fail safely without data loss)
await syncAndBroadcastObservations(
observations,
result,
session,
dbManager,
worker,
discoveryTokens,
agentName,
projectRoot
);
// Sync and broadcast summary if present
await syncAndBroadcastSummary(
summary,
summaryForStore,
result,
session,
dbManager,
worker,
discoveryTokens,
agentName
);
// Clean up session state
cleanupProcessedMessages(session, worker);
}
/**
* Normalize summary for storage (convert null fields to empty strings)
*/
function normalizeSummaryForStorage(summary: ParsedSummary | null): {
request: string;
investigated: string;
learned: string;
completed: string;
next_steps: string;
notes: string | null;
} | null {
if (!summary) return null;
return {
request: summary.request || '',
investigated: summary.investigated || '',
learned: summary.learned || '',
completed: summary.completed || '',
next_steps: summary.next_steps || '',
notes: summary.notes
};
}
/**
* Sync observations to Chroma and broadcast to SSE clients
*/
async function syncAndBroadcastObservations(
observations: ParsedObservation[],
result: StorageResult,
session: ActiveSession,
dbManager: DatabaseManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
agentName: string,
projectRoot?: string
): Promise<void> {
for (let i = 0; i < observations.length; i++) {
const obsId = result.observationIds[i];
const obs = observations[i];
const chromaStart = Date.now();
// Sync to Chroma (fire-and-forget)
dbManager.getChromaSync().syncObservation(
obsId,
session.contentSessionId,
session.project,
obs,
session.lastPromptNumber,
result.createdAtEpoch,
discoveryTokens
).then(() => {
const chromaDuration = Date.now() - chromaStart;
logger.debug('CHROMA', 'Observation synced', {
obsId,
duration: `${chromaDuration}ms`,
type: obs.type,
title: obs.title || '(untitled)'
});
}).catch((error) => {
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
obsId,
type: obs.type,
title: obs.title || '(untitled)'
}, error);
});
// Broadcast to SSE clients (for web UI)
// BUGFIX: Use obs.files_read and obs.files_modified (not obs.files)
broadcastObservation(worker, {
id: obsId,
memory_session_id: session.memorySessionId,
session_id: session.contentSessionId,
type: obs.type,
title: obs.title,
subtitle: obs.subtitle,
text: null, // text field is not in ParsedObservation
narrative: obs.narrative || null,
facts: JSON.stringify(obs.facts || []),
concepts: JSON.stringify(obs.concepts || []),
files_read: JSON.stringify(obs.files_read || []),
files_modified: JSON.stringify(obs.files_modified || []),
project: session.project,
prompt_number: session.lastPromptNumber,
created_at_epoch: result.createdAtEpoch
});
}
// Update folder CLAUDE.md files for touched folders (fire-and-forget)
// This runs per-observation batch to ensure folders are updated as work happens
const allFilePaths: string[] = [];
for (const obs of observations) {
allFilePaths.push(...(obs.files_modified || []));
allFilePaths.push(...(obs.files_read || []));
}
if (allFilePaths.length > 0) {
updateFolderClaudeMdFiles(
allFilePaths,
session.project,
getWorkerPort(),
projectRoot
).catch(error => {
logger.warn('FOLDER_INDEX', 'CLAUDE.md update failed (non-critical)', { project: session.project }, error as Error);
});
}
}
/**
* Sync summary to Chroma and broadcast to SSE clients
*/
async function syncAndBroadcastSummary(
summary: ParsedSummary | null,
summaryForStore: { request: string; investigated: string; learned: string; completed: string; next_steps: string; notes: string | null } | null,
result: StorageResult,
session: ActiveSession,
dbManager: DatabaseManager,
worker: WorkerRef | undefined,
discoveryTokens: number,
agentName: string
): Promise<void> {
if (!summaryForStore || !result.summaryId) {
return;
}
const chromaStart = Date.now();
// Sync to Chroma (fire-and-forget)
dbManager.getChromaSync().syncSummary(
result.summaryId,
session.contentSessionId,
session.project,
summaryForStore,
session.lastPromptNumber,
result.createdAtEpoch,
discoveryTokens
).then(() => {
const chromaDuration = Date.now() - chromaStart;
logger.debug('CHROMA', 'Summary synced', {
summaryId: result.summaryId,
duration: `${chromaDuration}ms`,
request: summaryForStore.request || '(no request)'
});
}).catch((error) => {
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
summaryId: result.summaryId,
request: summaryForStore.request || '(no request)'
}, error);
});
// Broadcast to SSE clients (for web UI)
broadcastSummary(worker, {
id: result.summaryId,
session_id: session.contentSessionId,
request: summary!.request,
investigated: summary!.investigated,
learned: summary!.learned,
completed: summary!.completed,
next_steps: summary!.next_steps,
notes: summary!.notes,
project: session.project,
prompt_number: session.lastPromptNumber,
created_at_epoch: result.createdAtEpoch
});
// Update Cursor context file for registered projects (fire-and-forget)
updateCursorContextForProject(session.project, getWorkerPort()).catch(error => {
logger.warn('CURSOR', 'Context update failed (non-critical)', { project: session.project }, error as Error);
});
}