Merge pull request #863 from thedotmack/claude/setup-ragtime-epstein-analysis-JApkL

feat: implement ragtime email investigation with self-iteration and cleanup
This commit is contained in:
Alex Newman
2026-02-06 05:41:54 -05:00
committed by GitHub
2 changed files with 299 additions and 57 deletions
+59 -12
View File
@@ -1,19 +1,66 @@
# Ragtime # Ragtime
> **Status**: Not yet implemented Email Investigation Batch Processor using Claude-mem's email-investigation mode.
Ragtime is a planned feature for claude-mem that will enable advanced timeline analysis and automated workflow orchestration. ## Overview
## Why It's Not Ready Yet Ragtime processes email corpus files through Claude, using the email-investigation mode for entity/relationship/timeline extraction. Each file gets a NEW session - context is managed by Claude-mem's context injection hook, not by conversation continuation.
Ragtime requires a fully functional **modes system** to work properly. The modes system (implemented in PR #412) provides: ## Features
- Mode inheritance and configuration loading - **Email-investigation mode** - Specialized observation types for entities, relationships, timeline events, anomalies
- Type-safe observation metadata - **Self-iterating loop** - Each file processed in a new session
- Dynamic prompt injection based on workflow context - **Transcript cleanup** - Automatic cleanup prevents buildup of old transcripts
- Language-specific behavior - **Configurable** - All paths and settings via environment variables
Now that the modes system is complete, Ragtime can be fully scripted out in a future release. ## Usage
```bash
# Basic usage (expects corpus in datasets/epstein-mode/)
bun ragtime/ragtime.ts
# With custom corpus path
RAGTIME_CORPUS_PATH=/path/to/emails bun ragtime/ragtime.ts
# Limit files for testing
RAGTIME_FILE_LIMIT=5 bun ragtime/ragtime.ts
```
## Configuration
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `RAGTIME_CORPUS_PATH` | `./datasets/epstein-mode` | Path to folder containing .md email files |
| `RAGTIME_PLUGIN_PATH` | `./plugin` | Path to claude-mem plugin |
| `CLAUDE_MEM_WORKER_PORT` | `37777` | Worker service port |
| `RAGTIME_TRANSCRIPT_MAX_AGE` | `24` | Max age of transcripts to keep (hours) |
| `RAGTIME_PROJECT_NAME` | `ragtime-investigation` | Project name for grouping |
| `RAGTIME_FILE_LIMIT` | `0` | Limit files to process (0 = all) |
| `RAGTIME_SESSION_DELAY` | `2000` | Delay between sessions (ms) |
## Corpus Format
The corpus directory should contain markdown files with email content. Files are processed in numeric order based on the first number in the filename:
```
datasets/epstein-mode/
0001.md
0002.md
0003.md
...
```
Each markdown file should contain a single email or document to analyze.
## How It Works
1. **Startup**: Sets `CLAUDE_MEM_MODE=email-investigation` and cleans up old transcripts
2. **Processing**: For each file:
- Starts a NEW Claude session (no continuation)
- Claude reads the file and analyzes entities, relationships, timeline events
- Claude-mem's context injection hook provides relevant past observations
- Worker processes and stores new observations
3. **Cleanup**: Periodic and final transcript cleanup prevents buildup
## License ## License
@@ -23,9 +70,9 @@ See [LICENSE](./LICENSE) for full terms.
### What this means: ### What this means:
- You can use ragtime for noncommercial purposes - You can use ragtime for noncommercial purposes
- You can modify and distribute it - You can modify and distribute it
- You cannot use it for commercial purposes without permission - You cannot use it for commercial purposes without permission
### Why a different license? ### Why a different license?
+232 -37
View File
@@ -1,14 +1,64 @@
#!/usr/bin/env bun
/**
* RAGTIME - Email Investigation Batch Processor
*
* Processes email corpus files through Claude using email-investigation mode.
* Each file gets a NEW session - context is managed by Claude-mem's context
* injection hook, not by conversation continuation.
*
* Features:
* - Email-investigation mode for entity/relationship/timeline extraction
* - Self-iterating loop (each file = new session)
* - Transcript cleanup to prevent buildup
* - Configurable paths via environment or defaults
*/
import { query } from "@anthropic-ai/claude-agent-sdk"; import { query } from "@anthropic-ai/claude-agent-sdk";
import * as fs from "fs"; import * as fs from "fs";
import * as path from "path"; import * as path from "path";
import { homedir } from "os";
const pathToFolder = "/Users/alexnewman/Scripts/claude-mem/datasets/epstein-mode/"; // Configuration - can be overridden via environment variables
const pathToPlugin = "/Users/alexnewman/Scripts/claude-mem/plugin/"; const CONFIG = {
const WORKER_PORT = 37777; // Path to corpus folder containing .md files
corpusPath: process.env.RAGTIME_CORPUS_PATH ||
path.join(process.cwd(), "datasets", "epstein-mode"),
// Or read from a directory // Path to claude-mem plugin
const filesToProcess = fs pluginPath: process.env.RAGTIME_PLUGIN_PATH ||
.readdirSync(pathToFolder) path.join(process.cwd(), "plugin"),
// Worker port
workerPort: parseInt(process.env.CLAUDE_MEM_WORKER_PORT || "37777", 10),
// Max age of transcripts to keep (in hours)
transcriptMaxAgeHours: parseInt(process.env.RAGTIME_TRANSCRIPT_MAX_AGE || "24", 10),
// Project name for grouping transcripts
projectName: process.env.RAGTIME_PROJECT_NAME || "ragtime-investigation",
// Limit files to process (0 = all)
fileLimit: parseInt(process.env.RAGTIME_FILE_LIMIT || "0", 10),
// Delay between sessions (ms) - gives worker time to process
sessionDelayMs: parseInt(process.env.RAGTIME_SESSION_DELAY || "2000", 10),
};
// Set email-investigation mode for Claude-mem
process.env.CLAUDE_MEM_MODE = "email-investigation";
/**
* Get list of markdown files to process, sorted numerically
*/
function getFilesToProcess(): string[] {
if (!fs.existsSync(CONFIG.corpusPath)) {
console.error(`Corpus path does not exist: ${CONFIG.corpusPath}`);
console.error("Set RAGTIME_CORPUS_PATH environment variable or create the directory");
process.exit(1);
}
const files = fs
.readdirSync(CONFIG.corpusPath)
.filter((f) => f.endsWith(".md")) .filter((f) => f.endsWith(".md"))
.sort((a, b) => { .sort((a, b) => {
// Extract numeric part from filename (e.g., "0001.md" -> 1) // Extract numeric part from filename (e.g., "0001.md" -> 1)
@@ -16,78 +66,223 @@ const filesToProcess = fs
const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10); const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10);
return numA - numB; return numA - numB;
}) })
.map((f) => path.join(pathToFolder, f)); .map((f) => path.join(CONFIG.corpusPath, f));
if (files.length === 0) {
console.error(`No .md files found in: ${CONFIG.corpusPath}`);
process.exit(1);
}
// Apply limit if set
if (CONFIG.fileLimit > 0) {
return files.slice(0, CONFIG.fileLimit);
}
return files;
}
/**
* Clean up old transcripts to prevent buildup
* Removes transcripts older than configured max age
*/
async function cleanupOldTranscripts(): Promise<void> {
const transcriptsBase = path.join(homedir(), ".claude", "projects");
if (!fs.existsSync(transcriptsBase)) {
console.log("No transcripts directory found, skipping cleanup");
return;
}
const maxAgeMs = CONFIG.transcriptMaxAgeHours * 60 * 60 * 1000;
const now = Date.now();
let cleaned = 0;
try {
// Walk through project directories
const projectDirs = fs.readdirSync(transcriptsBase);
for (const projectDir of projectDirs) {
const projectPath = path.join(transcriptsBase, projectDir);
const stat = fs.statSync(projectPath);
if (!stat.isDirectory()) continue;
// Check for .jsonl transcript files
const files = fs.readdirSync(projectPath);
for (const file of files) {
if (!file.endsWith(".jsonl")) continue;
const filePath = path.join(projectPath, file);
const fileStat = fs.statSync(filePath);
const fileAge = now - fileStat.mtimeMs;
if (fileAge > maxAgeMs) {
try {
fs.unlinkSync(filePath);
cleaned++;
} catch (err) {
console.warn(`Failed to delete old transcript: ${filePath}`);
}
}
}
// Remove empty project directories
const remaining = fs.readdirSync(projectPath);
if (remaining.length === 0) {
try {
fs.rmdirSync(projectPath);
} catch {
// Ignore - may have race condition
}
}
}
if (cleaned > 0) {
console.log(`Cleaned up ${cleaned} old transcript(s)`);
}
} catch (err) {
console.warn("Transcript cleanup error:", err);
}
}
/** /**
* Poll the worker's processing status endpoint until the queue is empty * Poll the worker's processing status endpoint until the queue is empty
*/ */
async function waitForQueueToEmpty(): Promise<void> { async function waitForQueueToEmpty(): Promise<void> {
const maxWaitTimeMs = 5 * 60 * 1000; // 5 minutes maximum const maxWaitTimeMs = 5 * 60 * 1000; // 5 minutes maximum
const pollIntervalMs = 500; // Poll every 500ms const pollIntervalMs = 500;
const startTime = Date.now(); const startTime = Date.now();
while (true) { while (true) {
try { try {
const response = await fetch(`http://localhost:${WORKER_PORT}/api/processing-status`); const response = await fetch(
`http://localhost:${CONFIG.workerPort}/api/processing-status`
);
if (!response.ok) { if (!response.ok) {
console.error(`Failed to get processing status: ${response.status}`); console.error(`Failed to get processing status: ${response.status}`);
break; break;
} }
const status = await response.json(); const status = await response.json();
console.log(`Queue status - Processing: ${status.isProcessing}, Queue depth: ${status.queueDepth}`);
// Exit when queue is empty // Exit when queue is empty
if (status.queueDepth === 0 && !status.isProcessing) { if (status.queueDepth === 0 && !status.isProcessing) {
console.log("Queue is empty, continuing to next prompt");
break; break;
} }
// Check timeout // Check timeout
if (Date.now() - startTime > maxWaitTimeMs) { if (Date.now() - startTime > maxWaitTimeMs) {
console.warn("Warning: Queue did not empty within timeout, continuing anyway"); console.warn("Queue did not empty within timeout, continuing anyway");
break; break;
} }
// Wait before polling again await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
} catch (error) { } catch (error) {
console.error("Error polling worker status:", error); console.error("Error polling worker status:", error);
// On error, wait a bit and continue to avoid infinite loop await new Promise((resolve) => setTimeout(resolve, 1000));
await new Promise(resolve => setTimeout(resolve, 1000));
break; break;
} }
} }
} }
// var i = 0; /**
* Process a single file in a NEW session
for (const file of filesToProcess) { * Context is injected by Claude-mem hooks, not conversation continuation
// i++; */
// Limit for testing async function processFile(file: string, index: number, total: number): Promise<void> {
// if (i > 3) break; const filename = path.basename(file);
console.log(`\n[${ index + 1}/${total}] Processing: ${filename}`);
console.log(`\n=== Processing ${file} ===\n`);
try {
for await (const message of query({ for await (const message of query({
prompt: `Read ${file} and think about how it relates to the injected context above (if any).`, prompt: `Read ${file} and analyze it in the context of the investigation. Look for entities, relationships, timeline events, and any anomalies. Cross-reference with what you know from the injected context above.`,
options: { options: {
cwd: pathToFolder, cwd: CONFIG.corpusPath,
plugins: [{ type: "local", path: pathToPlugin }], plugins: [{ type: "local", path: CONFIG.pluginPath }],
}, },
})) { })) {
if (message.type === "system" && message.subtype === "init") { // Log assistant responses
console.log("Plugins:", message.plugins);
console.log("Commands:", message.slash_commands);
}
if (message.type === "assistant") { if (message.type === "assistant") {
console.log("Assistant:", message.message.content); const content = message.message.content;
if (Array.isArray(content)) {
for (const block of content) {
if (block.type === "text" && block.text) {
// Truncate long responses for console
const text = block.text.length > 500
? block.text.substring(0, 500) + "..."
: block.text;
console.log("Assistant:", text);
}
}
} else if (typeof content === "string") {
console.log("Assistant:", content);
} }
console.log("Raw:", JSON.stringify(message, null, 2));
} }
// Wait for the worker queue to be empty before continuing to the next file // Log completion
console.log("\n=== Waiting for worker queue to empty ===\n"); if (message.type === "result" && message.subtype === "success") {
await waitForQueueToEmpty(); console.log(`Completed: ${filename}`);
} }
}
} catch (err) {
console.error(`Error processing ${filename}:`, err);
}
}
/**
* Main execution loop
*/
async function main(): Promise<void> {
console.log("=".repeat(60));
console.log("RAGTIME Email Investigation Processor");
console.log("=".repeat(60));
console.log(`Mode: email-investigation`);
console.log(`Corpus: ${CONFIG.corpusPath}`);
console.log(`Plugin: ${CONFIG.pluginPath}`);
console.log(`Worker: http://localhost:${CONFIG.workerPort}`);
console.log(`Transcript cleanup: ${CONFIG.transcriptMaxAgeHours}h`);
console.log("=".repeat(60));
// Initial cleanup
await cleanupOldTranscripts();
// Get files to process
const files = getFilesToProcess();
console.log(`\nFound ${files.length} file(s) to process\n`);
// Process each file in a NEW session
for (let i = 0; i < files.length; i++) {
const file = files[i];
await processFile(file, i, files.length);
// Wait for worker to finish processing observations
console.log("Waiting for worker queue...");
await waitForQueueToEmpty();
// Delay before next session
if (i < files.length - 1 && CONFIG.sessionDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, CONFIG.sessionDelayMs));
}
// Periodic transcript cleanup (every 10 files)
if ((i + 1) % 10 === 0) {
await cleanupOldTranscripts();
}
}
// Final cleanup
await cleanupOldTranscripts();
console.log("\n" + "=".repeat(60));
console.log("Investigation complete");
console.log("=".repeat(60));
}
// Run
main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});