b8ce27bd31
- Implemented atomic message claiming in PendingMessageStore with claimNextMessage. - Removed obsolete peekPending method to streamline message retrieval. - Introduced SessionQueueProcessor for robust async message iteration, replacing complex polling logic. - Refactored SessionManager to eliminate in-memory queue state, relying on PendingMessageStore for message tracking. - Cleaned up session handling logic, removing recursive restarts and session deletion on empty queues. - Enhanced error handling and logging for generator failures and session processing. - Updated SessionRoutes to handle crash recovery more effectively without deleting sessions.
2.3 KiB
2.3 KiB
Queue System Simplification Implementation
I have successfully implemented the queue system simplification plan.
Changes Implemented
1. Database Layer Hardening
- Added
claimNextMessage(sessionDbId)toPendingMessageStore:- Implements an atomic transaction (SELECT oldest pending + UPDATE to processing).
- Ensures a message can only be claimed by one worker at a time.
- Eliminates race conditions between "peeking" and "marking".
- Removed
peekPending():- No longer needed as
claimNextMessagehandles retrieval and locking in one step.
- No longer needed as
2. Unified "Pump" Architecture
- Created
src/services/queue/SessionQueueProcessor.ts:- Implements a robust
AsyncIterableIteratorthat yields messages. - Encapsulates the "Claim -> Yield -> Wait" loop.
- Replaces fragile polling/recursive logic with event-driven
waitForMessage. - Handles empty queues gracefully by waiting for signals.
- Implements a robust
3. SessionManager Refactoring
- Updated
getMessageIterator:- Now delegates to
SessionQueueProcessor. - Removes complex manual synchronization logic.
- Now delegates to
- Removed In-Memory Queue State:
queueObservationandqueueSummarizenow only write to DB and emit events.pendingMessagesarray is no longer used for logic (kept deprecated for type compatibility).getTotalActiveWork,hasPendingMessages, etc., now queryPendingMessageStoredirectly (counting both 'pending' and 'processing' states).
4. Logic Cleanup
- Removed Recursive Restarts:
- Refactored
startGeneratorWithProviderinSessionRoutes.tsandstartSessionProcessorinWorkerService.ts. - Removed logic that deleted sessions when queue emptied (sessions now wait for new work).
- Removed "auto-restart" logic for normal completion (only kept for crash recovery).
- Refactored
Benefits
- Reliability: Atomic DB operations prevent stuck or duplicate messages.
- Simplicity: Removed complex "peek-then-mark" and recursive restart chains.
- Performance: Zero-latency event notification with efficient DB queries.
- Maintainability: Clear separation of concerns (Store vs Processor vs Manager).
Verification
- Ran static analysis (
tsc) to verify type safety of new components. - Verified removal of dead code (
peekPending). - Confirmed integration points in
SessionManagerandSessionRoutes.