Files
claude-mem/src/bin/import-xml-observations.ts
T
Alex Newman 266c746d50 feat: Fix observation timestamps, refactor session management, and enhance worker reliability (#437)
* Refactor worker version checks and increase timeout settings

- Updated the default hook timeout from 5000ms to 120000ms for improved stability.
- Modified the worker version check to log a warning instead of restarting the worker on version mismatch.
- Removed legacy PM2 cleanup and worker start logic, simplifying the ensureWorkerRunning function.
- Enhanced polling mechanism for worker readiness with increased retries and reduced interval.

* feat: implement worker queue polling to ensure processing completion before proceeding

* refactor: change worker command from start to restart in hooks configuration

* refactor: remove session management complexity

- Simplify createSDKSession to pure INSERT OR IGNORE
- Remove auto-create logic from storeObservation/storeSummary
- Delete 11 unused session management methods
- Derive prompt_number from user_prompts count
- Keep sdk_sessions table schema unchanged for compatibility

* refactor: simplify session management by removing unused methods and auto-creation logic

* Refactor session prompt number retrieval in SessionRoutes

- Updated the method of obtaining the prompt number from the session.
- Replaced `store.getPromptCounter(sessionDbId)` with `store.getPromptNumberFromUserPrompts(claudeSessionId)` for better clarity and accuracy.
- Adjusted the logic for incrementing the prompt number to derive it from the user prompts count instead of directly incrementing a counter.

* refactor: replace getPromptCounter with getPromptNumberFromUserPrompts in SessionManager

Phase 7 of session management simplification. Updates SessionManager to derive
prompt numbers from user_prompts table count instead of using the deprecated
prompt_counter column.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: simplify SessionCompletionHandler to use direct SQL query

Phase 8: Remove call to findActiveSDKSession() and replace with direct
database query in SessionCompletionHandler.completeByClaudeId().

This removes dependency on the deleted findActiveSDKSession() method
and simplifies the code by using a straightforward SELECT query.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: remove markSessionCompleted call from SDKAgent

- Delete call to markSessionCompleted() in SDKAgent.ts
- Session status is no longer tracked or updated
- Part of phase 9: simplifying session management

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: remove markSessionComplete method (Phase 10)

- Deleted markSessionComplete() method from DatabaseManager
- Removed markSessionComplete call from SessionCompletionHandler
- Session completion status no longer tracked in database
- Part of session management simplification effort

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* refactor: replace deleted updateSDKSessionId calls in import script (Phase 11)

- Replace updateSDKSessionId() calls with direct SQL UPDATE statements
- Method was deleted in Phase 3 as part of session management simplification
- Import script now uses direct database access consistently

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

* test: add validation for SQL updates in sdk_sessions table

* refactor: enhance worker-cli to support manual and automated runs

* Remove cleanup hook and associated session completion logic

- Deleted the cleanup-hook implementation from the hooks directory.
- Removed the session completion endpoint that was used by the cleanup hook.
- Updated the SessionCompletionHandler to eliminate the completeByClaudeId method and its dependencies.
- Adjusted the SessionRoutes to reflect the removal of the session completion route.

* fix: update worker-cli command to use bun for consistency

* feat: Implement timestamp fix for observations and enhance processing logic

- Added `earliestPendingTimestamp` to `ActiveSession` to track the original timestamp of the earliest pending message.
- Updated `SDKAgent` to capture and utilize the earliest pending timestamp during response processing.
- Modified `SessionManager` to track the earliest timestamp when yielding messages.
- Created scripts for fixing corrupted timestamps, validating fixes, and investigating timestamp issues.
- Verified that all corrupted observations have been repaired and logic for future processing is sound.
- Ensured orphan processing can be safely re-enabled after validation.

* feat: Enhance SessionStore to support custom database paths and add timestamp fields for observations and summaries

* Refactor pending queue processing and add management endpoints

- Disabled automatic recovery of orphaned queues on startup; users must now use the new /api/pending-queue/process endpoint.
- Updated processOrphanedQueues method to processPendingQueues with improved session handling and return detailed results.
- Added new API endpoints for managing pending queues: GET /api/pending-queue and POST /api/pending-queue/process.
- Introduced a new script (check-pending-queue.ts) for checking and processing pending observation queues interactively or automatically.
- Enhanced logging and error handling for better monitoring of session processing.

* updated agent sdk

* feat: Add manual recovery guide and queue management endpoints to documentation

---------

Co-authored-by: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-25 15:36:46 -05:00

384 lines
11 KiB
JavaScript

#!/usr/bin/env node
/**
* Import XML observations back into the database
* Parses actual_xml_only_with_timestamps.xml and inserts observations via SessionStore
*/
import { readFileSync, readdirSync } from 'fs';
import { join } from 'path';
import { homedir } from 'os';
import { SessionStore } from '../services/sqlite/SessionStore.js';
interface ObservationData {
type: string;
title: string;
subtitle: string;
facts: string[];
narrative: string;
concepts: string[];
files_read: string[];
files_modified: string[];
}
interface SummaryData {
request: string;
investigated: string;
learned: string;
completed: string;
next_steps: string;
notes: string | null;
}
interface SessionMetadata {
sessionId: string;
project: string;
}
interface TimestampMapping {
[timestamp: string]: SessionMetadata;
}
/**
* Build a map of timestamp (rounded to second) -> session metadata by reading all transcript files
* Since XML timestamps are rounded to seconds, we map by second
*/
function buildTimestampMap(): TimestampMapping {
const transcriptDir = join(homedir(), '.claude', 'projects', '-Users-alexnewman-Scripts-claude-mem');
const map: TimestampMapping = {};
console.log(`Reading transcript files from ${transcriptDir}...`);
const files = readdirSync(transcriptDir).filter(f => f.endsWith('.jsonl'));
console.log(`Found ${files.length} transcript files`);
for (const filename of files) {
const filepath = join(transcriptDir, filename);
const content = readFileSync(filepath, 'utf-8');
const lines = content.split('\n').filter(l => l.trim());
for (const line of lines) {
try {
const data = JSON.parse(line);
const timestamp = data.timestamp;
const sessionId = data.sessionId;
const project = data.cwd;
if (timestamp && sessionId) {
// Round timestamp to second for matching with XML timestamps
const roundedTimestamp = new Date(timestamp);
roundedTimestamp.setMilliseconds(0);
const key = roundedTimestamp.toISOString();
// Only store first occurrence for each second (they're all the same session anyway)
if (!map[key]) {
map[key] = { sessionId, project };
}
}
} catch (e) {
// Skip invalid JSON lines
}
}
}
console.log(`Built timestamp map with ${Object.keys(map).length} unique seconds`);
return map;
}
/**
* Parse XML text content and extract tag value
*/
function extractTag(xml: string, tagName: string): string {
const regex = new RegExp(`<${tagName}>([\\s\\S]*?)</${tagName}>`, 'i');
const match = xml.match(regex);
return match ? match[1].trim() : '';
}
/**
* Parse XML array tags (facts, concepts, files, etc.)
*/
function extractArrayTags(xml: string, containerTag: string, itemTag: string): string[] {
const containerRegex = new RegExp(`<${containerTag}>([\\s\\S]*?)</${containerTag}>`, 'i');
const containerMatch = xml.match(containerRegex);
if (!containerMatch) {
return [];
}
const containerContent = containerMatch[1];
const itemRegex = new RegExp(`<${itemTag}>([\\s\\S]*?)</${itemTag}>`, 'gi');
const items: string[] = [];
let match;
while ((match = itemRegex.exec(containerContent)) !== null) {
items.push(match[1].trim());
}
return items;
}
/**
* Parse an observation block from XML
*/
function parseObservation(xml: string): ObservationData | null {
// Must be a complete observation block
if (!xml.includes('<observation>') || !xml.includes('</observation>')) {
return null;
}
try {
const observation: ObservationData = {
type: extractTag(xml, 'type'),
title: extractTag(xml, 'title'),
subtitle: extractTag(xml, 'subtitle'),
facts: extractArrayTags(xml, 'facts', 'fact'),
narrative: extractTag(xml, 'narrative'),
concepts: extractArrayTags(xml, 'concepts', 'concept'),
files_read: extractArrayTags(xml, 'files_read', 'file'),
files_modified: extractArrayTags(xml, 'files_modified', 'file'),
};
// Validate required fields
if (!observation.type || !observation.title) {
return null;
}
return observation;
} catch (e) {
console.error('Error parsing observation:', e);
return null;
}
}
/**
* Parse a summary block from XML
*/
function parseSummary(xml: string): SummaryData | null {
// Must be a complete summary block
if (!xml.includes('<summary>') || !xml.includes('</summary>')) {
return null;
}
try {
const summary: SummaryData = {
request: extractTag(xml, 'request'),
investigated: extractTag(xml, 'investigated'),
learned: extractTag(xml, 'learned'),
completed: extractTag(xml, 'completed'),
next_steps: extractTag(xml, 'next_steps'),
notes: extractTag(xml, 'notes') || null,
};
// Validate required fields
if (!summary.request) {
return null;
}
return summary;
} catch (e) {
console.error('Error parsing summary:', e);
return null;
}
}
/**
* Extract timestamp from XML comment
* Format: <!-- Block N | 2025-10-19 03:03:23 UTC -->
*/
function extractTimestamp(commentLine: string): string | null {
const match = commentLine.match(/<!-- Block \d+ \| (.+?) -->/);
if (match) {
// Convert "2025-10-19 03:03:23 UTC" to ISO format
const dateStr = match[1].replace(' UTC', '').replace(' ', 'T') + 'Z';
return new Date(dateStr).toISOString();
}
return null;
}
/**
* Main import function
*/
function main() {
console.log('Starting XML observation import...\n');
// Build timestamp map
const timestampMap = buildTimestampMap();
// Open database connection
const db = new SessionStore();
// Create SDK sessions for all unique Claude Code sessions
console.log('\nCreating SDK sessions for imported data...');
const claudeSessionToSdkSession = new Map<string, string>();
for (const sessionMeta of Object.values(timestampMap)) {
if (!claudeSessionToSdkSession.has(sessionMeta.sessionId)) {
const syntheticSdkSessionId = `imported-${sessionMeta.sessionId}`;
// Try to find existing session first
const existingQuery = db['db'].prepare(`
SELECT sdk_session_id
FROM sdk_sessions
WHERE claude_session_id = ?
`);
const existing = existingQuery.get(sessionMeta.sessionId) as { sdk_session_id: string | null } | undefined;
if (existing && existing.sdk_session_id) {
// Use existing SDK session ID
claudeSessionToSdkSession.set(sessionMeta.sessionId, existing.sdk_session_id);
} else if (existing && !existing.sdk_session_id) {
// Session exists but sdk_session_id is NULL, update it
db['db'].prepare('UPDATE sdk_sessions SET sdk_session_id = ? WHERE claude_session_id = ?')
.run(syntheticSdkSessionId, sessionMeta.sessionId);
claudeSessionToSdkSession.set(sessionMeta.sessionId, syntheticSdkSessionId);
} else {
// Create new SDK session
db.createSDKSession(
sessionMeta.sessionId,
sessionMeta.project,
'Imported from transcript XML'
);
// Update with synthetic SDK session ID
db['db'].prepare('UPDATE sdk_sessions SET sdk_session_id = ? WHERE claude_session_id = ?')
.run(syntheticSdkSessionId, sessionMeta.sessionId);
claudeSessionToSdkSession.set(sessionMeta.sessionId, syntheticSdkSessionId);
}
}
}
console.log(`Prepared ${claudeSessionToSdkSession.size} SDK sessions\n`);
// Read XML file
const xmlPath = join(process.cwd(), 'actual_xml_only_with_timestamps.xml');
console.log(`Reading XML file: ${xmlPath}`);
const xmlContent = readFileSync(xmlPath, 'utf-8');
// Split into blocks by comment markers
const blocks = xmlContent.split(/(?=<!-- Block \d+)/);
console.log(`Found ${blocks.length} blocks in XML file\n`);
let importedObs = 0;
let importedSum = 0;
let skipped = 0;
let duplicateObs = 0;
let duplicateSum = 0;
let noSession = 0;
for (const block of blocks) {
if (!block.trim() || block.startsWith('<?xml') || block.startsWith('<transcript_extracts')) {
continue;
}
// Extract timestamp from comment
const timestampIso = extractTimestamp(block);
if (!timestampIso) {
skipped++;
continue;
}
// Look up session metadata
const sessionMeta = timestampMap[timestampIso];
if (!sessionMeta) {
noSession++;
if (noSession <= 5) {
console.log(`⚠️ No session found for timestamp: ${timestampIso}`);
}
skipped++;
continue;
}
// Get SDK session ID
const sdkSessionId = claudeSessionToSdkSession.get(sessionMeta.sessionId);
if (!sdkSessionId) {
skipped++;
continue;
}
// Try parsing as observation first
const observation = parseObservation(block);
if (observation) {
// Check for duplicate
const existingObs = db['db'].prepare(`
SELECT id FROM observations
WHERE sdk_session_id = ? AND title = ? AND subtitle = ? AND type = ?
`).get(sdkSessionId, observation.title, observation.subtitle, observation.type);
if (existingObs) {
duplicateObs++;
continue;
}
try {
db.storeObservation(
sdkSessionId,
sessionMeta.project,
observation
);
importedObs++;
if (importedObs % 50 === 0) {
console.log(`Imported ${importedObs} observations...`);
}
} catch (e) {
console.error(`Error storing observation:`, e);
skipped++;
}
continue;
}
// Try parsing as summary
const summary = parseSummary(block);
if (summary) {
// Check for duplicate
const existingSum = db['db'].prepare(`
SELECT id FROM session_summaries
WHERE sdk_session_id = ? AND request = ? AND completed = ? AND learned = ?
`).get(sdkSessionId, summary.request, summary.completed, summary.learned);
if (existingSum) {
duplicateSum++;
continue;
}
try {
db.storeSummary(
sdkSessionId,
sessionMeta.project,
summary
);
importedSum++;
if (importedSum % 10 === 0) {
console.log(`Imported ${importedSum} summaries...`);
}
} catch (e) {
console.error(`Error storing summary:`, e);
skipped++;
}
continue;
}
// Neither observation nor summary - skip
skipped++;
}
db.close();
console.log('\n' + '='.repeat(60));
console.log('Import Complete!');
console.log('='.repeat(60));
console.log(`✓ Imported: ${importedObs} observations`);
console.log(`✓ Imported: ${importedSum} summaries`);
console.log(`✓ Total: ${importedObs + importedSum} items`);
console.log(`⊘ Skipped: ${skipped} blocks (not full observations or summaries)`);
console.log(`⊘ Duplicates skipped: ${duplicateObs} observations, ${duplicateSum} summaries`);
console.log(`⚠️ No session: ${noSession} blocks (timestamp not in transcripts)`);
console.log('='.repeat(60));
}
// Run if executed directly
if (import.meta.url === `file://${process.argv[1]}`) {
main();
}