diff --git a/ragtime/README.md b/ragtime/README.md index 896eb6fc..ce8c386f 100644 --- a/ragtime/README.md +++ b/ragtime/README.md @@ -1,19 +1,66 @@ # Ragtime -> **Status**: Not yet implemented +Email Investigation Batch Processor using Claude-mem's email-investigation mode. -Ragtime is a planned feature for claude-mem that will enable advanced timeline analysis and automated workflow orchestration. +## Overview -## Why It's Not Ready Yet +Ragtime processes email corpus files through Claude, using the email-investigation mode for entity/relationship/timeline extraction. Each file gets a NEW session - context is managed by Claude-mem's context injection hook, not by conversation continuation. -Ragtime requires a fully functional **modes system** to work properly. The modes system (implemented in PR #412) provides: +## Features -- Mode inheritance and configuration loading -- Type-safe observation metadata -- Dynamic prompt injection based on workflow context -- Language-specific behavior +- **Email-investigation mode** - Specialized observation types for entities, relationships, timeline events, anomalies +- **Self-iterating loop** - Each file processed in a new session +- **Transcript cleanup** - Automatic cleanup prevents buildup of old transcripts +- **Configurable** - All paths and settings via environment variables -Now that the modes system is complete, Ragtime can be fully scripted out in a future release. +## Usage + +```bash +# Basic usage (expects corpus in datasets/epstein-mode/) +bun ragtime/ragtime.ts + +# With custom corpus path +RAGTIME_CORPUS_PATH=/path/to/emails bun ragtime/ragtime.ts + +# Limit files for testing +RAGTIME_FILE_LIMIT=5 bun ragtime/ragtime.ts +``` + +## Configuration + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `RAGTIME_CORPUS_PATH` | `./datasets/epstein-mode` | Path to folder containing .md email files | +| `RAGTIME_PLUGIN_PATH` | `./plugin` | Path to claude-mem plugin | +| `CLAUDE_MEM_WORKER_PORT` | `37777` | Worker service port | +| `RAGTIME_TRANSCRIPT_MAX_AGE` | `24` | Max age of transcripts to keep (hours) | +| `RAGTIME_PROJECT_NAME` | `ragtime-investigation` | Project name for grouping | +| `RAGTIME_FILE_LIMIT` | `0` | Limit files to process (0 = all) | +| `RAGTIME_SESSION_DELAY` | `2000` | Delay between sessions (ms) | + +## Corpus Format + +The corpus directory should contain markdown files with email content. Files are processed in numeric order based on the first number in the filename: + +``` +datasets/epstein-mode/ + 0001.md + 0002.md + 0003.md + ... +``` + +Each markdown file should contain a single email or document to analyze. + +## How It Works + +1. **Startup**: Sets `CLAUDE_MEM_MODE=email-investigation` and cleans up old transcripts +2. **Processing**: For each file: + - Starts a NEW Claude session (no continuation) + - Claude reads the file and analyzes entities, relationships, timeline events + - Claude-mem's context injection hook provides relevant past observations + - Worker processes and stores new observations +3. **Cleanup**: Periodic and final transcript cleanup prevents buildup ## License @@ -23,9 +70,9 @@ See [LICENSE](./LICENSE) for full terms. ### What this means: -- ✅ You can use ragtime for noncommercial purposes -- ✅ You can modify and distribute it -- ❌ You cannot use it for commercial purposes without permission +- You can use ragtime for noncommercial purposes +- You can modify and distribute it +- You cannot use it for commercial purposes without permission ### Why a different license? diff --git a/ragtime/ragtime.ts b/ragtime/ragtime.ts index 135427db..67999b67 100644 --- a/ragtime/ragtime.ts +++ b/ragtime/ragtime.ts @@ -1,93 +1,288 @@ +#!/usr/bin/env bun +/** + * RAGTIME - Email Investigation Batch Processor + * + * Processes email corpus files through Claude using email-investigation mode. + * Each file gets a NEW session - context is managed by Claude-mem's context + * injection hook, not by conversation continuation. + * + * Features: + * - Email-investigation mode for entity/relationship/timeline extraction + * - Self-iterating loop (each file = new session) + * - Transcript cleanup to prevent buildup + * - Configurable paths via environment or defaults + */ + import { query } from "@anthropic-ai/claude-agent-sdk"; import * as fs from "fs"; import * as path from "path"; +import { homedir } from "os"; -const pathToFolder = "/Users/alexnewman/Scripts/claude-mem/datasets/epstein-mode/"; -const pathToPlugin = "/Users/alexnewman/Scripts/claude-mem/plugin/"; -const WORKER_PORT = 37777; +// Configuration - can be overridden via environment variables +const CONFIG = { + // Path to corpus folder containing .md files + corpusPath: process.env.RAGTIME_CORPUS_PATH || + path.join(process.cwd(), "datasets", "epstein-mode"), -// Or read from a directory -const filesToProcess = fs - .readdirSync(pathToFolder) - .filter((f) => f.endsWith(".md")) - .sort((a, b) => { - // Extract numeric part from filename (e.g., "0001.md" -> 1) - const numA = parseInt(a.match(/\d+/)?.[0] || "0", 10); - const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10); - return numA - numB; - }) - .map((f) => path.join(pathToFolder, f)); + // Path to claude-mem plugin + pluginPath: process.env.RAGTIME_PLUGIN_PATH || + path.join(process.cwd(), "plugin"), + + // Worker port + workerPort: parseInt(process.env.CLAUDE_MEM_WORKER_PORT || "37777", 10), + + // Max age of transcripts to keep (in hours) + transcriptMaxAgeHours: parseInt(process.env.RAGTIME_TRANSCRIPT_MAX_AGE || "24", 10), + + // Project name for grouping transcripts + projectName: process.env.RAGTIME_PROJECT_NAME || "ragtime-investigation", + + // Limit files to process (0 = all) + fileLimit: parseInt(process.env.RAGTIME_FILE_LIMIT || "0", 10), + + // Delay between sessions (ms) - gives worker time to process + sessionDelayMs: parseInt(process.env.RAGTIME_SESSION_DELAY || "2000", 10), +}; + +// Set email-investigation mode for Claude-mem +process.env.CLAUDE_MEM_MODE = "email-investigation"; + +/** + * Get list of markdown files to process, sorted numerically + */ +function getFilesToProcess(): string[] { + if (!fs.existsSync(CONFIG.corpusPath)) { + console.error(`Corpus path does not exist: ${CONFIG.corpusPath}`); + console.error("Set RAGTIME_CORPUS_PATH environment variable or create the directory"); + process.exit(1); + } + + const files = fs + .readdirSync(CONFIG.corpusPath) + .filter((f) => f.endsWith(".md")) + .sort((a, b) => { + // Extract numeric part from filename (e.g., "0001.md" -> 1) + const numA = parseInt(a.match(/\d+/)?.[0] || "0", 10); + const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10); + return numA - numB; + }) + .map((f) => path.join(CONFIG.corpusPath, f)); + + if (files.length === 0) { + console.error(`No .md files found in: ${CONFIG.corpusPath}`); + process.exit(1); + } + + // Apply limit if set + if (CONFIG.fileLimit > 0) { + return files.slice(0, CONFIG.fileLimit); + } + + return files; +} + +/** + * Clean up old transcripts to prevent buildup + * Removes transcripts older than configured max age + */ +async function cleanupOldTranscripts(): Promise { + const transcriptsBase = path.join(homedir(), ".claude", "projects"); + + if (!fs.existsSync(transcriptsBase)) { + console.log("No transcripts directory found, skipping cleanup"); + return; + } + + const maxAgeMs = CONFIG.transcriptMaxAgeHours * 60 * 60 * 1000; + const now = Date.now(); + let cleaned = 0; + + try { + // Walk through project directories + const projectDirs = fs.readdirSync(transcriptsBase); + + for (const projectDir of projectDirs) { + const projectPath = path.join(transcriptsBase, projectDir); + const stat = fs.statSync(projectPath); + + if (!stat.isDirectory()) continue; + + // Check for .jsonl transcript files + const files = fs.readdirSync(projectPath); + + for (const file of files) { + if (!file.endsWith(".jsonl")) continue; + + const filePath = path.join(projectPath, file); + const fileStat = fs.statSync(filePath); + const fileAge = now - fileStat.mtimeMs; + + if (fileAge > maxAgeMs) { + try { + fs.unlinkSync(filePath); + cleaned++; + } catch (err) { + console.warn(`Failed to delete old transcript: ${filePath}`); + } + } + } + + // Remove empty project directories + const remaining = fs.readdirSync(projectPath); + if (remaining.length === 0) { + try { + fs.rmdirSync(projectPath); + } catch { + // Ignore - may have race condition + } + } + } + + if (cleaned > 0) { + console.log(`Cleaned up ${cleaned} old transcript(s)`); + } + } catch (err) { + console.warn("Transcript cleanup error:", err); + } +} /** * Poll the worker's processing status endpoint until the queue is empty */ async function waitForQueueToEmpty(): Promise { const maxWaitTimeMs = 5 * 60 * 1000; // 5 minutes maximum - const pollIntervalMs = 500; // Poll every 500ms + const pollIntervalMs = 500; const startTime = Date.now(); while (true) { try { - const response = await fetch(`http://localhost:${WORKER_PORT}/api/processing-status`); + const response = await fetch( + `http://localhost:${CONFIG.workerPort}/api/processing-status` + ); + if (!response.ok) { console.error(`Failed to get processing status: ${response.status}`); break; } const status = await response.json(); - console.log(`Queue status - Processing: ${status.isProcessing}, Queue depth: ${status.queueDepth}`); // Exit when queue is empty if (status.queueDepth === 0 && !status.isProcessing) { - console.log("Queue is empty, continuing to next prompt"); break; } // Check timeout if (Date.now() - startTime > maxWaitTimeMs) { - console.warn("Warning: Queue did not empty within timeout, continuing anyway"); + console.warn("Queue did not empty within timeout, continuing anyway"); break; } - // Wait before polling again - await new Promise(resolve => setTimeout(resolve, pollIntervalMs)); + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); } catch (error) { console.error("Error polling worker status:", error); - // On error, wait a bit and continue to avoid infinite loop - await new Promise(resolve => setTimeout(resolve, 1000)); + await new Promise((resolve) => setTimeout(resolve, 1000)); break; } } } -// var i = 0; +/** + * Process a single file in a NEW session + * Context is injected by Claude-mem hooks, not conversation continuation + */ +async function processFile(file: string, index: number, total: number): Promise { + const filename = path.basename(file); + console.log(`\n[${ index + 1}/${total}] Processing: ${filename}`); -for (const file of filesToProcess) { - // i++; - // Limit for testing - // if (i > 3) break; + try { + for await (const message of query({ + prompt: `Read ${file} and analyze it in the context of the investigation. Look for entities, relationships, timeline events, and any anomalies. Cross-reference with what you know from the injected context above.`, + options: { + cwd: CONFIG.corpusPath, + plugins: [{ type: "local", path: CONFIG.pluginPath }], + }, + })) { + // Log assistant responses + if (message.type === "assistant") { + const content = message.message.content; + if (Array.isArray(content)) { + for (const block of content) { + if (block.type === "text" && block.text) { + // Truncate long responses for console + const text = block.text.length > 500 + ? block.text.substring(0, 500) + "..." + : block.text; + console.log("Assistant:", text); + } + } + } else if (typeof content === "string") { + console.log("Assistant:", content); + } + } - console.log(`\n=== Processing ${file} ===\n`); + // Log completion + if (message.type === "result" && message.subtype === "success") { + console.log(`Completed: ${filename}`); + } + } + } catch (err) { + console.error(`Error processing ${filename}:`, err); + } +} - for await (const message of query({ - prompt: `Read ${file} and think about how it relates to the injected context above (if any).`, - options: { - cwd: pathToFolder, - plugins: [{ type: "local", path: pathToPlugin }], - }, - })) { - if (message.type === "system" && message.subtype === "init") { - console.log("Plugins:", message.plugins); - console.log("Commands:", message.slash_commands); +/** + * Main execution loop + */ +async function main(): Promise { + console.log("=".repeat(60)); + console.log("RAGTIME Email Investigation Processor"); + console.log("=".repeat(60)); + console.log(`Mode: email-investigation`); + console.log(`Corpus: ${CONFIG.corpusPath}`); + console.log(`Plugin: ${CONFIG.pluginPath}`); + console.log(`Worker: http://localhost:${CONFIG.workerPort}`); + console.log(`Transcript cleanup: ${CONFIG.transcriptMaxAgeHours}h`); + console.log("=".repeat(60)); + + // Initial cleanup + await cleanupOldTranscripts(); + + // Get files to process + const files = getFilesToProcess(); + console.log(`\nFound ${files.length} file(s) to process\n`); + + // Process each file in a NEW session + for (let i = 0; i < files.length; i++) { + const file = files[i]; + + await processFile(file, i, files.length); + + // Wait for worker to finish processing observations + console.log("Waiting for worker queue..."); + await waitForQueueToEmpty(); + + // Delay before next session + if (i < files.length - 1 && CONFIG.sessionDelayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, CONFIG.sessionDelayMs)); } - if (message.type === "assistant") { - console.log("Assistant:", message.message.content); + // Periodic transcript cleanup (every 10 files) + if ((i + 1) % 10 === 0) { + await cleanupOldTranscripts(); } - console.log("Raw:", JSON.stringify(message, null, 2)); } - // Wait for the worker queue to be empty before continuing to the next file - console.log("\n=== Waiting for worker queue to empty ===\n"); - await waitForQueueToEmpty(); + // Final cleanup + await cleanupOldTranscripts(); + + console.log("\n" + "=".repeat(60)); + console.log("Investigation complete"); + console.log("=".repeat(60)); } + +// Run +main().catch((err) => { + console.error("Fatal error:", err); + process.exit(1); +});