chore: bump version to 10.0.2
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,224 @@
|
||||
import { existsSync, statSync, watch as fsWatch, createReadStream } from 'fs';
|
||||
import { basename, join } from 'path';
|
||||
import { globSync } from 'glob';
|
||||
import { logger } from '../../utils/logger.js';
|
||||
import { expandHomePath } from './config.js';
|
||||
import { loadWatchState, saveWatchState, type TranscriptWatchState } from './state.js';
|
||||
import type { TranscriptWatchConfig, TranscriptSchema, WatchTarget } from './types.js';
|
||||
import { TranscriptEventProcessor } from './processor.js';
|
||||
|
||||
interface TailState {
|
||||
offset: number;
|
||||
partial: string;
|
||||
}
|
||||
|
||||
class FileTailer {
|
||||
private watcher: ReturnType<typeof fsWatch> | null = null;
|
||||
private tailState: TailState;
|
||||
|
||||
constructor(
|
||||
private filePath: string,
|
||||
initialOffset: number,
|
||||
private onLine: (line: string) => Promise<void>,
|
||||
private onOffset: (offset: number) => void
|
||||
) {
|
||||
this.tailState = { offset: initialOffset, partial: '' };
|
||||
}
|
||||
|
||||
start(): void {
|
||||
this.readNewData().catch(() => undefined);
|
||||
this.watcher = fsWatch(this.filePath, { persistent: true }, () => {
|
||||
this.readNewData().catch(() => undefined);
|
||||
});
|
||||
}
|
||||
|
||||
close(): void {
|
||||
this.watcher?.close();
|
||||
this.watcher = null;
|
||||
}
|
||||
|
||||
private async readNewData(): Promise<void> {
|
||||
if (!existsSync(this.filePath)) return;
|
||||
|
||||
let size = 0;
|
||||
try {
|
||||
size = statSync(this.filePath).size;
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
|
||||
if (size < this.tailState.offset) {
|
||||
this.tailState.offset = 0;
|
||||
}
|
||||
|
||||
if (size === this.tailState.offset) return;
|
||||
|
||||
const stream = createReadStream(this.filePath, {
|
||||
start: this.tailState.offset,
|
||||
end: size - 1,
|
||||
encoding: 'utf8'
|
||||
});
|
||||
|
||||
let data = '';
|
||||
for await (const chunk of stream) {
|
||||
data += chunk as string;
|
||||
}
|
||||
|
||||
this.tailState.offset = size;
|
||||
this.onOffset(this.tailState.offset);
|
||||
|
||||
const combined = this.tailState.partial + data;
|
||||
const lines = combined.split('\n');
|
||||
this.tailState.partial = lines.pop() ?? '';
|
||||
|
||||
for (const line of lines) {
|
||||
const trimmed = line.trim();
|
||||
if (!trimmed) continue;
|
||||
await this.onLine(trimmed);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class TranscriptWatcher {
|
||||
private processor = new TranscriptEventProcessor();
|
||||
private tailers = new Map<string, FileTailer>();
|
||||
private state: TranscriptWatchState;
|
||||
private rescanTimers: Array<NodeJS.Timeout> = [];
|
||||
|
||||
constructor(private config: TranscriptWatchConfig, private statePath: string) {
|
||||
this.state = loadWatchState(statePath);
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
for (const watch of this.config.watches) {
|
||||
await this.setupWatch(watch);
|
||||
}
|
||||
}
|
||||
|
||||
stop(): void {
|
||||
for (const tailer of this.tailers.values()) {
|
||||
tailer.close();
|
||||
}
|
||||
this.tailers.clear();
|
||||
for (const timer of this.rescanTimers) {
|
||||
clearInterval(timer);
|
||||
}
|
||||
this.rescanTimers = [];
|
||||
}
|
||||
|
||||
private async setupWatch(watch: WatchTarget): Promise<void> {
|
||||
const schema = this.resolveSchema(watch);
|
||||
if (!schema) {
|
||||
logger.warn('TRANSCRIPT', 'Missing schema for watch', { watch: watch.name });
|
||||
return;
|
||||
}
|
||||
|
||||
const resolvedPath = expandHomePath(watch.path);
|
||||
const files = this.resolveWatchFiles(resolvedPath);
|
||||
|
||||
for (const filePath of files) {
|
||||
await this.addTailer(filePath, watch, schema);
|
||||
}
|
||||
|
||||
const rescanIntervalMs = watch.rescanIntervalMs ?? 5000;
|
||||
const timer = setInterval(async () => {
|
||||
const newFiles = this.resolveWatchFiles(resolvedPath);
|
||||
for (const filePath of newFiles) {
|
||||
if (!this.tailers.has(filePath)) {
|
||||
await this.addTailer(filePath, watch, schema);
|
||||
}
|
||||
}
|
||||
}, rescanIntervalMs);
|
||||
this.rescanTimers.push(timer);
|
||||
}
|
||||
|
||||
private resolveSchema(watch: WatchTarget): TranscriptSchema | null {
|
||||
if (typeof watch.schema === 'string') {
|
||||
return this.config.schemas?.[watch.schema] ?? null;
|
||||
}
|
||||
return watch.schema;
|
||||
}
|
||||
|
||||
private resolveWatchFiles(inputPath: string): string[] {
|
||||
if (this.hasGlob(inputPath)) {
|
||||
return globSync(inputPath, { nodir: true, absolute: true });
|
||||
}
|
||||
|
||||
if (existsSync(inputPath)) {
|
||||
try {
|
||||
const stat = statSync(inputPath);
|
||||
if (stat.isDirectory()) {
|
||||
const pattern = join(inputPath, '**', '*.jsonl');
|
||||
return globSync(pattern, { nodir: true, absolute: true });
|
||||
}
|
||||
return [inputPath];
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
private hasGlob(inputPath: string): boolean {
|
||||
return /[*?[\]{}()]/.test(inputPath);
|
||||
}
|
||||
|
||||
private async addTailer(filePath: string, watch: WatchTarget, schema: TranscriptSchema): Promise<void> {
|
||||
if (this.tailers.has(filePath)) return;
|
||||
|
||||
const sessionIdOverride = this.extractSessionIdFromPath(filePath);
|
||||
|
||||
let offset = this.state.offsets[filePath] ?? 0;
|
||||
if (offset === 0 && watch.startAtEnd) {
|
||||
try {
|
||||
offset = statSync(filePath).size;
|
||||
} catch {
|
||||
offset = 0;
|
||||
}
|
||||
}
|
||||
|
||||
const tailer = new FileTailer(
|
||||
filePath,
|
||||
offset,
|
||||
async (line: string) => {
|
||||
await this.handleLine(line, watch, schema, filePath, sessionIdOverride);
|
||||
},
|
||||
(newOffset: number) => {
|
||||
this.state.offsets[filePath] = newOffset;
|
||||
saveWatchState(this.statePath, this.state);
|
||||
}
|
||||
);
|
||||
|
||||
tailer.start();
|
||||
this.tailers.set(filePath, tailer);
|
||||
logger.info('TRANSCRIPT', 'Watching transcript file', {
|
||||
file: filePath,
|
||||
watch: watch.name,
|
||||
schema: schema.name
|
||||
});
|
||||
}
|
||||
|
||||
private async handleLine(
|
||||
line: string,
|
||||
watch: WatchTarget,
|
||||
schema: TranscriptSchema,
|
||||
filePath: string,
|
||||
sessionIdOverride?: string | null
|
||||
): Promise<void> {
|
||||
try {
|
||||
const entry = JSON.parse(line);
|
||||
await this.processor.processEntry(entry, watch, schema, sessionIdOverride ?? undefined);
|
||||
} catch (error) {
|
||||
logger.debug('TRANSCRIPT', 'Failed to parse transcript line', {
|
||||
watch: watch.name,
|
||||
file: basename(filePath)
|
||||
}, error as Error);
|
||||
}
|
||||
}
|
||||
|
||||
private extractSessionIdFromPath(filePath: string): string | null {
|
||||
const match = filePath.match(/[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i);
|
||||
return match ? match[0] : null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user