4ecdc4c9b3
- Implement auto-recovery of orphaned queues on startup in WorkerService. - Introduce startSessionWithAutoRestart method for continuous processing of pending work. - Modify SDKAgent to prevent session deletion during processing to avoid race conditions. - Update SessionManager to allow continued processing after yielding summaries. - Add logic in SessionRoutes to mark processing messages as failed upon generator errors. - Create detailed documentation for the queue system logic, including recovery mechanisms and potential issues.
743 lines
29 KiB
Markdown
743 lines
29 KiB
Markdown
# Queue System Logic Report
|
|
|
|
This document provides a line-by-line analysis of the queue system in claude-mem, explaining **the reason behind each piece of logic** and **what it actually does**.
|
|
|
|
---
|
|
|
|
## Table of Contents
|
|
|
|
1. [High-Level Architecture](#high-level-architecture)
|
|
2. [Message Status State Machine](#message-status-state-machine)
|
|
3. [PendingMessageStore (Database Layer)](#pendingmessagestore-database-layer)
|
|
4. [SessionManager (Queue Coordination)](#sessionmanager-queue-coordination)
|
|
5. [SDKAgent (Message Consumer)](#sdkagent-message-consumer)
|
|
6. [SessionRoutes (HTTP Entry Points)](#sessionroutes-http-entry-points)
|
|
7. [WorkerService (Orchestrator)](#workerservice-orchestrator)
|
|
8. [Critical Flow: How a Message Gets Stuck in "Processing"](#critical-flow-how-a-message-gets-stuck-in-processing)
|
|
9. [Recovery Mechanisms](#recovery-mechanisms)
|
|
|
|
---
|
|
|
|
## High-Level Architecture
|
|
|
|
```
|
|
Hook (post-tool-use/summary)
|
|
│
|
|
▼
|
|
SessionRoutes.handleObservations/handleSummarize
|
|
│
|
|
▼
|
|
SessionManager.queueObservation/queueSummarize
|
|
│
|
|
├─► PendingMessageStore.enqueue() [DB: status='pending']
|
|
│
|
|
├─► session.pendingMessages.push() [In-memory queue]
|
|
│
|
|
└─► emitter.emit('message') [Wake up generator]
|
|
|
|
│
|
|
▼
|
|
SDKAgent.createMessageGenerator (async generator)
|
|
│
|
|
├─► SessionManager.getMessageIterator()
|
|
│ │
|
|
│ ├─► PendingMessageStore.peekPending() [Find oldest pending]
|
|
│ │
|
|
│ ├─► PendingMessageStore.markProcessing() [DB: status='processing']
|
|
│ │
|
|
│ └─► yield message to SDK
|
|
│
|
|
▼
|
|
SDK query() processes message and returns response
|
|
│
|
|
▼
|
|
SDKAgent.processSDKResponse()
|
|
│
|
|
└─► SDKAgent.markMessagesProcessed()
|
|
│
|
|
└─► PendingMessageStore.markProcessed() [DB: status='processed']
|
|
```
|
|
|
|
---
|
|
|
|
## Message Status State Machine
|
|
|
|
```
|
|
┌─────────────┐
|
|
│ (new) │
|
|
└──────┬──────┘
|
|
│ enqueue()
|
|
▼
|
|
┌─────────────┐
|
|
┌────│ pending │◄───────────────┐
|
|
│ └──────┬──────┘ │
|
|
│ │ markProcessing() │ markFailed() [retry_count < maxRetries]
|
|
│ ▼ │
|
|
│ ┌─────────────┐ │
|
|
│ │ processing │────────────────┤
|
|
│ └──────┬──────┘ │
|
|
│ │ │
|
|
│ ├─► markProcessed() │
|
|
│ │ │ │
|
|
│ │ ▼ │
|
|
│ │ ┌─────────────┐ │
|
|
│ │ │ processed │ │
|
|
│ │ └─────────────┘ │
|
|
│ │ │
|
|
│ └─► markFailed() [retry_count >= maxRetries]
|
|
│ │
|
|
│ ▼
|
|
│ ┌─────────────┐
|
|
│ │ failed │
|
|
│ └─────────────┘
|
|
│
|
|
│
|
|
│ resetStuckMessages() [thresholdMs timeout]
|
|
└───────────────────────────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## PendingMessageStore (Database Layer)
|
|
|
|
### `enqueue()` (Lines 56-82)
|
|
|
|
```typescript
|
|
enqueue(sessionDbId: number, claudeSessionId: string, message: PendingMessage): number {
|
|
const now = Date.now();
|
|
const stmt = this.db.prepare(`
|
|
INSERT INTO pending_messages (
|
|
session_db_id, claude_session_id, message_type,
|
|
tool_name, tool_input, tool_response, cwd,
|
|
last_user_message, last_assistant_message,
|
|
prompt_number, status, retry_count, created_at_epoch
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?)
|
|
`);
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `const now = Date.now()` | Messages need timestamps for ordering and stuck-detection | Captures the moment the message was queued |
|
|
| `status, retry_count ... 'pending', 0` | New messages start in pending state with no retries | Hard-codes initial state in SQL |
|
|
| `created_at_epoch` | Need to track when message was originally queued for accurate observation timestamps | Used later when processing backlog to assign correct timestamps to observations |
|
|
| `JSON.stringify(message.tool_input)` | SQLite can't store objects natively | Serializes complex tool data to string |
|
|
| Returns `lastInsertRowid` | Caller needs the ID to track this specific message | Returns the database-assigned auto-increment ID |
|
|
|
|
### `peekPending()` (Lines 88-96)
|
|
|
|
```typescript
|
|
peekPending(sessionDbId: number): PersistentPendingMessage | null {
|
|
const stmt = this.db.prepare(`
|
|
SELECT * FROM pending_messages
|
|
WHERE session_db_id = ? AND status = 'pending'
|
|
ORDER BY id ASC
|
|
LIMIT 1
|
|
`);
|
|
return stmt.get(sessionDbId) as PersistentPendingMessage | null;
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `status = 'pending'` | Only look at messages not yet being processed | Filters out processing/processed/failed |
|
|
| `ORDER BY id ASC` | Process messages in the order they arrived (FIFO) | Uses auto-increment ID as natural ordering |
|
|
| `LIMIT 1` | Only need one message at a time for the iterator | Returns single oldest pending message |
|
|
| Does NOT change status | Peek is non-destructive; status change happens separately in markProcessing | Allows checking without committing to process |
|
|
|
|
### `markProcessing()` (Lines 216-224)
|
|
|
|
```typescript
|
|
markProcessing(messageId: number): void {
|
|
const now = Date.now();
|
|
const stmt = this.db.prepare(`
|
|
UPDATE pending_messages
|
|
SET status = 'processing', started_processing_at_epoch = ?
|
|
WHERE id = ? AND status = 'pending'
|
|
`);
|
|
stmt.run(now, messageId);
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `status = 'processing'` | Mark this message as "in progress" so other consumers don't pick it up | Prevents duplicate processing |
|
|
| `started_processing_at_epoch = ?` | Track when processing started for stuck detection | If processing takes >5min, considered stuck |
|
|
| `WHERE ... AND status = 'pending'` | Only transition from pending->processing (idempotent safety) | Prevents double-processing race conditions |
|
|
|
|
### `markProcessed()` (Lines 230-242)
|
|
|
|
```typescript
|
|
markProcessed(messageId: number): void {
|
|
const now = Date.now();
|
|
const stmt = this.db.prepare(`
|
|
UPDATE pending_messages
|
|
SET
|
|
status = 'processed',
|
|
completed_at_epoch = ?,
|
|
tool_input = NULL,
|
|
tool_response = NULL
|
|
WHERE id = ? AND status = 'processing'
|
|
`);
|
|
stmt.run(now, messageId);
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `status = 'processed'` | Message successfully handled, move to terminal state | Marks completion |
|
|
| `completed_at_epoch = ?` | Track when processing finished for metrics/display | Records completion time |
|
|
| `tool_input = NULL, tool_response = NULL` | Large payload data no longer needed after successful processing | Frees space - observations are already saved elsewhere |
|
|
| `WHERE ... AND status = 'processing'` | Only transition from processing->processed | Ensures we only complete messages we actually processed |
|
|
|
|
### `markFailed()` (Lines 249-274)
|
|
|
|
```typescript
|
|
markFailed(messageId: number): void {
|
|
const msg = this.db.prepare('SELECT retry_count FROM pending_messages WHERE id = ?').get(messageId);
|
|
|
|
if (msg.retry_count < this.maxRetries) {
|
|
// Move back to pending for retry
|
|
const stmt = this.db.prepare(`
|
|
UPDATE pending_messages
|
|
SET status = 'pending', retry_count = retry_count + 1, started_processing_at_epoch = NULL
|
|
WHERE id = ?
|
|
`);
|
|
} else {
|
|
// Max retries exceeded, mark as permanently failed
|
|
const stmt = this.db.prepare(`
|
|
UPDATE pending_messages
|
|
SET status = 'failed', completed_at_epoch = ?
|
|
WHERE id = ?
|
|
`);
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Check `retry_count < maxRetries` | Don't retry forever - eventually give up | Implements bounded retry policy (default: 3) |
|
|
| `status = 'pending'` (retry path) | Put message back in queue for another attempt | Allows automatic recovery |
|
|
| `retry_count + 1` | Track how many times we've tried | Increment toward failure threshold |
|
|
| `started_processing_at_epoch = NULL` | Clear the processing timestamp for next attempt | Prevents stuck detection from immediately triggering |
|
|
| `status = 'failed'` (terminal) | Message is permanently broken, stop trying | Prevents infinite retry loops |
|
|
|
|
### `resetStuckMessages()` (Lines 281-292)
|
|
|
|
```typescript
|
|
resetStuckMessages(thresholdMs: number): number {
|
|
const cutoff = thresholdMs === 0 ? Date.now() : Date.now() - thresholdMs;
|
|
|
|
const stmt = this.db.prepare(`
|
|
UPDATE pending_messages
|
|
SET status = 'pending', started_processing_at_epoch = NULL
|
|
WHERE status = 'processing' AND started_processing_at_epoch < ?
|
|
`);
|
|
|
|
return result.changes;
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `thresholdMs === 0 ? Date.now()` | Special case: threshold=0 means "reset all processing messages" | Allows forced recovery of all stuck messages |
|
|
| `Date.now() - thresholdMs` | Calculate cutoff time (e.g., 5 minutes ago) | Messages processing longer than this are stuck |
|
|
| `status = 'processing'` condition | Only reset messages actively being processed | Don't touch pending or completed messages |
|
|
| `started_processing_at_epoch < ?` | Processing started before cutoff = stuck | Time-based stuck detection |
|
|
| `SET status = 'pending'` | Move back to queue for retry | Enables automatic recovery |
|
|
| Returns `result.changes` | Caller needs to know how many were recovered | For logging/metrics |
|
|
|
|
### `getPendingCount()` (Lines 297-304)
|
|
|
|
```typescript
|
|
getPendingCount(sessionDbId: number): number {
|
|
const stmt = this.db.prepare(`
|
|
SELECT COUNT(*) as count FROM pending_messages
|
|
WHERE session_db_id = ? AND status IN ('pending', 'processing')
|
|
`);
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `status IN ('pending', 'processing')` | **CRITICAL**: Counts BOTH pending AND processing | Used to decide if generator should keep running |
|
|
| Why include processing? | A message in processing state is still "work to be done" | Prevents generator from stopping while SDK is mid-response |
|
|
|
|
---
|
|
|
|
## SessionManager (Queue Coordination)
|
|
|
|
### `queueObservation()` (Lines 181-232)
|
|
|
|
```typescript
|
|
queueObservation(sessionDbId: number, data: ObservationData): void {
|
|
// Auto-initialize from database if needed
|
|
let session = this.sessions.get(sessionDbId);
|
|
if (!session) {
|
|
session = this.initializeSession(sessionDbId);
|
|
}
|
|
|
|
// CRITICAL: Persist to database FIRST
|
|
const message: PendingMessage = { type: 'observation', ... };
|
|
const messageId = this.getPendingStore().enqueue(sessionDbId, session.claudeSessionId, message);
|
|
|
|
// Add to in-memory queue
|
|
session.pendingMessages.push(message);
|
|
|
|
// Notify generator immediately
|
|
const emitter = this.sessionQueues.get(sessionDbId);
|
|
emitter?.emit('message');
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Auto-initialize session | Worker may have restarted, need to rebuild in-memory state | Lazy initialization from database |
|
|
| `enqueue()` BEFORE in-memory push | **CRITICAL**: Database is source of truth, survives crashes | Persist-first ensures no data loss |
|
|
| `session.pendingMessages.push()` | In-memory queue for backward compatibility and fast status checks | Mirrors database state in RAM |
|
|
| `emitter?.emit('message')` | Wake up the generator immediately (zero-latency) | Event-driven, no polling needed |
|
|
|
|
### `getMessageIterator()` (Lines 397-477)
|
|
|
|
```typescript
|
|
async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessageWithId> {
|
|
while (!session.abortController.signal.aborted) {
|
|
// Check for pending messages in persistent store
|
|
const persistentMessage = this.getPendingStore().peekPending(sessionDbId);
|
|
|
|
if (!persistentMessage) {
|
|
// Wait for new message event
|
|
await new Promise<void>(resolve => {
|
|
emitter.once('message', messageHandler);
|
|
session.abortController.signal.addEventListener('abort', abortHandler, { once: true });
|
|
});
|
|
continue;
|
|
}
|
|
|
|
// Mark as processing BEFORE yielding
|
|
this.getPendingStore().markProcessing(persistentMessage.id);
|
|
|
|
// Track this message ID for completion marking
|
|
session.pendingProcessingIds.add(persistentMessage.id);
|
|
|
|
// Convert and yield
|
|
const message: PendingMessageWithId = {
|
|
_persistentId: persistentMessage.id,
|
|
_originalTimestamp: persistentMessage.created_at_epoch,
|
|
...this.getPendingStore().toPendingMessage(persistentMessage)
|
|
};
|
|
|
|
yield message;
|
|
|
|
// Remove from in-memory queue after yielding
|
|
session.pendingMessages.shift();
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `while (!aborted)` | Keep processing until session ends | Continuous processing loop |
|
|
| `peekPending()` | Check database for work | Non-destructively looks for pending messages |
|
|
| `await new Promise` with event | Block until message arrives (no polling) | Event-driven wake-up saves CPU |
|
|
| `markProcessing()` BEFORE yield | **CRITICAL**: Claim the message before giving to SDK | Prevents race conditions |
|
|
| `pendingProcessingIds.add()` | Track which messages are being processed | So we know what to mark as completed |
|
|
| `_persistentId` field | Attach database ID to in-flight message | Needed for markProcessed() later |
|
|
| `_originalTimestamp` | Preserve original queue time | For accurate observation timestamps when processing backlog |
|
|
| `pendingMessages.shift()` after yield | Keep in-memory queue in sync with database | Mirrors the database state change |
|
|
|
|
---
|
|
|
|
## SDKAgent (Message Consumer)
|
|
|
|
### `startSession()` Main Loop (Lines 75-150)
|
|
|
|
```typescript
|
|
const queryResult = query({
|
|
prompt: messageGenerator,
|
|
options: {
|
|
model: modelId,
|
|
resume: session.claudeSessionId, // <-- Session continuity
|
|
disallowedTools,
|
|
abortController: session.abortController,
|
|
pathToClaudeCodeExecutable: claudePath
|
|
}
|
|
});
|
|
|
|
for await (const message of queryResult) {
|
|
if (message.type === 'assistant') {
|
|
// Process response
|
|
await this.processSDKResponse(session, textContent, worker, discoveryTokens, originalTimestamp);
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `resume: session.claudeSessionId` | **CRITICAL**: Connect to existing Claude session | Enables session continuity - same transcript across prompts |
|
|
| `for await` loop | Process SDK responses as they arrive | Streaming response handling |
|
|
| `processSDKResponse()` called per response | Parse and save observations/summaries | Database + Chroma sync |
|
|
|
|
### `createMessageGenerator()` (Lines 202-291)
|
|
|
|
```typescript
|
|
private async *createMessageGenerator(session: ActiveSession): AsyncIterableIterator<SDKUserMessage> {
|
|
// Build initial or continuation prompt
|
|
const initPrompt = isInitPrompt
|
|
? buildInitPrompt(...)
|
|
: buildContinuationPrompt(...);
|
|
|
|
// Yield initial prompt
|
|
yield { type: 'user', message: { role: 'user', content: initPrompt }, session_id: session.claudeSessionId };
|
|
|
|
// Consume pending messages
|
|
for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
|
|
if (message.type === 'observation') {
|
|
const obsPrompt = buildObservationPrompt({ ... });
|
|
yield { type: 'user', message: { role: 'user', content: obsPrompt } };
|
|
} else if (message.type === 'summarize') {
|
|
const summaryPrompt = buildSummaryPrompt({ ... });
|
|
yield { type: 'user', message: { role: 'user', content: summaryPrompt } };
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `isInitPrompt` check | First prompt needs full context, subsequent prompts need continuation | Different prompt templates |
|
|
| `yield` initial prompt | Start the SDK conversation | Sends initialization to Claude |
|
|
| `for await ... getMessageIterator` | Pull messages as they become available | Event-driven message consumption |
|
|
| `yield` for each message | Feed observations/summaries to SDK one at a time | SDK processes each and responds |
|
|
|
|
### `markMessagesProcessed()` (Lines 462-491)
|
|
|
|
```typescript
|
|
private async markMessagesProcessed(session: ActiveSession, worker: any): Promise<void> {
|
|
const pendingMessageStore = this.sessionManager.getPendingMessageStore();
|
|
|
|
if (session.pendingProcessingIds.size > 0) {
|
|
for (const messageId of session.pendingProcessingIds) {
|
|
pendingMessageStore.markProcessed(messageId);
|
|
}
|
|
session.pendingProcessingIds.clear();
|
|
session.earliestPendingTimestamp = null;
|
|
|
|
// Cleanup old processed messages
|
|
const deletedCount = pendingMessageStore.cleanupProcessed(100);
|
|
}
|
|
|
|
// Broadcast status update
|
|
if (worker && typeof worker.broadcastProcessingStatus === 'function') {
|
|
worker.broadcastProcessingStatus();
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Loop over `pendingProcessingIds` | Mark ALL messages that were yielded to SDK | Batch completion |
|
|
| `markProcessed()` for each | Transition processing->processed in database | Completes the message lifecycle |
|
|
| `.clear()` | Reset tracking set for next batch | Prepare for next iteration |
|
|
| `earliestPendingTimestamp = null` | Reset timestamp tracking | Next batch gets fresh timestamps |
|
|
| `cleanupProcessed(100)` | Don't keep infinite processed messages | Retention policy |
|
|
| `broadcastProcessingStatus()` | Update UI with new state | SSE broadcast |
|
|
|
|
---
|
|
|
|
## SessionRoutes (HTTP Entry Points)
|
|
|
|
### `startGeneratorWithProvider()` (Lines 118-189)
|
|
|
|
```typescript
|
|
private startGeneratorWithProvider(session, provider, source): void {
|
|
session.currentProvider = provider;
|
|
|
|
session.generatorPromise = agent.startSession(session, this.workerService)
|
|
.catch(error => {
|
|
// Mark all processing messages as failed
|
|
const processingMessages = stmt.all(session.sessionDbId);
|
|
for (const msg of processingMessages) {
|
|
pendingStore.markFailed(msg.id);
|
|
}
|
|
})
|
|
.finally(() => {
|
|
session.generatorPromise = null;
|
|
session.currentProvider = null;
|
|
this.workerService.broadcastProcessingStatus();
|
|
|
|
// Check if there's more work pending
|
|
const pendingCount = pendingStore.getPendingCount(sessionDbId);
|
|
if (pendingCount > 0) {
|
|
// Auto-restart
|
|
setTimeout(() => {
|
|
if (stillExists && !stillExists.generatorPromise) {
|
|
this.startGeneratorWithProvider(stillExists, this.getSelectedProvider(), 'auto-restart');
|
|
}
|
|
}, 0);
|
|
} else {
|
|
// Cleanup
|
|
this.sessionManager.deleteSession(sessionDbId);
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| `session.generatorPromise =` | Track that generator is running | Prevents multiple generators per session |
|
|
| `.catch()` with markFailed | If generator crashes, don't lose messages | Marks for retry or permanent failure |
|
|
| `.finally()` | Always cleanup regardless of success/failure | Guaranteed cleanup |
|
|
| `generatorPromise = null` | Allow new generator to start | Clears the "running" flag |
|
|
| `getPendingCount() > 0` | **CRITICAL**: Check if more work arrived while processing | Handles messages queued during SDK call |
|
|
| `setTimeout(..., 0)` | Don't restart synchronously (could cause stack issues) | Deferred restart |
|
|
| `deleteSession()` when no work | Clean up resources | Memory management |
|
|
|
|
### `ensureGeneratorRunning()` (Lines 90-113)
|
|
|
|
```typescript
|
|
private ensureGeneratorRunning(sessionDbId: number, source: string): void {
|
|
const session = this.sessionManager.getSession(sessionDbId);
|
|
if (!session) return;
|
|
|
|
const selectedProvider = this.getSelectedProvider();
|
|
|
|
// Start generator if not running
|
|
if (!session.generatorPromise) {
|
|
this.startGeneratorWithProvider(session, selectedProvider, source);
|
|
return;
|
|
}
|
|
|
|
// Generator is running - check if provider changed
|
|
if (session.currentProvider && session.currentProvider !== selectedProvider) {
|
|
// Let current generator finish, next one will use new provider
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Check `!generatorPromise` | Only start if not already running | Prevents duplicate generators |
|
|
| Start generator if not running | Ensure messages get processed | Lazy generator startup |
|
|
| Provider change detection | Allow switching providers mid-session | Graceful provider transition |
|
|
|
|
---
|
|
|
|
## WorkerService (Orchestrator)
|
|
|
|
### `initializeBackground()` Stuck Message Recovery (Lines 627-633)
|
|
|
|
```typescript
|
|
// Recover stuck messages from previous crashes
|
|
const STUCK_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
|
|
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
|
|
if (resetCount > 0) {
|
|
logger.info('SYSTEM', `Recovered ${resetCount} stuck messages from previous session`);
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Called at startup | Worker may have crashed while messages were processing | Recovery mechanism |
|
|
| 5 minute threshold | If processing >5min, something went wrong | Reasonable timeout for SDK calls |
|
|
| Reset to pending | Give stuck messages another chance | Automatic retry |
|
|
|
|
### `processPendingQueues()` (Lines 747-811)
|
|
|
|
```typescript
|
|
async processPendingQueues(sessionLimit: number = 10): Promise<Result> {
|
|
const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();
|
|
|
|
for (const sessionDbId of orphanedSessionIds) {
|
|
// Skip if session already has active generator
|
|
const existingSession = this.sessionManager.getSession(sessionDbId);
|
|
if (existingSession?.generatorPromise) {
|
|
result.sessionsSkipped++;
|
|
continue;
|
|
}
|
|
|
|
// Initialize session and start SDK agent
|
|
const session = this.sessionManager.initializeSession(sessionDbId);
|
|
this.startSessionWithAutoRestart(session, getPendingCount, 'startup-recovery');
|
|
}
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Called at startup | Resume work interrupted by crash/restart | Auto-recovery |
|
|
| `getSessionsWithPendingMessages()` | Find sessions that have orphaned work | Database query |
|
|
| Skip if generator running | Don't start duplicate processors | Race condition prevention |
|
|
| `startSessionWithAutoRestart()` | Start processing with auto-restart logic | Shares code with SessionRoutes |
|
|
|
|
### `startSessionWithAutoRestart()` (Lines 696-739)
|
|
|
|
```typescript
|
|
private startSessionWithAutoRestart(session, getPendingCount, source): void {
|
|
session.generatorPromise = this.sdkAgent.startSession(session, this)
|
|
.catch(error => { ... })
|
|
.finally(() => {
|
|
session.generatorPromise = null;
|
|
this.broadcastProcessingStatus();
|
|
|
|
const stillPending = getPendingCount(sid);
|
|
if (stillPending > 0) {
|
|
// Recursive restart
|
|
setTimeout(() => {
|
|
const stillExists = this.sessionManager.getSession(sid);
|
|
if (stillExists && !stillExists.generatorPromise) {
|
|
this.startSessionWithAutoRestart(stillExists, getPendingCount, 'auto-restart');
|
|
}
|
|
}, 0);
|
|
} else {
|
|
// Cleanup
|
|
this.sessionManager.deleteSession(sid);
|
|
}
|
|
});
|
|
}
|
|
```
|
|
|
|
| Line | The Reason Behind This | What It Actually Does |
|
|
|------|------------------------|----------------------|
|
|
| Same pattern as SessionRoutes | **DRY**: Shared auto-restart logic | Prevents code duplication |
|
|
| Recursive restart | Keep processing until queue is empty | Ensures all messages processed |
|
|
| Check `stillExists` before restart | Session might have been deleted | Safety check |
|
|
|
|
---
|
|
|
|
## Critical Flow: How a Message Gets Stuck in "Processing"
|
|
|
|
### The Problem
|
|
|
|
Messages can get stuck in `status = 'processing'` if:
|
|
|
|
1. **SDK call hangs indefinitely** - The Agent SDK query never returns
|
|
2. **Worker crashes mid-processing** - Process dies before markProcessed()
|
|
3. **Exception in processSDKResponse()** - Error prevents markProcessed() from running
|
|
|
|
### The Flow
|
|
|
|
```
|
|
1. queueObservation() called
|
|
└─► enqueue() → status = 'pending'
|
|
|
|
2. getMessageIterator() picks up message
|
|
└─► markProcessing() → status = 'processing' ✓
|
|
└─► pendingProcessingIds.add(id)
|
|
└─► yield message to SDK
|
|
|
|
3. SDK processes and returns response
|
|
└─► processSDKResponse() called
|
|
└─► Parse observations/summaries
|
|
└─► Store to database
|
|
└─► markMessagesProcessed()
|
|
└─► markProcessed() → status = 'processed' ✓
|
|
|
|
IF STEP 3 FAILS OR HANGS:
|
|
└─► Message stays in 'processing' forever
|
|
└─► Recovery: resetStuckMessages() after 5 minutes
|
|
```
|
|
|
|
### Why Processing Messages Can Get "Lost"
|
|
|
|
**Race Condition in getMessageIterator():**
|
|
|
|
```typescript
|
|
// Lines 445-446 in SessionManager
|
|
this.getPendingStore().markProcessing(persistentMessage.id);
|
|
session.pendingProcessingIds.add(persistentMessage.id);
|
|
```
|
|
|
|
The message is marked as `processing` BEFORE being yielded. If the SDK hangs or crashes AFTER this line but BEFORE processSDKResponse completes, the message is stuck.
|
|
|
|
**Protection Mechanisms:**
|
|
|
|
1. `pendingProcessingIds` tracks what's in-flight
|
|
2. `markFailed()` in catch handler marks for retry
|
|
3. `resetStuckMessages()` at startup cleans up old stuck messages
|
|
|
|
---
|
|
|
|
## Recovery Mechanisms
|
|
|
|
### 1. Startup Recovery (Worker crashes)
|
|
|
|
```typescript
|
|
// In initializeBackground()
|
|
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
|
|
```
|
|
|
|
- Runs when worker starts
|
|
- Finds messages stuck in `processing` for >5 minutes
|
|
- Resets them to `pending` for retry
|
|
|
|
### 2. Generator Error Recovery
|
|
|
|
```typescript
|
|
// In startGeneratorWithProvider() catch handler
|
|
for (const msg of processingMessages) {
|
|
pendingStore.markFailed(msg.id);
|
|
}
|
|
```
|
|
|
|
- Runs when SDK call throws
|
|
- Marks processing messages as failed (which may reset to pending if retries remain)
|
|
|
|
### 3. Auto-Restart Recovery
|
|
|
|
```typescript
|
|
// In startGeneratorWithProvider() finally handler
|
|
if (pendingCount > 0) {
|
|
setTimeout(() => startGeneratorWithProvider(...), 0);
|
|
}
|
|
```
|
|
|
|
- Runs after every generator completes
|
|
- Checks for pending work
|
|
- Starts new generator if work remains
|
|
|
|
### 4. Manual Recovery (UI)
|
|
|
|
```typescript
|
|
// PendingMessageStore methods
|
|
retryMessage(messageId) // Reset specific message to pending
|
|
retryAllStuck(thresholdMs) // Reset all stuck messages
|
|
abortMessage(messageId) // Delete message from queue
|
|
```
|
|
|
|
---
|
|
|
|
## Summary of Potential Issues
|
|
|
|
| Issue | Cause | Mitigation |
|
|
|-------|-------|------------|
|
|
| Message stuck in processing | SDK hang, crash during processing | `resetStuckMessages()` at startup |
|
|
| Duplicate processing | Race condition on message claim | `markProcessing()` with `WHERE status = 'pending'` |
|
|
| Lost messages | Crash before enqueue | DB persist BEFORE in-memory push |
|
|
| Generator never starts | No call to `ensureGeneratorRunning()` | Called by every HTTP handler |
|
|
| Generator exits early | Empty queue check race | `finally` handler checks and restarts |
|
|
| Infinite retry | Repeated failures | `maxRetries` limit (default: 3) |
|
|
|
|
---
|
|
|
|
## Diagnostic Queries
|
|
|
|
Check for stuck messages:
|
|
```sql
|
|
SELECT * FROM pending_messages
|
|
WHERE status = 'processing'
|
|
AND started_processing_at_epoch < (strftime('%s', 'now') * 1000 - 300000);
|
|
```
|
|
|
|
Check queue depth by session:
|
|
```sql
|
|
SELECT session_db_id, status, COUNT(*)
|
|
FROM pending_messages
|
|
GROUP BY session_db_id, status;
|
|
```
|
|
|
|
Check retry counts:
|
|
```sql
|
|
SELECT id, message_type, retry_count, status
|
|
FROM pending_messages
|
|
WHERE retry_count > 0;
|
|
```
|