* Add server beta runtime foundation * Address server beta review findings * Resolve server beta review comments * Tighten server beta review follow-ups * Harden server beta auth and search * Avoid unnecessary FTS rebuilds * Block scoped keys from creating projects * Release BullMQ claims best effort on close * Address server beta review blockers * Reset BullMQ claims best effort * Add Postgres observation storage foundation * feat(server-beta): add independent runtime service Introduce src/server/runtime/ as a self-contained server-beta runtime that owns its lifecycle, Postgres bootstrap, and HTTP boundary without depending on WorkerService. ServerBetaService wraps the existing Server class, exposes /healthz and /v1/info with runtime="server-beta", and persists state to dedicated paths (.server-beta.pid|.port|.runtime.json). The four boundary managers (queue, generation worker, provider registry, event broadcaster) are intentionally disabled in this phase and report their status through /v1/info; later phases activate them. Adds plans/2026-05-07-finish-bullmq-branch-ship-plan.md to track the remaining work for this branch. Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(server-beta): route CLI lifecycle and bundle separate runtime scripts/build-hooks.js now produces plugin/scripts/server-beta-service.cjs as a separate Node CJS bundle, alongside the existing worker-service bundle. The server-beta runtime is now installable independently. src/npx-cli/commands/server.ts routes start|stop|restart|status to the server-beta lifecycle instead of the legacy worker. The worker keeps its own start|stop|restart|status under the worker namespace; the two runtimes can be operated independently. src/services/worker-service.ts adds a server-* command parser branch that delegates to the sibling server-beta-service.cjs bundle so direct worker-service invocations still route to the right runtime. tests/npx-cli-server-namespace.test.ts updated to expect server-beta lifecycle routing. Includes rebuilt plugin/scripts/*.cjs bundles produced by build-and-sync. Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(server-beta): add BullMQ job queue primitives Introduce src/server/jobs/ as the queue-side primitives that Phase 3 of the server-beta runtime needs to operate. types.ts defines a discriminated union over the four job kinds (event, event-batch, summary, reindex) and maps each to a per-kind BullMQ queue name and deterministic-ID prefix. job-id.ts builds deterministic, colon-free BullMQ jobIds from (kind, team, project, source). The colon ban exists because BullMQ uses ':' as a Redis key separator internally; embedding ':' in jobIds breaks scan and state lookups. ServerJobQueue.ts is a thin wrapper over BullMQ Queue + Worker that enforces autorun:false, default concurrency 1, and an attached error listener — all per BullMQ docs requirements. Test seams accept queue and worker factories so unit tests do not need Redis. outbox.ts publishes through the Postgres ObservationGenerationJob repository as canonical history. enqueueOutbox writes the row first, then publishes to BullMQ; if BullMQ throws, the row is transitioned to failed and a failed event is appended. reconcileOnStartup re-enqueues queued + processing rows after a restart, replacing terminal BullMQ jobs that may still be holding the deterministic ID slot. markCompleted and markFailed wrap transitionStatus and append the matching event row. Includes 20 unit tests covering deterministic ID stability, colon-free output, queue lifecycle, error-listener attachment, double-start refusal, idempotent enqueue, BullMQ failure rollback, startup reconciliation, max-attempts skipping, and completion / failure / retry transitions. Phase 3 commit 1 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(server-beta): activate queue boundary in runtime service Wire ActiveServerBetaQueueManager into the server-beta runtime graph. The active manager owns one ServerJobQueue per generation kind (event, event-batch, summary, reindex) and surfaces lane metadata through boundary health. Selection is opt-in and fail-fast: if CLAUDE_MEM_QUEUE_ENGINE is set to bullmq the active manager is constructed (and any Redis/config error throws — no silent fallback to SQLite, per Phase 3 anti-pattern guard). For any other engine the disabled boundary remains so worker-era and test setups stay compatible. Widens ServerBetaBoundaryHealth.status to a discriminated union ('disabled' | 'active' | 'errored') with optional details. The disabled adapter still emits status='disabled', which keeps the existing server-beta-service test green. ServerBetaService receives the manager through a new optional queueManager field on CreateServerBetaServiceOptions so test graphs and Phase 4 wiring can inject custom managers. Adds tests/server/runtime/active-queue-manager.test.ts covering bullmq guard, active health shape, per-kind queue access, close behavior, and post-close errored health. Phase 3 commit 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(server-beta): cap /v1/events/batch at 500 events Prevents unbounded array DoS surface flagged in PR review. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
13 KiB
Observation Queue Engine Deep Dive: BullMQ vs Bee-Queue
Date: 2026-05-06
Executive decision
If claude-mem replaces its observation queue with one of the two Redis-backed libraries, choose BullMQ, not Bee-Queue.
That said, the current observation queue is not a generic background job queue. It is a durable, per-session input stream feeding long-lived provider generators. Replacing it with Redis should not be the default local install path unless claude-mem is willing to require, bundle, or supervise Redis. If Redis is not acceptable as a new operational dependency, the better path is to keep the SQLite queue and fix the contract/test drift.
Recommended path:
- Stabilize the current queue contract and tests.
- Add a queue-engine adapter boundary.
- Keep SQLite as the default backend.
- Add BullMQ as an optional backend for users who explicitly configure Redis.
- Do not adopt Bee-Queue.
Current claude-mem queue shape
The active queue path is:
src/services/worker/http/shared.tsandSessionRoutes.tsingest observations/summarize requests.SessionManager.queueObservation()andqueueSummarize()persist rows throughPendingMessageStore.enqueue().SessionQueueProcessor.createIterator()claims one row at a time and wakes via a per-sessionEventEmitter.- Provider loops in
ClaudeProvider,GeminiProvider, andOpenRouterProviderconsumesessionManager.getMessageIterator(sessionDbId). - Parsed agent output is stored through
processAgentResponse(), thenSessionManager.clearPendingForSession()clears that session's pending rows.
Key semantics that must survive any replacement:
- Per-session FIFO ordering.
- At-most-one active consumer per session.
- Durable queue across worker restarts.
- Startup recovery from
processingback topending. - Low-latency wakeup when new tool observations arrive.
- Deduplication by
content_session_id + tool_use_id. - Original observation timestamp preservation for storage/broadcast.
- Queue depth for
/api/processing-statusand SSE. - Local-first behavior and simple install are product requirements, not just implementation details.
Important mismatch found during the dive:
- Current
PendingMessageStoreonly modelspendingandprocessing. - Older migrations, tests, and scripts still reference
processed,failed,retry_count,completed_at_epoch,failed_at_epoch, andworker_pid. storeObservationsAndMarkComplete()still updates a row toprocessed, while the currently visible queue path clears all pending messages for the session after parsing.src/services/sqlite/schema.sqlstill createsidx_pending_messages_worker_pideven though the visible table definition has noworker_pid.
Focused test run:
bun test tests/services/sqlite/PendingMessageStore.test.ts tests/services/queue/SessionQueueProcessor.test.ts
Result: 10 pass, 6 fail. Failures show stale tests/contract drift:
PendingMessageStore.test.tspasses3as constructor arg, but constructor now expectsonMutate?: () => void.SessionQueueProcessor.test.tsexpects retry-after-store-error behavior, but current implementation logs and exits the iterator on claim failure.
This needs to be reconciled before swapping engines; otherwise the migration will encode inconsistent behavior.
BullMQ deep dive
Sources checked:
- GitHub: https://github.com/taskforcesh/bullmq
- NPM: https://www.npmjs.com/package/bullmq
- Docs: https://docs.bullmq.io/
- Queues: https://docs.bullmq.io/guide/queues
- Connections/Redis constraints: https://docs.bullmq.io/guide/connections
- Production notes: https://docs.bullmq.io/guide/going-to-production
- Manual processing: https://docs.bullmq.io/patterns/manually-fetching-jobs
- Job IDs/dedupe: https://docs.bullmq.io/guide/jobs/job-ids
- Stalled jobs: https://docs.bullmq.io/guide/workers/stalled-jobs
Current package/repo facts captured on 2026-05-06:
- NPM latest:
bullmq@5.76.5. - NPM modified: 2026-05-02.
- GitHub pushed: 2026-05-05.
- GitHub stars/forks/open issues at capture time: 8808 / 606 / 414.
- License: MIT.
- Unpacked size: about 2.5 MB.
- Dependencies:
ioredis,cron-parser,msgpackr,node-abort-controller,semver,tslib. - TypeScript types are bundled.
- A Bun import smoke test succeeded for
import { Queue } from 'bullmq'.
Strengths for claude-mem:
- Actively maintained and widely used.
- Built-in TypeScript API.
- Redis-backed durability and distributed workers.
- Built-in stalled-job recovery, retry attempts, fixed/exponential backoff, delays, priorities, FIFO/LIFO, auto-removal, QueueEvents, manual processing APIs, and job ID based dedupe.
- BullMQ docs explicitly support manual job fetching with
Worker#getNextJob(),moveToCompleted(),moveToFailed(), and lock extension. This matters because claude-mem's provider loop is closer to a stream consumer than a normal job processor.
Costs and risks:
- Redis becomes required for the queue backend. BullMQ docs require a Redis connection to use queues and recommend Redis compatibility 6.2+.
- Redis must be configured like durable infrastructure, not cache: AOF persistence and
maxmemory-policy=noevictionare recommended/required for correctness. - Connection count increases. BullMQ docs note each class consumes at least one Redis connection;
WorkerandQueueEventsneed blocking/duplicated connections in some cases. - Jobs store data in Redis in clear text unless claude-mem encrypts or avoids sensitive payload fields. Tool input/output can be sensitive.
- BullMQ job completion/failure semantics do not map directly to claude-mem's current "provider consumes many messages, parses one response, then clears the session" behavior.
- Per-session FIFO with parallel sessions is not free in OSS BullMQ. A single global queue with worker concurrency > 1 can violate same-session ordering unless we add a scheduler. BullMQ Pro groups would address this, but claude-mem should not depend on Pro.
- Custom
jobIdis useful fortool_use_iddedupe, but BullMQ custom job IDs must not contain:. Use a hash or safe delimiter. - Manual processing requires lock management. BullMQ docs call out that manually fetched jobs do not get automatic lock renewal like standard processors; claude-mem would need
extendLock()for long provider calls or a large lock duration.
Best BullMQ shape if adopted:
- Prefer one queue per active session over one global queue initially:
- Queue name:
claude-mem:session:<safe-session-db-id>or a hashed content-session suffix. - Worker/manual consumer concurrency:
1. - Preserves per-session FIFO without BullMQ Pro groups.
- Active session counts are naturally low for local claude-mem usage.
- Cleanup queue keys when a session is deleted or after idle timeout.
- Queue name:
- Use
jobIdfor observation dedupe:obs_<sha256(contentSessionId + "\0" + toolUseId)>.- Summaries should use a distinct id scheme and usually should not dedupe unless the current summarize semantics require it.
- Use
removeOnCompleteaggressively if SQLite remains the source of truth for stored observations. - Keep only bounded failed jobs for debugging.
- Treat Redis as queue state only; SQLite remains the canonical observation/session store.
- Add config:
CLAUDE_MEM_QUEUE_ENGINE=sqlite|bullmqCLAUDE_MEM_REDIS_URLCLAUDE_MEM_QUEUE_REDIS_PREFIXCLAUDE_MEM_QUEUE_ENCRYPT_PAYLOADS=true|falseif sensitive fields are stored.
Bee-Queue deep dive
Sources checked:
- GitHub: https://github.com/bee-queue/bee-queue
- NPM: https://www.npmjs.com/package/bee-queue
- README/API docs in repository.
Current package/repo facts captured on 2026-05-06:
- NPM latest:
bee-queue@2.0.0. - NPM modified: 2025-12-08.
- GitHub pushed: 2026-04-10.
- GitHub stars/forks/open issues at capture time: 4027 / 221 / 47.
- License field from NPM: MIT. GitHub API license metadata returned
NOASSERTION. - Unpacked size: about 107 KB.
- Dependencies:
redis@^3.1.2,p-finally,promise-callbacks. - NPM package exposes
./index.d.ts. - A Bun import smoke test succeeded for
import BeeQueue from 'bee-queue'.
Strengths for claude-mem:
- Very small and simple.
- Designed for short, real-time jobs.
- Redis-backed with Lua/pipelining and low overhead.
- Supports concurrency, retries, retry strategies, timeouts, scheduled jobs, pub/sub events, results to producers, and stalled job retry.
- Redis requirement is lighter in docs: Redis 2.8+, with Redis 3.2+ recommended for delayed jobs.
Costs and risks:
- Narrower feature set by design. The README says priorities and repeatable jobs are not currently supported.
- CommonJS-first API; workable, but less idiomatic for this ESM TypeScript codebase.
- Uses the older
redisv3 client line, not modernredisv4/v5 orioredis. - Observability and operational tooling are thinner than BullMQ.
- Same per-session ordering mismatch exists as BullMQ, but with fewer escape hatches.
- Delayed retry behavior requires
activateDelayedJobson at least one queue instance. - The package is newly revived, but not as active/mature as BullMQ for a queue-engine foundation.
Conclusion: Bee-Queue is attractive if the only goal is "small Redis queue for short jobs." claude-mem needs a durable session stream with strict per-session semantics, good TypeScript ergonomics, explicit recovery behavior, and long-term maintenance. Bee-Queue is the wrong tradeoff.
Scorecard
| Area | Current SQLite | BullMQ | Bee-Queue |
|---|---|---|---|
| Local-first install | Strong | Weak unless Redis is bundled/optional | Weak unless Redis is bundled/optional |
| Per-session FIFO | Strong | Medium with per-session queues; weak with one global queue | Medium with per-session queues; weak with one global queue |
| Restart durability | Strong, SQLite-backed | Strong if Redis persistence configured | Strong if Redis persistence configured |
| Stalled recovery | Custom/simple | Strong built-in | Built-in |
| TypeScript fit | Strong | Strong | Medium |
| Maintenance/activity | Internal | Strong | Medium |
| Operational complexity | Low | High | Medium-high |
| Queue observability | Custom/basic | Strong | Medium |
| Dependency footprint | Low | Larger | Small |
| Privacy/data locality | SQLite local file | Redis clear-text unless handled | Redis clear-text unless handled |
| Best use in claude-mem | Default | Optional advanced backend | Do not use |
Migration plan
Phase 0: Fix the existing contract
- Decide whether
pending_messages.statusis onlypending|processing, or whetherprocessed|failedis coming back. - Fix
schema.sqland migrations soworker_pidindexes are not created afterworker_pidis dropped. - Fix
storeObservationsAndMarkComplete()or remove it if no longer used. - Update queue tests to match real behavior:
- constructor signature;
- claim error behavior;
- reset-on-start behavior;
- dedupe by
tool_use_id; - clear-session behavior.
Phase 1: Add an adapter boundary
Define a small interface around current behavior, not around BullMQ:
interface ObservationQueueEngine {
enqueue(sessionDbId: number, contentSessionId: string, message: PendingMessage): Promise<EnqueueResult>;
createIterator(sessionDbId: number, signal: AbortSignal, onIdleTimeout?: () => void): AsyncIterableIterator<PendingMessageWithId>;
clearPendingForSession(sessionDbId: number): Promise<number>;
resetProcessingToPending(sessionDbId: number): Promise<number>;
getPendingCount(sessionDbId: number): Promise<number>;
getTotalQueueDepth(): Promise<number>;
close(): Promise<void>;
}
Keep SqliteObservationQueueEngine as the first implementation by moving the current PendingMessageStore + SessionQueueProcessor behavior behind this interface.
Phase 2: Add BullMQ backend behind feature flag
- Add
BullMqObservationQueueEngine. - Use per-session queues with concurrency/manual fetch of 1.
- Use safe hashed
jobIdfor observation dedupe. - Preserve
_originalTimestampin job data. - Keep provider loops unchanged by preserving the async iterator interface.
- Implement lock extension if manual processing can exceed the configured lock duration.
- Keep SQLite as the observation/session truth; Redis is transport.
- Add Redis connectivity health to
/api/healthonly when BullMQ backend is enabled.
Phase 3: Migration and fallback
- On startup with BullMQ enabled, migrate existing SQLite
pending_messagesrows into BullMQ once, then mark/delete migrated rows. - If Redis is unavailable at startup, fail loudly for
CLAUDE_MEM_QUEUE_ENGINE=bullmq; do not silently drop observations. - For default
sqlite, do not require Redis.
Phase 4: Tests
- Unit-test the adapter contract with a shared test suite.
- Run the suite against SQLite always.
- Run BullMQ tests only when Redis is available, or spin Redis in CI.
- Add crash/restart tests:
- enqueue, kill worker, restart, process;
- claimed job stalls and returns;
- duplicate
tool_use_idis suppressed; - per-session FIFO across concurrent sessions;
- idle timeout still aborts provider subprocesses.
Final recommendation
Do not do a direct swap from SQLite to either library.
If the product goal is to keep claude-mem easy to install and local-first, invest in the current SQLite queue: clean up the schema/status drift, restore tests, add explicit retries/failure rows if needed, and keep the in-process wakeup path.
If the product goal is to support distributed workers or stronger queue observability, add BullMQ as an optional backend through an adapter. It has the right maintenance profile, TypeScript support, recovery primitives, and docs. Bee-Queue is too narrow and too legacy-client-oriented for this role.