fix: address PR review feedback for subprocess cleanup and binary resolution

Wrap SDK query loop in try/finally so subprocess cleanup runs on error paths.
Swap Chroma binary check order to try project-level .bin first (common case).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-02-15 23:24:00 -05:00
parent 67ba17cc8a
commit 055888e181
3 changed files with 183 additions and 179 deletions
+127 -124
View File
@@ -141,143 +141,146 @@ export class SDKAgent {
}
});
// Process SDK messages
for await (const message of queryResult) {
// Capture or update memory session ID from SDK message
// IMPORTANT: The SDK may return a DIFFERENT session_id on resume than what we sent!
// We must always sync the DB to match what the SDK actually uses.
//
// MULTI-TERMINAL COLLISION FIX (FK constraint bug):
// Use ensureMemorySessionIdRegistered() instead of updateMemorySessionId() because:
// 1. It's idempotent - safe to call multiple times
// 2. It verifies the update happened (SELECT before UPDATE)
// 3. Consistent with ResponseProcessor's usage pattern
// This ensures FK constraint compliance BEFORE any observations are stored.
if (message.session_id && message.session_id !== session.memorySessionId) {
const previousId = session.memorySessionId;
session.memorySessionId = message.session_id;
// Persist to database IMMEDIATELY for FK constraint compliance
// This must happen BEFORE any observations referencing this ID are stored
this.dbManager.getSessionStore().ensureMemorySessionIdRegistered(
session.sessionDbId,
message.session_id
);
// Verify the update by reading back from DB
const verification = this.dbManager.getSessionStore().getSessionById(session.sessionDbId);
const dbVerified = verification?.memory_session_id === message.session_id;
const logMessage = previousId
? `MEMORY_ID_CHANGED | sessionDbId=${session.sessionDbId} | from=${previousId} | to=${message.session_id} | dbVerified=${dbVerified}`
: `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`;
logger.info('SESSION', logMessage, {
sessionId: session.sessionDbId,
memorySessionId: message.session_id,
previousId
});
if (!dbVerified) {
logger.error('SESSION', `MEMORY_ID_MISMATCH | sessionDbId=${session.sessionDbId} | expected=${message.session_id} | got=${verification?.memory_session_id}`, {
sessionId: session.sessionDbId
// Process SDK messages — cleanup in finally ensures subprocess termination
// even if the loop throws (e.g., context overflow, invalid API key)
try {
for await (const message of queryResult) {
// Capture or update memory session ID from SDK message
// IMPORTANT: The SDK may return a DIFFERENT session_id on resume than what we sent!
// We must always sync the DB to match what the SDK actually uses.
//
// MULTI-TERMINAL COLLISION FIX (FK constraint bug):
// Use ensureMemorySessionIdRegistered() instead of updateMemorySessionId() because:
// 1. It's idempotent - safe to call multiple times
// 2. It verifies the update happened (SELECT before UPDATE)
// 3. Consistent with ResponseProcessor's usage pattern
// This ensures FK constraint compliance BEFORE any observations are stored.
if (message.session_id && message.session_id !== session.memorySessionId) {
const previousId = session.memorySessionId;
session.memorySessionId = message.session_id;
// Persist to database IMMEDIATELY for FK constraint compliance
// This must happen BEFORE any observations referencing this ID are stored
this.dbManager.getSessionStore().ensureMemorySessionIdRegistered(
session.sessionDbId,
message.session_id
);
// Verify the update by reading back from DB
const verification = this.dbManager.getSessionStore().getSessionById(session.sessionDbId);
const dbVerified = verification?.memory_session_id === message.session_id;
const logMessage = previousId
? `MEMORY_ID_CHANGED | sessionDbId=${session.sessionDbId} | from=${previousId} | to=${message.session_id} | dbVerified=${dbVerified}`
: `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`;
logger.info('SESSION', logMessage, {
sessionId: session.sessionDbId,
memorySessionId: message.session_id,
previousId
});
}
// Debug-level alignment log for detailed tracing
logger.debug('SDK', `[ALIGNMENT] ${previousId ? 'Updated' : 'Captured'} | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
}
// Handle assistant messages
if (message.type === 'assistant') {
const content = message.message.content;
const textContent = Array.isArray(content)
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
: typeof content === 'string' ? content : '';
// Check for context overflow - prevents infinite retry loops
if (textContent.includes('prompt is too long') ||
textContent.includes('context window')) {
logger.error('SDK', 'Context overflow detected - terminating session');
session.abortController.abort();
return;
if (!dbVerified) {
logger.error('SESSION', `MEMORY_ID_MISMATCH | sessionDbId=${session.sessionDbId} | expected=${message.session_id} | got=${verification?.memory_session_id}`, {
sessionId: session.sessionDbId
});
}
// Debug-level alignment log for detailed tracing
logger.debug('SDK', `[ALIGNMENT] ${previousId ? 'Updated' : 'Captured'} | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
}
const responseSize = textContent.length;
// Handle assistant messages
if (message.type === 'assistant') {
const content = message.message.content;
const textContent = Array.isArray(content)
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
: typeof content === 'string' ? content : '';
// Capture token state BEFORE updating (for delta calculation)
const tokensBeforeResponse = session.cumulativeInputTokens + session.cumulativeOutputTokens;
// Extract and track token usage
const usage = message.message.usage;
if (usage) {
session.cumulativeInputTokens += usage.input_tokens || 0;
session.cumulativeOutputTokens += usage.output_tokens || 0;
// Cache creation counts as discovery, cache read doesn't
if (usage.cache_creation_input_tokens) {
session.cumulativeInputTokens += usage.cache_creation_input_tokens;
// Check for context overflow - prevents infinite retry loops
if (textContent.includes('prompt is too long') ||
textContent.includes('context window')) {
logger.error('SDK', 'Context overflow detected - terminating session');
session.abortController.abort();
return;
}
logger.debug('SDK', 'Token usage captured', {
sessionId: session.sessionDbId,
inputTokens: usage.input_tokens,
outputTokens: usage.output_tokens,
cacheCreation: usage.cache_creation_input_tokens || 0,
cacheRead: usage.cache_read_input_tokens || 0,
cumulativeInput: session.cumulativeInputTokens,
cumulativeOutput: session.cumulativeOutputTokens
});
const responseSize = textContent.length;
// Capture token state BEFORE updating (for delta calculation)
const tokensBeforeResponse = session.cumulativeInputTokens + session.cumulativeOutputTokens;
// Extract and track token usage
const usage = message.message.usage;
if (usage) {
session.cumulativeInputTokens += usage.input_tokens || 0;
session.cumulativeOutputTokens += usage.output_tokens || 0;
// Cache creation counts as discovery, cache read doesn't
if (usage.cache_creation_input_tokens) {
session.cumulativeInputTokens += usage.cache_creation_input_tokens;
}
logger.debug('SDK', 'Token usage captured', {
sessionId: session.sessionDbId,
inputTokens: usage.input_tokens,
outputTokens: usage.output_tokens,
cacheCreation: usage.cache_creation_input_tokens || 0,
cacheRead: usage.cache_read_input_tokens || 0,
cumulativeInput: session.cumulativeInputTokens,
cumulativeOutput: session.cumulativeOutputTokens
});
}
// Calculate discovery tokens (delta for this response only)
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
// Process response (empty or not) and mark messages as processed
// Capture earliest timestamp BEFORE processing (will be cleared after)
const originalTimestamp = session.earliestPendingTimestamp;
if (responseSize > 0) {
const truncatedResponse = responseSize > 100
? textContent.substring(0, 100) + '...'
: textContent;
logger.dataOut('SDK', `Response received (${responseSize} chars)`, {
sessionId: session.sessionDbId,
promptNumber: session.lastPromptNumber
}, truncatedResponse);
}
// Detect fatal context overflow and terminate gracefully (issue #870)
if (typeof textContent === 'string' && textContent.includes('Prompt is too long')) {
throw new Error('Claude session context overflow: prompt is too long');
}
// Detect invalid API key — SDK returns this as response text, not an error.
// Throw so it surfaces in health endpoint and prevents silent failures.
if (typeof textContent === 'string' && textContent.includes('Invalid API key')) {
throw new Error('Invalid API key: check your API key configuration in ~/.claude-mem/settings.json or ~/.claude-mem/.env');
}
// Parse and process response using shared ResponseProcessor
await processAgentResponse(
textContent,
session,
this.dbManager,
this.sessionManager,
worker,
discoveryTokens,
originalTimestamp,
'SDK',
cwdTracker.lastCwd
);
}
// Calculate discovery tokens (delta for this response only)
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
// Process response (empty or not) and mark messages as processed
// Capture earliest timestamp BEFORE processing (will be cleared after)
const originalTimestamp = session.earliestPendingTimestamp;
if (responseSize > 0) {
const truncatedResponse = responseSize > 100
? textContent.substring(0, 100) + '...'
: textContent;
logger.dataOut('SDK', `Response received (${responseSize} chars)`, {
sessionId: session.sessionDbId,
promptNumber: session.lastPromptNumber
}, truncatedResponse);
// Log result messages
if (message.type === 'result' && message.subtype === 'success') {
// Usage telemetry is captured at SDK level
}
// Detect fatal context overflow and terminate gracefully (issue #870)
if (typeof textContent === 'string' && textContent.includes('Prompt is too long')) {
throw new Error('Claude session context overflow: prompt is too long');
}
// Detect invalid API key — SDK returns this as response text, not an error.
// Throw so it surfaces in health endpoint and prevents silent failures.
if (typeof textContent === 'string' && textContent.includes('Invalid API key')) {
throw new Error('Invalid API key: check your API key configuration in ~/.claude-mem/settings.json or ~/.claude-mem/.env');
}
// Parse and process response using shared ResponseProcessor
await processAgentResponse(
textContent,
session,
this.dbManager,
this.sessionManager,
worker,
discoveryTokens,
originalTimestamp,
'SDK',
cwdTracker.lastCwd
);
}
// Log result messages
if (message.type === 'result' && message.subtype === 'success') {
// Usage telemetry is captured at SDK level
} finally {
// Ensure subprocess is terminated after query completes (or on error)
const tracked = getProcessBySession(session.sessionDbId);
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
await ensureProcessExit(tracked, 5000);
}
}
// Ensure subprocess is terminated after query completes
const tracked = getProcessBySession(session.sessionDbId);
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
await ensureProcessExit(tracked, 5000);
}
// Mark session complete
const sessionDuration = Date.now() - session.startTime;
logger.success('SDK', 'Agent completed', {