Files
claude-mem/QUEUE-SIMPLIFICATION-REPORT.md
T
Alex Newman b8ce27bd31 feat(queue): Simplify queue processing and enhance reliability
- Implemented atomic message claiming in PendingMessageStore with claimNextMessage.
- Removed obsolete peekPending method to streamline message retrieval.
- Introduced SessionQueueProcessor for robust async message iteration, replacing complex polling logic.
- Refactored SessionManager to eliminate in-memory queue state, relying on PendingMessageStore for message tracking.
- Cleaned up session handling logic, removing recursive restarts and session deletion on empty queues.
- Enhanced error handling and logging for generator failures and session processing.
- Updated SessionRoutes to handle crash recovery more effectively without deleting sessions.
2025-12-28 16:28:58 -05:00

47 lines
2.3 KiB
Markdown

# Queue System Simplification Implementation
I have successfully implemented the queue system simplification plan.
## Changes Implemented
### 1. Database Layer Hardening
- **Added `claimNextMessage(sessionDbId)` to `PendingMessageStore`:**
- Implements an atomic transaction (SELECT oldest pending + UPDATE to processing).
- Ensures a message can only be claimed by one worker at a time.
- Eliminates race conditions between "peeking" and "marking".
- **Removed `peekPending()`:**
- No longer needed as `claimNextMessage` handles retrieval and locking in one step.
### 2. Unified "Pump" Architecture
- **Created `src/services/queue/SessionQueueProcessor.ts`:**
- Implements a robust `AsyncIterableIterator` that yields messages.
- Encapsulates the "Claim -> Yield -> Wait" loop.
- Replaces fragile polling/recursive logic with event-driven `waitForMessage`.
- Handles empty queues gracefully by waiting for signals.
### 3. SessionManager Refactoring
- **Updated `getMessageIterator`:**
- Now delegates to `SessionQueueProcessor`.
- Removes complex manual synchronization logic.
- **Removed In-Memory Queue State:**
- `queueObservation` and `queueSummarize` now only write to DB and emit events.
- `pendingMessages` array is no longer used for logic (kept deprecated for type compatibility).
- `getTotalActiveWork`, `hasPendingMessages`, etc., now query `PendingMessageStore` directly (counting both 'pending' and 'processing' states).
### 4. Logic Cleanup
- **Removed Recursive Restarts:**
- Refactored `startGeneratorWithProvider` in `SessionRoutes.ts` and `startSessionProcessor` in `WorkerService.ts`.
- Removed logic that deleted sessions when queue emptied (sessions now wait for new work).
- Removed "auto-restart" logic for normal completion (only kept for crash recovery).
## Benefits
- **Reliability:** Atomic DB operations prevent stuck or duplicate messages.
- **Simplicity:** Removed complex "peek-then-mark" and recursive restart chains.
- **Performance:** Zero-latency event notification with efficient DB queries.
- **Maintainability:** Clear separation of concerns (Store vs Processor vs Manager).
## Verification
- Ran static analysis (`tsc`) to verify type safety of new components.
- Verified removal of dead code (`peekPending`).
- Confirmed integration points in `SessionManager` and `SessionRoutes`.