Files
claude-mem/tests/services/queue/bullmq-observation-queue-engine.test.ts
Alex Newman 36b0929fae Server-beta: Postgres storage + independent runtime + BullMQ queue (Phases 1–3) (#2351)
* Add server beta runtime foundation

* Address server beta review findings

* Resolve server beta review comments

* Tighten server beta review follow-ups

* Harden server beta auth and search

* Avoid unnecessary FTS rebuilds

* Block scoped keys from creating projects

* Release BullMQ claims best effort on close

* Address server beta review blockers

* Reset BullMQ claims best effort

* Add Postgres observation storage foundation

* feat(server-beta): add independent runtime service

Introduce src/server/runtime/ as a self-contained server-beta runtime
that owns its lifecycle, Postgres bootstrap, and HTTP boundary without
depending on WorkerService.

ServerBetaService wraps the existing Server class, exposes
/healthz and /v1/info with runtime="server-beta", and persists state
to dedicated paths (.server-beta.pid|.port|.runtime.json). The four
boundary managers (queue, generation worker, provider registry, event
broadcaster) are intentionally disabled in this phase and report their
status through /v1/info; later phases activate them.

Adds plans/2026-05-07-finish-bullmq-branch-ship-plan.md to track the
remaining work for this branch.

Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): route CLI lifecycle and bundle separate runtime

scripts/build-hooks.js now produces plugin/scripts/server-beta-service.cjs
as a separate Node CJS bundle, alongside the existing worker-service
bundle. The server-beta runtime is now installable independently.

src/npx-cli/commands/server.ts routes start|stop|restart|status to the
server-beta lifecycle instead of the legacy worker. The worker keeps its
own start|stop|restart|status under the worker namespace; the two
runtimes can be operated independently.

src/services/worker-service.ts adds a server-* command parser branch
that delegates to the sibling server-beta-service.cjs bundle so
direct worker-service invocations still route to the right runtime.

tests/npx-cli-server-namespace.test.ts updated to expect server-beta
lifecycle routing.

