c6f932988a
* MAESTRO: fix ChromaDB core issues — Python pinning, Windows paths, disable toggle, metadata sanitization, transport errors - Add --python version pinning to uvx args in both local and remote mode (fixes #1196, #1206, #1208) - Convert backslash paths to forward slashes for --data-dir on Windows (fixes #1199) - Add CLAUDE_MEM_CHROMA_ENABLED setting for SQLite-only fallback mode (fixes #707) - Sanitize metadata in addDocuments() to filter null/undefined/empty values (fixes #1183, #1188) - Wrap callTool() in try/catch for transport errors with auto-reconnect (fixes #1162) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix data integrity — content-hash deduplication, project name collision, empty project guard, stuck isProcessing - Add SHA-256 content-hash deduplication to observations INSERT (store.ts, transactions.ts, SessionStore.ts) - Add content_hash column via migration 22 with backfill and index - Fix project name collision: getCurrentProjectName() now returns parent/basename - Guard against empty project string with cwd-derived fallback - Fix stuck isProcessing: hasAnyPendingWork() resets processing messages older than 5 minutes - Add 12 new tests covering all four fixes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix hook lifecycle — stderr suppression, output isolation, conversation pollution prevention - Suppress process.stderr.write in hookCommand() to prevent Claude Code showing diagnostic output as error UI (#1181). Restores stderr in finally block for worker-continues case. - Convert console.error() to logger.warn()/error() in hook-command.ts and handlers/index.ts so all diagnostics route to log file instead of stderr. - Verified all 7 handlers return suppressOutput: true (prevents conversation pollution #598, #784). - Verified session-complete is a recognized event type (fixes #984). - Verified unknown event types return no-op handler with exit 0 (graceful degradation). - Added 10 new tests in tests/hook-lifecycle.test.ts covering event dispatch, adapter defaults, stderr suppression, and standard response constants. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix worker lifecycle — restart loop coordination, stale transport retry, ENOENT shutdown race - Add PID file mtime guard to prevent concurrent restart storms (#1145): isPidFileRecent() + touchPidFile() coordinate across sessions - Add transparent retry in ChromaMcpManager.callTool() on transport error — reconnects and retries once instead of failing (#1131) - Wrap getInstalledPluginVersion() with ENOENT/EBUSY handling (#1042) - Verified ChromaMcpManager.stop() already called on all shutdown paths Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix Windows platform support — uvx.cmd spawn, PowerShell $_ elimination, windowsHide, FTS5 fallback - Route uvx spawn through cmd.exe /c on Windows since MCP SDK lacks shell:true (#1190, #1192, #1199) - Replace all PowerShell Where-Object {$_} pipelines with WQL -Filter server-side filtering (#1024, #1062) - Add windowsHide: true to all exec/spawn calls missing it to prevent console popups (#1048) - Add FTS5 runtime probe with graceful fallback when unavailable on Windows (#791) - Guard FTS5 table creation in migrations, SessionSearch, and SessionStore with try/catch Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix skills/ distribution — build-time verification and regression tests (#1187) Add post-build verification in build-hooks.js that fails if critical distribution files (skills, hooks, plugin manifest) are missing. Add 10 regression tests covering skill file presence, YAML frontmatter, hooks.json integrity, and package.json files field. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix MigrationRunner schema initialization (#979) — version conflict between parallel migration systems Root cause: old DatabaseManager migrations 1-7 shared schema_versions table with MigrationRunner's 4-22, causing version number collisions (5=drop tables vs add column, 6=FTS5 vs prompt tracking, 7=discovery_tokens vs remove UNIQUE). initializeSchema() was gated behind maxApplied===0, so core tables were never created when old versions were present. Fixes: - initializeSchema() always creates core tables via CREATE TABLE IF NOT EXISTS - Migrations 5-7 check actual DB state (columns/constraints) not just version tracking - Crash-safe temp table rebuilds (DROP IF EXISTS _new before CREATE) - Added missing migration 21 (ON UPDATE CASCADE) to MigrationRunner - Added ON UPDATE CASCADE to FK definitions in initializeSchema() - All changes applied to both runner.ts and SessionStore.ts Tests: 13 new tests in migration-runner.test.ts covering fresh DB, idempotency, version conflicts, crash recovery, FK constraints, and data integrity. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix 21 test failures — stale mocks, outdated assertions, missing OpenClaw guards Server tests (12): Added missing workerPath and getAiStatus to ServerOptions mocks after interface expansion. ChromaSync tests (3): Updated to verify transport cleanup in ChromaMcpManager after architecture refactor. OpenClaw (2): Added memory_ tool skipping and response truncation to prevent recursive loops and oversized payloads. MarkdownFormatter (2): Updated assertions to match current output. SettingsDefaultsManager (1): Used correct default key for getBool test. Logger standards (1): Excluded CLI transcript command from background service check. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix Codex CLI compatibility (#744) — session_id fallbacks, unknown platform tolerance, undefined guard Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix Cursor IDE integration (#838, #1049) — adapter field fallbacks, tolerant session-init validation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix /api/logs OOM (#1203) — tail-read replaces full-file readFileSync Replace readFileSync (loads entire file into memory) with readLastLines() that reads only from the end of the file in expanding chunks (64KB → 10MB cap). Prevents OOM on large log files while preserving the same API response shape. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix Settings CORS error (#1029) — explicit methods and allowedHeaders in CORS config Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: add session custom_title for agent attribution (#1213) — migration 23, endpoint + store support Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: prevent CLAUDE.md/AGENTS.md writes inside .git/ directories (#1165) Add .git path guard to all 4 write sites to prevent ref corruption when paths resolve inside .git internals. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix plugin disabled state not respected (#781) — early exit check in all hook entry points Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix UserPromptSubmit context re-injection on every turn (#1079) — contextInjected session flag Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * MAESTRO: fix stale AbortController queue stall (#1099) — lastGeneratorActivity tracking + 30s timeout Three-layer fix: 1. Added lastGeneratorActivity timestamp to ActiveSession, updated by processAgentResponse (all agents), getMessageIterator (queue yields), and startGeneratorWithProvider (generator launch) 2. Added stale generator detection in ensureGeneratorRunning — if no activity for >30s, aborts stale controller, resets state, restarts 3. Added AbortSignal.timeout(30000) in deleteSession to prevent indefinite hang when awaiting a stuck generator promise Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
331 lines
12 KiB
TypeScript
331 lines
12 KiB
TypeScript
/**
|
|
* ResponseProcessor: Shared response processing for all agent implementations
|
|
*
|
|
* Responsibility:
|
|
* - Parse observations and summaries from agent responses
|
|
* - Execute atomic database transactions
|
|
* - Orchestrate Chroma sync (fire-and-forget)
|
|
* - Broadcast to SSE clients
|
|
* - Clean up processed messages
|
|
*
|
|
* This module extracts 150+ lines of duplicate code from SDKAgent, GeminiAgent, and OpenRouterAgent.
|
|
*/
|
|
|
|
import { logger } from '../../../utils/logger.js';
|
|
import { parseObservations, parseSummary, type ParsedObservation, type ParsedSummary } from '../../../sdk/parser.js';
|
|
import { updateCursorContextForProject } from '../../integrations/CursorHooksInstaller.js';
|
|
import { updateFolderClaudeMdFiles } from '../../../utils/claude-md-utils.js';
|
|
import { getWorkerPort } from '../../../shared/worker-utils.js';
|
|
import { SettingsDefaultsManager } from '../../../shared/SettingsDefaultsManager.js';
|
|
import { USER_SETTINGS_PATH } from '../../../shared/paths.js';
|
|
import type { ActiveSession } from '../../worker-types.js';
|
|
import type { DatabaseManager } from '../DatabaseManager.js';
|
|
import type { SessionManager } from '../SessionManager.js';
|
|
import type { WorkerRef, StorageResult } from './types.js';
|
|
import { broadcastObservation, broadcastSummary } from './ObservationBroadcaster.js';
|
|
import { cleanupProcessedMessages } from './SessionCleanupHelper.js';
|
|
|
|
/**
|
|
* Process agent response text (parse XML, save to database, sync to Chroma, broadcast SSE)
|
|
*
|
|
* This is the unified response processor that handles:
|
|
* 1. Adding response to conversation history (for provider interop)
|
|
* 2. Parsing observations and summaries from XML
|
|
* 3. Atomic database transaction to store observations + summary
|
|
* 4. Async Chroma sync (fire-and-forget, failures are non-critical)
|
|
* 5. SSE broadcast to web UI clients
|
|
* 6. Session cleanup
|
|
*
|
|
* @param text - Response text from the agent
|
|
* @param session - Active session being processed
|
|
* @param dbManager - Database manager for storage operations
|
|
* @param sessionManager - Session manager for message tracking
|
|
* @param worker - Worker reference for SSE broadcasting (optional)
|
|
* @param discoveryTokens - Token cost delta for this response
|
|
* @param originalTimestamp - Original epoch when message was queued (for accurate timestamps)
|
|
* @param agentName - Name of the agent for logging (e.g., 'SDK', 'Gemini', 'OpenRouter')
|
|
*/
|
|
export async function processAgentResponse(
|
|
text: string,
|
|
session: ActiveSession,
|
|
dbManager: DatabaseManager,
|
|
sessionManager: SessionManager,
|
|
worker: WorkerRef | undefined,
|
|
discoveryTokens: number,
|
|
originalTimestamp: number | null,
|
|
agentName: string,
|
|
projectRoot?: string
|
|
): Promise<void> {
|
|
// Track generator activity for stale detection (Issue #1099)
|
|
session.lastGeneratorActivity = Date.now();
|
|
|
|
// Add assistant response to shared conversation history for provider interop
|
|
if (text) {
|
|
session.conversationHistory.push({ role: 'assistant', content: text });
|
|
}
|
|
|
|
// Parse observations and summary
|
|
const observations = parseObservations(text, session.contentSessionId);
|
|
const summary = parseSummary(text, session.sessionDbId);
|
|
|
|
// Convert nullable fields to empty strings for storeSummary (if summary exists)
|
|
const summaryForStore = normalizeSummaryForStorage(summary);
|
|
|
|
// Get session store for atomic transaction
|
|
const sessionStore = dbManager.getSessionStore();
|
|
|
|
// CRITICAL: Must use memorySessionId (not contentSessionId) for FK constraint
|
|
if (!session.memorySessionId) {
|
|
throw new Error('Cannot store observations: memorySessionId not yet captured');
|
|
}
|
|
|
|
// SAFETY NET (Issue #846 / Multi-terminal FK fix):
|
|
// The PRIMARY fix is in SDKAgent.ts where ensureMemorySessionIdRegistered() is called
|
|
// immediately when the SDK returns a memory_session_id. This call is a defensive safety net
|
|
// in case the DB was somehow not updated (race condition, crash, etc.).
|
|
// In multi-terminal scenarios, createSDKSession() now resets memory_session_id to NULL
|
|
// for each new generator, ensuring clean isolation.
|
|
sessionStore.ensureMemorySessionIdRegistered(session.sessionDbId, session.memorySessionId);
|
|
|
|
// Log pre-storage with session ID chain for verification
|
|
logger.info('DB', `STORING | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${observations.length} | hasSummary=${!!summaryForStore}`, {
|
|
sessionId: session.sessionDbId,
|
|
memorySessionId: session.memorySessionId
|
|
});
|
|
|
|
// ATOMIC TRANSACTION: Store observations + summary ONCE
|
|
// Messages are already deleted from queue on claim, so no completion tracking needed
|
|
const result = sessionStore.storeObservations(
|
|
session.memorySessionId,
|
|
session.project,
|
|
observations,
|
|
summaryForStore,
|
|
session.lastPromptNumber,
|
|
discoveryTokens,
|
|
originalTimestamp ?? undefined
|
|
);
|
|
|
|
// Log storage result with IDs for end-to-end traceability
|
|
logger.info('DB', `STORED | sessionDbId=${session.sessionDbId} | memorySessionId=${session.memorySessionId} | obsCount=${result.observationIds.length} | obsIds=[${result.observationIds.join(',')}] | summaryId=${result.summaryId || 'none'}`, {
|
|
sessionId: session.sessionDbId,
|
|
memorySessionId: session.memorySessionId
|
|
});
|
|
|
|
// CLAIM-CONFIRM: Now that storage succeeded, confirm all processing messages (delete from queue)
|
|
// This is the critical step that prevents message loss on generator crash
|
|
const pendingStore = sessionManager.getPendingMessageStore();
|
|
for (const messageId of session.processingMessageIds) {
|
|
pendingStore.confirmProcessed(messageId);
|
|
}
|
|
if (session.processingMessageIds.length > 0) {
|
|
logger.debug('QUEUE', `CONFIRMED_BATCH | sessionDbId=${session.sessionDbId} | count=${session.processingMessageIds.length} | ids=[${session.processingMessageIds.join(',')}]`);
|
|
}
|
|
// Clear the tracking array after confirmation
|
|
session.processingMessageIds = [];
|
|
|
|
// AFTER transaction commits - async operations (can fail safely without data loss)
|
|
await syncAndBroadcastObservations(
|
|
observations,
|
|
result,
|
|
session,
|
|
dbManager,
|
|
worker,
|
|
discoveryTokens,
|
|
agentName,
|
|
projectRoot
|
|
);
|
|
|
|
// Sync and broadcast summary if present
|
|
await syncAndBroadcastSummary(
|
|
summary,
|
|
summaryForStore,
|
|
result,
|
|
session,
|
|
dbManager,
|
|
worker,
|
|
discoveryTokens,
|
|
agentName
|
|
);
|
|
|
|
// Clean up session state
|
|
cleanupProcessedMessages(session, worker);
|
|
}
|
|
|
|
/**
|
|
* Normalize summary for storage (convert null fields to empty strings)
|
|
*/
|
|
function normalizeSummaryForStorage(summary: ParsedSummary | null): {
|
|
request: string;
|
|
investigated: string;
|
|
learned: string;
|
|
completed: string;
|
|
next_steps: string;
|
|
notes: string | null;
|
|
} | null {
|
|
if (!summary) return null;
|
|
|
|
return {
|
|
request: summary.request || '',
|
|
investigated: summary.investigated || '',
|
|
learned: summary.learned || '',
|
|
completed: summary.completed || '',
|
|
next_steps: summary.next_steps || '',
|
|
notes: summary.notes
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Sync observations to Chroma and broadcast to SSE clients
|
|
*/
|
|
async function syncAndBroadcastObservations(
|
|
observations: ParsedObservation[],
|
|
result: StorageResult,
|
|
session: ActiveSession,
|
|
dbManager: DatabaseManager,
|
|
worker: WorkerRef | undefined,
|
|
discoveryTokens: number,
|
|
agentName: string,
|
|
projectRoot?: string
|
|
): Promise<void> {
|
|
for (let i = 0; i < observations.length; i++) {
|
|
const obsId = result.observationIds[i];
|
|
const obs = observations[i];
|
|
const chromaStart = Date.now();
|
|
|
|
// Sync to Chroma (fire-and-forget, skipped if Chroma is disabled)
|
|
dbManager.getChromaSync()?.syncObservation(
|
|
obsId,
|
|
session.contentSessionId,
|
|
session.project,
|
|
obs,
|
|
session.lastPromptNumber,
|
|
result.createdAtEpoch,
|
|
discoveryTokens
|
|
).then(() => {
|
|
const chromaDuration = Date.now() - chromaStart;
|
|
logger.debug('CHROMA', 'Observation synced', {
|
|
obsId,
|
|
duration: `${chromaDuration}ms`,
|
|
type: obs.type,
|
|
title: obs.title || '(untitled)'
|
|
});
|
|
}).catch((error) => {
|
|
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
|
|
obsId,
|
|
type: obs.type,
|
|
title: obs.title || '(untitled)'
|
|
}, error);
|
|
});
|
|
|
|
// Broadcast to SSE clients (for web UI)
|
|
// BUGFIX: Use obs.files_read and obs.files_modified (not obs.files)
|
|
broadcastObservation(worker, {
|
|
id: obsId,
|
|
memory_session_id: session.memorySessionId,
|
|
session_id: session.contentSessionId,
|
|
type: obs.type,
|
|
title: obs.title,
|
|
subtitle: obs.subtitle,
|
|
text: null, // text field is not in ParsedObservation
|
|
narrative: obs.narrative || null,
|
|
facts: JSON.stringify(obs.facts || []),
|
|
concepts: JSON.stringify(obs.concepts || []),
|
|
files_read: JSON.stringify(obs.files_read || []),
|
|
files_modified: JSON.stringify(obs.files_modified || []),
|
|
project: session.project,
|
|
prompt_number: session.lastPromptNumber,
|
|
created_at_epoch: result.createdAtEpoch
|
|
});
|
|
}
|
|
|
|
// Update folder CLAUDE.md files for touched folders (fire-and-forget)
|
|
// This runs per-observation batch to ensure folders are updated as work happens
|
|
// Only runs if CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED is true (default: false)
|
|
const settings = SettingsDefaultsManager.loadFromFile(USER_SETTINGS_PATH);
|
|
// Handle both string 'true' and boolean true from JSON settings
|
|
const settingValue = settings.CLAUDE_MEM_FOLDER_CLAUDEMD_ENABLED;
|
|
const folderClaudeMdEnabled = settingValue === 'true' || settingValue === true;
|
|
|
|
if (folderClaudeMdEnabled) {
|
|
const allFilePaths: string[] = [];
|
|
for (const obs of observations) {
|
|
allFilePaths.push(...(obs.files_modified || []));
|
|
allFilePaths.push(...(obs.files_read || []));
|
|
}
|
|
|
|
if (allFilePaths.length > 0) {
|
|
updateFolderClaudeMdFiles(
|
|
allFilePaths,
|
|
session.project,
|
|
getWorkerPort(),
|
|
projectRoot
|
|
).catch(error => {
|
|
logger.warn('FOLDER_INDEX', 'CLAUDE.md update failed (non-critical)', { project: session.project }, error as Error);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Sync summary to Chroma and broadcast to SSE clients
|
|
*/
|
|
async function syncAndBroadcastSummary(
|
|
summary: ParsedSummary | null,
|
|
summaryForStore: { request: string; investigated: string; learned: string; completed: string; next_steps: string; notes: string | null } | null,
|
|
result: StorageResult,
|
|
session: ActiveSession,
|
|
dbManager: DatabaseManager,
|
|
worker: WorkerRef | undefined,
|
|
discoveryTokens: number,
|
|
agentName: string
|
|
): Promise<void> {
|
|
if (!summaryForStore || !result.summaryId) {
|
|
return;
|
|
}
|
|
|
|
const chromaStart = Date.now();
|
|
|
|
// Sync to Chroma (fire-and-forget, skipped if Chroma is disabled)
|
|
dbManager.getChromaSync()?.syncSummary(
|
|
result.summaryId,
|
|
session.contentSessionId,
|
|
session.project,
|
|
summaryForStore,
|
|
session.lastPromptNumber,
|
|
result.createdAtEpoch,
|
|
discoveryTokens
|
|
).then(() => {
|
|
const chromaDuration = Date.now() - chromaStart;
|
|
logger.debug('CHROMA', 'Summary synced', {
|
|
summaryId: result.summaryId,
|
|
duration: `${chromaDuration}ms`,
|
|
request: summaryForStore.request || '(no request)'
|
|
});
|
|
}).catch((error) => {
|
|
logger.error('CHROMA', `${agentName} chroma sync failed, continuing without vector search`, {
|
|
summaryId: result.summaryId,
|
|
request: summaryForStore.request || '(no request)'
|
|
}, error);
|
|
});
|
|
|
|
// Broadcast to SSE clients (for web UI)
|
|
broadcastSummary(worker, {
|
|
id: result.summaryId,
|
|
session_id: session.contentSessionId,
|
|
request: summary!.request,
|
|
investigated: summary!.investigated,
|
|
learned: summary!.learned,
|
|
completed: summary!.completed,
|
|
next_steps: summary!.next_steps,
|
|
notes: summary!.notes,
|
|
project: session.project,
|
|
prompt_number: session.lastPromptNumber,
|
|
created_at_epoch: result.createdAtEpoch
|
|
});
|
|
|
|
// Update Cursor context file for registered projects (fire-and-forget)
|
|
updateCursorContextForProject(session.project, getWorkerPort()).catch(error => {
|
|
logger.warn('CURSOR', 'Context update failed (non-critical)', { project: session.project }, error as Error);
|
|
});
|
|
}
|