feat: implement ragtime email investigation with self-iteration and cleanup

- Add email-investigation mode via CLAUDE_MEM_MODE environment variable
- Each file processed in a new session (context managed by Claude-mem hooks)
- Add configurable transcript cleanup to prevent buildup (default 24h)
- Support environment variable configuration for all paths and settings
- Update README with usage documentation and configuration options

https://claude.ai/code/session_01N2wNRpUrUs2z9JKb7y29mH
This commit is contained in:
Claude
2026-01-30 23:18:33 +00:00
parent 1341e93fca
commit 2eaef1f586
2 changed files with 299 additions and 57 deletions
+59 -12
View File
@@ -1,19 +1,66 @@
# Ragtime
> **Status**: Not yet implemented
Email Investigation Batch Processor using Claude-mem's email-investigation mode.
Ragtime is a planned feature for claude-mem that will enable advanced timeline analysis and automated workflow orchestration.
## Overview
## Why It's Not Ready Yet
Ragtime processes email corpus files through Claude, using the email-investigation mode for entity/relationship/timeline extraction. Each file gets a NEW session - context is managed by Claude-mem's context injection hook, not by conversation continuation.
Ragtime requires a fully functional **modes system** to work properly. The modes system (implemented in PR #412) provides:
## Features
- Mode inheritance and configuration loading
- Type-safe observation metadata
- Dynamic prompt injection based on workflow context
- Language-specific behavior
- **Email-investigation mode** - Specialized observation types for entities, relationships, timeline events, anomalies
- **Self-iterating loop** - Each file processed in a new session
- **Transcript cleanup** - Automatic cleanup prevents buildup of old transcripts
- **Configurable** - All paths and settings via environment variables
Now that the modes system is complete, Ragtime can be fully scripted out in a future release.
## Usage
```bash
# Basic usage (expects corpus in datasets/epstein-mode/)
bun ragtime/ragtime.ts
# With custom corpus path
RAGTIME_CORPUS_PATH=/path/to/emails bun ragtime/ragtime.ts
# Limit files for testing
RAGTIME_FILE_LIMIT=5 bun ragtime/ragtime.ts
```
## Configuration
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `RAGTIME_CORPUS_PATH` | `./datasets/epstein-mode` | Path to folder containing .md email files |
| `RAGTIME_PLUGIN_PATH` | `./plugin` | Path to claude-mem plugin |
| `CLAUDE_MEM_WORKER_PORT` | `37777` | Worker service port |
| `RAGTIME_TRANSCRIPT_MAX_AGE` | `24` | Max age of transcripts to keep (hours) |
| `RAGTIME_PROJECT_NAME` | `ragtime-investigation` | Project name for grouping |
| `RAGTIME_FILE_LIMIT` | `0` | Limit files to process (0 = all) |
| `RAGTIME_SESSION_DELAY` | `2000` | Delay between sessions (ms) |
## Corpus Format
The corpus directory should contain markdown files with email content. Files are processed in numeric order based on the first number in the filename:
```
datasets/epstein-mode/
0001.md
0002.md
0003.md
...
```
Each markdown file should contain a single email or document to analyze.
## How It Works
1. **Startup**: Sets `CLAUDE_MEM_MODE=email-investigation` and cleans up old transcripts
2. **Processing**: For each file:
- Starts a NEW Claude session (no continuation)
- Claude reads the file and analyzes entities, relationships, timeline events
- Claude-mem's context injection hook provides relevant past observations
- Worker processes and stores new observations
3. **Cleanup**: Periodic and final transcript cleanup prevents buildup
## License
@@ -23,9 +70,9 @@ See [LICENSE](./LICENSE) for full terms.
### What this means:
- You can use ragtime for noncommercial purposes
- You can modify and distribute it
- You cannot use it for commercial purposes without permission
- You can use ragtime for noncommercial purposes
- You can modify and distribute it
- You cannot use it for commercial purposes without permission
### Why a different license?
+240 -45
View File
@@ -1,93 +1,288 @@
#!/usr/bin/env bun
/**
* RAGTIME - Email Investigation Batch Processor
*
* Processes email corpus files through Claude using email-investigation mode.
* Each file gets a NEW session - context is managed by Claude-mem's context
* injection hook, not by conversation continuation.
*
* Features:
* - Email-investigation mode for entity/relationship/timeline extraction
* - Self-iterating loop (each file = new session)
* - Transcript cleanup to prevent buildup
* - Configurable paths via environment or defaults
*/
import { query } from "@anthropic-ai/claude-agent-sdk";
import * as fs from "fs";
import * as path from "path";
import { homedir } from "os";
const pathToFolder = "/Users/alexnewman/Scripts/claude-mem/datasets/epstein-mode/";
const pathToPlugin = "/Users/alexnewman/Scripts/claude-mem/plugin/";
const WORKER_PORT = 37777;
// Configuration - can be overridden via environment variables
const CONFIG = {
// Path to corpus folder containing .md files
corpusPath: process.env.RAGTIME_CORPUS_PATH ||
path.join(process.cwd(), "datasets", "epstein-mode"),
// Or read from a directory
const filesToProcess = fs
.readdirSync(pathToFolder)
.filter((f) => f.endsWith(".md"))
.sort((a, b) => {
// Extract numeric part from filename (e.g., "0001.md" -> 1)
const numA = parseInt(a.match(/\d+/)?.[0] || "0", 10);
const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10);
return numA - numB;
})
.map((f) => path.join(pathToFolder, f));
// Path to claude-mem plugin
pluginPath: process.env.RAGTIME_PLUGIN_PATH ||
path.join(process.cwd(), "plugin"),
// Worker port
workerPort: parseInt(process.env.CLAUDE_MEM_WORKER_PORT || "37777", 10),
// Max age of transcripts to keep (in hours)
transcriptMaxAgeHours: parseInt(process.env.RAGTIME_TRANSCRIPT_MAX_AGE || "24", 10),
// Project name for grouping transcripts
projectName: process.env.RAGTIME_PROJECT_NAME || "ragtime-investigation",
// Limit files to process (0 = all)
fileLimit: parseInt(process.env.RAGTIME_FILE_LIMIT || "0", 10),
// Delay between sessions (ms) - gives worker time to process
sessionDelayMs: parseInt(process.env.RAGTIME_SESSION_DELAY || "2000", 10),
};
// Set email-investigation mode for Claude-mem
process.env.CLAUDE_MEM_MODE = "email-investigation";
/**
* Get list of markdown files to process, sorted numerically
*/
function getFilesToProcess(): string[] {
if (!fs.existsSync(CONFIG.corpusPath)) {
console.error(`Corpus path does not exist: ${CONFIG.corpusPath}`);
console.error("Set RAGTIME_CORPUS_PATH environment variable or create the directory");
process.exit(1);
}
const files = fs
.readdirSync(CONFIG.corpusPath)
.filter((f) => f.endsWith(".md"))
.sort((a, b) => {
// Extract numeric part from filename (e.g., "0001.md" -> 1)
const numA = parseInt(a.match(/\d+/)?.[0] || "0", 10);
const numB = parseInt(b.match(/\d+/)?.[0] || "0", 10);
return numA - numB;
})
.map((f) => path.join(CONFIG.corpusPath, f));
if (files.length === 0) {
console.error(`No .md files found in: ${CONFIG.corpusPath}`);
process.exit(1);
}
// Apply limit if set
if (CONFIG.fileLimit > 0) {
return files.slice(0, CONFIG.fileLimit);
}
return files;
}
/**
* Clean up old transcripts to prevent buildup
* Removes transcripts older than configured max age
*/
async function cleanupOldTranscripts(): Promise<void> {
const transcriptsBase = path.join(homedir(), ".claude", "projects");
if (!fs.existsSync(transcriptsBase)) {
console.log("No transcripts directory found, skipping cleanup");
return;
}
const maxAgeMs = CONFIG.transcriptMaxAgeHours * 60 * 60 * 1000;
const now = Date.now();
let cleaned = 0;
try {
// Walk through project directories
const projectDirs = fs.readdirSync(transcriptsBase);
for (const projectDir of projectDirs) {
const projectPath = path.join(transcriptsBase, projectDir);
const stat = fs.statSync(projectPath);
if (!stat.isDirectory()) continue;
// Check for .jsonl transcript files
const files = fs.readdirSync(projectPath);
for (const file of files) {
if (!file.endsWith(".jsonl")) continue;
const filePath = path.join(projectPath, file);
const fileStat = fs.statSync(filePath);
const fileAge = now - fileStat.mtimeMs;
if (fileAge > maxAgeMs) {
try {
fs.unlinkSync(filePath);
cleaned++;
} catch (err) {
console.warn(`Failed to delete old transcript: ${filePath}`);
}
}
}
// Remove empty project directories
const remaining = fs.readdirSync(projectPath);
if (remaining.length === 0) {
try {
fs.rmdirSync(projectPath);
} catch {
// Ignore - may have race condition
}
}
}
if (cleaned > 0) {
console.log(`Cleaned up ${cleaned} old transcript(s)`);
}
} catch (err) {
console.warn("Transcript cleanup error:", err);
}
}
/**
* Poll the worker's processing status endpoint until the queue is empty
*/
async function waitForQueueToEmpty(): Promise<void> {
const maxWaitTimeMs = 5 * 60 * 1000; // 5 minutes maximum
const pollIntervalMs = 500; // Poll every 500ms
const pollIntervalMs = 500;
const startTime = Date.now();
while (true) {
try {
const response = await fetch(`http://localhost:${WORKER_PORT}/api/processing-status`);
const response = await fetch(
`http://localhost:${CONFIG.workerPort}/api/processing-status`
);
if (!response.ok) {
console.error(`Failed to get processing status: ${response.status}`);
break;
}
const status = await response.json();
console.log(`Queue status - Processing: ${status.isProcessing}, Queue depth: ${status.queueDepth}`);
// Exit when queue is empty
if (status.queueDepth === 0 && !status.isProcessing) {
console.log("Queue is empty, continuing to next prompt");
break;
}
// Check timeout
if (Date.now() - startTime > maxWaitTimeMs) {
console.warn("Warning: Queue did not empty within timeout, continuing anyway");
console.warn("Queue did not empty within timeout, continuing anyway");
break;
}
// Wait before polling again
await new Promise(resolve => setTimeout(resolve, pollIntervalMs));
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
} catch (error) {
console.error("Error polling worker status:", error);
// On error, wait a bit and continue to avoid infinite loop
await new Promise(resolve => setTimeout(resolve, 1000));
await new Promise((resolve) => setTimeout(resolve, 1000));
break;
}
}
}
// var i = 0;
/**
* Process a single file in a NEW session
* Context is injected by Claude-mem hooks, not conversation continuation
*/
async function processFile(file: string, index: number, total: number): Promise<void> {
const filename = path.basename(file);
console.log(`\n[${ index + 1}/${total}] Processing: ${filename}`);
for (const file of filesToProcess) {
// i++;
// Limit for testing
// if (i > 3) break;
try {
for await (const message of query({
prompt: `Read ${file} and analyze it in the context of the investigation. Look for entities, relationships, timeline events, and any anomalies. Cross-reference with what you know from the injected context above.`,
options: {
cwd: CONFIG.corpusPath,
plugins: [{ type: "local", path: CONFIG.pluginPath }],
},
})) {
// Log assistant responses
if (message.type === "assistant") {
const content = message.message.content;
if (Array.isArray(content)) {
for (const block of content) {
if (block.type === "text" && block.text) {
// Truncate long responses for console
const text = block.text.length > 500
? block.text.substring(0, 500) + "..."
: block.text;
console.log("Assistant:", text);
}
}
} else if (typeof content === "string") {
console.log("Assistant:", content);
}
}
console.log(`\n=== Processing ${file} ===\n`);
// Log completion
if (message.type === "result" && message.subtype === "success") {
console.log(`Completed: ${filename}`);
}
}
} catch (err) {
console.error(`Error processing ${filename}:`, err);
}
}
for await (const message of query({
prompt: `Read ${file} and think about how it relates to the injected context above (if any).`,
options: {
cwd: pathToFolder,
plugins: [{ type: "local", path: pathToPlugin }],
},
})) {
if (message.type === "system" && message.subtype === "init") {
console.log("Plugins:", message.plugins);
console.log("Commands:", message.slash_commands);
/**
* Main execution loop
*/
async function main(): Promise<void> {
console.log("=".repeat(60));
console.log("RAGTIME Email Investigation Processor");
console.log("=".repeat(60));
console.log(`Mode: email-investigation`);
console.log(`Corpus: ${CONFIG.corpusPath}`);
console.log(`Plugin: ${CONFIG.pluginPath}`);
console.log(`Worker: http://localhost:${CONFIG.workerPort}`);
console.log(`Transcript cleanup: ${CONFIG.transcriptMaxAgeHours}h`);
console.log("=".repeat(60));
// Initial cleanup
await cleanupOldTranscripts();
// Get files to process
const files = getFilesToProcess();
console.log(`\nFound ${files.length} file(s) to process\n`);
// Process each file in a NEW session
for (let i = 0; i < files.length; i++) {
const file = files[i];
await processFile(file, i, files.length);
// Wait for worker to finish processing observations
console.log("Waiting for worker queue...");
await waitForQueueToEmpty();
// Delay before next session
if (i < files.length - 1 && CONFIG.sessionDelayMs > 0) {
await new Promise((resolve) => setTimeout(resolve, CONFIG.sessionDelayMs));
}
if (message.type === "assistant") {
console.log("Assistant:", message.message.content);
// Periodic transcript cleanup (every 10 files)
if ((i + 1) % 10 === 0) {
await cleanupOldTranscripts();
}
console.log("Raw:", JSON.stringify(message, null, 2));
}
// Wait for the worker queue to be empty before continuing to the next file
console.log("\n=== Waiting for worker queue to empty ===\n");
await waitForQueueToEmpty();
// Final cleanup
await cleanupOldTranscripts();
console.log("\n" + "=".repeat(60));
console.log("Investigation complete");
console.log("=".repeat(60));
}
// Run
main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});