b8ce27bd31
- Implemented atomic message claiming in PendingMessageStore with claimNextMessage. - Removed obsolete peekPending method to streamline message retrieval. - Introduced SessionQueueProcessor for robust async message iteration, replacing complex polling logic. - Refactored SessionManager to eliminate in-memory queue state, relying on PendingMessageStore for message tracking. - Cleaned up session handling logic, removing recursive restarts and session deletion on empty queues. - Enhanced error handling and logging for generator failures and session processing. - Updated SessionRoutes to handle crash recovery more effectively without deleting sessions.
47 lines
2.3 KiB
Markdown
47 lines
2.3 KiB
Markdown
# Queue System Simplification Implementation
|
|
|
|
I have successfully implemented the queue system simplification plan.
|
|
|
|
## Changes Implemented
|
|
|
|
### 1. Database Layer Hardening
|
|
- **Added `claimNextMessage(sessionDbId)` to `PendingMessageStore`:**
|
|
- Implements an atomic transaction (SELECT oldest pending + UPDATE to processing).
|
|
- Ensures a message can only be claimed by one worker at a time.
|
|
- Eliminates race conditions between "peeking" and "marking".
|
|
- **Removed `peekPending()`:**
|
|
- No longer needed as `claimNextMessage` handles retrieval and locking in one step.
|
|
|
|
### 2. Unified "Pump" Architecture
|
|
- **Created `src/services/queue/SessionQueueProcessor.ts`:**
|
|
- Implements a robust `AsyncIterableIterator` that yields messages.
|
|
- Encapsulates the "Claim -> Yield -> Wait" loop.
|
|
- Replaces fragile polling/recursive logic with event-driven `waitForMessage`.
|
|
- Handles empty queues gracefully by waiting for signals.
|
|
|
|
### 3. SessionManager Refactoring
|
|
- **Updated `getMessageIterator`:**
|
|
- Now delegates to `SessionQueueProcessor`.
|
|
- Removes complex manual synchronization logic.
|
|
- **Removed In-Memory Queue State:**
|
|
- `queueObservation` and `queueSummarize` now only write to DB and emit events.
|
|
- `pendingMessages` array is no longer used for logic (kept deprecated for type compatibility).
|
|
- `getTotalActiveWork`, `hasPendingMessages`, etc., now query `PendingMessageStore` directly (counting both 'pending' and 'processing' states).
|
|
|
|
### 4. Logic Cleanup
|
|
- **Removed Recursive Restarts:**
|
|
- Refactored `startGeneratorWithProvider` in `SessionRoutes.ts` and `startSessionProcessor` in `WorkerService.ts`.
|
|
- Removed logic that deleted sessions when queue emptied (sessions now wait for new work).
|
|
- Removed "auto-restart" logic for normal completion (only kept for crash recovery).
|
|
|
|
## Benefits
|
|
- **Reliability:** Atomic DB operations prevent stuck or duplicate messages.
|
|
- **Simplicity:** Removed complex "peek-then-mark" and recursive restart chains.
|
|
- **Performance:** Zero-latency event notification with efficient DB queries.
|
|
- **Maintainability:** Clear separation of concerns (Store vs Processor vs Manager).
|
|
|
|
## Verification
|
|
- Ran static analysis (`tsc`) to verify type safety of new components.
|
|
- Verified removal of dead code (`peekPending`).
|
|
- Confirmed integration points in `SessionManager` and `SessionRoutes`.
|