feat: implement graceful shutdown handlers and session locking in hooks
This commit is contained in:
@@ -13,7 +13,7 @@ import { fileURLToPath } from 'url';
|
||||
import { query } from '@anthropic-ai/claude-agent-sdk';
|
||||
import { renderToolMessage, HOOK_CONFIG } from './shared/hook-prompt-renderer.js';
|
||||
import { getProjectName } from './shared/path-resolver.js';
|
||||
import { initializeDatabase, getActiveStreamingSessionsForProject } from './shared/hook-helpers.js';
|
||||
import { initializeDatabase, getActiveStreamingSessionsForProject, acquireSessionLock, releaseSessionLock, cleanupStaleLocks } from './shared/hook-helpers.js';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
@@ -40,6 +40,44 @@ function debugLog(message, data = {}) {
|
||||
// MAIN
|
||||
// =============================================================================
|
||||
|
||||
// =============================================================================
|
||||
// GRACEFUL SHUTDOWN HANDLERS
|
||||
// =============================================================================
|
||||
|
||||
let db;
|
||||
let lockAcquired = false;
|
||||
let sdkSessionId = null;
|
||||
|
||||
function cleanup() {
|
||||
if (lockAcquired && sdkSessionId && db) {
|
||||
try {
|
||||
releaseSessionLock(db, sdkSessionId);
|
||||
debugLog('PostToolUse: Released session lock on shutdown', { sdkSessionId });
|
||||
} catch (err) {
|
||||
// Silent fail on cleanup
|
||||
}
|
||||
}
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
// Silent fail on cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
debugLog('PostToolUse: Received SIGTERM, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
debugLog('PostToolUse: Received SIGINT, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
let input = '';
|
||||
process.stdin.setEncoding('utf8');
|
||||
process.stdin.on('data', (chunk) => { input += chunk; });
|
||||
@@ -62,7 +100,10 @@ process.stdin.on('end', async () => {
|
||||
|
||||
try {
|
||||
// Load SDK session info from database
|
||||
const db = initializeDatabase();
|
||||
db = initializeDatabase();
|
||||
|
||||
// Clean up any stale locks first
|
||||
cleanupStaleLocks(db);
|
||||
|
||||
const sessions = getActiveStreamingSessionsForProject(db, project);
|
||||
if (!sessions || sessions.length === 0) {
|
||||
@@ -72,7 +113,22 @@ process.stdin.on('end', async () => {
|
||||
}
|
||||
|
||||
const sessionData = sessions[0];
|
||||
const sdkSessionId = sessionData.sdk_session_id;
|
||||
sdkSessionId = sessionData.sdk_session_id;
|
||||
|
||||
// Validate SDK session ID exists
|
||||
if (!sdkSessionId) {
|
||||
debugLog('PostToolUse: SDK session ID not yet available', { project });
|
||||
db.close();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
// Try to acquire lock - if another hook has it, skip this tool
|
||||
lockAcquired = acquireSessionLock(db, sdkSessionId, 'PostToolUse');
|
||||
if (!lockAcquired) {
|
||||
debugLog('PostToolUse: Session locked by another hook, skipping', { sdkSessionId });
|
||||
db.close();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
// Convert tool response to string
|
||||
const toolResponseStr = typeof tool_response === 'string'
|
||||
@@ -139,10 +195,26 @@ process.stdin.on('end', async () => {
|
||||
|
||||
debugLog('PostToolUse: SDK finished processing', { tool_name, sdkSessionId });
|
||||
|
||||
// Close database connection
|
||||
db.close();
|
||||
} catch (error) {
|
||||
debugLog('PostToolUse: Error sending to SDK', { error: error.message });
|
||||
debugLog('PostToolUse: Error sending to SDK', { error: error.message, stack: error.stack });
|
||||
} finally {
|
||||
// Always release lock and close database
|
||||
if (lockAcquired && sdkSessionId && db) {
|
||||
try {
|
||||
releaseSessionLock(db, sdkSessionId);
|
||||
debugLog('PostToolUse: Released session lock', { sdkSessionId });
|
||||
} catch (err) {
|
||||
debugLog('PostToolUse: Error releasing lock', { error: err.message });
|
||||
}
|
||||
}
|
||||
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
debugLog('PostToolUse: Error closing database', { error: err.message });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Exit cleanly after async processing completes
|
||||
|
||||
@@ -426,5 +426,78 @@ export function markStreamingSessionCompleted(db, id) {
|
||||
export function initializeDatabase() {
|
||||
const db = getDatabase();
|
||||
ensureStreamingSessionsTable(db);
|
||||
ensureSessionLocksTable(db);
|
||||
return db;
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// SESSION LOCKING (prevents concurrent SDK resume)
|
||||
// =============================================================================
|
||||
|
||||
/**
|
||||
* Ensure the session_locks table exists
|
||||
*/
|
||||
function ensureSessionLocksTable(db) {
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS session_locks (
|
||||
sdk_session_id TEXT PRIMARY KEY,
|
||||
locked_by TEXT NOT NULL,
|
||||
locked_at TEXT NOT NULL,
|
||||
locked_at_epoch INTEGER NOT NULL
|
||||
)
|
||||
`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to acquire a lock on an SDK session
|
||||
* @returns {boolean} true if lock acquired, false if already locked
|
||||
*/
|
||||
export function acquireSessionLock(db, sdkSessionId, lockOwner) {
|
||||
ensureSessionLocksTable(db);
|
||||
|
||||
try {
|
||||
const timestamp = new Date().toISOString();
|
||||
const epoch = Date.now();
|
||||
|
||||
const stmt = db.prepare(`
|
||||
INSERT INTO session_locks (sdk_session_id, locked_by, locked_at, locked_at_epoch)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
stmt.run(sdkSessionId, lockOwner, timestamp, epoch);
|
||||
return true;
|
||||
} catch (error) {
|
||||
// UNIQUE constraint violation = already locked
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a lock on an SDK session
|
||||
*/
|
||||
export function releaseSessionLock(db, sdkSessionId) {
|
||||
ensureSessionLocksTable(db);
|
||||
|
||||
const stmt = db.prepare(`
|
||||
DELETE FROM session_locks
|
||||
WHERE sdk_session_id = ?
|
||||
`);
|
||||
|
||||
stmt.run(sdkSessionId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up stale locks (older than 5 minutes)
|
||||
*/
|
||||
export function cleanupStaleLocks(db) {
|
||||
ensureSessionLocksTable(db);
|
||||
|
||||
const fiveMinutesAgo = Date.now() - (5 * 60 * 1000);
|
||||
|
||||
const stmt = db.prepare(`
|
||||
DELETE FROM session_locks
|
||||
WHERE locked_at_epoch < ?
|
||||
`);
|
||||
|
||||
stmt.run(fiveMinutesAgo);
|
||||
}
|
||||
|
||||
+101
-17
@@ -12,7 +12,7 @@ import fs from 'fs';
|
||||
import { query } from '@anthropic-ai/claude-agent-sdk';
|
||||
import { renderEndMessage, HOOK_CONFIG } from './shared/hook-prompt-renderer.js';
|
||||
import { getProjectName } from './shared/path-resolver.js';
|
||||
import { initializeDatabase, getActiveStreamingSessionsForProject, markStreamingSessionCompleted } from './shared/hook-helpers.js';
|
||||
import { initializeDatabase, getActiveStreamingSessionsForProject, markStreamingSessionCompleted, acquireSessionLock, releaseSessionLock, cleanupStaleLocks } from './shared/hook-helpers.js';
|
||||
|
||||
const HOOKS_LOG = path.join(process.env.HOME || '', '.claude-mem', 'logs', 'hooks.log');
|
||||
|
||||
@@ -29,6 +29,45 @@ function debugLog(message, data = {}) {
|
||||
}
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
// GRACEFUL SHUTDOWN HANDLERS
|
||||
// =============================================================================
|
||||
|
||||
let db;
|
||||
let lockAcquired = false;
|
||||
let sdkSessionId = null;
|
||||
let sessionData = null;
|
||||
|
||||
function cleanup() {
|
||||
if (lockAcquired && sdkSessionId && db) {
|
||||
try {
|
||||
releaseSessionLock(db, sdkSessionId);
|
||||
debugLog('Stop: Released session lock on shutdown', { sdkSessionId });
|
||||
} catch (err) {
|
||||
// Silent fail on cleanup
|
||||
}
|
||||
}
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
// Silent fail on cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
debugLog('Stop: Received SIGTERM, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
debugLog('Stop: Received SIGINT, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
// =============================================================================
|
||||
// MAIN
|
||||
// =============================================================================
|
||||
@@ -50,20 +89,23 @@ process.stdin.on('end', async () => {
|
||||
const { cwd } = payload;
|
||||
const project = cwd ? getProjectName(cwd) : 'unknown';
|
||||
|
||||
// Immediately clear activity flag for UI indicator
|
||||
const activityFlagPath = path.join(process.env.HOME || '', '.claude-mem', 'activity.flag');
|
||||
try {
|
||||
fs.writeFileSync(activityFlagPath, JSON.stringify({ active: false, timestamp: Date.now() }));
|
||||
} catch (error) {
|
||||
// Silent fail - non-critical
|
||||
}
|
||||
|
||||
// Return immediately with async mode
|
||||
console.log(JSON.stringify({ async: true, asyncTimeout: 180000 }));
|
||||
|
||||
try {
|
||||
// Clear activity flag FIRST - even if hook fails, UI should update
|
||||
const activityFlagPath = path.join(process.env.HOME || '', '.claude-mem', 'activity.flag');
|
||||
try {
|
||||
fs.writeFileSync(activityFlagPath, JSON.stringify({ active: false, timestamp: Date.now() }));
|
||||
} catch (error) {
|
||||
debugLog('Stop: Error clearing activity flag', { error: error.message });
|
||||
}
|
||||
|
||||
// Load SDK session info from database
|
||||
const db = initializeDatabase();
|
||||
db = initializeDatabase();
|
||||
|
||||
// Clean up any stale locks first
|
||||
cleanupStaleLocks(db);
|
||||
|
||||
const sessions = getActiveStreamingSessionsForProject(db, project);
|
||||
if (!sessions || sessions.length === 0) {
|
||||
@@ -72,10 +114,34 @@ process.stdin.on('end', async () => {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
const sessionData = sessions[0];
|
||||
const sdkSessionId = sessionData.sdk_session_id;
|
||||
sessionData = sessions[0];
|
||||
sdkSessionId = sessionData.sdk_session_id;
|
||||
const claudeSessionId = sessionData.claude_session_id;
|
||||
|
||||
// Validate SDK session ID exists
|
||||
if (!sdkSessionId) {
|
||||
debugLog('Stop: SDK session ID not yet available', { project });
|
||||
db.close();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
// Try to acquire lock - wait up to 10 seconds for PostToolUse to finish
|
||||
let attempts = 0;
|
||||
while (attempts < 20) {
|
||||
lockAcquired = acquireSessionLock(db, sdkSessionId, 'Stop');
|
||||
if (lockAcquired) break;
|
||||
|
||||
debugLog('Stop: Waiting for session lock', { attempt: attempts + 1, sdkSessionId });
|
||||
await new Promise(resolve => setTimeout(resolve, 500));
|
||||
attempts++;
|
||||
}
|
||||
|
||||
if (!lockAcquired) {
|
||||
debugLog('Stop: Could not acquire session lock after 10 seconds', { sdkSessionId });
|
||||
db.close();
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
debugLog('Stop: Ending SDK session', { sdkSessionId, claudeSessionId });
|
||||
|
||||
// Build end message - SDK will call `claude-mem store-overview` and `chroma_add_documents`
|
||||
@@ -120,13 +186,31 @@ process.stdin.on('end', async () => {
|
||||
}
|
||||
|
||||
// Mark session as completed in database
|
||||
markStreamingSessionCompleted(db, sessionData.id);
|
||||
debugLog('Stop: Session ended and marked complete', { project, sessionId: sessionData.id });
|
||||
if (sessionData) {
|
||||
markStreamingSessionCompleted(db, sessionData.id);
|
||||
debugLog('Stop: Session ended and marked complete', { project, sessionId: sessionData.id });
|
||||
}
|
||||
|
||||
// Close database connection
|
||||
db.close();
|
||||
} catch (error) {
|
||||
debugLog('Stop: Error ending session', { error: error.message });
|
||||
debugLog('Stop: Error ending session', { error: error.message, stack: error.stack });
|
||||
} finally {
|
||||
// Always release lock and close database
|
||||
if (lockAcquired && sdkSessionId && db) {
|
||||
try {
|
||||
releaseSessionLock(db, sdkSessionId);
|
||||
debugLog('Stop: Released session lock', { sdkSessionId });
|
||||
} catch (err) {
|
||||
debugLog('Stop: Error releasing lock', { error: err.message });
|
||||
}
|
||||
}
|
||||
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
debugLog('Stop: Error closing database', { error: err.message });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Exit cleanly after async processing completes
|
||||
|
||||
@@ -36,6 +36,34 @@ function debugLog(message, data = {}) {
|
||||
// Removed: buildStreamingSystemPrompt function
|
||||
// Now using centralized config from hook-prompt-renderer.js
|
||||
|
||||
// =============================================================================
|
||||
// GRACEFUL SHUTDOWN HANDLERS
|
||||
// =============================================================================
|
||||
|
||||
let db;
|
||||
|
||||
function cleanup() {
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
// Silent fail on cleanup
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
process.on('SIGTERM', () => {
|
||||
debugLog('UserPromptSubmit: Received SIGTERM, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
process.on('SIGINT', () => {
|
||||
debugLog('UserPromptSubmit: Received SIGINT, cleaning up');
|
||||
cleanup();
|
||||
process.exit(0);
|
||||
});
|
||||
|
||||
// =============================================================================
|
||||
// MAIN
|
||||
// =============================================================================
|
||||
@@ -89,7 +117,7 @@ process.stdin.on('end', async () => {
|
||||
|
||||
try {
|
||||
// Initialize database and create session record FIRST
|
||||
const db = initializeDatabase();
|
||||
db = initializeDatabase();
|
||||
|
||||
// Create session record immediately - this gives us a tracking ID
|
||||
const sessionRecord = createStreamingSession(db, {
|
||||
@@ -145,10 +173,17 @@ process.stdin.on('end', async () => {
|
||||
});
|
||||
}
|
||||
|
||||
// Close database connection
|
||||
db.close();
|
||||
} catch (error) {
|
||||
debugLog('UserPromptSubmit: Error starting SDK session', { error: error.message });
|
||||
debugLog('UserPromptSubmit: Error starting SDK session', { error: error.message, stack: error.stack });
|
||||
} finally {
|
||||
// Always close database connection
|
||||
if (db) {
|
||||
try {
|
||||
db.close();
|
||||
} catch (err) {
|
||||
debugLog('UserPromptSubmit: Error closing database', { error: err.message });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Return success to Claude Code
|
||||
|
||||
Reference in New Issue
Block a user