fix: resolve 3 upstream bugs (summarize, ChromaSync, HealthMonitor) (#1566)
* fix: resolve 3 upstream bugs in summarize, ChromaSync, and HealthMonitor 1. summarize.ts: Skip summary when transcript has no assistant message. Prevents error loop where empty transcripts cause repeated failed summarize attempts (~30 errors/day observed in production). 2. ChromaSync.ts: Fallback to chroma_update_documents when add fails with "IDs already exist". Handles partial writes after MCP timeout without waiting for next backfill cycle. 3. HealthMonitor.ts: Replace HTTP-based isPortInUse with atomic socket bind on Unix. Eliminates TOCTOU race when two sessions start simultaneously (HTTP check is non-atomic — both see "port free" before either completes listen()). Updated tests accordingly. All three bugs are pre-existing in v10.5.5. Confirmed via log analysis of 543K lines over 17 days of production usage across two servers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * chore: add CONTRIB_NOTES.md to gitignore Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address CodeRabbit review on PR #1566 - HealthMonitor: add APPROVED OVERRIDE annotation for Win32 HTTP fallback - ChromaSync: replace chroma_update_documents with delete+add for proper upsert (update only modifies existing IDs, silently ignores missing ones) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Alessandro Costa <alessandro@claudio.dev> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -35,3 +35,6 @@ src/ui/viewer.html
|
|||||||
.claude/session-intent.md
|
.claude/session-intent.md
|
||||||
.claude/session-plan.md
|
.claude/session-plan.md
|
||||||
.octo/
|
.octo/
|
||||||
|
|
||||||
|
# Local contribution analysis (not part of upstream)
|
||||||
|
CONTRIB_NOTES.md
|
||||||
@@ -52,6 +52,16 @@ export const summarizeHandler: EventHandler = {
|
|||||||
return { continue: true, suppressOutput: true, exitCode: HOOK_EXIT_CODES.SUCCESS };
|
return { continue: true, suppressOutput: true, exitCode: HOOK_EXIT_CODES.SUCCESS };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Skip summary if transcript has no assistant message (prevents repeated
|
||||||
|
// empty summarize requests that pollute logs — upstream bug)
|
||||||
|
if (!lastAssistantMessage || !lastAssistantMessage.trim()) {
|
||||||
|
logger.debug('HOOK', 'No assistant message in transcript - skipping summary', {
|
||||||
|
sessionId,
|
||||||
|
transcriptPath
|
||||||
|
});
|
||||||
|
return { continue: true, suppressOutput: true, exitCode: HOOK_EXIT_CODES.SUCCESS };
|
||||||
|
}
|
||||||
|
|
||||||
logger.dataIn('HOOK', 'Stop: Requesting summary', {
|
logger.dataIn('HOOK', 'Stop: Requesting summary', {
|
||||||
hasLastAssistantMessage: !!lastAssistantMessage
|
hasLastAssistantMessage: !!lastAssistantMessage
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -10,6 +10,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
import net from 'net';
|
||||||
import { readFileSync } from 'fs';
|
import { readFileSync } from 'fs';
|
||||||
import { logger } from '../../utils/logger.js';
|
import { logger } from '../../utils/logger.js';
|
||||||
import { MARKETPLACE_ROOT } from '../../shared/paths.js';
|
import { MARKETPLACE_ROOT } from '../../shared/paths.js';
|
||||||
@@ -35,17 +36,43 @@ async function httpRequestToWorker(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if a port is in use by querying the health endpoint
|
* Check if a port is in use by attempting an atomic socket bind.
|
||||||
|
* More reliable than HTTP health check for daemon spawn guards —
|
||||||
|
* prevents TOCTOU race where two daemons both see "port free" via
|
||||||
|
* HTTP and then both try to listen() (upstream bug workaround).
|
||||||
|
*
|
||||||
|
* Falls back to HTTP health check on Windows where socket bind
|
||||||
|
* behavior differs.
|
||||||
*/
|
*/
|
||||||
export async function isPortInUse(port: number): Promise<boolean> {
|
export async function isPortInUse(port: number): Promise<boolean> {
|
||||||
|
if (process.platform === 'win32') {
|
||||||
|
// APPROVED OVERRIDE: Windows keeps HTTP health check because socket bind
|
||||||
|
// semantics differ (SO_REUSEADDR defaults, firewall prompts). The TOCTOU
|
||||||
|
// race remains on Windows but is an accepted limitation — the atomic
|
||||||
|
// socket approach would cause false positives or UAC popups.
|
||||||
try {
|
try {
|
||||||
// Note: Removed AbortSignal.timeout to avoid Windows Bun cleanup issue (libuv assertion)
|
|
||||||
const response = await fetch(`http://127.0.0.1:${port}/api/health`);
|
const response = await fetch(`http://127.0.0.1:${port}/api/health`);
|
||||||
return response.ok;
|
return response.ok;
|
||||||
} catch (error) {
|
} catch {
|
||||||
// [ANTI-PATTERN IGNORED]: Health check polls every 500ms, logging would flood
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unix: atomic socket bind check — no TOCTOU race
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
const server = net.createServer();
|
||||||
|
server.once('error', (err: NodeJS.ErrnoException) => {
|
||||||
|
if (err.code === 'EADDRINUSE') {
|
||||||
|
resolve(true);
|
||||||
|
} else {
|
||||||
|
resolve(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
server.once('listening', () => {
|
||||||
|
server.close(() => resolve(false));
|
||||||
|
});
|
||||||
|
server.listen(port, '127.0.0.1');
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -283,6 +283,35 @@ export class ChromaSync {
|
|||||||
metadatas: cleanMetadatas
|
metadatas: cleanMetadatas
|
||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
const errMsg = error instanceof Error ? error.message : String(error);
|
||||||
|
// APPROVED OVERRIDE: Duplicate IDs from partial write before timeout/crash.
|
||||||
|
// chroma_update_documents only updates *existing* IDs — it silently ignores
|
||||||
|
// missing ones. So we delete-then-add to guarantee all IDs are written.
|
||||||
|
if (errMsg.includes('already exist')) {
|
||||||
|
try {
|
||||||
|
await chromaMcp.callTool('chroma_delete_documents', {
|
||||||
|
collection_name: this.collectionName,
|
||||||
|
ids: batch.map(d => d.id)
|
||||||
|
});
|
||||||
|
await chromaMcp.callTool('chroma_add_documents', {
|
||||||
|
collection_name: this.collectionName,
|
||||||
|
ids: batch.map(d => d.id),
|
||||||
|
documents: batch.map(d => d.document),
|
||||||
|
metadatas: cleanMetadatas
|
||||||
|
});
|
||||||
|
logger.info('CHROMA_SYNC', 'Batch reconciled via delete+add after duplicate conflict', {
|
||||||
|
collection: this.collectionName,
|
||||||
|
batchStart: i,
|
||||||
|
batchSize: batch.length
|
||||||
|
});
|
||||||
|
} catch (reconcileError) {
|
||||||
|
logger.error('CHROMA_SYNC', 'Batch reconcile (delete+add) failed', {
|
||||||
|
collection: this.collectionName,
|
||||||
|
batchStart: i,
|
||||||
|
batchSize: batch.length
|
||||||
|
}, reconcileError as Error);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
logger.error('CHROMA_SYNC', 'Batch add failed, continuing with remaining batches', {
|
logger.error('CHROMA_SYNC', 'Batch add failed, continuing with remaining batches', {
|
||||||
collection: this.collectionName,
|
collection: this.collectionName,
|
||||||
batchStart: i,
|
batchStart: i,
|
||||||
@@ -290,6 +319,7 @@ export class ChromaSync {
|
|||||||
}, error as Error);
|
}, error as Error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
logger.debug('CHROMA_SYNC', 'Documents added', {
|
logger.debug('CHROMA_SYNC', 'Documents added', {
|
||||||
collection: this.collectionName,
|
collection: this.collectionName,
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import { describe, it, expect, beforeEach, afterEach, mock } from 'bun:test';
|
import { describe, it, expect, beforeEach, afterEach, mock, spyOn } from 'bun:test';
|
||||||
|
import net from 'net';
|
||||||
import {
|
import {
|
||||||
isPortInUse,
|
isPortInUse,
|
||||||
waitForHealth,
|
waitForHealth,
|
||||||
@@ -15,45 +16,73 @@ describe('HealthMonitor', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe('isPortInUse', () => {
|
describe('isPortInUse', () => {
|
||||||
it('should return true for occupied port (health check succeeds)', async () => {
|
// Note: Since we are on Linux (as per session_context), isPortInUse uses 'net'
|
||||||
global.fetch = mock(() => Promise.resolve({ ok: true } as Response));
|
// instead of 'fetch'. We need to mock 'net.createServer().listen()'
|
||||||
|
|
||||||
|
it('should return true for occupied port (EADDRINUSE)', async () => {
|
||||||
|
// Create a specific mock for this test
|
||||||
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'error') {
|
||||||
|
// Trigger EADDRINUSE immediately
|
||||||
|
setTimeout(() => cb({ code: 'EADDRINUSE' }), 0);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
listen: mock(() => {})
|
||||||
|
}));
|
||||||
|
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
const result = await isPortInUse(37777);
|
const result = await isPortInUse(37777);
|
||||||
|
|
||||||
expect(result).toBe(true);
|
expect(result).toBe(true);
|
||||||
expect(global.fetch).toHaveBeenCalledWith('http://127.0.0.1:37777/api/health');
|
expect(net.createServer).toHaveBeenCalled();
|
||||||
|
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should return false for free port (connection refused)', async () => {
|
it('should return false for free port (listening succeeds)', async () => {
|
||||||
global.fetch = mock(() => Promise.reject(new Error('ECONNREFUSED')));
|
const closeMock = mock((cb: Function) => cb());
|
||||||
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'listening') {
|
||||||
|
// Trigger listening success
|
||||||
|
setTimeout(() => cb(), 0);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
listen: mock(() => {}),
|
||||||
|
close: closeMock
|
||||||
|
}));
|
||||||
|
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
const result = await isPortInUse(39999);
|
const result = await isPortInUse(39999);
|
||||||
|
|
||||||
expect(result).toBe(false);
|
expect(result).toBe(false);
|
||||||
|
expect(net.createServer).toHaveBeenCalled();
|
||||||
|
expect(closeMock).toHaveBeenCalled();
|
||||||
|
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should return false when health check returns non-ok', async () => {
|
it('should return false for other socket errors', async () => {
|
||||||
global.fetch = mock(() => Promise.resolve({ ok: false, status: 503 } as Response));
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'error') {
|
||||||
|
// Trigger other error (e.g., EACCES)
|
||||||
|
setTimeout(() => cb({ code: 'EACCES' }), 0);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
listen: mock(() => {})
|
||||||
|
}));
|
||||||
|
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
const result = await isPortInUse(37777);
|
const result = await isPortInUse(37777);
|
||||||
|
|
||||||
expect(result).toBe(false);
|
expect(result).toBe(false);
|
||||||
});
|
|
||||||
|
|
||||||
it('should return false on network timeout', async () => {
|
spy.mockRestore();
|
||||||
global.fetch = mock(() => Promise.reject(new Error('ETIMEDOUT')));
|
|
||||||
|
|
||||||
const result = await isPortInUse(37777);
|
|
||||||
|
|
||||||
expect(result).toBe(false);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should return false on fetch failed error', async () => {
|
|
||||||
global.fetch = mock(() => Promise.reject(new Error('fetch failed')));
|
|
||||||
|
|
||||||
const result = await isPortInUse(37777);
|
|
||||||
|
|
||||||
expect(result).toBe(false);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -203,54 +232,80 @@ describe('HealthMonitor', () => {
|
|||||||
|
|
||||||
describe('waitForPortFree', () => {
|
describe('waitForPortFree', () => {
|
||||||
it('should return true immediately when port is already free', async () => {
|
it('should return true immediately when port is already free', async () => {
|
||||||
global.fetch = mock(() => Promise.reject(new Error('ECONNREFUSED')));
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'listening') setTimeout(() => cb(), 0);
|
||||||
|
}),
|
||||||
|
listen: mock(() => {}),
|
||||||
|
close: mock((cb: Function) => cb())
|
||||||
|
}));
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
const start = Date.now();
|
const start = Date.now();
|
||||||
const result = await waitForPortFree(39999, 5000);
|
const result = await waitForPortFree(39999, 5000);
|
||||||
const elapsed = Date.now() - start;
|
const elapsed = Date.now() - start;
|
||||||
|
|
||||||
expect(result).toBe(true);
|
expect(result).toBe(true);
|
||||||
// Should return quickly
|
|
||||||
expect(elapsed).toBeLessThan(1000);
|
expect(elapsed).toBeLessThan(1000);
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should timeout when port remains occupied', async () => {
|
it('should timeout when port remains occupied', async () => {
|
||||||
global.fetch = mock(() => Promise.resolve({ ok: true } as Response));
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'error') setTimeout(() => cb({ code: 'EADDRINUSE' }), 0);
|
||||||
|
}),
|
||||||
|
listen: mock(() => {})
|
||||||
|
}));
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
const start = Date.now();
|
const start = Date.now();
|
||||||
const result = await waitForPortFree(37777, 1500);
|
const result = await waitForPortFree(37777, 1500);
|
||||||
const elapsed = Date.now() - start;
|
const elapsed = Date.now() - start;
|
||||||
|
|
||||||
expect(result).toBe(false);
|
expect(result).toBe(false);
|
||||||
// Should take close to timeout duration
|
|
||||||
expect(elapsed).toBeGreaterThanOrEqual(1400);
|
expect(elapsed).toBeGreaterThanOrEqual(1400);
|
||||||
expect(elapsed).toBeLessThan(2500);
|
expect(elapsed).toBeLessThan(2500);
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should succeed when port becomes free', async () => {
|
it('should succeed when port becomes free', async () => {
|
||||||
let callCount = 0;
|
let callCount = 0;
|
||||||
global.fetch = mock(() => {
|
const spy = spyOn(net, 'createServer').mockImplementation(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
callCount++;
|
callCount++;
|
||||||
// Port occupied for first 2 checks, then free
|
// Port occupied for first 2 checks, then free
|
||||||
if (callCount < 3) {
|
if (callCount < 3) {
|
||||||
return Promise.resolve({ ok: true } as Response);
|
if (event === 'error') setTimeout(() => cb({ code: 'EADDRINUSE' }), 0);
|
||||||
|
} else {
|
||||||
|
if (event === 'listening') setTimeout(() => cb(), 0);
|
||||||
}
|
}
|
||||||
return Promise.reject(new Error('ECONNREFUSED'));
|
}),
|
||||||
});
|
listen: mock(() => {}),
|
||||||
|
close: mock((cb: Function) => cb())
|
||||||
|
} as any));
|
||||||
|
|
||||||
const result = await waitForPortFree(37777, 5000);
|
const result = await waitForPortFree(37777, 5000);
|
||||||
|
|
||||||
expect(result).toBe(true);
|
expect(result).toBe(true);
|
||||||
expect(callCount).toBeGreaterThanOrEqual(3);
|
expect(callCount).toBeGreaterThanOrEqual(3);
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should use default timeout when not specified', async () => {
|
it('should use default timeout when not specified', async () => {
|
||||||
global.fetch = mock(() => Promise.reject(new Error('ECONNREFUSED')));
|
const createServerMock = mock(() => ({
|
||||||
|
once: mock((event: string, cb: Function) => {
|
||||||
|
if (event === 'listening') setTimeout(() => cb(), 0);
|
||||||
|
}),
|
||||||
|
listen: mock(() => {}),
|
||||||
|
close: mock((cb: Function) => cb())
|
||||||
|
}));
|
||||||
|
const spy = spyOn(net, 'createServer').mockImplementation(createServerMock as any);
|
||||||
|
|
||||||
// Just verify it doesn't throw and returns quickly
|
|
||||||
const result = await waitForPortFree(39999);
|
const result = await waitForPortFree(39999);
|
||||||
|
|
||||||
expect(result).toBe(true);
|
expect(result).toBe(true);
|
||||||
|
spy.mockRestore();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user