fix: implement pending_messages cleanup to prevent unbounded growth

Fixes #353 - Observations not being saved due to incomplete pending messages implementation

Changes:
- PendingMessageStore.markProcessed() now clears tool_input and tool_response
- PendingMessageStore.cleanupProcessed() changed from time-based to count-based retention
- Keeps most recent 100 processed messages for UI display
- SDKAgent.processSDKResponse() calls cleanup after marking messages processed

This prevents the database from growing unbounded with duplicate transcript data.
The pending_messages table now only stores full transcripts for pending/processing
messages, while processed messages keep metadata only.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2025-12-16 16:45:04 -05:00
parent f7c0840a35
commit d6cd9e6059
4 changed files with 71 additions and 39 deletions
+17 -7
View File
@@ -224,12 +224,17 @@ export class PendingMessageStore {
/**
* Mark message as successfully processed (status: processing -> processed)
* Clears tool_input and tool_response to save space (observations are already saved)
*/
markProcessed(messageId: number): void {
const now = Date.now();
const stmt = this.db.prepare(`
UPDATE pending_messages
SET status = 'processed', completed_at_epoch = ?
SET
status = 'processed',
completed_at_epoch = ?,
tool_input = NULL,
tool_response = NULL
WHERE id = ? AND status = 'processing'
`);
stmt.run(now, messageId);
@@ -334,18 +339,23 @@ export class PendingMessageStore {
/**
* Cleanup old processed messages (retention policy)
* @param retentionMs Delete processed messages older than this (0 = delete all processed)
* Keeps the most recent N processed messages, deletes the rest
* @param retentionCount Number of processed messages to keep (default: 100)
* @returns Number of messages deleted
*/
cleanupProcessed(retentionMs: number): number {
const cutoff = retentionMs === 0 ? Date.now() : Date.now() - retentionMs;
cleanupProcessed(retentionCount: number = 100): number {
const stmt = this.db.prepare(`
DELETE FROM pending_messages
WHERE status = 'processed' AND completed_at_epoch < ?
WHERE status = 'processed'
AND id NOT IN (
SELECT id FROM pending_messages
WHERE status = 'processed'
ORDER BY completed_at_epoch DESC
LIMIT ?
)
`);
const result = stmt.run(cutoff);
const result = stmt.run(retentionCount);
return result.changes;
}
+8
View File
@@ -409,6 +409,14 @@ export class SDKAgent {
count: session.pendingProcessingIds.size
});
session.pendingProcessingIds.clear();
// Clean up old processed messages (keep last 100 for UI display)
const deletedCount = pendingMessageStore.cleanupProcessed(100);
if (deletedCount > 0) {
logger.debug('SDK', 'Cleaned up old processed messages', {
deletedCount
});
}
}
// Broadcast activity status after processing (queue may have changed)