Merge pull request #1122 from thedotmack/claude/friendly-pascal
fix: resolve orphaned subprocesses and Chroma HTTP regressions
This commit is contained in:
+1
-1
@@ -97,8 +97,8 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@anthropic-ai/claude-agent-sdk": "^0.1.76",
|
||||
"@chroma-core/default-embed": "^0.1.9",
|
||||
"@modelcontextprotocol/sdk": "^1.25.1",
|
||||
"@chroma-core/default-embed": "^0.1.9",
|
||||
"ansi-to-html": "^0.7.2",
|
||||
"chromadb": "^3.2.2",
|
||||
"dompurify": "^3.3.1",
|
||||
|
||||
File diff suppressed because one or more lines are too long
+111
-111
File diff suppressed because one or more lines are too long
@@ -11,7 +11,7 @@
|
||||
import { spawn, ChildProcess, execSync } from 'child_process';
|
||||
import path from 'path';
|
||||
import os from 'os';
|
||||
import fs from 'fs';
|
||||
import fs, { existsSync } from 'fs';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
|
||||
export interface ChromaServerConfig {
|
||||
@@ -108,14 +108,35 @@ export class ChromaServerManager {
|
||||
|
||||
// Cross-platform: use npx.cmd on Windows
|
||||
const isWindows = process.platform === 'win32';
|
||||
const command = isWindows ? 'npx.cmd' : 'npx';
|
||||
|
||||
const args = [
|
||||
'chroma', 'run',
|
||||
'--path', this.config.dataDir,
|
||||
'--host', this.config.host,
|
||||
'--port', String(this.config.port)
|
||||
];
|
||||
// Resolve chroma binary absolutely — npx fails when spawned from cache dirs (#1120)
|
||||
let command: string;
|
||||
let args: string[];
|
||||
try {
|
||||
// chromadb package installs a 'chroma' bin entry
|
||||
const chromaBinDir = path.dirname(require.resolve('chromadb/package.json'));
|
||||
// Check project-level .bin first (most common npm/bun installation layout)
|
||||
const projectBin = path.join(chromaBinDir, '..', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
|
||||
// Fallback: nested node_modules .bin (rare — pnpm or workspace hoisting)
|
||||
const nestedBin = path.join(chromaBinDir, 'node_modules', '.bin', isWindows ? 'chroma.cmd' : 'chroma');
|
||||
|
||||
if (existsSync(projectBin)) {
|
||||
command = projectBin;
|
||||
} else if (existsSync(nestedBin)) {
|
||||
command = nestedBin;
|
||||
} else {
|
||||
// Last resort: npx with explicit cwd
|
||||
command = isWindows ? 'npx.cmd' : 'npx';
|
||||
}
|
||||
} catch {
|
||||
command = isWindows ? 'npx.cmd' : 'npx';
|
||||
}
|
||||
|
||||
if (command.includes('npx')) {
|
||||
args = ['chroma', 'run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
|
||||
} else {
|
||||
args = ['run', '--path', this.config.dataDir, '--host', this.config.host, '--port', String(this.config.port)];
|
||||
}
|
||||
|
||||
logger.info('CHROMA_SERVER', 'Starting Chroma server', {
|
||||
command,
|
||||
@@ -125,11 +146,20 @@ export class ChromaServerManager {
|
||||
|
||||
const spawnEnv = this.getSpawnEnv();
|
||||
|
||||
// Resolve cwd for npx fallback — ensures node_modules is findable (#1120)
|
||||
let spawnCwd: string | undefined;
|
||||
try {
|
||||
spawnCwd = path.dirname(require.resolve('chromadb/package.json'));
|
||||
} catch {
|
||||
// If chromadb isn't resolvable, omit cwd and let npx handle it
|
||||
}
|
||||
|
||||
this.serverProcess = spawn(command, args, {
|
||||
stdio: ['ignore', 'pipe', 'pipe'],
|
||||
detached: !isWindows, // Don't detach on Windows (no process groups)
|
||||
windowsHide: true, // Hide console window on Windows
|
||||
env: spawnEnv
|
||||
env: spawnEnv,
|
||||
...(spawnCwd && { cwd: spawnCwd })
|
||||
});
|
||||
|
||||
// Log server output for debugging
|
||||
|
||||
@@ -189,17 +189,20 @@ export class ChromaSync {
|
||||
}
|
||||
|
||||
try {
|
||||
// getOrCreateCollection handles both cases
|
||||
// Lazy-load DefaultEmbeddingFunction to avoid eagerly pulling in
|
||||
// @huggingface/transformers → sharp native binaries at bundle startup
|
||||
// Use WASM backend to avoid native ONNX binary issues (#1104, #1105, #1110).
|
||||
// Same model (all-MiniLM-L6-v2), same embeddings, but runs in WASM —
|
||||
// no native binary loading, no segfaults, no ENOENT errors.
|
||||
const { DefaultEmbeddingFunction } = await import('@chroma-core/default-embed');
|
||||
const embeddingFunction = new DefaultEmbeddingFunction();
|
||||
const embeddingFunction = new DefaultEmbeddingFunction({ wasm: true });
|
||||
|
||||
this.collection = await this.chromaClient.getOrCreateCollection({
|
||||
name: this.collectionName,
|
||||
embeddingFunction
|
||||
});
|
||||
|
||||
logger.debug('CHROMA_SYNC', 'Collection ready', { collection: this.collectionName });
|
||||
logger.debug('CHROMA_SYNC', 'Collection ready', {
|
||||
collection: this.collectionName
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('CHROMA_SYNC', 'Failed to get/create collection', { collection: this.collectionName }, error as Error);
|
||||
throw new Error(`Collection setup failed: ${error instanceof Error ? error.message : String(error)}`);
|
||||
|
||||
+127
-118
@@ -141,134 +141,143 @@ export class SDKAgent {
|
||||
}
|
||||
});
|
||||
|
||||
// Process SDK messages
|
||||
for await (const message of queryResult) {
|
||||
// Capture or update memory session ID from SDK message
|
||||
// IMPORTANT: The SDK may return a DIFFERENT session_id on resume than what we sent!
|
||||
// We must always sync the DB to match what the SDK actually uses.
|
||||
//
|
||||
// MULTI-TERMINAL COLLISION FIX (FK constraint bug):
|
||||
// Use ensureMemorySessionIdRegistered() instead of updateMemorySessionId() because:
|
||||
// 1. It's idempotent - safe to call multiple times
|
||||
// 2. It verifies the update happened (SELECT before UPDATE)
|
||||
// 3. Consistent with ResponseProcessor's usage pattern
|
||||
// This ensures FK constraint compliance BEFORE any observations are stored.
|
||||
if (message.session_id && message.session_id !== session.memorySessionId) {
|
||||
const previousId = session.memorySessionId;
|
||||
session.memorySessionId = message.session_id;
|
||||
// Persist to database IMMEDIATELY for FK constraint compliance
|
||||
// This must happen BEFORE any observations referencing this ID are stored
|
||||
this.dbManager.getSessionStore().ensureMemorySessionIdRegistered(
|
||||
session.sessionDbId,
|
||||
message.session_id
|
||||
);
|
||||
// Verify the update by reading back from DB
|
||||
const verification = this.dbManager.getSessionStore().getSessionById(session.sessionDbId);
|
||||
const dbVerified = verification?.memory_session_id === message.session_id;
|
||||
const logMessage = previousId
|
||||
? `MEMORY_ID_CHANGED | sessionDbId=${session.sessionDbId} | from=${previousId} | to=${message.session_id} | dbVerified=${dbVerified}`
|
||||
: `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`;
|
||||
logger.info('SESSION', logMessage, {
|
||||
sessionId: session.sessionDbId,
|
||||
memorySessionId: message.session_id,
|
||||
previousId
|
||||
});
|
||||
if (!dbVerified) {
|
||||
logger.error('SESSION', `MEMORY_ID_MISMATCH | sessionDbId=${session.sessionDbId} | expected=${message.session_id} | got=${verification?.memory_session_id}`, {
|
||||
sessionId: session.sessionDbId
|
||||
// Process SDK messages — cleanup in finally ensures subprocess termination
|
||||
// even if the loop throws (e.g., context overflow, invalid API key)
|
||||
try {
|
||||
for await (const message of queryResult) {
|
||||
// Capture or update memory session ID from SDK message
|
||||
// IMPORTANT: The SDK may return a DIFFERENT session_id on resume than what we sent!
|
||||
// We must always sync the DB to match what the SDK actually uses.
|
||||
//
|
||||
// MULTI-TERMINAL COLLISION FIX (FK constraint bug):
|
||||
// Use ensureMemorySessionIdRegistered() instead of updateMemorySessionId() because:
|
||||
// 1. It's idempotent - safe to call multiple times
|
||||
// 2. It verifies the update happened (SELECT before UPDATE)
|
||||
// 3. Consistent with ResponseProcessor's usage pattern
|
||||
// This ensures FK constraint compliance BEFORE any observations are stored.
|
||||
if (message.session_id && message.session_id !== session.memorySessionId) {
|
||||
const previousId = session.memorySessionId;
|
||||
session.memorySessionId = message.session_id;
|
||||
// Persist to database IMMEDIATELY for FK constraint compliance
|
||||
// This must happen BEFORE any observations referencing this ID are stored
|
||||
this.dbManager.getSessionStore().ensureMemorySessionIdRegistered(
|
||||
session.sessionDbId,
|
||||
message.session_id
|
||||
);
|
||||
// Verify the update by reading back from DB
|
||||
const verification = this.dbManager.getSessionStore().getSessionById(session.sessionDbId);
|
||||
const dbVerified = verification?.memory_session_id === message.session_id;
|
||||
const logMessage = previousId
|
||||
? `MEMORY_ID_CHANGED | sessionDbId=${session.sessionDbId} | from=${previousId} | to=${message.session_id} | dbVerified=${dbVerified}`
|
||||
: `MEMORY_ID_CAPTURED | sessionDbId=${session.sessionDbId} | memorySessionId=${message.session_id} | dbVerified=${dbVerified}`;
|
||||
logger.info('SESSION', logMessage, {
|
||||
sessionId: session.sessionDbId,
|
||||
memorySessionId: message.session_id,
|
||||
previousId
|
||||
});
|
||||
}
|
||||
// Debug-level alignment log for detailed tracing
|
||||
logger.debug('SDK', `[ALIGNMENT] ${previousId ? 'Updated' : 'Captured'} | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
|
||||
}
|
||||
|
||||
// Handle assistant messages
|
||||
if (message.type === 'assistant') {
|
||||
const content = message.message.content;
|
||||
const textContent = Array.isArray(content)
|
||||
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
|
||||
: typeof content === 'string' ? content : '';
|
||||
|
||||
// Check for context overflow - prevents infinite retry loops
|
||||
if (textContent.includes('prompt is too long') ||
|
||||
textContent.includes('context window')) {
|
||||
logger.error('SDK', 'Context overflow detected - terminating session');
|
||||
session.abortController.abort();
|
||||
return;
|
||||
if (!dbVerified) {
|
||||
logger.error('SESSION', `MEMORY_ID_MISMATCH | sessionDbId=${session.sessionDbId} | expected=${message.session_id} | got=${verification?.memory_session_id}`, {
|
||||
sessionId: session.sessionDbId
|
||||
});
|
||||
}
|
||||
// Debug-level alignment log for detailed tracing
|
||||
logger.debug('SDK', `[ALIGNMENT] ${previousId ? 'Updated' : 'Captured'} | contentSessionId=${session.contentSessionId} → memorySessionId=${message.session_id} | Future prompts will resume with this ID`);
|
||||
}
|
||||
|
||||
const responseSize = textContent.length;
|
||||
// Handle assistant messages
|
||||
if (message.type === 'assistant') {
|
||||
const content = message.message.content;
|
||||
const textContent = Array.isArray(content)
|
||||
? content.filter((c: any) => c.type === 'text').map((c: any) => c.text).join('\n')
|
||||
: typeof content === 'string' ? content : '';
|
||||
|
||||
// Capture token state BEFORE updating (for delta calculation)
|
||||
const tokensBeforeResponse = session.cumulativeInputTokens + session.cumulativeOutputTokens;
|
||||
|
||||
// Extract and track token usage
|
||||
const usage = message.message.usage;
|
||||
if (usage) {
|
||||
session.cumulativeInputTokens += usage.input_tokens || 0;
|
||||
session.cumulativeOutputTokens += usage.output_tokens || 0;
|
||||
|
||||
// Cache creation counts as discovery, cache read doesn't
|
||||
if (usage.cache_creation_input_tokens) {
|
||||
session.cumulativeInputTokens += usage.cache_creation_input_tokens;
|
||||
// Check for context overflow - prevents infinite retry loops
|
||||
if (textContent.includes('prompt is too long') ||
|
||||
textContent.includes('context window')) {
|
||||
logger.error('SDK', 'Context overflow detected - terminating session');
|
||||
session.abortController.abort();
|
||||
return;
|
||||
}
|
||||
|
||||
logger.debug('SDK', 'Token usage captured', {
|
||||
sessionId: session.sessionDbId,
|
||||
inputTokens: usage.input_tokens,
|
||||
outputTokens: usage.output_tokens,
|
||||
cacheCreation: usage.cache_creation_input_tokens || 0,
|
||||
cacheRead: usage.cache_read_input_tokens || 0,
|
||||
cumulativeInput: session.cumulativeInputTokens,
|
||||
cumulativeOutput: session.cumulativeOutputTokens
|
||||
});
|
||||
const responseSize = textContent.length;
|
||||
|
||||
// Capture token state BEFORE updating (for delta calculation)
|
||||
const tokensBeforeResponse = session.cumulativeInputTokens + session.cumulativeOutputTokens;
|
||||
|
||||
// Extract and track token usage
|
||||
const usage = message.message.usage;
|
||||
if (usage) {
|
||||
session.cumulativeInputTokens += usage.input_tokens || 0;
|
||||
session.cumulativeOutputTokens += usage.output_tokens || 0;
|
||||
|
||||
// Cache creation counts as discovery, cache read doesn't
|
||||
if (usage.cache_creation_input_tokens) {
|
||||
session.cumulativeInputTokens += usage.cache_creation_input_tokens;
|
||||
}
|
||||
|
||||
logger.debug('SDK', 'Token usage captured', {
|
||||
sessionId: session.sessionDbId,
|
||||
inputTokens: usage.input_tokens,
|
||||
outputTokens: usage.output_tokens,
|
||||
cacheCreation: usage.cache_creation_input_tokens || 0,
|
||||
cacheRead: usage.cache_read_input_tokens || 0,
|
||||
cumulativeInput: session.cumulativeInputTokens,
|
||||
cumulativeOutput: session.cumulativeOutputTokens
|
||||
});
|
||||
}
|
||||
|
||||
// Calculate discovery tokens (delta for this response only)
|
||||
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
|
||||
|
||||
// Process response (empty or not) and mark messages as processed
|
||||
// Capture earliest timestamp BEFORE processing (will be cleared after)
|
||||
const originalTimestamp = session.earliestPendingTimestamp;
|
||||
|
||||
if (responseSize > 0) {
|
||||
const truncatedResponse = responseSize > 100
|
||||
? textContent.substring(0, 100) + '...'
|
||||
: textContent;
|
||||
logger.dataOut('SDK', `Response received (${responseSize} chars)`, {
|
||||
sessionId: session.sessionDbId,
|
||||
promptNumber: session.lastPromptNumber
|
||||
}, truncatedResponse);
|
||||
}
|
||||
|
||||
// Detect fatal context overflow and terminate gracefully (issue #870)
|
||||
if (typeof textContent === 'string' && textContent.includes('Prompt is too long')) {
|
||||
throw new Error('Claude session context overflow: prompt is too long');
|
||||
}
|
||||
|
||||
// Detect invalid API key — SDK returns this as response text, not an error.
|
||||
// Throw so it surfaces in health endpoint and prevents silent failures.
|
||||
if (typeof textContent === 'string' && textContent.includes('Invalid API key')) {
|
||||
throw new Error('Invalid API key: check your API key configuration in ~/.claude-mem/settings.json or ~/.claude-mem/.env');
|
||||
}
|
||||
|
||||
// Parse and process response using shared ResponseProcessor
|
||||
await processAgentResponse(
|
||||
textContent,
|
||||
session,
|
||||
this.dbManager,
|
||||
this.sessionManager,
|
||||
worker,
|
||||
discoveryTokens,
|
||||
originalTimestamp,
|
||||
'SDK',
|
||||
cwdTracker.lastCwd
|
||||
);
|
||||
}
|
||||
|
||||
// Calculate discovery tokens (delta for this response only)
|
||||
const discoveryTokens = (session.cumulativeInputTokens + session.cumulativeOutputTokens) - tokensBeforeResponse;
|
||||
|
||||
// Process response (empty or not) and mark messages as processed
|
||||
// Capture earliest timestamp BEFORE processing (will be cleared after)
|
||||
const originalTimestamp = session.earliestPendingTimestamp;
|
||||
|
||||
if (responseSize > 0) {
|
||||
const truncatedResponse = responseSize > 100
|
||||
? textContent.substring(0, 100) + '...'
|
||||
: textContent;
|
||||
logger.dataOut('SDK', `Response received (${responseSize} chars)`, {
|
||||
sessionId: session.sessionDbId,
|
||||
promptNumber: session.lastPromptNumber
|
||||
}, truncatedResponse);
|
||||
// Log result messages
|
||||
if (message.type === 'result' && message.subtype === 'success') {
|
||||
// Usage telemetry is captured at SDK level
|
||||
}
|
||||
|
||||
// Detect fatal context overflow and terminate gracefully (issue #870)
|
||||
if (typeof textContent === 'string' && textContent.includes('Prompt is too long')) {
|
||||
throw new Error('Claude session context overflow: prompt is too long');
|
||||
}
|
||||
|
||||
// Detect invalid API key — SDK returns this as response text, not an error.
|
||||
// Throw so it surfaces in health endpoint and prevents silent failures.
|
||||
if (typeof textContent === 'string' && textContent.includes('Invalid API key')) {
|
||||
throw new Error('Invalid API key: check your API key configuration in ~/.claude-mem/settings.json or ~/.claude-mem/.env');
|
||||
}
|
||||
|
||||
// Parse and process response using shared ResponseProcessor
|
||||
await processAgentResponse(
|
||||
textContent,
|
||||
session,
|
||||
this.dbManager,
|
||||
this.sessionManager,
|
||||
worker,
|
||||
discoveryTokens,
|
||||
originalTimestamp,
|
||||
'SDK',
|
||||
cwdTracker.lastCwd
|
||||
);
|
||||
}
|
||||
|
||||
// Log result messages
|
||||
if (message.type === 'result' && message.subtype === 'success') {
|
||||
// Usage telemetry is captured at SDK level
|
||||
} finally {
|
||||
// Ensure subprocess is terminated after query completes (or on error)
|
||||
const tracked = getProcessBySession(session.sessionDbId);
|
||||
if (tracked && !tracked.process.killed && tracked.process.exitCode === null) {
|
||||
await ensureProcessExit(tracked, 5000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user