Includes rebuilt plugin/scripts/*.cjs bundles produced by
build-and-sync.

Phase 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): add BullMQ job queue primitives

Introduce src/server/jobs/ as the queue-side primitives that Phase 3 of
the server-beta runtime needs to operate.

types.ts defines a discriminated union over the four job kinds (event,
event-batch, summary, reindex) and maps each to a per-kind BullMQ queue
name and deterministic-ID prefix.

job-id.ts builds deterministic, colon-free BullMQ jobIds from
(kind, team, project, source). The colon ban exists because BullMQ uses
':' as a Redis key separator internally; embedding ':' in jobIds
breaks scan and state lookups.

ServerJobQueue.ts is a thin wrapper over BullMQ Queue + Worker that
enforces autorun:false, default concurrency 1, and an attached error
listener — all per BullMQ docs requirements. Test seams accept queue
and worker factories so unit tests do not need Redis.

outbox.ts publishes through the Postgres ObservationGenerationJob
repository as canonical history. enqueueOutbox writes the row first,
then publishes to BullMQ; if BullMQ throws, the row is transitioned to
failed and a failed event is appended. reconcileOnStartup re-enqueues
queued + processing rows after a restart, replacing terminal BullMQ
jobs that may still be holding the deterministic ID slot. markCompleted
and markFailed wrap transitionStatus and append the matching event row.

Includes 20 unit tests covering deterministic ID stability, colon-free
output, queue lifecycle, error-listener attachment, double-start
refusal, idempotent enqueue, BullMQ failure rollback, startup
reconciliation, max-attempts skipping, and completion / failure /
retry transitions.

Phase 3 commit 1 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* feat(server-beta): activate queue boundary in runtime service

Wire ActiveServerBetaQueueManager into the server-beta runtime graph.
The active manager owns one ServerJobQueue per generation kind (event,
event-batch, summary, reindex) and surfaces lane metadata through
boundary health.

Selection is opt-in and fail-fast: if CLAUDE_MEM_QUEUE_ENGINE is set to
bullmq the active manager is constructed (and any Redis/config error
throws — no silent fallback to SQLite, per Phase 3 anti-pattern guard).
For any other engine the disabled boundary remains so worker-era and
test setups stay compatible.

Widens ServerBetaBoundaryHealth.status to a discriminated union
('disabled' | 'active' | 'errored') with optional details. The disabled
adapter still emits status='disabled', which keeps the existing
server-beta-service test green.

ServerBetaService receives the manager through a new optional
queueManager field on CreateServerBetaServiceOptions so test graphs
and Phase 4 wiring can inject custom managers.

Adds tests/server/runtime/active-queue-manager.test.ts covering bullmq
guard, active health shape, per-kind queue access, close behavior, and
post-close errored health.

Phase 3 commit 2 of plans/2026-05-07-server-beta-independent-bullmq-observation-runtime.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix(server-beta): cap /v1/events/batch at 500 events

Prevents unbounded array DoS surface flagged in PR review.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-08 01:20:07 -07:00

613 lines
18 KiB
TypeScript

import { afterEach, describe, expect, test } from 'bun:test';
import { Redis } from 'ioredis';
import {
BullMqObservationQueueEngine,
getSafeJobId,
type BullMqObservationQueueEngineOptions,
} from '../../../src/server/queue/BullMqObservationQueueEngine.js';
import type { PendingMessage } from '../../../src/services/worker-types.js';
class FakeJob {
state: string = 'waiting';
failMoveToWait = false;
constructor(
readonly id: string,
readonly name: string,
readonly data: any,
) {}
async getState(): Promise<string> {
return this.state;
}
async moveToCompleted(): Promise<void> {
this.state = 'completed';
}
async remove(): Promise<void> {
this.state = 'removed';
}
async moveToWait(): Promise<number> {
if (this.failMoveToWait) {
throw new Error('moveToWait failed');
}
this.state = 'waiting';
return 0;
}
async extendLock(): Promise<number> {
return 1;
}
}
class FakeQueue {
readonly jobs: FakeJob[] = [];
failObliterate = false;
closed = false;
async add(name: string, data: any, opts: { jobId?: string } = {}): Promise<FakeJob> {
const id = opts.jobId ?? String(this.jobs.length + 1);
const existing = this.jobs.find(job => job.id === id && job.state !== 'removed');
if (existing) {
return existing;
}
const job = new FakeJob(id, name, data);
this.jobs.push(job);
return job;
}
async getJob(jobId: string): Promise<FakeJob | undefined> {
return this.jobs.find(job => job.id === jobId && job.state !== 'removed');
}
async getJobCounts(...types: string[]): Promise<Record<string, number>> {
return Object.fromEntries(types.map(type => [type, this.jobs.filter(job => job.state === type).length]));
}
async getJobs(types: string[]): Promise<FakeJob[]> {
return this.jobs.filter(job => types.includes(job.state));
}
async obliterate(): Promise<void> {
if (this.failObliterate) {
throw new Error('obliterate failed');
}
this.jobs.length = 0;
}
async close(): Promise<void> {
this.closed = true;
}
async claimNext(): Promise<FakeJob | undefined> {
const job = this.jobs.find(item => item.state === 'waiting');
if (job) {
job.state = 'active';
}
return job;
}
}
class FakeRedis {
status: string = 'wait';
readonly sets = new Map<string, Set<string>>();
failSets = false;
async connect(): Promise<void> {
this.status = 'ready';
}
async ping(): Promise<string> {
return 'PONG';
}
async sadd(key: string, ...members: string[]): Promise<number> {
if (this.failSets) {
throw new Error('sadd failed');
}
let set = this.sets.get(key);
if (!set) {
set = new Set<string>();
this.sets.set(key, set);
}
const before = set.size;
members.forEach(member => set.add(member));
return set.size - before;
}
async srem(key: string, ...members: string[]): Promise<number> {
if (this.failSets) {
throw new Error('srem failed');
}
const set = this.sets.get(key);
if (!set) return 0;
let removed = 0;
for (const member of members) {
if (set.delete(member)) removed++;
}
return removed;
}
async smembers(key: string): Promise<string[]> {
if (this.failSets) {
throw new Error('smembers failed');
}
return Array.from(this.sets.get(key) ?? []);
}
async quit(): Promise<void> {
this.status = 'end';
}
disconnect(): void {
this.status = 'end';
}
}
function createEngine(options: Partial<BullMqObservationQueueEngineOptions> & {
queues?: Map<string, FakeQueue>;
redis?: FakeRedis;
} = {}) {
const queues = options.queues ?? new Map<string, FakeQueue>();
const redis = options.redis ?? new FakeRedis();
const { queues: _queues, redis: _redis, ...engineOptions } = options;
const getQueue = (name: string) => {
let queue = queues.get(name);
if (!queue) {
queue = new FakeQueue();
queues.set(name, queue);
}
return queue;
};
const engine = new BullMqObservationQueueEngine({
config: {
engine: 'bullmq',
mode: 'external',
url: null,
host: '127.0.0.1',
port: 6379,
prefix: 'test_prefix',
connection: {
host: '127.0.0.1',
port: 6379,
lazyConnect: true,
maxRetriesPerRequest: null,
},
},
lockDurationMs: 60_000,
pollIntervalMs: 5,
queueFactory: name => getQueue(name) as any,
workerFactory: name => ({
getNextJob: () => getQueue(name).claimNext(),
close: async () => {},
}) as any,
redisFactory: () => redis as any,
...engineOptions,
});
return { engine, queues, redis };
}
describe('BullMqObservationQueueEngine', () => {
let engine: BullMqObservationQueueEngine | null = null;
afterEach(async () => {
await engine?.close();
engine = null;
});
test('uses safe hashed job ids without colon', () => {
const observation: PendingMessage = {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool:with:colon',
};
const summarize: PendingMessage = {
type: 'summarize',
last_assistant_message: 'done',
};
const obsId = getSafeJobId('session:1', observation, 123);
const sumId = getSafeJobId('session:1', summarize, 123);
const fallbackA = getSafeJobId('session:1', { type: 'observation', tool_name: 'Read' }, 123);
const fallbackB = getSafeJobId('session:1', { type: 'observation', tool_name: 'Read' }, 124);
expect(obsId).toStartWith('obs_');
expect(sumId).toStartWith('sum_');
expect(obsId).not.toContain(':');
expect(sumId).not.toContain(':');
expect(fallbackA).not.toBe(fallbackB);
});
test('deduplicates active observation jobs by content session and tool use id', async () => {
({ engine } = createEngine());
const first = await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-1',
});
const duplicate = await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-1',
});
expect(first).toBeGreaterThan(0);
expect(duplicate).toBe(0);
expect(await engine.getPendingCount(1)).toBe(1);
});
test('replaces terminal jobs before reusing a deterministic BullMQ job id', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-1',
});
const queue = result.queues.get('claude_mem_session_1')!;
queue.jobs[0].state = 'failed';
const replacement = await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-1',
});
expect(replacement).toBeGreaterThan(0);
expect(queue.jobs.map(job => job.state)).toEqual(['removed', 'waiting']);
expect(await engine.getPendingCount(1)).toBe(1);
});
test('yields per-session FIFO messages and confirms exact claimed jobs', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'First',
toolUseId: 'tool-a',
});
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Second',
toolUseId: 'tool-b',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
const first = await iterator.next();
const second = await iterator.next();
expect(first.value).toMatchObject({ type: 'observation', tool_name: 'First' });
expect(second.value).toMatchObject({ type: 'observation', tool_name: 'Second' });
expect(first.value._persistentId).not.toBe(second.value._persistentId);
expect(await engine.confirmProcessed(first.value._persistentId)).toBe(1);
expect(await engine.getPendingCount(1)).toBe(1);
expect(await engine.confirmProcessed(second.value._persistentId)).toBe(1);
expect(await engine.getPendingCount(1)).toBe(0);
expect(await result.redis.smembers('test_prefix:queue_registry:sessions')).toEqual([]);
controller.abort();
await iterator.return?.();
});
test('resetProcessingToPending returns claimed jobs to the session queue', async () => {
({ engine } = createEngine());
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
const first = await iterator.next();
expect(first.value.tool_name).toBe('Read');
expect(await engine.resetProcessingToPending(1)).toBe(1);
const second = await iterator.next();
expect(second.value.tool_name).toBe('Read');
controller.abort();
await iterator.return?.();
});
test('resetProcessingToPending attempts every active claim before throwing', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Write',
toolUseId: 'tool-b',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
await iterator.next();
await iterator.next();
const queue = result.queues.get('claude_mem_session_1')!;
const failedJob = queue.jobs[0];
const releasedJob = queue.jobs[1];
failedJob.failMoveToWait = true;
await expect(engine.resetProcessingToPending(1)).rejects.toThrow('moveToWait failed');
expect(failedJob.state).toBe('active');
expect(releasedJob.state).toBe('waiting');
failedJob.failMoveToWait = false;
expect(await engine.resetProcessingToPending(1)).toBe(1);
controller.abort();
await iterator.return?.();
});
test('close moves local active claims back to wait before dropping state', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
const first = await iterator.next();
expect(first.value.tool_name).toBe('Read');
expect(result.queues.get('claude_mem_session_1')!.jobs[0].state).toBe('active');
await engine.close();
engine = null;
expect(result.queues.get('claude_mem_session_1')!.jobs[0].state).toBe('waiting');
controller.abort();
await iterator.return?.();
});
test('close releases local resources when moving a job back to wait fails', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Write',
toolUseId: 'tool-b',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
await iterator.next();
await iterator.next();
const queue = result.queues.get('claude_mem_session_1')!;
const failedJob = queue.jobs[0];
const releasedJob = queue.jobs[1];
failedJob.failMoveToWait = true;
await expect(engine.close()).rejects.toThrow('moveToWait failed');
engine = null;
expect(failedJob.state).toBe('active');
expect(releasedJob.state).toBe('waiting');
expect(queue.closed).toBe(true);
expect(result.redis.status).toBe('end');
controller.abort();
await iterator.return?.();
});
test('clearPendingForSession preserves active claims when Redis deletion fails', async () => {
const result = createEngine();
engine = result.engine;
await engine.enqueue(1, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 1,
signal: controller.signal,
idleTimeoutMs: 100,
});
await iterator.next();
const queue = result.queues.get('claude_mem_session_1')!;
queue.failObliterate = true;
await expect(engine.clearPendingForSession(1)).rejects.toThrow('obliterate failed');
queue.failObliterate = false;
expect(await engine.resetProcessingToPending(1)).toBe(1);
expect(queue.jobs[0].state).toBe('waiting');
controller.abort();
await iterator.return?.();
});
test('discovers queue depth from Redis registry after process restart', async () => {
const queues = new Map<string, FakeQueue>();
const redis = new FakeRedis();
const firstProcess = createEngine({ queues, redis });
engine = firstProcess.engine;
await engine.enqueue(7, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
expect(await redis.smembers('test_prefix:queue_registry:sessions')).toEqual(['7']);
await engine.close();
const secondProcess = createEngine({ queues, redis });
engine = secondProcess.engine;
expect(await engine.getTotalQueueDepth()).toBe(1);
expect(secondProcess.queues.get('claude_mem_session_7')).toBeDefined();
});
test('clearPendingForSession prunes empty sessions from the Redis registry', async () => {
const queues = new Map<string, FakeQueue>();
const redis = new FakeRedis();
const firstProcess = createEngine({ queues, redis });
engine = firstProcess.engine;
await engine.enqueue(7, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
expect(await redis.smembers('test_prefix:queue_registry:sessions')).toEqual(['7']);
expect(await engine.clearPendingForSession(7)).toBe(1);
expect(await redis.smembers('test_prefix:queue_registry:sessions')).toEqual([]);
});
test('reports Redis health without creating sqlite fallback', async () => {
({ engine } = createEngine());
const health = await engine.getHealth();
expect(health.engine).toBe('bullmq');
expect(health.redis.status).toBe('ok');
expect(health.redis.prefix).toBe('test_prefix');
});
test('assertHealthy fails instead of falling back when Redis is unavailable', async () => {
({ engine } = createEngine({
redisFactory: () => ({
status: 'wait',
connect: async () => {},
ping: async () => {
throw new Error('connection refused');
},
sadd: async () => 0,
srem: async () => 0,
smembers: async () => [],
quit: async () => {},
disconnect: () => {},
}),
}));
await expect(engine.assertHealthy()).rejects.toThrow('CLAUDE_MEM_QUEUE_ENGINE=bullmq requires Redis/Valkey');
});
const redisIntegrationTest = process.env.CLAUDE_MEM_RUN_REDIS_QUEUE_TESTS === 'true'
? test
: test.skip;
redisIntegrationTest('releases active jobs and discovers registry with real Redis', async () => {
const redisUrl = process.env.CLAUDE_MEM_REDIS_URL ?? 'redis://127.0.0.1:6379';
const prefix = `cm_test_${Date.now()}_${Math.random().toString(36).slice(2)}`;
const parsedRedisUrl = new URL(redisUrl);
const redisConnection = {
host: parsedRedisUrl.hostname || '127.0.0.1',
port: parsedRedisUrl.port ? Number.parseInt(parsedRedisUrl.port, 10) : 6379,
username: parsedRedisUrl.username ? decodeURIComponent(parsedRedisUrl.username) : undefined,
password: parsedRedisUrl.password ? decodeURIComponent(parsedRedisUrl.password) : undefined,
db: parsedRedisUrl.pathname.length > 1 ? Number.parseInt(parsedRedisUrl.pathname.slice(1), 10) : undefined,
tls: parsedRedisUrl.protocol === 'rediss:' ? {} : undefined,
lazyConnect: true,
maxRetriesPerRequest: null,
};
const client = new Redis(redisUrl, {
lazyConnect: true,
maxRetriesPerRequest: null,
connectTimeout: 1000,
});
await client.connect();
await client.ping();
await client.quit();
const config = {
engine: 'bullmq' as const,
mode: 'external' as const,
url: redisUrl,
host: redisConnection.host,
port: redisConnection.port,
prefix,
connection: redisConnection,
};
engine = new BullMqObservationQueueEngine({
config,
lockDurationMs: 60_000,
pollIntervalMs: 5,
});
await engine.enqueue(99, 'content-session', {
type: 'observation',
tool_name: 'Read',
toolUseId: 'tool-a',
});
const controller = new AbortController();
const iterator = engine.createIterator({
sessionDbId: 99,
signal: controller.signal,
idleTimeoutMs: 100,
});
const first = await iterator.next();
expect(first.value.tool_name).toBe('Read');
await engine.close();
engine = null;
const restarted = new BullMqObservationQueueEngine({
config,
lockDurationMs: 60_000,
pollIntervalMs: 5,
});
engine = restarted;
expect(await restarted.getTotalQueueDepth()).toBe(1);
expect(await restarted.clearPendingForSession(99)).toBe(1);
controller.abort();
await iterator.return?.();
});
});