Files
claude-mem/src/services/worker/http/routes/DataRoutes.ts
T
Octopus f7fd2221c8 fix: rebuild FTS5 index after bulk observation import (fixes #1631) (#1632)
Imported observations were invisible to the MCP search tool because the
FTS5 content table was not reliably updated during bulk import. The import
handler now calls rebuildObservationsFTSIndex() after inserting new
observations, ensuring the full-text search index is consistent.

A new SessionStore.rebuildObservationsFTSIndex() method encapsulates the
FTS5 rebuild command and is a no-op when the observations_fts table does
not exist (e.g. FTS5 unavailable on Windows).
2026-04-08 16:16:55 -07:00

514 lines
18 KiB
TypeScript

/**
* Data Routes
*
* Handles data retrieval operations: observations, summaries, prompts, stats, processing status.
* All endpoints use direct database access via service layer.
*/
import express, { Request, Response } from 'express';
import path from 'path';
import { readFileSync, statSync, existsSync } from 'fs';
import { logger } from '../../../../utils/logger.js';
import { homedir } from 'os';
import { getPackageRoot } from '../../../../shared/paths.js';
import { getWorkerPort } from '../../../../shared/worker-utils.js';
import { PaginationHelper } from '../../PaginationHelper.js';
import { DatabaseManager } from '../../DatabaseManager.js';
import { SessionManager } from '../../SessionManager.js';
import { SSEBroadcaster } from '../../SSEBroadcaster.js';
import type { WorkerService } from '../../../worker-service.js';
import { BaseRouteHandler } from '../BaseRouteHandler.js';
import { normalizePlatformSource } from '../../../../shared/platform-source.js';
import { getObservationsByFilePath } from '../../../sqlite/observations/get.js';
export class DataRoutes extends BaseRouteHandler {
constructor(
private paginationHelper: PaginationHelper,
private dbManager: DatabaseManager,
private sessionManager: SessionManager,
private sseBroadcaster: SSEBroadcaster,
private workerService: WorkerService,
private startTime: number
) {
super();
}
setupRoutes(app: express.Application): void {
// Pagination endpoints
app.get('/api/observations', this.handleGetObservations.bind(this));
app.get('/api/summaries', this.handleGetSummaries.bind(this));
app.get('/api/prompts', this.handleGetPrompts.bind(this));
// Fetch by ID endpoints
app.get('/api/observation/:id', this.handleGetObservationById.bind(this));
app.get('/api/observations/by-file', this.handleGetObservationsByFile.bind(this));
app.post('/api/observations/batch', this.handleGetObservationsByIds.bind(this));
app.get('/api/session/:id', this.handleGetSessionById.bind(this));
app.post('/api/sdk-sessions/batch', this.handleGetSdkSessionsByIds.bind(this));
app.get('/api/prompt/:id', this.handleGetPromptById.bind(this));
// Metadata endpoints
app.get('/api/stats', this.handleGetStats.bind(this));
app.get('/api/projects', this.handleGetProjects.bind(this));
// Processing status endpoints
app.get('/api/processing-status', this.handleGetProcessingStatus.bind(this));
app.post('/api/processing', this.handleSetProcessing.bind(this));
// Pending queue management endpoints
app.get('/api/pending-queue', this.handleGetPendingQueue.bind(this));
app.post('/api/pending-queue/process', this.handleProcessPendingQueue.bind(this));
app.delete('/api/pending-queue/failed', this.handleClearFailedQueue.bind(this));
app.delete('/api/pending-queue/all', this.handleClearAllQueue.bind(this));
// Import endpoint
app.post('/api/import', this.handleImport.bind(this));
}
/**
* Get paginated observations
*/
private handleGetObservations = this.wrapHandler((req: Request, res: Response): void => {
const { offset, limit, project, platformSource } = this.parsePaginationParams(req);
const result = this.paginationHelper.getObservations(offset, limit, project, platformSource);
res.json(result);
});
/**
* Get paginated summaries
*/
private handleGetSummaries = this.wrapHandler((req: Request, res: Response): void => {
const { offset, limit, project, platformSource } = this.parsePaginationParams(req);
const result = this.paginationHelper.getSummaries(offset, limit, project, platformSource);
res.json(result);
});
/**
* Get paginated user prompts
*/
private handleGetPrompts = this.wrapHandler((req: Request, res: Response): void => {
const { offset, limit, project, platformSource } = this.parsePaginationParams(req);
const result = this.paginationHelper.getPrompts(offset, limit, project, platformSource);
res.json(result);
});
/**
* Get observation by ID
* GET /api/observation/:id
*/
private handleGetObservationById = this.wrapHandler((req: Request, res: Response): void => {
const id = this.parseIntParam(req, res, 'id');
if (id === null) return;
const store = this.dbManager.getSessionStore();
const observation = store.getObservationById(id);
if (!observation) {
this.notFound(res, `Observation #${id} not found`);
return;
}
res.json(observation);
});
/**
* Get observations associated with a file path, scoped to projects
* GET /api/observations/by-file?path=<file_path>&projects=<comma,separated>&limit=15
*/
private handleGetObservationsByFile = this.wrapHandler((req: Request, res: Response): void => {
const filePath = req.query.path as string | undefined;
if (!filePath) {
this.badRequest(res, 'path query parameter is required');
return;
}
const projectsParam = req.query.projects as string | undefined;
const projects = projectsParam ? projectsParam.split(',').filter(Boolean) : undefined;
const parsedLimit = req.query.limit ? parseInt(req.query.limit as string, 10) : undefined;
const limit = Number.isFinite(parsedLimit) && parsedLimit! > 0 ? parsedLimit : undefined;
const db = this.dbManager.getSessionStore().db;
const observations = getObservationsByFilePath(db, filePath, { projects, limit });
res.json({ observations, count: observations.length });
});
/**
* Get observations by array of IDs
* POST /api/observations/batch
* Body: { ids: number[], orderBy?: 'date_desc' | 'date_asc', limit?: number, project?: string }
*/
private handleGetObservationsByIds = this.wrapHandler((req: Request, res: Response): void => {
let { ids, orderBy, limit, project } = req.body;
// Coerce string-encoded arrays from MCP clients (e.g. "[1,2,3]" or "1,2,3")
if (typeof ids === 'string') {
try { ids = JSON.parse(ids); } catch { ids = ids.split(',').map(Number); }
}
if (!ids || !Array.isArray(ids)) {
this.badRequest(res, 'ids must be an array of numbers');
return;
}
if (ids.length === 0) {
res.json([]);
return;
}
// Validate all IDs are numbers
if (!ids.every(id => typeof id === 'number' && Number.isInteger(id))) {
this.badRequest(res, 'All ids must be integers');
return;
}
const store = this.dbManager.getSessionStore();
const observations = store.getObservationsByIds(ids, { orderBy, limit, project });
res.json(observations);
});
/**
* Get session by ID
* GET /api/session/:id
*/
private handleGetSessionById = this.wrapHandler((req: Request, res: Response): void => {
const id = this.parseIntParam(req, res, 'id');
if (id === null) return;
const store = this.dbManager.getSessionStore();
const sessions = store.getSessionSummariesByIds([id]);
if (sessions.length === 0) {
this.notFound(res, `Session #${id} not found`);
return;
}
res.json(sessions[0]);
});
/**
* Get SDK sessions by SDK session IDs
* POST /api/sdk-sessions/batch
* Body: { memorySessionIds: string[] }
*/
private handleGetSdkSessionsByIds = this.wrapHandler((req: Request, res: Response): void => {
let { memorySessionIds } = req.body;
// Coerce string-encoded arrays from MCP clients (e.g. '["a","b"]' or "a,b")
if (typeof memorySessionIds === 'string') {
try { memorySessionIds = JSON.parse(memorySessionIds); } catch { memorySessionIds = memorySessionIds.split(',').map((s: string) => s.trim()); }
}
if (!Array.isArray(memorySessionIds)) {
this.badRequest(res, 'memorySessionIds must be an array');
return;
}
const store = this.dbManager.getSessionStore();
const sessions = store.getSdkSessionsBySessionIds(memorySessionIds);
res.json(sessions);
});
/**
* Get user prompt by ID
* GET /api/prompt/:id
*/
private handleGetPromptById = this.wrapHandler((req: Request, res: Response): void => {
const id = this.parseIntParam(req, res, 'id');
if (id === null) return;
const store = this.dbManager.getSessionStore();
const prompts = store.getUserPromptsByIds([id]);
if (prompts.length === 0) {
this.notFound(res, `Prompt #${id} not found`);
return;
}
res.json(prompts[0]);
});
/**
* Get database statistics (with worker metadata)
*/
private handleGetStats = this.wrapHandler((req: Request, res: Response): void => {
const db = this.dbManager.getSessionStore().db;
// Read version from package.json
const packageRoot = getPackageRoot();
const packageJsonPath = path.join(packageRoot, 'package.json');
const packageJson = JSON.parse(readFileSync(packageJsonPath, 'utf-8'));
const version = packageJson.version;
// Get database stats
const totalObservations = db.prepare('SELECT COUNT(*) as count FROM observations').get() as { count: number };
const totalSessions = db.prepare('SELECT COUNT(*) as count FROM sdk_sessions').get() as { count: number };
const totalSummaries = db.prepare('SELECT COUNT(*) as count FROM session_summaries').get() as { count: number };
// Get database file size and path
const dbPath = path.join(homedir(), '.claude-mem', 'claude-mem.db');
let dbSize = 0;
if (existsSync(dbPath)) {
dbSize = statSync(dbPath).size;
}
// Worker metadata
const uptime = Math.floor((Date.now() - this.startTime) / 1000);
const activeSessions = this.sessionManager.getActiveSessionCount();
const sseClients = this.sseBroadcaster.getClientCount();
res.json({
worker: {
version,
uptime,
activeSessions,
sseClients,
port: getWorkerPort()
},
database: {
path: dbPath,
size: dbSize,
observations: totalObservations.count,
sessions: totalSessions.count,
summaries: totalSummaries.count
}
});
});
/**
* Get list of distinct projects from observations
* GET /api/projects
*/
private handleGetProjects = this.wrapHandler((req: Request, res: Response): void => {
const store = this.dbManager.getSessionStore();
const rawPlatformSource = req.query.platformSource as string | undefined;
const platformSource = rawPlatformSource ? normalizePlatformSource(rawPlatformSource) : undefined;
if (platformSource) {
const projects = store.getAllProjects(platformSource);
res.json({
projects,
sources: [platformSource],
projectsBySource: { [platformSource]: projects }
});
return;
}
res.json(store.getProjectCatalog());
});
/**
* Get current processing status
* GET /api/processing-status
*/
private handleGetProcessingStatus = this.wrapHandler((req: Request, res: Response): void => {
const isProcessing = this.sessionManager.isAnySessionProcessing();
const queueDepth = this.sessionManager.getTotalActiveWork(); // Includes queued + actively processing
res.json({ isProcessing, queueDepth });
});
/**
* Set processing status (called by hooks)
* NOTE: This now broadcasts computed status based on active processing (ignores input)
*/
private handleSetProcessing = this.wrapHandler((req: Request, res: Response): void => {
// Broadcast current computed status (ignores manual input)
this.workerService.broadcastProcessingStatus();
const isProcessing = this.sessionManager.isAnySessionProcessing();
const queueDepth = this.sessionManager.getTotalQueueDepth();
const activeSessions = this.sessionManager.getActiveSessionCount();
res.json({ status: 'ok', isProcessing, queueDepth, activeSessions });
});
/**
* Parse pagination parameters from request query
*/
private parsePaginationParams(req: Request): { offset: number; limit: number; project?: string; platformSource?: string } {
const offset = parseInt(req.query.offset as string, 10) || 0;
const limit = Math.min(parseInt(req.query.limit as string, 10) || 20, 100); // Max 100
const project = req.query.project as string | undefined;
const rawPlatformSource = req.query.platformSource as string | undefined;
const platformSource = rawPlatformSource ? normalizePlatformSource(rawPlatformSource) : undefined;
return { offset, limit, project, platformSource };
}
/**
* Import memories from export file
* POST /api/import
* Body: { sessions: [], summaries: [], observations: [], prompts: [] }
*/
private handleImport = this.wrapHandler((req: Request, res: Response): void => {
const { sessions, summaries, observations, prompts } = req.body;
const stats = {
sessionsImported: 0,
sessionsSkipped: 0,
summariesImported: 0,
summariesSkipped: 0,
observationsImported: 0,
observationsSkipped: 0,
promptsImported: 0,
promptsSkipped: 0
};
const store = this.dbManager.getSessionStore();
// Import sessions first (dependency for everything else)
if (Array.isArray(sessions)) {
for (const session of sessions) {
const result = store.importSdkSession(session);
if (result.imported) {
stats.sessionsImported++;
} else {
stats.sessionsSkipped++;
}
}
}
// Import summaries (depends on sessions)
if (Array.isArray(summaries)) {
for (const summary of summaries) {
const result = store.importSessionSummary(summary);
if (result.imported) {
stats.summariesImported++;
} else {
stats.summariesSkipped++;
}
}
}
// Import observations (depends on sessions)
if (Array.isArray(observations)) {
for (const obs of observations) {
const result = store.importObservation(obs);
if (result.imported) {
stats.observationsImported++;
} else {
stats.observationsSkipped++;
}
}
// Rebuild FTS index so imported observations are immediately searchable.
// The FTS5 content table relies on triggers for incremental updates, but
// those triggers may not have fired correctly for all import paths.
if (stats.observationsImported > 0) {
store.rebuildObservationsFTSIndex();
}
}
// Import prompts (depends on sessions)
if (Array.isArray(prompts)) {
for (const prompt of prompts) {
const result = store.importUserPrompt(prompt);
if (result.imported) {
stats.promptsImported++;
} else {
stats.promptsSkipped++;
}
}
}
res.json({
success: true,
stats
});
});
/**
* Get pending queue contents
* GET /api/pending-queue
* Returns all pending, processing, and failed messages with optional recently processed
*/
private handleGetPendingQueue = this.wrapHandler((req: Request, res: Response): void => {
const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
// Get queue contents (pending, processing, failed)
const queueMessages = pendingStore.getQueueMessages();
// Get recently processed (last 30 min, up to 20)
const recentlyProcessed = pendingStore.getRecentlyProcessed(20, 30);
// Get stuck message count (processing > 5 min)
const stuckCount = pendingStore.getStuckCount(5 * 60 * 1000);
// Get sessions with pending work
const sessionsWithPending = pendingStore.getSessionsWithPendingMessages();
res.json({
queue: {
messages: queueMessages,
totalPending: queueMessages.filter((m: { status: string }) => m.status === 'pending').length,
totalProcessing: queueMessages.filter((m: { status: string }) => m.status === 'processing').length,
totalFailed: queueMessages.filter((m: { status: string }) => m.status === 'failed').length,
stuckCount
},
recentlyProcessed,
sessionsWithPendingWork: sessionsWithPending
});
});
/**
* Process pending queue
* POST /api/pending-queue/process
* Body: { sessionLimit?: number } - defaults to 10
* Starts SDK agents for sessions with pending messages
*/
private handleProcessPendingQueue = this.wrapHandler(async (req: Request, res: Response): Promise<void> => {
const sessionLimit = Math.min(
Math.max(parseInt(req.body.sessionLimit, 10) || 10, 1),
100 // Max 100 sessions at once
);
const result = await this.workerService.processPendingQueues(sessionLimit);
res.json({
success: true,
...result
});
});
/**
* Clear all failed messages from the queue
* DELETE /api/pending-queue/failed
* Returns the number of messages cleared
*/
private handleClearFailedQueue = this.wrapHandler((req: Request, res: Response): void => {
const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const clearedCount = pendingStore.clearFailed();
logger.info('QUEUE', 'Cleared failed queue messages', { clearedCount });
res.json({
success: true,
clearedCount
});
});
/**
* Clear all messages from the queue (pending, processing, and failed)
* DELETE /api/pending-queue/all
* Returns the number of messages cleared
*/
private handleClearAllQueue = this.wrapHandler((req: Request, res: Response): void => {
const { PendingMessageStore } = require('../../../sqlite/PendingMessageStore.js');
const pendingStore = new PendingMessageStore(this.dbManager.getSessionStore().db, 3);
const clearedCount = pendingStore.clearAll();
logger.warn('QUEUE', 'Cleared ALL queue messages (pending, processing, failed)', { clearedCount });
res.json({
success: true,
clearedCount
});
});
}