Files
claude-mem/QUEUE-SYSTEM-ANALYSIS.md
T
Alex Newman 4ecdc4c9b3 Enhance queue processing and recovery mechanisms
- Implement auto-recovery of orphaned queues on startup in WorkerService.
- Introduce startSessionWithAutoRestart method for continuous processing of pending work.
- Modify SDKAgent to prevent session deletion during processing to avoid race conditions.
- Update SessionManager to allow continued processing after yielding summaries.
- Add logic in SessionRoutes to mark processing messages as failed upon generator errors.
- Create detailed documentation for the queue system logic, including recovery mechanisms and potential issues.
2025-12-28 15:44:54 -05:00

29 KiB

Queue System Logic Report

This document provides a line-by-line analysis of the queue system in claude-mem, explaining the reason behind each piece of logic and what it actually does.


Table of Contents

  1. High-Level Architecture
  2. Message Status State Machine
  3. PendingMessageStore (Database Layer)
  4. SessionManager (Queue Coordination)
  5. SDKAgent (Message Consumer)
  6. SessionRoutes (HTTP Entry Points)
  7. WorkerService (Orchestrator)
  8. Critical Flow: How a Message Gets Stuck in "Processing"
  9. Recovery Mechanisms

High-Level Architecture

Hook (post-tool-use/summary)
    │
    ▼
SessionRoutes.handleObservations/handleSummarize
    │
    ▼
SessionManager.queueObservation/queueSummarize
    │
    ├─► PendingMessageStore.enqueue() [DB: status='pending']
    │
    ├─► session.pendingMessages.push() [In-memory queue]
    │
    └─► emitter.emit('message') [Wake up generator]

    │
    ▼
SDKAgent.createMessageGenerator (async generator)
    │
    ├─► SessionManager.getMessageIterator()
    │       │
    │       ├─► PendingMessageStore.peekPending() [Find oldest pending]
    │       │
    │       ├─► PendingMessageStore.markProcessing() [DB: status='processing']
    │       │
    │       └─► yield message to SDK
    │
    ▼
SDK query() processes message and returns response
    │
    ▼
SDKAgent.processSDKResponse()
    │
    └─► SDKAgent.markMessagesProcessed()
            │
            └─► PendingMessageStore.markProcessed() [DB: status='processed']

Message Status State Machine

                   ┌─────────────┐
                   │   (new)     │
                   └──────┬──────┘
                          │ enqueue()
                          ▼
                   ┌─────────────┐
              ┌────│   pending   │◄───────────────┐
              │    └──────┬──────┘                │
              │           │ markProcessing()      │ markFailed() [retry_count < maxRetries]
              │           ▼                       │
              │    ┌─────────────┐                │
              │    │ processing  │────────────────┤
              │    └──────┬──────┘                │
              │           │                       │
              │           ├─► markProcessed()     │
              │           │         │             │
              │           │         ▼             │
              │           │  ┌─────────────┐      │
              │           │  │  processed  │      │
              │           │  └─────────────┘      │
              │           │                       │
              │           └─► markFailed() [retry_count >= maxRetries]
              │                     │
              │                     ▼
              │              ┌─────────────┐
              │              │   failed    │
              │              └─────────────┘
              │
              │
              │ resetStuckMessages() [thresholdMs timeout]
              └───────────────────────────────────┘

PendingMessageStore (Database Layer)

enqueue() (Lines 56-82)

