/** * Data Routes * * Handles data retrieval operations: observations, summaries, prompts, stats, processing status. * All endpoints use direct database access via service layer. */ import express, { Request, Response } from 'express'; import path from 'path'; import { readFileSync, statSync, existsSync } from 'fs'; import { logger } from '../../../../utils/logger.js'; import { homedir } from 'os'; import { getPackageRoot } from '../../../../shared/paths.js'; import { getWorkerPort } from '../../../../shared/worker-utils.js'; import { PaginationHelper } from '../../PaginationHelper.js'; import { DatabaseManager } from '../../DatabaseManager.js'; import { SessionManager } from '../../SessionManager.js'; import { SSEBroadcaster } from '../../SSEBroadcaster.js'; import type { WorkerService } from '../../../worker-service.js'; import { BaseRouteHandler } from '../BaseRouteHandler.js'; import { normalizePlatformSource } from '../../../../shared/platform-source.js'; import { getObservationsByFilePath } from '../../../sqlite/observations/get.js'; export class DataRoutes extends BaseRouteHandler { constructor( private paginationHelper: PaginationHelper, private dbManager: DatabaseManager, private sessionManager: SessionManager, private sseBroadcaster: SSEBroadcaster, private workerService: WorkerService, private startTime: number ) { super(); } setupRoutes(app: express.Application): void { // Pagination endpoints app.get('/api/observations', this.handleGetObservations.bind(this)); app.get('/api/summaries', this.handleGetSummaries.bind(this)); app.get('/api/prompts', this.handleGetPrompts.bind(this)); // Fetch by ID endpoints app.get('/api/observation/:id', this.handleGetObservationById.bind(this)); app.get('/api/observations/by-file', this.handleGetObservationsByFile.bind(this)); app.post('/api/observations/batch', this.handleGetObservationsByIds.bind(this)); app.get('/api/session/:id', this.handleGetSessionById.bind(this)); app.post('/api/sdk-sessions/batch', this.handleGetSdkSessionsByIds.bind(this)); app.get('/api/prompt/:id', this.handleGetPromptById.bind(this)); // Metadata endpoints app.get('/api/stats', this.handleGetStats.bind(this)); app.get('/api/projects', this.handleGetProjects.bind(this)); // Processing status endpoints app.get('/api/processing-status', this.handleGetProcessingStatus.bind(this)); app.post('/api/processing', this.handleSetProcessing.bind(this)); // Pending queue management endpoints app.get('/api/pending-queue', this.handleGetPendingQueue.bind(this)); app.post('/api/pending-queue/process', this.handleProcessPendingQueue.bind(this)); app.delete('/api/pending-queue/failed', this.handleClearFailedQueue.bind(this)); app.delete('/api/pending-queue/all', this.handleClearAllQueue.bind(this)); // Import endpoint app.post('/api/import', this.handleImport.bind(this)); } /** * Get paginated observations */ private handleGetObservations = this.wrapHandler((req: Request, res: Response): void => { const { offset, limit, project, platformSource } = this.parsePaginationParams(req); const result = this.paginationHelper.getObservations(offset, limit, project, platformSource); res.json(result); }); /** * Get paginated summaries */ private handleGetSummaries = this.wrapHandler((req: Request, res: Response): void => { const { offset, limit, project, platformSource } = this.parsePaginationParams(req); const result = this.paginationHelper.getSummaries(offset, limit, project, platformSource); res.json(result); }); /** * Get paginated user prompts */ private handleGetPrompts = this.wrapHandler((req: Request, res: Response): void => { const { offset, limit, project, platformSource } = this.parsePaginationParams(req); const result = this.paginationHelper.getPrompts(offset, limit, project, platformSource); res.json(result); }); /** * Get observation by ID * GET /api/observation/:id */ private handleGetObservationById = this.wrapHandler((req: Request, res: Response): void => { const id = this.parseIntParam(req, res, 'id'); if (id === null) return; const store = this.dbManager.getSessionStore(); const observation = store.getObservationById(id); if (!observation) { this.notFound(res, `Observation #${id} not found`); return; } res.json(observation); }); /** * Get observations associated with a file path, scoped to projects * GET /api/observations/by-file?path=&projects=&limit=15 */ private handleGetObservationsByFile = this.wrapHandler((req: Request, res: Response): void => { const filePath = req.query.path as string | undefined; if (!filePath) { this.badRequest(res, 'path query parameter is required'); return; } const projectsParam = req.query.projects as string | undefined; const projects = projectsParam ? projectsParam.split(',').filter(Boolean) : undefined; const parsedLimit = req.query.limit ? parseInt(req.query.limit as string, 10) : undefined; const limit = Number.isFinite(parsedLimit) && parsedLimit! > 0 ? parsedLimit : undefined; const db = this.dbManager.getSessionStore().db; const observations = getObservationsByFilePath(db, filePath, { projects, limit }); res.json({ observations, count: observations.length }); }); /** * Get observations by array of IDs * POST /api/observations/batch * Body: { ids: number[], orderBy?: 'date_desc' | 'date_asc', limit?: number, project?: string } */ private handleGetObservationsByIds = this.wrapHandler((req: Request, res: Response): void => { let { ids, orderBy, limit, project } = req.body; // Coerce string-encoded arrays from MCP clients (e.g. "[1,2,3]" or "1,2,3") if (typeof ids === 'string') { try { ids = JSON.parse(ids); } catch { ids = ids.split(',').map(Number); } } if (!ids || !Array.isArray(ids)) { this.badRequest(res, 'ids must be an array of numbers'); return; } if (ids.length === 0) { res.json([]); return; } // Validate all IDs are numbers if (!ids.every(id => typeof id === 'number' && Number.isInteger(id))) { this.badRequest(res, 'All ids must be integers'); return; } const store = this.dbManager.getSessionStore(); const observations = store.getObservationsByIds(ids, { orderBy, limit, project }); res.json(observations); }); /** * Get session by ID * GET /api/session/:id */ private handleGetSessionById = this.wrapHandler((req: Request, res: Response): void => { const id = this.parseIntParam(req, res, 'id'); if (id === null) return; const store = this.dbManager.getSessionStore(); const sessions = store.getSessionSummariesByIds([id]); if (sessions.length === 0) { this.notFound(res, `Session #${id} not found`); return; } res.json(sessions[0]); }); /** * Get SDK sessions by SDK session IDs * POST /api/sdk-sessions/batch * Body: { memorySessionIds: string[] } */ private handleGetSdkSessionsByIds = this.wrapHandler((req: Request, res: Response): void => { let { memorySessionIds } = req.body; // Coerce string-encoded arrays from MCP clients (e.g. '["a","b"]' or "a,b") if (typeof memorySessionIds === 'string') { try { memorySessionIds = JSON.parse(memorySessionIds); } catch { memorySessionIds = memorySessionIds.split(',').map((s: string) => s.trim()); } } if (!Array.isArray(memorySessionIds)) { this.badRequest(res, 'memorySessionIds must be an array'); return; } const store = this.dbManager.getSessionStore(); const sessions = store.getSdkSessionsBySessionIds(memorySessionIds); res.json(sessions); }); /** * Get user prompt by ID * GET /api/prompt/:id */ private handleGetPromptById = this.wrapHandler((req: Request, res: Response): void => { const id = this.parseIntParam(req, res, 'id'); if (id === null) return; const store = this.dbManager.getSessionStore(); const prompts = store.getUserPromptsByIds([id]); if (prompts.length === 0) { this.notFound(res, `Prompt #${id} not found`); return; } res.json(prompts[0]); }); /** * Get database statistics (with worker metadata) */ private handleGetStats = this.wrapHandler((req: Request, res: Response): void => { const db = this.dbManager.getSessionStore().db; // Read version from package.json const packageRoot = getPackageRoot(); const packageJsonPath = path.join(packageRoot, 'package.json'); const packageJson = JSON.parse(readFileSync(packageJsonPath, 'utf-8')); const version = packageJson.version; // Get database stats const totalObservations = db.prepare('SELECT COUNT(*) as count FROM observations').get() as { count: number }; const totalSessions = db.prepare('SELECT COUNT(*) as count FROM sdk_sessions').get() as { count: number }; const totalSummaries = db.prepare('SELECT COUNT(*) as count FROM session_summaries').get() as { count: number }; // Get database file size and path const dbPath = path.join(homedir(), '.claude-mem', 'claude-mem.db'); let dbSize = 0; if (existsSync(dbPath)) { dbSize = statSync(dbPath).size; } // Worker metadata const uptime = Math.floor((Date.now() - this.startTime) / 1000); const activeSessions = this.sessionManager.getActiveSessionCount(); const sseClients = this.sseBroadcaster.getClientCount(); res.json({ worker: { version, uptime, activeSessions, sseClients, port: getWorkerPort() }, database: { path: dbPath, size: dbSize, observations: totalObservations.count, sessions: totalSessions.count, summaries: totalSummaries.count } }); }); /** * Get list of distinct projects from observations * GET /api/projects */ private handleGetProjects = this.wrapHandler((req: Request, res: Response): void => { const store = this.dbManager.getSessionStore(); const rawPlatformSource = req.query.platformSource as string | undefined; const platformSource = rawPlatformSource ? normalizePlatformSource(rawPlatformSource) : undefined; if (platformSource) { const projects = store.getAllProjects(platformSource); res.json({ projects, sources: [platformSource], projectsBySource: { [platformSource]: projects } }); return; } res.json(store.getProjectCatalog()); }); /** * Get current processing status * GET /api/processing-status */ private handleGetProcessingStatus = this.wrapHandler((req: Request, res: Response): void => { const isProcessing = this.sessionManager.isAnySessionProcessing(); const queueDepth = this.sessionManager.getTotalActiveWork(); // Includes queued + actively processing res.json({ isProcessing, queueDepth }); }); /** * Set processing status (called by hooks) * NOTE: This now broadcasts computed status based on active processing (ignores input) */ private handleSetProcessing = this.wrapHandler((req: Request, res: Response): void => { // Broadcast current computed status (ignores manual input) this.workerService.broadcastProcessingStatus(); const isProcessing = this.sessionManager.isAnySessionProcessing(); const queueDepth = this.sessionManager.getTotalQueueDepth(); const activeSessions = this.sessionManager.getActiveSessionCount(); res.json({ status: 'ok', isProcessing, queueDepth, activeSessions }); }); /** * Parse pagination parameters from request query */ private parsePaginationParams(req: Request): { offset: number; limit: number; project?: string; platformSource?: string } { const offset = parseInt(req.query.offset as string, 10) || 0; const limit = Math.min(parseInt(req.query.limit as string, 10) || 20, 100); // Max 100 const project = req.query.project as string | undefined; const rawPlatformSource = req.query.platformSource as string | undefined; const platformSource = rawPlatformSource ? normalizePlatformSource(rawPlatformSource) : undefined; return { offset, limit, project, platformSource }; } /** * Import memories from export file * POST /api/import * Body: { sessions: [], summaries: [], observations: [], prompts: [] } */ private handleImport = this.wrapHandler((req: Request, res: Response): void => { const { sessions, summaries, observations, prompts } = req.body; const stats = { sessionsImported: 0, sessionsSkipped: 0, summariesImported: 0, summariesSkipped: 0, observationsImported: 0, observationsSkipped: 0, promptsImported: 0, promptsSkipped: 0 }; const store = this.dbManager.getSessionStore(); // Import sessions first (dependency for everything else) if (Array.isArray(sessions)) { for (const session of sessions) { const result = store.importSdkSession(session); if (result.imported) { stats.sessionsImported++; } else { stats.sessionsSkipped++; } } } // Import summaries (depends on sessions) if (Array.isArray(summaries)) { for (const summary of summaries) { const result = store.importSessionSummary(summary); if (result.imported) { stats.summariesImported++; } else { stats.summariesSkipped++; } } } // Import observations (depends on sessions) const importedObservations: Array<{ id: number; obs: typeof observations[0] }> = []; if (Array.isArray(observations)) { for (const obs of observations) { const result = store.importObservation(obs); if (result.imported) { stats.observationsImported++; importedObservations.push({ id: result.id, obs }); } else { stats.observationsSkipped++; } } // Rebuild FTS index so imported observations are immediately searchable. // The FTS5 content table relies on triggers for incremental updates, but // those triggers may not have fired correctly for all import paths. if (stats.observationsImported > 0) { store.rebuildObservationsFTSIndex(); } // Sync imported observations to ChromaDB for vector search. // Fire-and-forget: Chroma sync failure should not block the import response. // Bounded concurrency to prevent overwhelming Chroma on large imports. const chromaSync = this.dbManager.getChromaSync(); if (chromaSync && importedObservations.length > 0) { const CHROMA_SYNC_CONCURRENCY = 8; const safeParseJson = (val: string | null): string[] => { if (!val) return []; try { return JSON.parse(val); } catch { return []; } }; const syncOne = async ({ id, obs }: { id: number; obs: any }) => { const parsedObs = { type: obs.type || 'discovery', title: obs.title || null, subtitle: obs.subtitle || null, facts: safeParseJson(obs.facts), narrative: obs.narrative || null, concepts: safeParseJson(obs.concepts), files_read: safeParseJson(obs.files_read), files_modified: safeParseJson(obs.files_modified), }; await chromaSync.syncObservation( id, obs.memory_session_id, obs.project, parsedObs, obs.prompt_number || 0, obs.created_at_epoch, obs.discovery_tokens || 0 ).catch(err => { logger.error('CHROMA', 'Import ChromaDB sync failed', { id }, err as Error); }); }; // Fire-and-forget: process in batches but don't block the response (async () => { for (let i = 0; i < importedObservations.length; i += CHROMA_SYNC_CONCURRENCY) { const batch = importedObservations.slice(i, i + CHROMA_SYNC_CONCURRENCY); await Promise.all(batch.map(syncOne)); } })().catch(err => { logger.error('CHROMA', 'Import ChromaDB batch sync failed', {}, err as Error); }); } } // Import prompts (depends on sessions) if (Array.isArray(prompts)) { for (const prompt of prompts) { const result = store.importUserPrompt(prompt); if (result.imported) { stats.promptsImported++; } else { stats.promptsSkipped++; } } } res.json({ success: true, stats }); }); /** * Get pending queue contents * GET /api/pending-queue * Returns all pending, processing, and failed messages with optional recently processed */ private handleGetPendingQueue = this.wrapHandler((req: Request, res: Response): void => { const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js'); const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3); // Get queue contents (pending, processing, failed) const queueMessages = pendingStore.getQueueMessages(); // Get recently processed (last 30 min, up to 20) const recentlyProcessed = pendingStore.getRecentlyProcessed(20, 30); // Get stuck message count (processing > 5 min) const stuckCount = pendingStore.getStuckCount(5 * 60 * 1000); // Get sessions with pending work const sessionsWithPending = pendingStore.getSessionsWithPendingMessages(); res.json({ queue: { messages: queueMessages, totalPending: queueMessages.filter((m: { status: string }) => m.status === 'pending').length, totalProcessing: queueMessages.filter((m: { status: string }) => m.status === 'processing').length, totalFailed: queueMessages.filter((m: { status: string }) => m.status === 'failed').length, stuckCount }, recentlyProcessed, sessionsWithPendingWork: sessionsWithPending }); }); /** * Process pending queue * POST /api/pending-queue/process * Body: { sessionLimit?: number } - defaults to 10 * Starts SDK agents for sessions with pending messages */ private handleProcessPendingQueue = this.wrapHandler(async (req: Request, res: Response): Promise => { const sessionLimit = Math.min( Math.max(parseInt(req.body.sessionLimit, 10) || 10, 1), 100 // Max 100 sessions at once ); const result = await this.workerService.processPendingQueues(sessionLimit); res.json({ success: true, ...result }); }); /** * Clear all failed messages from the queue * DELETE /api/pending-queue/failed * Returns the number of messages cleared */ private handleClearFailedQueue = this.wrapHandler((req: Request, res: Response): void => { const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js'); const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3); const clearedCount = pendingStore.clearFailed(); logger.info('QUEUE', 'Cleared failed queue messages', { clearedCount }); res.json({ success: true, clearedCount }); }); /** * Clear all messages from the queue (pending, processing, and failed) * DELETE /api/pending-queue/all * Returns the number of messages cleared */ private handleClearAllQueue = this.wrapHandler((req: Request, res: Response): void => { const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js'); const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3); const clearedCount = pendingStore.clearAll(); logger.warn('QUEUE', 'Cleared ALL queue messages (pending, processing, failed)', { clearedCount }); res.json({ success: true, clearedCount }); }); }