Files
claude-mem/plans/2026-05-06-observation-queue-engine-deep-dive.md
T
Alex Newman 36b0929fae Server-beta: Postgres storage + independent runtime + BullMQ queue (Phases 1–3) (#2351)
* Add server beta runtime foundation

* Address server beta review findings

* Resolve server beta review comments

* Tighten server beta review follow-ups

* Harden server beta auth and search

* Avoid unnecessary FTS rebuilds

* Block scoped keys from creating projects

* Release BullMQ claims best effort on close

* Address server beta review blockers

* Reset BullMQ claims best effort

* Add Postgres observation storage foundation

* feat(server-beta): add independent runtime service

Introduce src/server/runtime/ as a self-contained server-beta runtime
that owns its lifecycle, Postgres bootstrap, and HTTP boundary without
depending on WorkerService.

ServerBetaService wraps the existing Server class, exposes
/healthz and /v1/info with runtime="server-beta", and persists state
to dedicated paths (.server-beta.pid|.port|.runtime.json). The four
boundary managers (queue, generation worker, provider registry, event
broadcaster) are intentionally disabled in this phase and report their
status through /v1/info; later phases activate them.

Adds plans/2026-05-07-finish-bullmq-branch-ship-plan.md to track the
remaining work for this branch.

Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): route CLI lifecycle and bundle separate runtime

scripts/build-hooks.js now produces plugin/scripts/server-beta-service.cjs
as a separate Node CJS bundle, alongside the existing worker-service
bundle. The server-beta runtime is now installable independently.

src/npx-cli/commands/server.ts routes start|stop|restart|status to the
server-beta lifecycle instead of the legacy worker. The worker keeps its
own start|stop|restart|status under the worker namespace; the two
runtimes can be operated independently.

src/services/worker-service.ts adds a server-* command parser branch
that delegates to the sibling server-beta-service.cjs bundle so
direct worker-service invocations still route to the right runtime.

tests/npx-cli-server-namespace.test.ts updated to expect server-beta
lifecycle routing.