enqueue(sessionDbId: number, claudeSessionId: string, message: PendingMessage): number {
  const now = Date.now();
  const stmt = this.db.prepare(`
    INSERT INTO pending_messages (
      session_db_id, claude_session_id, message_type,
      tool_name, tool_input, tool_response, cwd,
      last_user_message, last_assistant_message,
      prompt_number, status, retry_count, created_at_epoch
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', 0, ?)
  `);
Line The Reason Behind This What It Actually Does
const now = Date.now() Messages need timestamps for ordering and stuck-detection Captures the moment the message was queued
status, retry_count ... 'pending', 0 New messages start in pending state with no retries Hard-codes initial state in SQL
created_at_epoch Need to track when message was originally queued for accurate observation timestamps Used later when processing backlog to assign correct timestamps to observations
JSON.stringify(message.tool_input) SQLite can't store objects natively Serializes complex tool data to string
Returns lastInsertRowid Caller needs the ID to track this specific message Returns the database-assigned auto-increment ID

peekPending() (Lines 88-96)

peekPending(sessionDbId: number): PersistentPendingMessage | null {
  const stmt = this.db.prepare(`
    SELECT * FROM pending_messages
    WHERE session_db_id = ? AND status = 'pending'
    ORDER BY id ASC
    LIMIT 1
  `);
  return stmt.get(sessionDbId) as PersistentPendingMessage | null;
}
Line The Reason Behind This What It Actually Does
status = 'pending' Only look at messages not yet being processed Filters out processing/processed/failed
ORDER BY id ASC Process messages in the order they arrived (FIFO) Uses auto-increment ID as natural ordering
LIMIT 1 Only need one message at a time for the iterator Returns single oldest pending message
Does NOT change status Peek is non-destructive; status change happens separately in markProcessing Allows checking without committing to process

markProcessing() (Lines 216-224)

markProcessing(messageId: number): void {
  const now = Date.now();
  const stmt = this.db.prepare(`
    UPDATE pending_messages
    SET status = 'processing', started_processing_at_epoch = ?
    WHERE id = ? AND status = 'pending'
  `);
  stmt.run(now, messageId);
}
Line The Reason Behind This What It Actually Does
status = 'processing' Mark this message as "in progress" so other consumers don't pick it up Prevents duplicate processing
started_processing_at_epoch = ? Track when processing started for stuck detection If processing takes >5min, considered stuck
WHERE ... AND status = 'pending' Only transition from pending->processing (idempotent safety) Prevents double-processing race conditions

markProcessed() (Lines 230-242)

markProcessed(messageId: number): void {
  const now = Date.now();
  const stmt = this.db.prepare(`
    UPDATE pending_messages
    SET
      status = 'processed',
      completed_at_epoch = ?,
      tool_input = NULL,
      tool_response = NULL
    WHERE id = ? AND status = 'processing'
  `);
  stmt.run(now, messageId);
}
Line The Reason Behind This What It Actually Does
status = 'processed' Message successfully handled, move to terminal state Marks completion
completed_at_epoch = ? Track when processing finished for metrics/display Records completion time
tool_input = NULL, tool_response = NULL Large payload data no longer needed after successful processing Frees space - observations are already saved elsewhere
WHERE ... AND status = 'processing' Only transition from processing->processed Ensures we only complete messages we actually processed

markFailed() (Lines 249-274)

markFailed(messageId: number): void {
  const msg = this.db.prepare('SELECT retry_count FROM pending_messages WHERE id = ?').get(messageId);

  if (msg.retry_count < this.maxRetries) {
    // Move back to pending for retry
    const stmt = this.db.prepare(`
      UPDATE pending_messages
      SET status = 'pending', retry_count = retry_count + 1, started_processing_at_epoch = NULL
      WHERE id = ?
    `);
  } else {
    // Max retries exceeded, mark as permanently failed
    const stmt = this.db.prepare(`
      UPDATE pending_messages
      SET status = 'failed', completed_at_epoch = ?
      WHERE id = ?
    `);
  }
}
Line The Reason Behind This What It Actually Does
Check retry_count < maxRetries Don't retry forever - eventually give up Implements bounded retry policy (default: 3)
status = 'pending' (retry path) Put message back in queue for another attempt Allows automatic recovery
retry_count + 1 Track how many times we've tried Increment toward failure threshold
started_processing_at_epoch = NULL Clear the processing timestamp for next attempt Prevents stuck detection from immediately triggering
status = 'failed' (terminal) Message is permanently broken, stop trying Prevents infinite retry loops

resetStuckMessages() (Lines 281-292)

resetStuckMessages(thresholdMs: number): number {
  const cutoff = thresholdMs === 0 ? Date.now() : Date.now() - thresholdMs;

  const stmt = this.db.prepare(`
    UPDATE pending_messages
    SET status = 'pending', started_processing_at_epoch = NULL
    WHERE status = 'processing' AND started_processing_at_epoch < ?
  `);

  return result.changes;
}
Line The Reason Behind This What It Actually Does
thresholdMs === 0 ? Date.now() Special case: threshold=0 means "reset all processing messages" Allows forced recovery of all stuck messages
Date.now() - thresholdMs Calculate cutoff time (e.g., 5 minutes ago) Messages processing longer than this are stuck
status = 'processing' condition Only reset messages actively being processed Don't touch pending or completed messages
started_processing_at_epoch < ? Processing started before cutoff = stuck Time-based stuck detection
SET status = 'pending' Move back to queue for retry Enables automatic recovery
Returns result.changes Caller needs to know how many were recovered For logging/metrics

getPendingCount() (Lines 297-304)

getPendingCount(sessionDbId: number): number {
  const stmt = this.db.prepare(`
    SELECT COUNT(*) as count FROM pending_messages
    WHERE session_db_id = ? AND status IN ('pending', 'processing')
  `);
Line The Reason Behind This What It Actually Does
status IN ('pending', 'processing') CRITICAL: Counts BOTH pending AND processing Used to decide if generator should keep running
Why include processing? A message in processing state is still "work to be done" Prevents generator from stopping while SDK is mid-response

SessionManager (Queue Coordination)

queueObservation() (Lines 181-232)

queueObservation(sessionDbId: number, data: ObservationData): void {
  // Auto-initialize from database if needed
  let session = this.sessions.get(sessionDbId);
  if (!session) {
    session = this.initializeSession(sessionDbId);
  }

  // CRITICAL: Persist to database FIRST
  const message: PendingMessage = { type: 'observation', ... };
  const messageId = this.getPendingStore().enqueue(sessionDbId, session.claudeSessionId, message);

  // Add to in-memory queue
  session.pendingMessages.push(message);

  // Notify generator immediately
  const emitter = this.sessionQueues.get(sessionDbId);
  emitter?.emit('message');
}
Line The Reason Behind This What It Actually Does
Auto-initialize session Worker may have restarted, need to rebuild in-memory state Lazy initialization from database
enqueue() BEFORE in-memory push CRITICAL: Database is source of truth, survives crashes Persist-first ensures no data loss
session.pendingMessages.push() In-memory queue for backward compatibility and fast status checks Mirrors database state in RAM
emitter?.emit('message') Wake up the generator immediately (zero-latency) Event-driven, no polling needed

getMessageIterator() (Lines 397-477)

async *getMessageIterator(sessionDbId: number): AsyncIterableIterator<PendingMessageWithId> {
  while (!session.abortController.signal.aborted) {
    // Check for pending messages in persistent store
    const persistentMessage = this.getPendingStore().peekPending(sessionDbId);

    if (!persistentMessage) {
      // Wait for new message event
      await new Promise<void>(resolve => {
        emitter.once('message', messageHandler);
        session.abortController.signal.addEventListener('abort', abortHandler, { once: true });
      });
      continue;
    }

    // Mark as processing BEFORE yielding
    this.getPendingStore().markProcessing(persistentMessage.id);

    // Track this message ID for completion marking
    session.pendingProcessingIds.add(persistentMessage.id);

    // Convert and yield
    const message: PendingMessageWithId = {
      _persistentId: persistentMessage.id,
      _originalTimestamp: persistentMessage.created_at_epoch,
      ...this.getPendingStore().toPendingMessage(persistentMessage)
    };

    yield message;

    // Remove from in-memory queue after yielding
    session.pendingMessages.shift();
  }
}
Line The Reason Behind This What It Actually Does
while (!aborted) Keep processing until session ends Continuous processing loop
peekPending() Check database for work Non-destructively looks for pending messages
await new Promise with event Block until message arrives (no polling) Event-driven wake-up saves CPU
markProcessing() BEFORE yield CRITICAL: Claim the message before giving to SDK Prevents race conditions
pendingProcessingIds.add() Track which messages are being processed So we know what to mark as completed
_persistentId field Attach database ID to in-flight message Needed for markProcessed() later
_originalTimestamp Preserve original queue time For accurate observation timestamps when processing backlog
pendingMessages.shift() after yield Keep in-memory queue in sync with database Mirrors the database state change

SDKAgent (Message Consumer)

startSession() Main Loop (Lines 75-150)

const queryResult = query({
  prompt: messageGenerator,
  options: {
    model: modelId,
    resume: session.claudeSessionId,  // <-- Session continuity
    disallowedTools,
    abortController: session.abortController,
    pathToClaudeCodeExecutable: claudePath
  }
});

for await (const message of queryResult) {
  if (message.type === 'assistant') {
    // Process response
    await this.processSDKResponse(session, textContent, worker, discoveryTokens, originalTimestamp);
  }
}
Line The Reason Behind This What It Actually Does
resume: session.claudeSessionId CRITICAL: Connect to existing Claude session Enables session continuity - same transcript across prompts
for await loop Process SDK responses as they arrive Streaming response handling
processSDKResponse() called per response Parse and save observations/summaries Database + Chroma sync

createMessageGenerator() (Lines 202-291)

private async *createMessageGenerator(session: ActiveSession): AsyncIterableIterator<SDKUserMessage> {
  // Build initial or continuation prompt
  const initPrompt = isInitPrompt
    ? buildInitPrompt(...)
    : buildContinuationPrompt(...);

  // Yield initial prompt
  yield { type: 'user', message: { role: 'user', content: initPrompt }, session_id: session.claudeSessionId };

  // Consume pending messages
  for await (const message of this.sessionManager.getMessageIterator(session.sessionDbId)) {
    if (message.type === 'observation') {
      const obsPrompt = buildObservationPrompt({ ... });
      yield { type: 'user', message: { role: 'user', content: obsPrompt } };
    } else if (message.type === 'summarize') {
      const summaryPrompt = buildSummaryPrompt({ ... });
      yield { type: 'user', message: { role: 'user', content: summaryPrompt } };
    }
  }
}
Line The Reason Behind This What It Actually Does
isInitPrompt check First prompt needs full context, subsequent prompts need continuation Different prompt templates
yield initial prompt Start the SDK conversation Sends initialization to Claude
for await ... getMessageIterator Pull messages as they become available Event-driven message consumption
yield for each message Feed observations/summaries to SDK one at a time SDK processes each and responds

markMessagesProcessed() (Lines 462-491)

private async markMessagesProcessed(session: ActiveSession, worker: any): Promise<void> {
  const pendingMessageStore = this.sessionManager.getPendingMessageStore();

  if (session.pendingProcessingIds.size > 0) {
    for (const messageId of session.pendingProcessingIds) {
      pendingMessageStore.markProcessed(messageId);
    }
    session.pendingProcessingIds.clear();
    session.earliestPendingTimestamp = null;

    // Cleanup old processed messages
    const deletedCount = pendingMessageStore.cleanupProcessed(100);
  }

  // Broadcast status update
  if (worker && typeof worker.broadcastProcessingStatus === 'function') {
    worker.broadcastProcessingStatus();
  }
}
Line The Reason Behind This What It Actually Does
Loop over pendingProcessingIds Mark ALL messages that were yielded to SDK Batch completion
markProcessed() for each Transition processing->processed in database Completes the message lifecycle
.clear() Reset tracking set for next batch Prepare for next iteration
earliestPendingTimestamp = null Reset timestamp tracking Next batch gets fresh timestamps
cleanupProcessed(100) Don't keep infinite processed messages Retention policy
broadcastProcessingStatus() Update UI with new state SSE broadcast

SessionRoutes (HTTP Entry Points)

startGeneratorWithProvider() (Lines 118-189)

private startGeneratorWithProvider(session, provider, source): void {
  session.currentProvider = provider;

  session.generatorPromise = agent.startSession(session, this.workerService)
    .catch(error => {
      // Mark all processing messages as failed
      const processingMessages = stmt.all(session.sessionDbId);
      for (const msg of processingMessages) {
        pendingStore.markFailed(msg.id);
      }
    })
    .finally(() => {
      session.generatorPromise = null;
      session.currentProvider = null;
      this.workerService.broadcastProcessingStatus();

      // Check if there's more work pending
      const pendingCount = pendingStore.getPendingCount(sessionDbId);
      if (pendingCount > 0) {
        // Auto-restart
        setTimeout(() => {
          if (stillExists && !stillExists.generatorPromise) {
            this.startGeneratorWithProvider(stillExists, this.getSelectedProvider(), 'auto-restart');
          }
        }, 0);
      } else {
        // Cleanup
        this.sessionManager.deleteSession(sessionDbId);
      }
    });
}
Line The Reason Behind This What It Actually Does
session.generatorPromise = Track that generator is running Prevents multiple generators per session
.catch() with markFailed If generator crashes, don't lose messages Marks for retry or permanent failure
.finally() Always cleanup regardless of success/failure Guaranteed cleanup
generatorPromise = null Allow new generator to start Clears the "running" flag
getPendingCount() > 0 CRITICAL: Check if more work arrived while processing Handles messages queued during SDK call
setTimeout(..., 0) Don't restart synchronously (could cause stack issues) Deferred restart
deleteSession() when no work Clean up resources Memory management

ensureGeneratorRunning() (Lines 90-113)

private ensureGeneratorRunning(sessionDbId: number, source: string): void {
  const session = this.sessionManager.getSession(sessionDbId);
  if (!session) return;

  const selectedProvider = this.getSelectedProvider();

  // Start generator if not running
  if (!session.generatorPromise) {
    this.startGeneratorWithProvider(session, selectedProvider, source);
    return;
  }

  // Generator is running - check if provider changed
  if (session.currentProvider && session.currentProvider !== selectedProvider) {
    // Let current generator finish, next one will use new provider
  }
}
Line The Reason Behind This What It Actually Does
Check !generatorPromise Only start if not already running Prevents duplicate generators
Start generator if not running Ensure messages get processed Lazy generator startup
Provider change detection Allow switching providers mid-session Graceful provider transition

WorkerService (Orchestrator)

initializeBackground() Stuck Message Recovery (Lines 627-633)

// Recover stuck messages from previous crashes
const STUCK_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
if (resetCount > 0) {
  logger.info('SYSTEM', `Recovered ${resetCount} stuck messages from previous session`);
}
Line The Reason Behind This What It Actually Does
Called at startup Worker may have crashed while messages were processing Recovery mechanism
5 minute threshold If processing >5min, something went wrong Reasonable timeout for SDK calls
Reset to pending Give stuck messages another chance Automatic retry

processPendingQueues() (Lines 747-811)

async processPendingQueues(sessionLimit: number = 10): Promise<Result> {
  const orphanedSessionIds = pendingStore.getSessionsWithPendingMessages();

  for (const sessionDbId of orphanedSessionIds) {
    // Skip if session already has active generator
    const existingSession = this.sessionManager.getSession(sessionDbId);
    if (existingSession?.generatorPromise) {
      result.sessionsSkipped++;
      continue;
    }

    // Initialize session and start SDK agent
    const session = this.sessionManager.initializeSession(sessionDbId);
    this.startSessionWithAutoRestart(session, getPendingCount, 'startup-recovery');
  }
}
Line The Reason Behind This What It Actually Does
Called at startup Resume work interrupted by crash/restart Auto-recovery
getSessionsWithPendingMessages() Find sessions that have orphaned work Database query
Skip if generator running Don't start duplicate processors Race condition prevention
startSessionWithAutoRestart() Start processing with auto-restart logic Shares code with SessionRoutes

startSessionWithAutoRestart() (Lines 696-739)

private startSessionWithAutoRestart(session, getPendingCount, source): void {
  session.generatorPromise = this.sdkAgent.startSession(session, this)
    .catch(error => { ... })
    .finally(() => {
      session.generatorPromise = null;
      this.broadcastProcessingStatus();

      const stillPending = getPendingCount(sid);
      if (stillPending > 0) {
        // Recursive restart
        setTimeout(() => {
          const stillExists = this.sessionManager.getSession(sid);
          if (stillExists && !stillExists.generatorPromise) {
            this.startSessionWithAutoRestart(stillExists, getPendingCount, 'auto-restart');
          }
        }, 0);
      } else {
        // Cleanup
        this.sessionManager.deleteSession(sid);
      }
    });
}
Line The Reason Behind This What It Actually Does
Same pattern as SessionRoutes DRY: Shared auto-restart logic Prevents code duplication
Recursive restart Keep processing until queue is empty Ensures all messages processed
Check stillExists before restart Session might have been deleted Safety check

Critical Flow: How a Message Gets Stuck in "Processing"

The Problem

Messages can get stuck in status = 'processing' if:

  1. SDK call hangs indefinitely - The Agent SDK query never returns
  2. Worker crashes mid-processing - Process dies before markProcessed()
  3. Exception in processSDKResponse() - Error prevents markProcessed() from running

The Flow

1. queueObservation() called
   └─► enqueue() → status = 'pending'

2. getMessageIterator() picks up message
   └─► markProcessing() → status = 'processing' ✓
   └─► pendingProcessingIds.add(id)
   └─► yield message to SDK

3. SDK processes and returns response
   └─► processSDKResponse() called
       └─► Parse observations/summaries
       └─► Store to database
       └─► markMessagesProcessed()
           └─► markProcessed() → status = 'processed' ✓

IF STEP 3 FAILS OR HANGS:
   └─► Message stays in 'processing' forever
   └─► Recovery: resetStuckMessages() after 5 minutes

Why Processing Messages Can Get "Lost"

Race Condition in getMessageIterator():

// Lines 445-446 in SessionManager
this.getPendingStore().markProcessing(persistentMessage.id);
session.pendingProcessingIds.add(persistentMessage.id);

The message is marked as processing BEFORE being yielded. If the SDK hangs or crashes AFTER this line but BEFORE processSDKResponse completes, the message is stuck.

Protection Mechanisms:

  1. pendingProcessingIds tracks what's in-flight
  2. markFailed() in catch handler marks for retry
  3. resetStuckMessages() at startup cleans up old stuck messages

Recovery Mechanisms

1. Startup Recovery (Worker crashes)

// In initializeBackground()
const resetCount = pendingStore.resetStuckMessages(STUCK_THRESHOLD_MS);
  • Runs when worker starts
  • Finds messages stuck in processing for >5 minutes
  • Resets them to pending for retry

2. Generator Error Recovery

// In startGeneratorWithProvider() catch handler
for (const msg of processingMessages) {
  pendingStore.markFailed(msg.id);
}
  • Runs when SDK call throws
  • Marks processing messages as failed (which may reset to pending if retries remain)

3. Auto-Restart Recovery

// In startGeneratorWithProvider() finally handler
if (pendingCount > 0) {
  setTimeout(() => startGeneratorWithProvider(...), 0);
}
  • Runs after every generator completes
  • Checks for pending work
  • Starts new generator if work remains

4. Manual Recovery (UI)

// PendingMessageStore methods
retryMessage(messageId)      // Reset specific message to pending
retryAllStuck(thresholdMs)   // Reset all stuck messages
abortMessage(messageId)      // Delete message from queue

Summary of Potential Issues

Issue Cause Mitigation
Message stuck in processing SDK hang, crash during processing resetStuckMessages() at startup
Duplicate processing Race condition on message claim markProcessing() with WHERE status = 'pending'
Lost messages Crash before enqueue DB persist BEFORE in-memory push
Generator never starts No call to ensureGeneratorRunning() Called by every HTTP handler
Generator exits early Empty queue check race finally handler checks and restarts
Infinite retry Repeated failures maxRetries limit (default: 3)

Diagnostic Queries

Check for stuck messages:

SELECT * FROM pending_messages
WHERE status = 'processing'
AND started_processing_at_epoch < (strftime('%s', 'now') * 1000 - 300000);

Check queue depth by session:

SELECT session_db_id, status, COUNT(*)
FROM pending_messages
GROUP BY session_db_id, status;

Check retry counts:

SELECT id, message_type, retry_count, status
FROM pending_messages
WHERE retry_count > 0;