Refactor hooks and worker service for improved error handling and initialization
- Removed try-catch blocks in new-hook, save-hook, and summary-hook for cleaner flow. - Enhanced error handling in save and summary hooks to throw errors instead of logging and returning. - Introduced ensureWorkerRunning utility to manage worker service lifecycle and health checks. - Replaced dynamic port allocation with a fixed port for the worker service. - Simplified path management and removed unused port allocator utility. - Added database schema initialization for fresh installations and improved migration handling.
This commit is contained in:
@@ -18,6 +18,9 @@ export class SessionStore {
|
||||
this.db.pragma('synchronous = NORMAL');
|
||||
this.db.pragma('foreign_keys = ON');
|
||||
|
||||
// Initialize schema if needed (fresh database)
|
||||
this.initializeSchema();
|
||||
|
||||
// Run migrations
|
||||
this.ensureWorkerPortColumn();
|
||||
this.ensurePromptTrackingColumns();
|
||||
@@ -27,10 +30,108 @@ export class SessionStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure worker_port column exists (migration)
|
||||
* Initialize database schema using migrations (migration004)
|
||||
* This runs the core SDK tables migration if no tables exist
|
||||
*/
|
||||
private initializeSchema(): void {
|
||||
try {
|
||||
// Create schema_versions table if it doesn't exist
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS schema_versions (
|
||||
id INTEGER PRIMARY KEY,
|
||||
version INTEGER UNIQUE NOT NULL,
|
||||
applied_at TEXT NOT NULL
|
||||
)
|
||||
`);
|
||||
|
||||
// Get applied migrations
|
||||
const appliedVersions = this.db.prepare('SELECT version FROM schema_versions ORDER BY version').all() as Array<{version: number}>;
|
||||
const maxApplied = appliedVersions.length > 0 ? Math.max(...appliedVersions.map(v => v.version)) : 0;
|
||||
|
||||
// Only run migration004 if no migrations have been applied
|
||||
// This creates the sdk_sessions, observations, and session_summaries tables
|
||||
if (maxApplied === 0) {
|
||||
console.error('[SessionStore] Initializing fresh database with migration004...');
|
||||
|
||||
// Migration004: SDK agent architecture tables
|
||||
this.db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS sdk_sessions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
claude_session_id TEXT UNIQUE NOT NULL,
|
||||
sdk_session_id TEXT UNIQUE,
|
||||
project TEXT NOT NULL,
|
||||
user_prompt TEXT,
|
||||
started_at TEXT NOT NULL,
|
||||
started_at_epoch INTEGER NOT NULL,
|
||||
completed_at TEXT,
|
||||
completed_at_epoch INTEGER,
|
||||
status TEXT CHECK(status IN ('active', 'completed', 'failed')) NOT NULL DEFAULT 'active'
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_claude_id ON sdk_sessions(claude_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_sdk_id ON sdk_sessions(sdk_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_project ON sdk_sessions(project);
|
||||
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_status ON sdk_sessions(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_sdk_sessions_started ON sdk_sessions(started_at_epoch DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS observations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
sdk_session_id TEXT NOT NULL,
|
||||
project TEXT NOT NULL,
|
||||
text TEXT NOT NULL,
|
||||
type TEXT NOT NULL CHECK(type IN ('decision', 'bugfix', 'feature', 'refactor', 'discovery')),
|
||||
created_at TEXT NOT NULL,
|
||||
created_at_epoch INTEGER NOT NULL,
|
||||
FOREIGN KEY(sdk_session_id) REFERENCES sdk_sessions(sdk_session_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_sdk_session ON observations(sdk_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_project ON observations(project);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_type ON observations(type);
|
||||
CREATE INDEX IF NOT EXISTS idx_observations_created ON observations(created_at_epoch DESC);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS session_summaries (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
sdk_session_id TEXT UNIQUE NOT NULL,
|
||||
project TEXT NOT NULL,
|
||||
request TEXT,
|
||||
investigated TEXT,
|
||||
learned TEXT,
|
||||
completed TEXT,
|
||||
next_steps TEXT,
|
||||
files_read TEXT,
|
||||
files_edited TEXT,
|
||||
notes TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
created_at_epoch INTEGER NOT NULL,
|
||||
FOREIGN KEY(sdk_session_id) REFERENCES sdk_sessions(sdk_session_id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_sdk_session ON session_summaries(sdk_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_project ON session_summaries(project);
|
||||
CREATE INDEX IF NOT EXISTS idx_session_summaries_created ON session_summaries(created_at_epoch DESC);
|
||||
`);
|
||||
|
||||
// Record migration004 as applied
|
||||
this.db.prepare('INSERT INTO schema_versions (version, applied_at) VALUES (?, ?)').run(4, new Date().toISOString());
|
||||
|
||||
console.error('[SessionStore] Migration004 applied successfully');
|
||||
}
|
||||
} catch (error: any) {
|
||||
console.error('[SessionStore] Schema initialization error:', error.message);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure worker_port column exists (migration 5)
|
||||
*/
|
||||
private ensureWorkerPortColumn(): void {
|
||||
try {
|
||||
// Check if migration already applied
|
||||
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(5) as {version: number} | undefined;
|
||||
if (applied) return;
|
||||
|
||||
// Check if column exists
|
||||
const tableInfo = this.db.pragma('table_info(sdk_sessions)');
|
||||
const hasWorkerPort = (tableInfo as any[]).some((col: any) => col.name === 'worker_port');
|
||||
@@ -39,16 +140,23 @@ export class SessionStore {
|
||||
this.db.exec('ALTER TABLE sdk_sessions ADD COLUMN worker_port INTEGER');
|
||||
console.error('[SessionStore] Added worker_port column to sdk_sessions table');
|
||||
}
|
||||
|
||||
// Record migration
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(5, new Date().toISOString());
|
||||
} catch (error: any) {
|
||||
console.error('[SessionStore] Migration error:', error.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure prompt tracking columns exist (migration 006)
|
||||
* Ensure prompt tracking columns exist (migration 6)
|
||||
*/
|
||||
private ensurePromptTrackingColumns(): void {
|
||||
try {
|
||||
// Check if migration already applied
|
||||
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(6) as {version: number} | undefined;
|
||||
if (applied) return;
|
||||
|
||||
// Check sdk_sessions for prompt_counter
|
||||
const sessionsInfo = this.db.pragma('table_info(sdk_sessions)');
|
||||
const hasPromptCounter = (sessionsInfo as any[]).some((col: any) => col.name === 'prompt_counter');
|
||||
@@ -76,27 +184,29 @@ export class SessionStore {
|
||||
console.error('[SessionStore] Added prompt_number column to session_summaries table');
|
||||
}
|
||||
|
||||
// Remove UNIQUE constraint on session_summaries.sdk_session_id
|
||||
// SQLite doesn't support dropping constraints, so we need to check if it exists first
|
||||
const summariesIndexes = this.db.pragma('index_list(session_summaries)');
|
||||
const hasUniqueConstraint = (summariesIndexes as any[]).some((idx: any) => idx.unique === 1);
|
||||
|
||||
// Record migration
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(6, new Date().toISOString());
|
||||
} catch (error: any) {
|
||||
console.error('[SessionStore] Prompt tracking migration error:', error.message);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove UNIQUE constraint from session_summaries.sdk_session_id (migration 007)
|
||||
* Remove UNIQUE constraint from session_summaries.sdk_session_id (migration 7)
|
||||
*/
|
||||
private removeSessionSummariesUniqueConstraint(): void {
|
||||
try {
|
||||
// Check if migration already applied
|
||||
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(7) as {version: number} | undefined;
|
||||
if (applied) return;
|
||||
|
||||
// Check if UNIQUE constraint exists
|
||||
const summariesIndexes = this.db.pragma('index_list(session_summaries)');
|
||||
const hasUniqueConstraint = (summariesIndexes as any[]).some((idx: any) => idx.unique === 1);
|
||||
|
||||
if (!hasUniqueConstraint) {
|
||||
// Already migrated
|
||||
// Already migrated (no constraint exists)
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(7, new Date().toISOString());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -152,6 +262,9 @@ export class SessionStore {
|
||||
// Commit transaction
|
||||
this.db.exec('COMMIT');
|
||||
|
||||
// Record migration
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(7, new Date().toISOString());
|
||||
|
||||
console.error('[SessionStore] Successfully removed UNIQUE constraint from session_summaries.sdk_session_id');
|
||||
} catch (error: any) {
|
||||
// Rollback on error
|
||||
@@ -164,16 +277,21 @@ export class SessionStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* Add hierarchical fields to observations table (migration 008)
|
||||
* Add hierarchical fields to observations table (migration 8)
|
||||
*/
|
||||
private addObservationHierarchicalFields(): void {
|
||||
try {
|
||||
// Check if migration already applied
|
||||
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(8) as {version: number} | undefined;
|
||||
if (applied) return;
|
||||
|
||||
// Check if new fields already exist
|
||||
const tableInfo = this.db.pragma('table_info(observations)');
|
||||
const hasTitle = (tableInfo as any[]).some((col: any) => col.name === 'title');
|
||||
|
||||
if (hasTitle) {
|
||||
// Already migrated
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(8, new Date().toISOString());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -190,6 +308,9 @@ export class SessionStore {
|
||||
ALTER TABLE observations ADD COLUMN files_modified TEXT;
|
||||
`);
|
||||
|
||||
// Record migration
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(8, new Date().toISOString());
|
||||
|
||||
console.error('[SessionStore] Successfully added hierarchical fields to observations table');
|
||||
} catch (error: any) {
|
||||
console.error('[SessionStore] Migration error (add hierarchical fields):', error.message);
|
||||
@@ -197,17 +318,22 @@ export class SessionStore {
|
||||
}
|
||||
|
||||
/**
|
||||
* Make observations.text nullable (migration 009)
|
||||
* Make observations.text nullable (migration 9)
|
||||
* The text field is deprecated in favor of structured fields (title, subtitle, narrative, etc.)
|
||||
*/
|
||||
private makeObservationsTextNullable(): void {
|
||||
try {
|
||||
// Check if migration already applied
|
||||
const applied = this.db.prepare('SELECT version FROM schema_versions WHERE version = ?').get(9) as {version: number} | undefined;
|
||||
if (applied) return;
|
||||
|
||||
// Check if text column is already nullable
|
||||
const tableInfo = this.db.pragma('table_info(observations)');
|
||||
const textColumn = (tableInfo as any[]).find((col: any) => col.name === 'text');
|
||||
|
||||
if (!textColumn || textColumn.notnull === 0) {
|
||||
// Already migrated or text column doesn't exist
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(9, new Date().toISOString());
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -265,6 +391,9 @@ export class SessionStore {
|
||||
// Commit transaction
|
||||
this.db.exec('COMMIT');
|
||||
|
||||
// Record migration
|
||||
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(9, new Date().toISOString());
|
||||
|
||||
console.error('[SessionStore] Successfully made observations.text nullable');
|
||||
} catch (error: any) {
|
||||
// Rollback on error
|
||||
|
||||
@@ -10,12 +10,12 @@ import { SessionStore } from './sqlite/SessionStore.js';
|
||||
import { buildInitPrompt, buildObservationPrompt, buildFinalizePrompt } from '../sdk/prompts.js';
|
||||
import { parseObservations, parseSummary } from '../sdk/parser.js';
|
||||
import type { SDKSession } from '../sdk/prompts.js';
|
||||
import { findAvailablePort } from '../utils/port-allocator.js';
|
||||
import { logger } from '../utils/logger.js';
|
||||
import { getWorkerPortFilePath, ensureAllDataDirs } from '../shared/paths.js';
|
||||
import { ensureAllDataDirs } from '../shared/paths.js';
|
||||
|
||||
const MODEL = 'claude-sonnet-4-5';
|
||||
const DISALLOWED_TOOLS = ['Glob', 'Grep', 'ListMcpResourcesTool', 'WebSearch'];
|
||||
const FIXED_PORT = parseInt(process.env.CLAUDE_MEM_WORKER_PORT || '37777', 10);
|
||||
|
||||
interface ObservationMessage {
|
||||
type: 'observation';
|
||||
@@ -69,13 +69,7 @@ class WorkerService {
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
// Find available port
|
||||
const port = await findAvailablePort();
|
||||
if (!port) {
|
||||
throw new Error('No available ports in range 37000-37999');
|
||||
}
|
||||
|
||||
this.port = port;
|
||||
this.port = FIXED_PORT;
|
||||
|
||||
// Clean up orphaned sessions from previous worker instances
|
||||
const db = new SessionStore();
|
||||
@@ -87,18 +81,15 @@ class WorkerService {
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
this.app.listen(port, '127.0.0.1', () => {
|
||||
logger.info('SYSTEM', `Worker started`, { port, pid: process.pid, activeSessions: this.sessions.size });
|
||||
|
||||
// Write port to file for hooks to discover
|
||||
const { writeFileSync } = require('fs');
|
||||
ensureAllDataDirs(); // Ensure data directory exists
|
||||
const portFile = getWorkerPortFilePath();
|
||||
writeFileSync(portFile, port.toString(), 'utf8');
|
||||
logger.info('SYSTEM', `Port file written to ${portFile}`);
|
||||
|
||||
this.app.listen(FIXED_PORT, '127.0.0.1', () => {
|
||||
logger.info('SYSTEM', `Worker started`, { port: FIXED_PORT, pid: process.pid, activeSessions: this.sessions.size });
|
||||
resolve();
|
||||
}).on('error', reject);
|
||||
}).on('error', (err: any) => {
|
||||
if (err.code === 'EADDRINUSE') {
|
||||
logger.error('SYSTEM', `Port ${FIXED_PORT} already in use - worker may already be running`);
|
||||
}
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user