Includes rebuilt plugin/scripts/*.cjs bundles produced by
build-and-sync.

Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): add BullMQ job queue primitives

Introduce src/server/jobs/ as the queue-side primitives that Phase 3 of
the server-beta runtime needs to operate.

types.ts defines a discriminated union over the four job kinds (event,
event-batch, summary, reindex) and maps each to a per-kind BullMQ queue
name and deterministic-ID prefix.

job-id.ts builds deterministic, colon-free BullMQ jobIds from
(kind, team, project, source). The colon ban exists because BullMQ uses
':' as a Redis key separator internally; embedding ':' in jobIds
breaks scan and state lookups.

ServerJobQueue.ts is a thin wrapper over BullMQ Queue + Worker that
enforces autorun:false, default concurrency 1, and an attached error
listener — all per BullMQ docs requirements. Test seams accept queue
and worker factories so unit tests do not need Redis.

outbox.ts publishes through the Postgres ObservationGenerationJob
repository as canonical history. enqueueOutbox writes the row first,
then publishes to BullMQ; if BullMQ throws, the row is transitioned to
failed and a failed event is appended. reconcileOnStartup re-enqueues
queued + processing rows after a restart, replacing terminal BullMQ
jobs that may still be holding the deterministic ID slot. markCompleted
and markFailed wrap transitionStatus and append the matching event row.

Includes 20 unit tests covering deterministic ID stability, colon-free
output, queue lifecycle, error-listener attachment, double-start
refusal, idempotent enqueue, BullMQ failure rollback, startup
reconciliation, max-attempts skipping, and completion / failure /
retry transitions.

Phase 3 commit 1 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): activate queue boundary in runtime service

Wire ActiveServerBetaQueueManager into the server-beta runtime graph.
The active manager owns one ServerJobQueue per generation kind (event,
event-batch, summary, reindex) and surfaces lane metadata through
boundary health.

Selection is opt-in and fail-fast: if CLAUDE_MEM_QUEUE_ENGINE is set to
bullmq the active manager is constructed (and any Redis/config error
throws — no silent fallback to SQLite, per Phase 3 anti-pattern guard).
For any other engine the disabled boundary remains so worker-era and
test setups stay compatible.

Widens ServerBetaBoundaryHealth.status to a discriminated union
('disabled' | 'active' | 'errored') with optional details. The disabled
adapter still emits status='disabled', which keeps the existing
server-beta-service test green.

ServerBetaService receives the manager through a new optional
queueManager field on CreateServerBetaServiceOptions so test graphs
and Phase 4 wiring can inject custom managers.

Adds tests/server/runtime/active-queue-manager.test.ts covering bullmq
guard, active health shape, per-kind queue access, close behavior, and
post-close errored health.

Phase 3 commit 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(server-beta): cap /v1/events/batch at 500 events

Prevents unbounded array DoS surface flagged in PR review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:20:07 -07:00

13 KiB

Observation Queue Engine Deep Dive: BullMQ vs Bee-Queue

Date: 2026-05-06

Executive decision

If claude-mem replaces its observation queue with one of the two Redis-backed libraries, choose BullMQ, not Bee-Queue.

That said, the current observation queue is not a generic background job queue. It is a durable, per-session input stream feeding long-lived provider generators. Replacing it with Redis should not be the default local install path unless claude-mem is willing to require, bundle, or supervise Redis. If Redis is not acceptable as a new operational dependency, the better path is to keep the SQLite queue and fix the contract/test drift.

Recommended path:

  1. Stabilize the current queue contract and tests.
  2. Add a queue-engine adapter boundary.
  3. Keep SQLite as the default backend.
  4. Add BullMQ as an optional backend for users who explicitly configure Redis.
  5. Do not adopt Bee-Queue.

Current claude-mem queue shape

The active queue path is:

  • src/services/worker/http/shared.ts and SessionRoutes.ts ingest observations/summarize requests.
  • SessionManager.queueObservation() and queueSummarize() persist rows through PendingMessageStore.enqueue().
  • SessionQueueProcessor.createIterator() claims one row at a time and wakes via a per-session EventEmitter.
  • Provider loops in ClaudeProvider, GeminiProvider, and OpenRouterProvider consume sessionManager.getMessageIterator(sessionDbId).
  • Parsed agent output is stored through processAgentResponse(), then SessionManager.clearPendingForSession() clears that session's pending rows.

Key semantics that must survive any replacement:

  • Per-session FIFO ordering.
  • At-most-one active consumer per session.
  • Durable queue across worker restarts.
  • Startup recovery from processing back to pending.
  • Low-latency wakeup when new tool observations arrive.
  • Deduplication by content_session_id + tool_use_id.
  • Original observation timestamp preservation for storage/broadcast.
  • Queue depth for /api/processing-status and SSE.
  • Local-first behavior and simple install are product requirements, not just implementation details.

Important mismatch found during the dive:

  • Current PendingMessageStore only models pending and processing.
  • Older migrations, tests, and scripts still reference processed, failed, retry_count, completed_at_epoch, failed_at_epoch, and worker_pid.
  • storeObservationsAndMarkComplete() still updates a row to processed, while the currently visible queue path clears all pending messages for the session after parsing.
  • src/services/sqlite/schema.sql still creates idx_pending_messages_worker_pid even though the visible table definition has no worker_pid.

Focused test run:

bun test tests/services/sqlite/PendingMessageStore.test.ts tests/services/queue/SessionQueueProcessor.test.ts

Result: 10 pass, 6 fail. Failures show stale tests/contract drift:

  • PendingMessageStore.test.ts passes 3 as constructor arg, but constructor now expects onMutate?: () => void.
  • SessionQueueProcessor.test.ts expects retry-after-store-error behavior, but current implementation logs and exits the iterator on claim failure.

This needs to be reconciled before swapping engines; otherwise the migration will encode inconsistent behavior.

BullMQ deep dive

Sources checked:

Current package/repo facts captured on 2026-05-06:

  • NPM latest: bullmq@5.76.5.
  • NPM modified: 2026-05-02.
  • GitHub pushed: 2026-05-05.
  • GitHub stars/forks/open issues at capture time: 8808 / 606 / 414.
  • License: MIT.
  • Unpacked size: about 2.5 MB.
  • Dependencies: ioredis, cron-parser, msgpackr, node-abort-controller, semver, tslib.
  • TypeScript types are bundled.
  • A Bun import smoke test succeeded for import { Queue } from 'bullmq'.

Strengths for claude-mem:

  • Actively maintained and widely used.
  • Built-in TypeScript API.
  • Redis-backed durability and distributed workers.
  • Built-in stalled-job recovery, retry attempts, fixed/exponential backoff, delays, priorities, FIFO/LIFO, auto-removal, QueueEvents, manual processing APIs, and job ID based dedupe.
  • BullMQ docs explicitly support manual job fetching with Worker#getNextJob(), moveToCompleted(), moveToFailed(), and lock extension. This matters because claude-mem's provider loop is closer to a stream consumer than a normal job processor.

Costs and risks:

  • Redis becomes required for the queue backend. BullMQ docs require a Redis connection to use queues and recommend Redis compatibility 6.2+.
  • Redis must be configured like durable infrastructure, not cache: AOF persistence and maxmemory-policy=noeviction are recommended/required for correctness.
  • Connection count increases. BullMQ docs note each class consumes at least one Redis connection; Worker and QueueEvents need blocking/duplicated connections in some cases.
  • Jobs store data in Redis in clear text unless claude-mem encrypts or avoids sensitive payload fields. Tool input/output can be sensitive.
  • BullMQ job completion/failure semantics do not map directly to claude-mem's current "provider consumes many messages, parses one response, then clears the session" behavior.
  • Per-session FIFO with parallel sessions is not free in OSS BullMQ. A single global queue with worker concurrency > 1 can violate same-session ordering unless we add a scheduler. BullMQ Pro groups would address this, but claude-mem should not depend on Pro.
  • Custom jobId is useful for tool_use_id dedupe, but BullMQ custom job IDs must not contain :. Use a hash or safe delimiter.
  • Manual processing requires lock management. BullMQ docs call out that manually fetched jobs do not get automatic lock renewal like standard processors; claude-mem would need extendLock() for long provider calls or a large lock duration.

Best BullMQ shape if adopted:

  • Prefer one queue per active session over one global queue initially:
    • Queue name: claude-mem:session:<safe-session-db-id> or a hashed content-session suffix.
    • Worker/manual consumer concurrency: 1.
    • Preserves per-session FIFO without BullMQ Pro groups.
    • Active session counts are naturally low for local claude-mem usage.
    • Cleanup queue keys when a session is deleted or after idle timeout.
  • Use jobId for observation dedupe:
    • obs_<sha256(contentSessionId + "\0" + toolUseId)>.
    • Summaries should use a distinct id scheme and usually should not dedupe unless the current summarize semantics require it.
  • Use removeOnComplete aggressively if SQLite remains the source of truth for stored observations.
  • Keep only bounded failed jobs for debugging.
  • Treat Redis as queue state only; SQLite remains the canonical observation/session store.
  • Add config:
    • CLAUDE_MEM_QUEUE_ENGINE=sqlite|bullmq
    • CLAUDE_MEM_REDIS_URL
    • CLAUDE_MEM_QUEUE_REDIS_PREFIX
    • CLAUDE_MEM_QUEUE_ENCRYPT_PAYLOADS=true|false if sensitive fields are stored.

Bee-Queue deep dive

Sources checked:

Current package/repo facts captured on 2026-05-06:

  • NPM latest: bee-queue@2.0.0.
  • NPM modified: 2025-12-08.
  • GitHub pushed: 2026-04-10.
  • GitHub stars/forks/open issues at capture time: 4027 / 221 / 47.
  • License field from NPM: MIT. GitHub API license metadata returned NOASSERTION.
  • Unpacked size: about 107 KB.
  • Dependencies: redis@^3.1.2, p-finally, promise-callbacks.
  • NPM package exposes ./index.d.ts.
  • A Bun import smoke test succeeded for import BeeQueue from 'bee-queue'.

Strengths for claude-mem:

  • Very small and simple.
  • Designed for short, real-time jobs.
  • Redis-backed with Lua/pipelining and low overhead.
  • Supports concurrency, retries, retry strategies, timeouts, scheduled jobs, pub/sub events, results to producers, and stalled job retry.
  • Redis requirement is lighter in docs: Redis 2.8+, with Redis 3.2+ recommended for delayed jobs.

Costs and risks:

  • Narrower feature set by design. The README says priorities and repeatable jobs are not currently supported.
  • CommonJS-first API; workable, but less idiomatic for this ESM TypeScript codebase.
  • Uses the older redis v3 client line, not modern redis v4/v5 or ioredis.
  • Observability and operational tooling are thinner than BullMQ.
  • Same per-session ordering mismatch exists as BullMQ, but with fewer escape hatches.
  • Delayed retry behavior requires activateDelayedJobs on at least one queue instance.
  • The package is newly revived, but not as active/mature as BullMQ for a queue-engine foundation.

Conclusion: Bee-Queue is attractive if the only goal is "small Redis queue for short jobs." claude-mem needs a durable session stream with strict per-session semantics, good TypeScript ergonomics, explicit recovery behavior, and long-term maintenance. Bee-Queue is the wrong tradeoff.

Scorecard

Area Current SQLite BullMQ Bee-Queue
Local-first install Strong Weak unless Redis is bundled/optional Weak unless Redis is bundled/optional
Per-session FIFO Strong Medium with per-session queues; weak with one global queue Medium with per-session queues; weak with one global queue
Restart durability Strong, SQLite-backed Strong if Redis persistence configured Strong if Redis persistence configured
Stalled recovery Custom/simple Strong built-in Built-in
TypeScript fit Strong Strong Medium
Maintenance/activity Internal Strong Medium
Operational complexity Low High Medium-high
Queue observability Custom/basic Strong Medium
Dependency footprint Low Larger Small
Privacy/data locality SQLite local file Redis clear-text unless handled Redis clear-text unless handled
Best use in claude-mem Default Optional advanced backend Do not use

Migration plan

Phase 0: Fix the existing contract

  • Decide whether pending_messages.status is only pending|processing, or whether processed|failed is coming back.
  • Fix schema.sql and migrations so worker_pid indexes are not created after worker_pid is dropped.
  • Fix storeObservationsAndMarkComplete() or remove it if no longer used.
  • Update queue tests to match real behavior:
    • constructor signature;
    • claim error behavior;
    • reset-on-start behavior;
    • dedupe by tool_use_id;
    • clear-session behavior.

Phase 1: Add an adapter boundary

Define a small interface around current behavior, not around BullMQ:

interface ObservationQueueEngine {
  enqueue(sessionDbId: number, contentSessionId: string, message: PendingMessage): Promise<EnqueueResult>;
  createIterator(sessionDbId: number, signal: AbortSignal, onIdleTimeout?: () => void): AsyncIterableIterator<PendingMessageWithId>;
  clearPendingForSession(sessionDbId: number): Promise<number>;
  resetProcessingToPending(sessionDbId: number): Promise<number>;
  getPendingCount(sessionDbId: number): Promise<number>;
  getTotalQueueDepth(): Promise<number>;
  close(): Promise<void>;
}

Keep SqliteObservationQueueEngine as the first implementation by moving the current PendingMessageStore + SessionQueueProcessor behavior behind this interface.

Phase 2: Add BullMQ backend behind feature flag

  • Add BullMqObservationQueueEngine.
  • Use per-session queues with concurrency/manual fetch of 1.
  • Use safe hashed jobId for observation dedupe.
  • Preserve _originalTimestamp in job data.
  • Keep provider loops unchanged by preserving the async iterator interface.
  • Implement lock extension if manual processing can exceed the configured lock duration.
  • Keep SQLite as the observation/session truth; Redis is transport.
  • Add Redis connectivity health to /api/health only when BullMQ backend is enabled.

Phase 3: Migration and fallback

  • On startup with BullMQ enabled, migrate existing SQLite pending_messages rows into BullMQ once, then mark/delete migrated rows.
  • If Redis is unavailable at startup, fail loudly for CLAUDE_MEM_QUEUE_ENGINE=bullmq; do not silently drop observations.
  • For default sqlite, do not require Redis.

Phase 4: Tests

  • Unit-test the adapter contract with a shared test suite.
  • Run the suite against SQLite always.
  • Run BullMQ tests only when Redis is available, or spin Redis in CI.
  • Add crash/restart tests:
    • enqueue, kill worker, restart, process;
    • claimed job stalls and returns;
    • duplicate tool_use_id is suppressed;
    • per-session FIFO across concurrent sessions;
    • idle timeout still aborts provider subprocesses.

Final recommendation

Do not do a direct swap from SQLite to either library.

If the product goal is to keep claude-mem easy to install and local-first, invest in the current SQLite queue: clean up the schema/status drift, restore tests, add explicit retries/failure rows if needed, and keep the in-process wakeup path.

If the product goal is to support distributed workers or stronger queue observability, add BullMQ as an optional backend through an adapter. It has the right maintenance profile, TypeScript support, recovery primitives, and docs. Bee-Queue is too narrow and too legacy-client-oriented for this role.