import { existsSync, statSync, watch as fsWatch, createReadStream } from 'fs'; import { basename, join } from 'path'; import { globSync } from 'glob'; import { logger } from '../../utils/logger.js'; import { expandHomePath } from './config.js'; import { loadWatchState, saveWatchState, type TranscriptWatchState } from './state.js'; import type { TranscriptWatchConfig, TranscriptSchema, WatchTarget } from './types.js'; import { TranscriptEventProcessor } from './processor.js'; interface TailState { offset: number; partial: string; } class FileTailer { private watcher: ReturnType | null = null; private tailState: TailState; constructor( private filePath: string, initialOffset: number, private onLine: (line: string) => Promise, private onOffset: (offset: number) => void ) { this.tailState = { offset: initialOffset, partial: '' }; } start(): void { this.readNewData().catch(() => undefined); this.watcher = fsWatch(this.filePath, { persistent: true }, () => { this.readNewData().catch(() => undefined); }); } close(): void { this.watcher?.close(); this.watcher = null; } private async readNewData(): Promise { if (!existsSync(this.filePath)) return; let size = 0; try { size = statSync(this.filePath).size; } catch { return; } if (size < this.tailState.offset) { this.tailState.offset = 0; } if (size === this.tailState.offset) return; const stream = createReadStream(this.filePath, { start: this.tailState.offset, end: size - 1, encoding: 'utf8' }); let data = ''; for await (const chunk of stream) { data += chunk as string; } this.tailState.offset = size; this.onOffset(this.tailState.offset); const combined = this.tailState.partial + data; const lines = combined.split('\n'); this.tailState.partial = lines.pop() ?? ''; for (const line of lines) { const trimmed = line.trim(); if (!trimmed) continue; await this.onLine(trimmed); } } } export class TranscriptWatcher { private processor = new TranscriptEventProcessor(); private tailers = new Map(); private state: TranscriptWatchState; private rescanTimers: Array = []; constructor(private config: TranscriptWatchConfig, private statePath: string) { this.state = loadWatchState(statePath); } async start(): Promise { for (const watch of this.config.watches) { await this.setupWatch(watch); } } stop(): void { for (const tailer of this.tailers.values()) { tailer.close(); } this.tailers.clear(); for (const timer of this.rescanTimers) { clearInterval(timer); } this.rescanTimers = []; } private async setupWatch(watch: WatchTarget): Promise { const schema = this.resolveSchema(watch); if (!schema) { logger.warn('TRANSCRIPT', 'Missing schema for watch', { watch: watch.name }); return; } const resolvedPath = expandHomePath(watch.path); const files = this.resolveWatchFiles(resolvedPath); for (const filePath of files) { await this.addTailer(filePath, watch, schema); } const rescanIntervalMs = watch.rescanIntervalMs ?? 5000; const timer = setInterval(async () => { const newFiles = this.resolveWatchFiles(resolvedPath); for (const filePath of newFiles) { if (!this.tailers.has(filePath)) { await this.addTailer(filePath, watch, schema); } } }, rescanIntervalMs); this.rescanTimers.push(timer); } private resolveSchema(watch: WatchTarget): TranscriptSchema | null { if (typeof watch.schema === 'string') { return this.config.schemas?.[watch.schema] ?? null; } return watch.schema; } private resolveWatchFiles(inputPath: string): string[] { if (this.hasGlob(inputPath)) { return globSync(inputPath, { nodir: true, absolute: true }); } if (existsSync(inputPath)) { try { const stat = statSync(inputPath); if (stat.isDirectory()) { const pattern = join(inputPath, '**', '*.jsonl'); return globSync(pattern, { nodir: true, absolute: true }); } return [inputPath]; } catch { return []; } } return []; } private hasGlob(inputPath: string): boolean { return /[*?[\]{}()]/.test(inputPath); } private async addTailer(filePath: string, watch: WatchTarget, schema: TranscriptSchema): Promise { if (this.tailers.has(filePath)) return; const sessionIdOverride = this.extractSessionIdFromPath(filePath); let offset = this.state.offsets[filePath] ?? 0; if (offset === 0 && watch.startAtEnd) { try { offset = statSync(filePath).size; } catch { offset = 0; } } const tailer = new FileTailer( filePath, offset, async (line: string) => { await this.handleLine(line, watch, schema, filePath, sessionIdOverride); }, (newOffset: number) => { this.state.offsets[filePath] = newOffset; saveWatchState(this.statePath, this.state); } ); tailer.start(); this.tailers.set(filePath, tailer); logger.info('TRANSCRIPT', 'Watching transcript file', { file: filePath, watch: watch.name, schema: schema.name }); } private async handleLine( line: string, watch: WatchTarget, schema: TranscriptSchema, filePath: string, sessionIdOverride?: string | null ): Promise { try { const entry = JSON.parse(line); await this.processor.processEntry(entry, watch, schema, sessionIdOverride ?? undefined); } catch (error) { logger.debug('TRANSCRIPT', 'Failed to parse transcript line', { watch: watch.name, file: basename(filePath) }, error as Error); } } private extractSessionIdFromPath(filePath: string): string | null { const match = filePath.match(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i); return match ? match[0] : null; } }