Files
claude-mem/src/services/worker/SessionManager.ts
T
Alex Newman 54ef1496d2 fix: Refresh in-memory session project when updated in database
**Problem:**
- Summaries created with empty project names even after database fix
- 13 summaries had empty project, couldn't appear in context-hook
- In-memory sessions cached stale empty project value

**Root Cause:**
SessionManager caches ActiveSession objects in memory. When reusing
an existing session, it never refreshed the project field from the
database. Even though createSDKSession() now updates the DB with the
correct project, the in-memory session.project remained empty, causing
summaries to be stored with empty project.

**Flow:**
1. Session created with empty project (cached in memory)
2. new-hook UPDATE fixes project in database
3. SessionManager reuses cached session (stale project="")
4. Summary stored with session.project (empty)
5. Context-hook can't find summary (WHERE project = 'claude-mem')

**Solution:**
When SessionManager.initializeSession() reuses an existing session,
refresh the project field from the database. This ensures summaries
and observations always use the latest project value.

**Impact:**
- Future summaries will have correct project name
- Backfilled 13 historical summaries with project='claude-mem'
- Context injection will now show recent summaries
- Both observations AND summaries fixed

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-23 16:14:53 -05:00

352 lines
11 KiB
TypeScript

/**
* SessionManager: Event-driven session lifecycle
*
* Responsibility:
* - Manage active session lifecycle
* - Handle event-driven message queues
* - Coordinate between HTTP requests and SDK agent
* - Zero-latency event notification (no polling)
*/
import { EventEmitter } from 'events';
import { DatabaseManager } from './DatabaseManager.js';
import { logger } from '../../utils/logger.js';
import { silentDebug } from '../../utils/silent-debug.js';
import type { ActiveSession, PendingMessage, ObservationData } from '../worker-types.js';
export class SessionManager {
private dbManager: DatabaseManager;
private sessions: Map<number, ActiveSession> = new Map();
private sessionQueues: Map<number, EventEmitter> = new Map();
private onSessionDeletedCallback?: () => void;
constructor(dbManager: DatabaseManager) {
this.dbManager = dbManager;
}
/**
* Set callback to be called when a session is deleted (for broadcasting status)
*/
setOnSessionDeleted(callback: () => void): void {
this.onSessionDeletedCallback = callback;
}
/**
* Initialize a new session or return existing one
*/
initializeSession(sessionDbId: number, currentUserPrompt?: string, promptNumber?: number): ActiveSession {
// Check if already active
let session = this.sessions.get(sessionDbId);
if (session) {
// Refresh project from database in case it was updated by new-hook
// This fixes the bug where sessions created with empty project get updated
// in the database but the in-memory session still has the stale empty value
const dbSession = this.dbManager.getSessionById(sessionDbId);
if (dbSession.project && dbSession.project !== session.project) {
silentDebug('[SessionManager] Updating project from database', {
sessionDbId,
oldProject: session.project,
newProject: dbSession.project
});
session.project = dbSession.project;
}
// Update userPrompt for continuation prompts
if (currentUserPrompt) {
silentDebug('[SessionManager] Updating userPrompt for continuation', {
sessionDbId,
promptNumber,
oldPrompt: session.userPrompt.substring(0, 80),
newPrompt: currentUserPrompt.substring(0, 80)
});
session.userPrompt = currentUserPrompt;
session.lastPromptNumber = promptNumber || session.lastPromptNumber;
} else {
silentDebug('[SessionManager] No currentUserPrompt provided for existing session', {
sessionDbId,
promptNumber,
usingCachedPrompt: session.userPrompt.substring(0, 80)
});
}
return session;
}
// Fetch from database
const dbSession = this.dbManager.getSessionById(sessionDbId);
// Use currentUserPrompt if provided, otherwise fall back to database (first prompt)
const userPrompt = currentUserPrompt || dbSession.user_prompt;
if (!currentUserPrompt) {
silentDebug('[SessionManager] No currentUserPrompt provided for new session, using database', {
sessionDbId,
promptNumber,
dbPrompt: dbSession.user_prompt.substring(0, 80)
});
} else {
silentDebug('[SessionManager] Initializing session with fresh userPrompt', {
sessionDbId,
promptNumber,
userPrompt: currentUserPrompt.substring(0, 80)
});
}
// Create active session
session = {
sessionDbId,
claudeSessionId: dbSession.claude_session_id,
sdkSessionId: null,
project: dbSession.project,
userPrompt,
pendingMessages: [],
abortController: new AbortController(),
generatorPromise: null,
lastPromptNumber: promptNumber || this.dbManager.getSessionStore().getPromptCounter(sessionDbId),
startTime: Date.now(),
cumulativeInputTokens: 0,
cumulativeOutputTokens: 0
};
this.sessions.set(sessionDbId, session);
// Create event emitter for queue notifications
const emitter = new EventEmitter();
this.sessionQueues.set(sessionDbId, emitter);
logger.info('SESSION', 'Session initialized', {
sessionId: sessionDbId,
project: session.project,
claudeSessionId: session.claudeSessionId,
queueDepth: 0,
hasGenerator: false
});
return session;
}
/**
* Get active session by ID
*/
getSession(sessionDbId: number): ActiveSession | undefined {
return this.sessions.get(sessionDbId);
}
/**
* Queue an observation for processing (zero-latency notification)
* Auto-initializes session if not in memory but exists in database
*/
queueObservation(sessionDbId: number, data: ObservationData): void {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
const beforeDepth = session.pendingMessages.length;
session.pendingMessages.push({
type: 'observation',
tool_name: data.tool_name,
tool_input: data.tool_input,
tool_response: data.tool_response,
prompt_number: data.prompt_number,
cwd: data.cwd
});
const afterDepth = session.pendingMessages.length;
// Notify generator immediately (zero latency)
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
// Format tool name for logging
const toolSummary = logger.formatTool(data.tool_name, data.tool_input);
logger.info('SESSION', `Observation queued (${beforeDepth}${afterDepth})`, {
sessionId: sessionDbId,
tool: toolSummary,
hasGenerator: !!session.generatorPromise
});
}
/**
* Queue a summarize request (zero-latency notification)
* Auto-initializes session if not in memory but exists in database
*/
queueSummarize(sessionDbId: number, lastUserMessage: string, lastAssistantMessage?: string): void {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
const beforeDepth = session.pendingMessages.length;
session.pendingMessages.push({
type: 'summarize',
last_user_message: lastUserMessage,
last_assistant_message: lastAssistantMessage
});
const afterDepth = session.pendingMessages.length;
const emitter = this.sessionQueues.get(sessionDbId);
emitter?.emit('message');
logger.info('SESSION', `Summarize queued (${beforeDepth}${afterDepth})`, {
sessionId: sessionDbId,
hasGenerator: !!session.generatorPromise
});
}
/**
* Delete a session (abort SDK agent and cleanup)
*/
async deleteSession(sessionDbId: number): Promise<void> {
const session = this.sessions.get(sessionDbId);
if (!session) {
return; // Already deleted
}
const sessionDuration = Date.now() - session.startTime;
// Abort the SDK agent
session.abortController.abort();
// Wait for generator to finish
if (session.generatorPromise) {
await session.generatorPromise.catch(() => {});
}
// Cleanup
this.sessions.delete(sessionDbId);
this.sessionQueues.delete(sessionDbId);
logger.info('SESSION', 'Session deleted', {
sessionId: sessionDbId,
duration: `${(sessionDuration / 1000).toFixed(1)}s`,
project: session.project
});
// Trigger callback to broadcast status update (spinner may need to stop)
if (this.onSessionDeletedCallback) {
this.onSessionDeletedCallback();
}
}
/**
* Shutdown all active sessions
*/
async shutdownAll(): Promise<void> {
const sessionIds = Array.from(this.sessions.keys());
await Promise.all(sessionIds.map(id => this.deleteSession(id)));
}
/**
* Check if any session has pending messages (for spinner tracking)
*/
hasPendingMessages(): boolean {
return Array.from(this.sessions.values()).some(
session => session.pendingMessages.length > 0
);
}
/**
* Get number of active sessions (for stats)
*/
getActiveSessionCount(): number {
return this.sessions.size;
}
/**
* Get total queue depth across all sessions (for activity indicator)
*/
getTotalQueueDepth(): number {
let total = 0;
for (const session of this.sessions.values()) {
total += session.pendingMessages.length;
}
return total;
}
/**
* Get total active work (queued + currently processing)
* Counts both pending messages and items actively being processed by SDK agents
*/
getTotalActiveWork(): number {
let total = 0;
for (const session of this.sessions.values()) {
// Count queued messages
total += session.pendingMessages.length;
// Count currently processing item (1 per active generator)
if (session.generatorPromise !== null) {
total += 1;
}
}
return total;
}
/**
* Check if any session is actively processing (has pending messages OR active generator)
* Used for activity indicator to prevent spinner from stopping while SDK is processing
*/
isAnySessionProcessing(): boolean {
for (const session of this.sessions.values()) {
// Has queued messages waiting to be processed
if (session.pendingMessages.length > 0) {
return true;
}
// Has active SDK generator running (processing dequeued messages)
if (session.generatorPromise !== null) {
return true;
}
}
return false;
}
/**
* Get message iterator for SDKAgent to consume (event-driven, no polling)
* Auto-initializes session if not in memory but exists in database
*/
async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessage> {
// Auto-initialize from database if needed (handles worker restarts)
let session = this.sessions.get(sessionDbId);
if (!session) {
session = this.initializeSession(sessionDbId);
}
const emitter = this.sessionQueues.get(sessionDbId);
if (!emitter) {
throw new Error(`No emitter for session ${sessionDbId}`);
}
while (!session.abortController.signal.aborted) {
// Wait for messages if queue is empty
if (session.pendingMessages.length === 0) {
await new Promise<void>(resolve => {
const handler = () => resolve();
emitter.once('message', handler);
// Also listen for abort
session.abortController.signal.addEventListener('abort', () => {
emitter.off('message', handler);
resolve();
}, { once: true });
});
}
// Yield all pending messages
while (session.pendingMessages.length > 0) {
const message = session.pendingMessages.shift()!;
yield message;
// If we just yielded a summary, that's the end of this batch - stop the iterator
if (message.type === 'summarize') {
logger.info('SESSION', `Summary yielded - ending generator`, { sessionId: sessionDbId });
return;
}
}
}
}
}