Files
claude-mem/src/services/worker-service.ts
T
Alex Newman ca4f046777 feat: Add search skill with progressive disclosure and refactor existing skills
Enhancements:
- Added search skill with 10 HTTP API endpoints for memory queries
- Refactored version-bump and troubleshoot skills using progressive disclosure pattern
- Added operations/ subdirectories for detailed skill documentation
- Updated CLAUDE.md with skill-based search architecture
- Enhanced worker service with search API endpoints
- Updated CHANGELOG.md with v5.4.0 migration details

Technical changes:
- New plugin/skills/search/ directory with SKILL.md
- New .claude/skills/version-bump/operations/ (workflow.md, scenarios.md)
- New plugin/skills/troubleshoot/operations/ (common-issues.md, worker.md)
- Modified src/services/worker-service.ts (added search endpoints)
- Modified plugin/scripts/worker-service.cjs (rebuilt with search API)
- Reduced main skill files by 89% using progressive disclosure
- Token savings: ~2,250 tokens per session start

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-09 18:41:53 -05:00

1257 lines
42 KiB
TypeScript

/**
* Worker Service v2: Clean Object-Oriented Architecture
*
* This is a complete rewrite following the architecture document.
* Key improvements:
* - Single database connection (no open/close churn)
* - Event-driven queues (zero polling)
* - DRY utilities for pagination and settings
* - Clean separation of concerns
* - ~600-700 lines (down from 1173)
*/
import express, { Request, Response } from 'express';
import cors from 'cors';
import http from 'http';
import path from 'path';
import { readFileSync, writeFileSync, statSync, existsSync } from 'fs';
import { homedir } from 'os';
import { getPackageRoot } from '../shared/paths.js';
import { getWorkerPort } from '../shared/worker-utils.js';
import { logger } from '../utils/logger.js';
// Import composed services
import { DatabaseManager } from './worker/DatabaseManager.js';
import { SessionManager } from './worker/SessionManager.js';
import { SSEBroadcaster } from './worker/SSEBroadcaster.js';
import { SDKAgent } from './worker/SDKAgent.js';
import { PaginationHelper } from './worker/PaginationHelper.js';
import { SettingsManager } from './worker/SettingsManager.js';
export class WorkerService {
private app: express.Application;
private server: http.Server | null = null;
private startTime: number = Date.now();
// Composed services
private dbManager: DatabaseManager;
private sessionManager: SessionManager;
private sseBroadcaster: SSEBroadcaster;
private sdkAgent: SDKAgent;
private paginationHelper: PaginationHelper;
private settingsManager: SettingsManager;
// Processing status tracking for viewer UI spinner
private isProcessing: boolean = false;
constructor() {
this.app = express();
// Initialize services (dependency injection)
this.dbManager = new DatabaseManager();
this.sessionManager = new SessionManager(this.dbManager);
this.sseBroadcaster = new SSEBroadcaster();
this.sdkAgent = new SDKAgent(this.dbManager, this.sessionManager);
this.paginationHelper = new PaginationHelper(this.dbManager);
this.settingsManager = new SettingsManager(this.dbManager);
this.setupMiddleware();
this.setupRoutes();
}
/**
* Setup Express middleware
*/
private setupMiddleware(): void {
this.app.use(express.json({ limit: '50mb' }));
this.app.use(cors());
// Serve static files for web UI (viewer-bundle.js, logos, fonts, etc.)
const packageRoot = getPackageRoot();
const uiDir = path.join(packageRoot, 'plugin', 'ui');
this.app.use(express.static(uiDir));
}
/**
* Setup HTTP routes
*/
private setupRoutes(): void {
// Health & Viewer
this.app.get('/health', this.handleHealth.bind(this));
this.app.get('/', this.handleViewerUI.bind(this));
this.app.get('/stream', this.handleSSEStream.bind(this));
// Session endpoints
this.app.post('/sessions/:sessionDbId/init', this.handleSessionInit.bind(this));
this.app.post('/sessions/:sessionDbId/observations', this.handleObservations.bind(this));
this.app.post('/sessions/:sessionDbId/summarize', this.handleSummarize.bind(this));
this.app.get('/sessions/:sessionDbId/status', this.handleSessionStatus.bind(this));
this.app.delete('/sessions/:sessionDbId', this.handleSessionDelete.bind(this));
this.app.post('/sessions/:sessionDbId/complete', this.handleSessionComplete.bind(this));
// Data retrieval
this.app.get('/api/observations', this.handleGetObservations.bind(this));
this.app.get('/api/summaries', this.handleGetSummaries.bind(this));
this.app.get('/api/prompts', this.handleGetPrompts.bind(this));
this.app.get('/api/stats', this.handleGetStats.bind(this));
this.app.get('/api/processing-status', this.handleGetProcessingStatus.bind(this));
this.app.post('/api/processing', this.handleSetProcessing.bind(this));
// Settings
this.app.get('/api/settings', this.handleGetSettings.bind(this));
this.app.post('/api/settings', this.handleUpdateSettings.bind(this));
// Search API endpoints (for skill-based search)
this.app.get('/api/search/observations', this.handleSearchObservations.bind(this));
this.app.get('/api/search/sessions', this.handleSearchSessions.bind(this));
this.app.get('/api/search/prompts', this.handleSearchPrompts.bind(this));
this.app.get('/api/search/by-concept', this.handleSearchByConcept.bind(this));
this.app.get('/api/search/by-file', this.handleSearchByFile.bind(this));
this.app.get('/api/search/by-type', this.handleSearchByType.bind(this));
this.app.get('/api/context/recent', this.handleGetRecentContext.bind(this));
this.app.get('/api/context/timeline', this.handleGetContextTimeline.bind(this));
this.app.get('/api/timeline/by-query', this.handleGetTimelineByQuery.bind(this));
this.app.get('/api/search/help', this.handleSearchHelp.bind(this));
}
/**
* Start the worker service
*/
async start(): Promise<void> {
// Initialize database (once, stays open)
await this.dbManager.initialize();
// Start HTTP server
const port = getWorkerPort();
this.server = await new Promise<http.Server>((resolve, reject) => {
const srv = this.app.listen(port, () => resolve(srv));
srv.on('error', reject);
});
logger.info('SYSTEM', 'Worker started', { port, pid: process.pid });
}
/**
* Shutdown the worker service
*/
async shutdown(): Promise<void> {
// Shutdown all active sessions
await this.sessionManager.shutdownAll();
// Close HTTP server
if (this.server) {
await new Promise<void>((resolve, reject) => {
this.server!.close(err => err ? reject(err) : resolve());
});
}
// Close database connection
await this.dbManager.close();
logger.info('SYSTEM', 'Worker shutdown complete');
}
// ============================================================================
// Route Handlers
// ============================================================================
/**
* Health check endpoint
*/
private handleHealth(req: Request, res: Response): void {
res.json({ status: 'ok', timestamp: Date.now() });
}
/**
* Serve viewer UI
*/
private handleViewerUI(req: Request, res: Response): void {
try {
const packageRoot = getPackageRoot();
const viewerPath = path.join(packageRoot, 'plugin', 'ui', 'viewer.html');
const html = readFileSync(viewerPath, 'utf-8');
res.setHeader('Content-Type', 'text/html');
res.send(html);
} catch (error) {
logger.failure('WORKER', 'Viewer UI error', {}, error as Error);
res.status(500).json({ error: 'Failed to load viewer UI' });
}
}
/**
* SSE stream endpoint
*/
private handleSSEStream(req: Request, res: Response): void {
// Setup SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
// Add client to broadcaster
this.sseBroadcaster.addClient(res);
// Send initial_load event with projects list
const allProjects = this.dbManager.getSessionStore().getAllProjects();
this.sseBroadcaster.broadcast({
type: 'initial_load',
projects: allProjects,
timestamp: Date.now()
});
// Send initial processing status
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing: this.isProcessing
});
}
/**
* Initialize a new session
*/
private handleSessionInit(req: Request, res: Response): void {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const session = this.sessionManager.initializeSession(sessionDbId);
// Get the latest user_prompt for this session to sync to Chroma
const db = this.dbManager.getSessionStore().db;
const latestPrompt = db.prepare(`
SELECT
up.*,
s.sdk_session_id,
s.project
FROM user_prompts up
JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id
WHERE up.claude_session_id = ?
ORDER BY up.created_at_epoch DESC
LIMIT 1
`).get(session.claudeSessionId) as any;
// Broadcast new prompt to SSE clients (for web UI)
if (latestPrompt) {
this.sseBroadcaster.broadcast({
type: 'new_prompt',
prompt: {
id: latestPrompt.id,
claude_session_id: latestPrompt.claude_session_id,
project: latestPrompt.project,
prompt_number: latestPrompt.prompt_number,
prompt_text: latestPrompt.prompt_text,
created_at_epoch: latestPrompt.created_at_epoch
}
});
// Sync user prompt to Chroma (fire-and-forget)
this.dbManager.getChromaSync().syncUserPrompt(
latestPrompt.id,
latestPrompt.sdk_session_id,
latestPrompt.project,
latestPrompt.prompt_text,
latestPrompt.prompt_number,
latestPrompt.created_at_epoch
).catch(err => {
logger.error('WORKER', 'Failed to sync user_prompt to Chroma', { promptId: latestPrompt.id }, err);
});
}
// Start processing indicator
this.broadcastProcessingStatus(true);
// Start SDK agent in background (pass worker ref for spinner control)
this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
});
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_started',
sessionDbId,
project: session.project
});
res.json({ status: 'initialized', sessionDbId, port: getWorkerPort() });
} catch (error) {
logger.failure('WORKER', 'Session init failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Queue observations for processing
* CRITICAL: Ensures SDK agent is running to process the queue (ALWAYS SAVE EVERYTHING)
*/
private handleObservations(req: Request, res: Response): void {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const { tool_name, tool_input, tool_response, prompt_number } = req.body;
this.sessionManager.queueObservation(sessionDbId, {
tool_name,
tool_input,
tool_response,
prompt_number
});
// CRITICAL: Ensure SDK agent is running to consume the queue
const session = this.sessionManager.getSession(sessionDbId);
if (session && !session.generatorPromise) {
session.generatorPromise = this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
});
}
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'observation_queued',
sessionDbId
});
res.json({ status: 'queued' });
} catch (error) {
logger.failure('WORKER', 'Observation queuing failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Queue summarize request
* CRITICAL: Ensures SDK agent is running to process the queue (ALWAYS SAVE EVERYTHING)
*/
private handleSummarize(req: Request, res: Response): void {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
this.sessionManager.queueSummarize(sessionDbId);
// CRITICAL: Ensure SDK agent is running to consume the queue
const session = this.sessionManager.getSession(sessionDbId);
if (session && !session.generatorPromise) {
session.generatorPromise = this.sdkAgent.startSession(session, this).catch(err => {
logger.failure('WORKER', 'SDK agent error', { sessionId: sessionDbId }, err);
});
}
res.json({ status: 'queued' });
} catch (error) {
logger.failure('WORKER', 'Summarize queuing failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get session status
*/
private handleSessionStatus(req: Request, res: Response): void {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
const session = this.sessionManager.getSession(sessionDbId);
if (!session) {
res.json({ status: 'not_found' });
return;
}
res.json({
status: 'active',
sessionDbId,
project: session.project,
queueLength: session.pendingMessages.length,
uptime: Date.now() - session.startTime
});
} catch (error) {
logger.failure('WORKER', 'Session status failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Delete a session
*/
private async handleSessionDelete(req: Request, res: Response): Promise<void> {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_completed',
sessionDbId
});
res.json({ status: 'deleted' });
} catch (error) {
logger.failure('WORKER', 'Session delete failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Complete a session (backward compatibility for cleanup-hook)
* cleanup-hook expects POST /sessions/:sessionDbId/complete instead of DELETE
*/
private async handleSessionComplete(req: Request, res: Response): Promise<void> {
try {
const sessionDbId = parseInt(req.params.sessionDbId, 10);
if (isNaN(sessionDbId)) {
res.status(400).json({ success: false, error: 'Invalid session ID' });
return;
}
await this.sessionManager.deleteSession(sessionDbId);
// Mark session complete in database
this.dbManager.markSessionComplete(sessionDbId);
// Stop processing indicator
this.broadcastProcessingStatus(false);
// Broadcast SSE event
this.sseBroadcaster.broadcast({
type: 'session_completed',
timestamp: Date.now(),
sessionDbId
});
res.json({ success: true });
} catch (error) {
logger.failure('WORKER', 'Session complete failed', {}, error as Error);
res.status(500).json({ success: false, error: String(error) });
}
}
/**
* Get paginated observations
*/
private handleGetObservations(req: Request, res: Response): void {
try {
const { offset, limit, project } = parsePaginationParams(req);
const result = this.paginationHelper.getObservations(offset, limit, project);
res.json(result);
} catch (error) {
logger.failure('WORKER', 'Get observations failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get paginated summaries
*/
private handleGetSummaries(req: Request, res: Response): void {
try {
const { offset, limit, project } = parsePaginationParams(req);
const result = this.paginationHelper.getSummaries(offset, limit, project);
res.json(result);
} catch (error) {
logger.failure('WORKER', 'Get summaries failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get paginated user prompts
*/
private handleGetPrompts(req: Request, res: Response): void {
try {
const { offset, limit, project } = parsePaginationParams(req);
const result = this.paginationHelper.getPrompts(offset, limit, project);
res.json(result);
} catch (error) {
logger.failure('WORKER', 'Get prompts failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get database statistics (with worker metadata)
*/
private handleGetStats(req: Request, res: Response): void {
try {
const db = this.dbManager.getSessionStore().db;
// Read version from package.json
const packageRoot = getPackageRoot();
const packageJsonPath = path.join(packageRoot, 'package.json');
const packageJson = JSON.parse(readFileSync(packageJsonPath, 'utf-8'));
const version = packageJson.version;
// Get database stats
const totalObservations = db.prepare('SELECT COUNT(*) as count FROM observations').get() as { count: number };
const totalSessions = db.prepare('SELECT COUNT(*) as count FROM sdk_sessions').get() as { count: number };
const totalSummaries = db.prepare('SELECT COUNT(*) as count FROM session_summaries').get() as { count: number };
// Get database file size and path
const dbPath = path.join(homedir(), '.claude-mem', 'claude-mem.db');
let dbSize = 0;
if (existsSync(dbPath)) {
dbSize = statSync(dbPath).size;
}
// Worker metadata
const uptime = Math.floor((Date.now() - this.startTime) / 1000);
const activeSessions = this.sessionManager.getActiveSessionCount();
const sseClients = this.sseBroadcaster.getClientCount();
res.json({
worker: {
version,
uptime,
activeSessions,
sseClients,
port: getWorkerPort()
},
database: {
path: dbPath,
size: dbSize,
observations: totalObservations.count,
sessions: totalSessions.count,
summaries: totalSummaries.count
}
});
} catch (error) {
logger.failure('WORKER', 'Get stats failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get environment settings (from ~/.claude/settings.json)
*/
private handleGetSettings(req: Request, res: Response): void {
try {
const settingsPath = path.join(homedir(), '.claude', 'settings.json');
if (!existsSync(settingsPath)) {
// Return defaults if file doesn't exist
res.json({
CLAUDE_MEM_MODEL: 'claude-haiku-4-5',
CLAUDE_MEM_CONTEXT_OBSERVATIONS: '50',
CLAUDE_MEM_WORKER_PORT: '37777'
});
return;
}
const settingsData = readFileSync(settingsPath, 'utf-8');
const settings = JSON.parse(settingsData);
const env = settings.env || {};
res.json({
CLAUDE_MEM_MODEL: env.CLAUDE_MEM_MODEL || 'claude-haiku-4-5',
CLAUDE_MEM_CONTEXT_OBSERVATIONS: env.CLAUDE_MEM_CONTEXT_OBSERVATIONS || '50',
CLAUDE_MEM_WORKER_PORT: env.CLAUDE_MEM_WORKER_PORT || '37777'
});
} catch (error) {
logger.failure('WORKER', 'Get settings failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Update environment settings (in ~/.claude/settings.json) with validation
*/
private handleUpdateSettings(req: Request, res: Response): void {
try {
const { CLAUDE_MEM_MODEL, CLAUDE_MEM_CONTEXT_OBSERVATIONS, CLAUDE_MEM_WORKER_PORT } = req.body;
// Validate inputs
if (CLAUDE_MEM_CONTEXT_OBSERVATIONS) {
const obsCount = parseInt(CLAUDE_MEM_CONTEXT_OBSERVATIONS, 10);
if (isNaN(obsCount) || obsCount < 1 || obsCount > 200) {
res.status(400).json({
success: false,
error: 'CLAUDE_MEM_CONTEXT_OBSERVATIONS must be between 1 and 200'
});
return;
}
}
if (CLAUDE_MEM_WORKER_PORT) {
const port = parseInt(CLAUDE_MEM_WORKER_PORT, 10);
if (isNaN(port) || port < 1024 || port > 65535) {
res.status(400).json({
success: false,
error: 'CLAUDE_MEM_WORKER_PORT must be between 1024 and 65535'
});
return;
}
}
// Read existing settings
const settingsPath = path.join(homedir(), '.claude', 'settings.json');
let settings: any = { env: {} };
if (existsSync(settingsPath)) {
const settingsData = readFileSync(settingsPath, 'utf-8');
settings = JSON.parse(settingsData);
if (!settings.env) {
settings.env = {};
}
}
// Update settings
if (CLAUDE_MEM_MODEL) {
settings.env.CLAUDE_MEM_MODEL = CLAUDE_MEM_MODEL;
}
if (CLAUDE_MEM_CONTEXT_OBSERVATIONS) {
settings.env.CLAUDE_MEM_CONTEXT_OBSERVATIONS = CLAUDE_MEM_CONTEXT_OBSERVATIONS;
}
if (CLAUDE_MEM_WORKER_PORT) {
settings.env.CLAUDE_MEM_WORKER_PORT = CLAUDE_MEM_WORKER_PORT;
}
// Write back
writeFileSync(settingsPath, JSON.stringify(settings, null, 2), 'utf-8');
logger.info('WORKER', 'Settings updated');
res.json({ success: true, message: 'Settings updated successfully' });
} catch (error) {
logger.failure('WORKER', 'Update settings failed', {}, error as Error);
res.status(500).json({ success: false, error: String(error) });
}
}
/**
* Get processing status (for viewer UI spinner)
*/
private handleGetProcessingStatus(req: Request, res: Response): void {
res.json({ isProcessing: this.isProcessing });
}
// ============================================================================
// Processing Status Helpers
// ============================================================================
/**
* Broadcast processing status change to SSE clients
*/
broadcastProcessingStatus(isProcessing: boolean): void {
this.isProcessing = isProcessing;
this.sseBroadcaster.broadcast({
type: 'processing_status',
isProcessing
});
}
/**
* Set processing status (called by hooks)
*/
private handleSetProcessing(req: Request, res: Response): void {
try {
const { isProcessing } = req.body;
if (typeof isProcessing !== 'boolean') {
res.status(400).json({ error: 'isProcessing must be a boolean' });
return;
}
this.broadcastProcessingStatus(isProcessing);
logger.debug('WORKER', 'Processing status updated', { isProcessing });
res.json({ status: 'ok', isProcessing });
} catch (error) {
logger.failure('WORKER', 'Failed to set processing status', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
// ============================================================================
// Search API Handlers (for skill-based search)
// ============================================================================
/**
* Search observations
* GET /api/search/observations?query=...&format=index&limit=20&project=...
*/
private handleSearchObservations(req: Request, res: Response): void {
try {
const query = req.query.query as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 20;
const project = req.query.project as string | undefined;
if (!query) {
res.status(400).json({ error: 'Missing required parameter: query' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.searchObservations(query, { limit, project });
res.json({
query,
count: results.length,
format,
results: format === 'index' ? results.map(r => ({
id: r.id,
type: r.type,
title: r.title,
subtitle: r.subtitle,
created_at_epoch: r.created_at_epoch,
project: r.project,
score: r.score
})) : results
});
} catch (error) {
logger.failure('WORKER', 'Search observations failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Search session summaries
* GET /api/search/sessions?query=...&format=index&limit=20
*/
private handleSearchSessions(req: Request, res: Response): void {
try {
const query = req.query.query as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 20;
if (!query) {
res.status(400).json({ error: 'Missing required parameter: query' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.searchSessions(query, { limit });
res.json({
query,
count: results.length,
format,
results: format === 'index' ? results.map(r => ({
id: r.id,
request: r.request,
completed: r.completed,
created_at_epoch: r.created_at_epoch,
project: r.project,
score: r.score
})) : results
});
} catch (error) {
logger.failure('WORKER', 'Search sessions failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Search user prompts
* GET /api/search/prompts?query=...&format=index&limit=20
*/
private handleSearchPrompts(req: Request, res: Response): void {
try {
const query = req.query.query as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 20;
const project = req.query.project as string | undefined;
if (!query) {
res.status(400).json({ error: 'Missing required parameter: query' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.searchUserPrompts(query, { limit, project });
res.json({
query,
count: results.length,
format,
results: format === 'index' ? results.map(r => ({
id: r.id,
claude_session_id: r.claude_session_id,
prompt_number: r.prompt_number,
prompt_text: r.prompt_text,
created_at_epoch: r.created_at_epoch,
score: r.score
})) : results
});
} catch (error) {
logger.failure('WORKER', 'Search prompts failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Search observations by concept
* GET /api/search/by-concept?concept=discovery&format=index&limit=5
*/
private handleSearchByConcept(req: Request, res: Response): void {
try {
const concept = req.query.concept as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 10;
const project = req.query.project as string | undefined;
if (!concept) {
res.status(400).json({ error: 'Missing required parameter: concept' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.findByConcept(concept, { limit, project });
res.json({
concept,
count: results.length,
format,
results: format === 'index' ? results.map(r => ({
id: r.id,
type: r.type,
title: r.title,
subtitle: r.subtitle,
created_at_epoch: r.created_at_epoch,
project: r.project,
concepts: r.concepts
})) : results
});
} catch (error) {
logger.failure('WORKER', 'Search by concept failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Search by file path
* GET /api/search/by-file?filePath=...&format=index&limit=10
*/
private handleSearchByFile(req: Request, res: Response): void {
try {
const filePath = req.query.filePath as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 10;
const project = req.query.project as string | undefined;
if (!filePath) {
res.status(400).json({ error: 'Missing required parameter: filePath' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.findByFile(filePath, { limit, project });
res.json({
filePath,
count: results.observations.length + results.sessions.length,
format,
results: {
observations: format === 'index' ? results.observations.map(r => ({
id: r.id,
type: r.type,
title: r.title,
subtitle: r.subtitle,
created_at_epoch: r.created_at_epoch,
project: r.project
})) : results.observations,
sessions: format === 'index' ? results.sessions.map(r => ({
id: r.id,
request: r.request,
completed: r.completed,
created_at_epoch: r.created_at_epoch,
project: r.project
})) : results.sessions
}
});
} catch (error) {
logger.failure('WORKER', 'Search by file failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Search observations by type
* GET /api/search/by-type?type=bugfix&format=index&limit=10
*/
private handleSearchByType(req: Request, res: Response): void {
try {
const type = req.query.type as string;
const format = (req.query.format as string) || 'full';
const limit = parseInt(req.query.limit as string, 10) || 10;
const project = req.query.project as string | undefined;
if (!type) {
res.status(400).json({ error: 'Missing required parameter: type' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const results = sessionSearch.findByType(type as any, { limit, project });
res.json({
type,
count: results.length,
format,
results: format === 'index' ? results.map(r => ({
id: r.id,
type: r.type,
title: r.title,
subtitle: r.subtitle,
created_at_epoch: r.created_at_epoch,
project: r.project
})) : results
});
} catch (error) {
logger.failure('WORKER', 'Search by type failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get recent context (summaries and observations for a project)
* GET /api/context/recent?project=...&limit=3
*/
private handleGetRecentContext(req: Request, res: Response): void {
try {
const project = (req.query.project as string) || path.basename(process.cwd());
const limit = parseInt(req.query.limit as string, 10) || 3;
const sessionStore = this.dbManager.getSessionStore();
const sessions = sessionStore.getRecentSessionsWithStatus(project, limit);
const contextData = sessions.map(session => {
const summary = session.has_summary && session.sdk_session_id
? sessionStore.getSummaryForSession(session.sdk_session_id)
: null;
const observations = session.sdk_session_id
? sessionStore.getObservationsForSession(session.sdk_session_id)
: [];
return {
session_id: session.id,
sdk_session_id: session.sdk_session_id,
project: session.project,
status: session.status,
has_summary: session.has_summary,
summary,
observations: observations.map(o => ({
id: o.id,
type: o.type,
title: o.title,
subtitle: o.subtitle,
created_at_epoch: o.created_at_epoch
})),
created_at_epoch: session.started_at_epoch
};
});
res.json({
project,
limit,
count: contextData.length,
sessions: contextData
});
} catch (error) {
logger.failure('WORKER', 'Get recent context failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get context timeline around an anchor point
* GET /api/context/timeline?anchor=123&depth_before=10&depth_after=10&project=...
*/
private handleGetContextTimeline(req: Request, res: Response): void {
try {
const anchor = req.query.anchor as string;
const depthBefore = parseInt(req.query.depth_before as string, 10) || 10;
const depthAfter = parseInt(req.query.depth_after as string, 10) || 10;
const project = req.query.project as string | undefined;
if (!anchor) {
res.status(400).json({ error: 'Missing required parameter: anchor' });
return;
}
const sessionStore = this.dbManager.getSessionStore();
let timeline;
// Check if anchor is a number (observation ID)
if (/^\d+$/.test(anchor)) {
const obsId = parseInt(anchor, 10);
const obs = sessionStore.getObservationById(obsId);
if (!obs) {
res.status(404).json({ error: `Observation #${obsId} not found` });
return;
}
timeline = sessionStore.getTimelineAroundObservation(obsId, obs.created_at_epoch, depthBefore, depthAfter, project);
} else if (anchor.startsWith('S') || anchor.startsWith('#S')) {
// Session ID
const sessionId = anchor.replace(/^#?S/, '');
const sessionNum = parseInt(sessionId, 10);
const sessions = sessionStore.getSessionSummariesByIds([sessionNum]);
if (sessions.length === 0) {
res.status(404).json({ error: `Session #${sessionNum} not found` });
return;
}
timeline = sessionStore.getTimelineAroundTimestamp(sessions[0].created_at_epoch, depthBefore, depthAfter, project);
} else {
// ISO timestamp
const date = new Date(anchor);
if (isNaN(date.getTime())) {
res.status(400).json({ error: `Invalid timestamp: ${anchor}` });
return;
}
timeline = sessionStore.getTimelineAroundTimestamp(date.getTime(), depthBefore, depthAfter, project);
}
res.json({
anchor,
depth_before: depthBefore,
depth_after: depthAfter,
project,
timeline
});
} catch (error) {
logger.failure('WORKER', 'Get context timeline failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get timeline by query (search first, then get timeline around best match)
* GET /api/timeline/by-query?query=...&mode=auto&depth_before=10&depth_after=10
*/
private handleGetTimelineByQuery(req: Request, res: Response): void {
try {
const query = req.query.query as string;
const mode = (req.query.mode as string) || 'auto';
const depthBefore = parseInt(req.query.depth_before as string, 10) || 10;
const depthAfter = parseInt(req.query.depth_after as string, 10) || 10;
const project = req.query.project as string | undefined;
if (!query) {
res.status(400).json({ error: 'Missing required parameter: query' });
return;
}
const sessionSearch = this.dbManager.getSessionSearch();
const sessionStore = this.dbManager.getSessionStore();
// Search based on mode
let bestMatch: any = null;
let searchResults: any = null;
if (mode === 'observations' || mode === 'auto') {
const obsResults = sessionSearch.searchObservations(query, { limit: 1, project });
if (obsResults.length > 0) {
bestMatch = obsResults[0];
searchResults = { type: 'observation', results: obsResults };
}
}
if (!bestMatch && (mode === 'sessions' || mode === 'auto')) {
const sessionResults = sessionSearch.searchSessions(query, { limit: 1 });
if (sessionResults.length > 0) {
bestMatch = sessionResults[0];
searchResults = { type: 'session', results: sessionResults };
}
}
if (!bestMatch) {
res.json({
query,
mode,
match: null,
timeline: null,
message: 'No matches found for query'
});
return;
}
// Get timeline around best match
const timeline = searchResults.type === 'observation'
? sessionStore.getTimelineAroundObservation(bestMatch.id, bestMatch.created_at_epoch, depthBefore, depthAfter, project)
: sessionStore.getTimelineAroundTimestamp(bestMatch.created_at_epoch, depthBefore, depthAfter, project);
res.json({
query,
mode,
match: {
type: searchResults.type,
id: bestMatch.id,
title: bestMatch.title || bestMatch.request,
score: bestMatch.score,
created_at_epoch: bestMatch.created_at_epoch
},
depth_before: depthBefore,
depth_after: depthAfter,
timeline
});
} catch (error) {
logger.failure('WORKER', 'Get timeline by query failed', {}, error as Error);
res.status(500).json({ error: (error as Error).message });
}
}
/**
* Get search help documentation
* GET /api/search/help
*/
private handleSearchHelp(req: Request, res: Response): void {
res.json({
title: 'Claude-Mem Search API',
description: 'HTTP API for searching persistent memory',
endpoints: [
{
path: '/api/search/observations',
method: 'GET',
description: 'Search observations using full-text search',
parameters: {
query: 'Search query (required)',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results (default: 20)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/search/sessions',
method: 'GET',
description: 'Search session summaries using full-text search',
parameters: {
query: 'Search query (required)',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results (default: 20)'
}
},
{
path: '/api/search/prompts',
method: 'GET',
description: 'Search user prompts using full-text search',
parameters: {
query: 'Search query (required)',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results (default: 20)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/search/by-concept',
method: 'GET',
description: 'Find observations by concept tag',
parameters: {
concept: 'Concept tag (required): discovery, decision, bugfix, feature, refactor',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results (default: 10)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/search/by-file',
method: 'GET',
description: 'Find observations and sessions by file path',
parameters: {
filePath: 'File path or partial path (required)',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results per type (default: 10)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/search/by-type',
method: 'GET',
description: 'Find observations by type',
parameters: {
type: 'Observation type (required): discovery, decision, bugfix, feature, refactor',
format: 'Response format: "index" or "full" (default: "full")',
limit: 'Number of results (default: 10)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/context/recent',
method: 'GET',
description: 'Get recent session context including summaries and observations',
parameters: {
project: 'Project name (default: current directory)',
limit: 'Number of recent sessions (default: 3)'
}
},
{
path: '/api/context/timeline',
method: 'GET',
description: 'Get unified timeline around a specific point in time',
parameters: {
anchor: 'Anchor point: observation ID, session ID (e.g., "S123"), or ISO timestamp (required)',
depth_before: 'Number of records before anchor (default: 10)',
depth_after: 'Number of records after anchor (default: 10)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/timeline/by-query',
method: 'GET',
description: 'Search for best match, then get timeline around it',
parameters: {
query: 'Search query (required)',
mode: 'Search mode: "auto", "observations", or "sessions" (default: "auto")',
depth_before: 'Number of records before match (default: 10)',
depth_after: 'Number of records after match (default: 10)',
project: 'Filter by project name (optional)'
}
},
{
path: '/api/search/help',
method: 'GET',
description: 'Get this help documentation'
}
],
examples: [
'curl "http://localhost:37777/api/search/observations?query=authentication&format=index&limit=5"',
'curl "http://localhost:37777/api/search/by-type?type=bugfix&limit=10"',
'curl "http://localhost:37777/api/context/recent?project=claude-mem&limit=3"',
'curl "http://localhost:37777/api/context/timeline?anchor=123&depth_before=5&depth_after=5"'
]
});
}
}
// ============================================================================
// Utilities
// ============================================================================
/**
* Parse pagination parameters from request
*/
function parsePaginationParams(req: Request): { offset: number; limit: number; project?: string } {
const offset = parseInt(req.query.offset as string, 10) || 0;
const limit = Math.min(parseInt(req.query.limit as string, 10) || 20, 100); // Max 100
const project = req.query.project as string | undefined;
return { offset, limit, project };
}
// ============================================================================
// Main Entry Point
// ============================================================================
/**
* Start the worker service (if running as main module)
* Note: Using require.main check for CJS compatibility (build outputs CJS)
*/
if (require.main === module || !module.parent) {
const worker = new WorkerService();
// Graceful shutdown
process.on('SIGTERM', async () => {
logger.info('SYSTEM', 'Received SIGTERM, shutting down gracefully');
await worker.shutdown();
process.exit(0);
});
process.on('SIGINT', async () => {
logger.info('SYSTEM', 'Received SIGINT, shutting down gracefully');
await worker.shutdown();
process.exit(0);
});
// Start the worker
worker.start().catch(error => {
logger.failure('SYSTEM', 'Worker startup failed', {}, error);
process.exit(1);
});
}
export default WorkerService;