Add comprehensive documentation for claude-mem codebase and create a test worker script

- Introduced CODEMAP.md detailing project overview, architecture, directory structure, core components, commands, hooks system, SDK, services, shared components, utilities, and key workflows.
- Added a test-worker.sh script to automate testing of the SDK worker, including session creation, worker initiation, socket communication, and cleanup after finalization.
This commit is contained in:
Alex Newman
2025-10-16 13:56:18 -04:00
parent 18aa4f2538
commit 6e9be84a01
10 changed files with 2074 additions and 228 deletions
+247 -88
View File
File diff suppressed because one or more lines are too long
+1524
View File
File diff suppressed because it is too large Load Diff
+16
View File
@@ -174,6 +174,22 @@ program
summaryHook(JSON.parse(input)); summaryHook(JSON.parse(input));
}); });
program
.command('worker <sessionId>')
.description('Run SDK worker process (internal use)')
.action(async (sessionId: string) => {
try {
// Import and run the worker main function
const { main } = await import('../sdk/worker.js');
// Set process.argv so worker can parse sessionId
process.argv[2] = sessionId;
await main();
} catch (error: any) {
console.error(`[SDK Worker] Fatal error: ${error.message}`);
process.exit(1);
}
});
// Helper function to read stdin // Helper function to read stdin
async function readStdin(): Promise<string> { async function readStdin(): Promise<string> {
return new Promise((resolve) => { return new Promise((resolve) => {
+2 -20
View File
@@ -1,7 +1,6 @@
import { HooksDatabase } from '../services/sqlite/HooksDatabase.js'; import { HooksDatabase } from '../services/sqlite/HooksDatabase.js';
import path from 'path'; import path from 'path';
import { spawn } from 'child_process'; import { spawn } from 'child_process';
import fs from 'fs';
export interface UserPromptSubmitInput { export interface UserPromptSubmitInput {
session_id: string; session_id: string;
@@ -37,25 +36,8 @@ export function newHook(input: UserPromptSubmitInput): void {
db.close(); db.close();
// Start SDK worker in background as detached process // Start SDK worker in background as detached process
// Try source first (development), then fall back to dist (production) // Use 'claude-mem worker' CLI command which is always available
const srcWorkerPath = path.join(__dirname, '..', 'sdk', 'worker.ts'); const child = spawn('claude-mem', ['worker', sessionId.toString()], {
const distWorkerPath = path.join(__dirname, '..', 'sdk', 'worker.js');
let workerPath: string;
if (fs.existsSync(srcWorkerPath)) {
workerPath = srcWorkerPath;
} else if (fs.existsSync(distWorkerPath)) {
workerPath = distWorkerPath;
} else {
// Fallback: assume we're in the bundled CLI
// In this case, we can't spawn the worker since it's bundled
// This is a limitation we'll need to address
console.error('[claude-mem] Worker not found, skipping background processing');
console.log('{"continue": true, "suppressOutput": true}');
process.exit(0);
}
const child = spawn('bun', [workerPath, sessionId.toString()], {
detached: true, detached: true,
stdio: 'ignore' stdio: 'ignore'
}); });
+29 -20
View File
@@ -1,5 +1,7 @@
import net from 'net';
import { join } from 'path';
import { HooksDatabase } from '../services/sqlite/HooksDatabase.js'; import { HooksDatabase } from '../services/sqlite/HooksDatabase.js';
import path from 'path'; import { PathDiscovery } from '../services/path-discovery.js';
export interface PostToolUseInput { export interface PostToolUseInput {
session_id: string; session_id: string;
@@ -18,11 +20,11 @@ const SKIP_TOOLS = new Set([
/** /**
* Save Hook - PostToolUse * Save Hook - PostToolUse
* Queues tool observations for SDK processing * Sends tool observations to worker via Unix socket
*/ */
export function saveHook(input: PostToolUseInput): void { export function saveHook(input: PostToolUseInput): void {
try { try {
const { session_id, cwd, tool_name, tool_input, tool_output } = input; const { session_id, tool_name, tool_input, tool_output } = input;
// Skip certain tools // Skip certain tools
if (SKIP_TOOLS.has(tool_name)) { if (SKIP_TOOLS.has(tool_name)) {
@@ -30,38 +32,45 @@ export function saveHook(input: PostToolUseInput): void {
process.exit(0); process.exit(0);
} }
// Extract project from cwd
const project = path.basename(cwd);
// Find active SDK session // Find active SDK session
const db = new HooksDatabase(); const db = new HooksDatabase();
const session = db.findActiveSDKSession(session_id); const session = db.findActiveSDKSession(session_id);
db.close();
if (!session) { if (!session) {
// No active session yet - this can happen if UserPromptSubmit hasn't run // No active session yet - this can happen if UserPromptSubmit hasn't run
// Just exit silently // Just exit silently
db.close();
console.log('{"continue": true, "suppressOutput": true}'); console.log('{"continue": true, "suppressOutput": true}');
process.exit(0); process.exit(0);
} }
// Queue the observation // Get socket path
// SDK session ID might be null if init message hasn't arrived yet const dataDir = PathDiscovery.getInstance().getDataDirectory();
// Use the internal ID as a fallback const socketPath = join(dataDir, `worker-${session.id}.sock`);
const sdkSessionId = session.sdk_session_id || `pending-${session.id}`;
db.queueObservation( // Send observation via Unix socket
sdkSessionId, const message = {
type: 'observation',
tool_name, tool_name,
JSON.stringify(tool_input), tool_input: JSON.stringify(tool_input),
JSON.stringify(tool_output) tool_output: JSON.stringify(tool_output)
); };
db.close(); const client = net.connect(socketPath, () => {
client.write(JSON.stringify(message) + '\n');
client.end();
});
// Output hook response client.on('error', (err) => {
console.log('{"continue": true, "suppressOutput": true}'); // Socket not available - worker may have crashed or not started
process.exit(0); console.error(`[claude-mem save] Socket error: ${err.message}`);
// Continue anyway, don't block Claude
});
client.on('close', () => {
console.log('{"continue": true, "suppressOutput": true}');
process.exit(0);
});
} catch (error: any) { } catch (error: any) {
// On error, don't block Claude Code // On error, don't block Claude Code
+26 -14
View File
@@ -1,4 +1,7 @@
import net from 'net';
import { join } from 'path';
import { HooksDatabase } from '../services/sqlite/HooksDatabase.js'; import { HooksDatabase } from '../services/sqlite/HooksDatabase.js';
import { PathDiscovery } from '../services/path-discovery.js';
export interface StopInput { export interface StopInput {
session_id: string; session_id: string;
@@ -8,7 +11,7 @@ export interface StopInput {
/** /**
* Summary Hook - Stop * Summary Hook - Stop
* Signals SDK to finalize and generate summary * Sends FINALIZE message to worker via Unix socket
*/ */
export function summaryHook(input: StopInput): void { export function summaryHook(input: StopInput): void {
try { try {
@@ -17,29 +20,38 @@ export function summaryHook(input: StopInput): void {
// Find active SDK session // Find active SDK session
const db = new HooksDatabase(); const db = new HooksDatabase();
const session = db.findActiveSDKSession(session_id); const session = db.findActiveSDKSession(session_id);
db.close();
if (!session) { if (!session) {
// No active session - nothing to finalize // No active session - nothing to finalize
db.close();
console.log('{"continue": true, "suppressOutput": true}'); console.log('{"continue": true, "suppressOutput": true}');
process.exit(0); process.exit(0);
} }
// Insert special FINALIZE message into observation queue // Get socket path
const sdkSessionId = session.sdk_session_id || `pending-${session.id}`; const dataDir = PathDiscovery.getInstance().getDataDirectory();
const socketPath = join(dataDir, `worker-${session.id}.sock`);
db.queueObservation( // Send FINALIZE message via Unix socket
sdkSessionId, const message = {
'FINALIZE', type: 'finalize'
'{}', };
'{}'
);
db.close(); const client = net.connect(socketPath, () => {
client.write(JSON.stringify(message) + '\n');
client.end();
});
// Output hook response client.on('error', (err) => {
console.log('{"continue": true, "suppressOutput": true}'); // Socket not available - worker may have already finished or crashed
process.exit(0); console.error(`[claude-mem summary] Socket error: ${err.message}`);
// Continue anyway, don't block Claude
});
client.on('close', () => {
console.log('{"continue": true, "suppressOutput": true}');
process.exit(0);
});
} catch (error: any) { } catch (error: any) {
// On error, don't block Claude Code // On error, don't block Claude Code
+123 -29
View File
@@ -1,23 +1,39 @@
#!/usr/bin/env bun #!/usr/bin/env bun
/** /**
* SDK Worker Process * SDK Worker Process
* Background agent that processes tool observations and generates session summaries * Background server that processes tool observations via Unix socket
*/ */
import net from 'net';
import { unlinkSync, existsSync } from 'fs';
import { join } from 'path';
import { query } from '@anthropic-ai/claude-agent-sdk'; import { query } from '@anthropic-ai/claude-agent-sdk';
import { HooksDatabase } from '../services/sqlite/HooksDatabase.js'; import { HooksDatabase } from '../services/sqlite/HooksDatabase.js';
import { PathDiscovery } from '../services/path-discovery.js';
import { buildInitPrompt, buildObservationPrompt, buildFinalizePrompt } from './prompts.js'; import { buildInitPrompt, buildObservationPrompt, buildFinalizePrompt } from './prompts.js';
import { parseObservations, parseSummary } from './parser.js'; import { parseObservations, parseSummary } from './parser.js';
import type { Observation, SDKSession } from './prompts.js'; import type { SDKSession } from './prompts.js';
const POLL_INTERVAL_MS = 1000; // 1 second
const MODEL = 'claude-sonnet-4-5'; const MODEL = 'claude-sonnet-4-5';
const DISALLOWED_TOOLS = ['Glob', 'Grep', 'ListMcpResourcesTool', 'WebSearch']; const DISALLOWED_TOOLS = ['Glob', 'Grep', 'ListMcpResourcesTool', 'WebSearch'];
interface ObservationMessage {
type: 'observation';
tool_name: string;
tool_input: string;
tool_output: string;
}
interface FinalizeMessage {
type: 'finalize';
}
type WorkerMessage = ObservationMessage | FinalizeMessage;
/** /**
* Main worker process entry point * Main worker process entry point
*/ */
async function main() { export async function main() {
const sessionDbId = parseInt(process.argv[2], 10); const sessionDbId = parseInt(process.argv[2], 10);
if (!sessionDbId) { if (!sessionDbId) {
@@ -30,21 +46,28 @@ async function main() {
} }
/** /**
* SDK Worker class - handles the full lifecycle of observation processing * SDK Worker - Unix socket server that processes observations
*/ */
class SDKWorker { class SDKWorker {
private sessionDbId: number; private sessionDbId: number;
private db: HooksDatabase; private db: HooksDatabase;
private socketPath: string;
private server: net.Server | null = null;
private sdkSessionId: string | null = null; private sdkSessionId: string | null = null;
private project: string = ''; private project: string = '';
private userPrompt: string = ''; private userPrompt: string = '';
private abortController: AbortController; private abortController: AbortController;
private isFinalized = false; private isFinalized = false;
private pendingMessages: WorkerMessage[] = [];
constructor(sessionDbId: number) { constructor(sessionDbId: number) {
this.sessionDbId = sessionDbId; this.sessionDbId = sessionDbId;
this.db = new HooksDatabase(); this.db = new HooksDatabase();
this.abortController = new AbortController(); this.abortController = new AbortController();
// Socket path: ~/.claude-mem/worker-{sessionId}.sock
const dataDir = PathDiscovery.getInstance().getDataDirectory();
this.socketPath = join(dataDir, `worker-${sessionDbId}.sock`);
} }
/** /**
@@ -62,26 +85,92 @@ class SDKWorker {
this.project = session.project; this.project = session.project;
this.userPrompt = session.user_prompt; this.userPrompt = session.user_prompt;
// Start Unix socket server
await this.startSocketServer();
console.error(`[SDK Worker] Socket server listening: ${this.socketPath}`);
// Run SDK agent with streaming input // Run SDK agent with streaming input
await this.runSDKAgent(); await this.runSDKAgent();
// Mark session as completed // Mark session as completed
this.db.markSessionCompleted(this.sessionDbId); this.db.markSessionCompleted(this.sessionDbId);
this.db.close(); this.db.close();
this.cleanup();
} catch (error: any) { } catch (error: any) {
console.error('[SDK Worker] Error:', error.message); console.error('[SDK Worker] Error:', error.message);
this.db.markSessionFailed(this.sessionDbId); this.db.markSessionFailed(this.sessionDbId);
this.db.close(); this.db.close();
this.cleanup();
process.exit(1); process.exit(1);
} }
} }
/**
* Start Unix socket server to receive messages from hooks
*/
private async startSocketServer(): Promise<void> {
// Clean up old socket if it exists
if (existsSync(this.socketPath)) {
unlinkSync(this.socketPath);
}
return new Promise((resolve, reject) => {
this.server = net.createServer((socket) => {
let buffer = '';
socket.on('data', (chunk) => {
buffer += chunk.toString();
// Try to parse complete JSON messages (separated by newlines)
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const message: WorkerMessage = JSON.parse(line);
this.handleMessage(message);
} catch (err) {
console.error('[SDK Worker] Invalid message:', line);
}
}
}
});
socket.on('error', (err) => {
console.error('[SDK Worker] Socket connection error:', err.message);
});
});
this.server.on('error', (err: any) => {
if (err.code === 'EADDRINUSE') {
console.error(`[SDK Worker] Socket already in use: ${this.socketPath}`);
}
reject(err);
});
this.server.listen(this.socketPath, () => {
resolve();
});
});
}
/**
* Handle incoming message from hook
*/
private handleMessage(message: WorkerMessage): void {
this.pendingMessages.push(message);
if (message.type === 'finalize') {
this.isFinalized = true;
}
}
/** /**
* Load session from database * Load session from database
*/ */
private async loadSession(): Promise<SDKSession | null> { private async loadSession(): Promise<SDKSession | null> {
// Query session by ID
const db = this.db as any; const db = this.db as any;
const query = db.db.query(` const query = db.db.query(`
SELECT id, sdk_session_id, project, user_prompt SELECT id, sdk_session_id, project, user_prompt
@@ -98,11 +187,9 @@ class SDKWorker {
* Run SDK agent with streaming input mode * Run SDK agent with streaming input mode
*/ */
private async runSDKAgent(): Promise<void> { private async runSDKAgent(): Promise<void> {
const messageGenerator = this.createMessageGenerator();
await query({ await query({
model: MODEL, model: MODEL,
messages: messageGenerator, messages: () => this.createMessageGenerator(),
disallowedTools: DISALLOWED_TOOLS, disallowedTools: DISALLOWED_TOOLS,
signal: this.abortController.signal, signal: this.abortController.signal,
onSystemInitMessage: (msg) => { onSystemInitMessage: (msg) => {
@@ -121,6 +208,7 @@ class SDKWorker {
/** /**
* Create async message generator for SDK streaming input * Create async message generator for SDK streaming input
* Now pulls from socket messages instead of polling database
*/ */
private async* createMessageGenerator(): AsyncIterable<{ role: 'user'; content: string }> { private async* createMessageGenerator(): AsyncIterable<{ role: 'user'; content: string }> {
// Yield initial prompt // Yield initial prompt
@@ -128,36 +216,37 @@ class SDKWorker {
const initPrompt = buildInitPrompt(this.project, claudeSessionId, this.userPrompt); const initPrompt = buildInitPrompt(this.project, claudeSessionId, this.userPrompt);
yield { role: 'user', content: initPrompt }; yield { role: 'user', content: initPrompt };
// Poll observation queue // Process messages as they arrive via socket
while (!this.isFinalized) { while (!this.isFinalized) {
await this.sleep(POLL_INTERVAL_MS); // Wait for messages to arrive
if (this.pendingMessages.length === 0) {
if (!this.sdkSessionId) { await this.sleep(100); // Short sleep, just to yield control
continue; // Wait for SDK session ID to be captured continue;
} }
// Get pending observations // Process all pending messages
const observations = this.db.getPendingObservations(this.sdkSessionId, 10); while (this.pendingMessages.length > 0) {
const message = this.pendingMessages.shift()!;
for (const obs of observations) { if (message.type === 'finalize') {
// Check for FINALIZE message
if (this.isFinalizationMessage(obs)) {
this.isFinalized = true; this.isFinalized = true;
const session = await this.loadSession(); const session = await this.loadSession();
if (session) { if (session) {
const finalizePrompt = buildFinalizePrompt(session); const finalizePrompt = buildFinalizePrompt(session);
yield { role: 'user', content: finalizePrompt }; yield { role: 'user', content: finalizePrompt };
} }
this.db.markObservationProcessed(obs.id);
break; break;
} }
// Send observation to SDK if (message.type === 'observation') {
const observationPrompt = buildObservationPrompt(obs); // Build observation prompt
yield { role: 'user', content: observationPrompt }; const observationPrompt = buildObservationPrompt({
tool_name: message.tool_name,
// Mark as processed tool_input: message.tool_input,
this.db.markObservationProcessed(obs.id); tool_output: message.tool_output
});
yield { role: 'user', content: observationPrompt };
}
} }
} }
} }
@@ -194,10 +283,15 @@ class SDKWorker {
} }
/** /**
* Check if observation is a FINALIZE message * Cleanup socket server and socket file
*/ */
private isFinalizationMessage(obs: Observation): boolean { private cleanup(): void {
return obs.tool_name === 'FINALIZE'; if (this.server) {
this.server.close();
}
if (existsSync(this.socketPath)) {
unlinkSync(this.socketPath);
}
} }
/** /**
-56
View File
@@ -103,62 +103,6 @@ export class HooksDatabase {
query.run(sdkSessionId, id); query.run(sdkSessionId, id);
} }
/**
* Queue an observation for SDK processing
*/
queueObservation(
sdkSessionId: string,
toolName: string,
toolInput: string,
toolOutput: string
): void {
const nowEpoch = Date.now();
const query = this.db.query(`
INSERT INTO observation_queue
(sdk_session_id, tool_name, tool_input, tool_output, created_at_epoch)
VALUES (?, ?, ?, ?, ?)
`);
query.run(sdkSessionId, toolName, toolInput, toolOutput, nowEpoch);
}
/**
* Get pending observations for SDK processing
*/
getPendingObservations(sdkSessionId: string, limit: number = 10): Array<{
id: number;
tool_name: string;
tool_input: string;
tool_output: string;
created_at_epoch: number;
}> {
const query = this.db.query(`
SELECT id, tool_name, tool_input, tool_output, created_at_epoch
FROM observation_queue
WHERE sdk_session_id = ? AND processed_at_epoch IS NULL
ORDER BY created_at_epoch ASC
LIMIT ?
`);
return query.all(sdkSessionId, limit) as any[];
}
/**
* Mark observation as processed
*/
markObservationProcessed(id: number): void {
const nowEpoch = Date.now();
const query = this.db.query(`
UPDATE observation_queue
SET processed_at_epoch = ?
WHERE id = ?
`);
query.run(nowEpoch, id);
}
/** /**
* Store an observation (from SDK parsing) * Store an observation (from SDK parsing)
*/ */
+59 -1
View File
@@ -305,6 +305,63 @@ export const migration004: Migration = {
} }
}; };
/**
* Migration 005 - Remove orphaned tables
* Drops streaming_sessions (superseded by sdk_sessions)
* Drops observation_queue (superseded by Unix socket communication)
*/
export const migration005: Migration = {
version: 5,
up: (db: Database) => {
// Drop streaming_sessions - superseded by sdk_sessions in migration004
// This table was from v2 architecture and is no longer used
db.run(`DROP TABLE IF EXISTS streaming_sessions`);
// Drop observation_queue - superseded by Unix socket communication
// Worker now uses sockets instead of database polling for observations
db.run(`DROP TABLE IF EXISTS observation_queue`);
console.log('✅ Dropped orphaned tables: streaming_sessions, observation_queue');
},
down: (db: Database) => {
// Recreate tables if needed (though they should never be used)
db.run(`
CREATE TABLE IF NOT EXISTS streaming_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
claude_session_id TEXT UNIQUE NOT NULL,
sdk_session_id TEXT,
project TEXT NOT NULL,
title TEXT,
subtitle TEXT,
user_prompt TEXT,
started_at TEXT NOT NULL,
started_at_epoch INTEGER NOT NULL,
updated_at TEXT,
updated_at_epoch INTEGER,
completed_at TEXT,
completed_at_epoch INTEGER,
status TEXT NOT NULL DEFAULT 'active'
)
`);
db.run(`
CREATE TABLE IF NOT EXISTS observation_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sdk_session_id TEXT NOT NULL,
tool_name TEXT NOT NULL,
tool_input TEXT NOT NULL,
tool_output TEXT NOT NULL,
created_at_epoch INTEGER NOT NULL,
processed_at_epoch INTEGER,
FOREIGN KEY(sdk_session_id) REFERENCES sdk_sessions(sdk_session_id) ON DELETE CASCADE
)
`);
console.log('⚠️ Recreated streaming_sessions and observation_queue (for rollback only)');
}
};
/** /**
* All migrations in order * All migrations in order
*/ */
@@ -312,5 +369,6 @@ export const migrations: Migration[] = [
migration001, migration001,
migration002, migration002,
migration003, migration003,
migration004 migration004,
migration005
]; ];
Executable
+48
View File
@@ -0,0 +1,48 @@
#!/bin/bash
set -e
echo "Creating session..."
echo '{"session_id":"test-socket-789","cwd":"/Users/alexnewman/Scripts/claude-mem","prompt":"testing"}' | bun src/bin/cli.ts new
sleep 1
SESSION_ID=$(sqlite3 ~/.claude-mem/claude-mem.db "SELECT id FROM sdk_sessions ORDER BY id DESC LIMIT 1;")
echo "Session ID: $SESSION_ID"
echo "Starting worker..."
bun src/sdk/worker.ts $SESSION_ID 2>&1 &
WORKER_PID=$!
echo "Worker PID: $WORKER_PID"
sleep 3
if ps -p $WORKER_PID > /dev/null 2>&1; then
echo "✅ Worker is RUNNING!"
if [ -e ~/.claude-mem/worker-$SESSION_ID.sock ]; then
echo "✅ Socket file exists!"
ls -la ~/.claude-mem/worker-$SESSION_ID.sock
else
echo "❌ Socket file NOT found"
fi
# Try to send a message
echo "Sending test observation..."
echo '{"type":"observation","tool_name":"TestTool","tool_input":"{}","tool_output":"{}"}' | nc -U ~/.claude-mem/worker-$SESSION_ID.sock
echo "Message sent!"
sleep 2
# Send finalize
echo "Sending finalize..."
echo '{"type":"finalize"}' | nc -U ~/.claude-mem/worker-$SESSION_ID.sock
sleep 2
if ps -p $WORKER_PID > /dev/null 2>&1; then
echo "⚠️ Worker still running after finalize"
kill $WORKER_PID
else
echo "✅ Worker exited cleanly after finalize"
fi
else
echo "❌ Worker exited prematurely"
fi