uploade
This commit is contained in:
@@ -101,6 +101,8 @@ class WorkerService {
|
||||
private sessions: Map<number, ActiveSession> = new Map();
|
||||
private chromaSync!: ChromaSync;
|
||||
private sseClients: Set<Response> = new Set();
|
||||
private isProcessing: boolean = false;
|
||||
private spinnerStopTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
constructor() {
|
||||
this.app = express();
|
||||
@@ -126,13 +128,14 @@ class WorkerService {
|
||||
this.app.get('/api/observations', this.handleGetObservations.bind(this));
|
||||
this.app.get('/api/summaries', this.handleGetSummaries.bind(this));
|
||||
this.app.get('/api/prompts', this.handleGetPrompts.bind(this));
|
||||
this.app.get('/api/processing-status', this.handleGetProcessingStatus.bind(this));
|
||||
|
||||
// Session endpoints
|
||||
this.app.post('/sessions/:sessionDbId/init', this.handleInit.bind(this));
|
||||
this.app.post('/sessions/:sessionDbId/observations', this.handleObservation.bind(this));
|
||||
this.app.post('/sessions/:sessionDbId/summarize', this.handleSummarize.bind(this));
|
||||
this.app.post('/sessions/:sessionDbId/complete', this.handleComplete.bind(this));
|
||||
this.app.get('/sessions/:sessionDbId/status', this.handleStatus.bind(this));
|
||||
this.app.delete('/sessions/:sessionDbId', this.handleDelete.bind(this));
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
@@ -274,16 +277,46 @@ class WorkerService {
|
||||
/**
|
||||
* Broadcast processing status to SSE clients
|
||||
*/
|
||||
private broadcastProcessingStatus(claudeSessionId: string, isProcessing: boolean): void {
|
||||
private broadcastProcessingStatus(isProcessing: boolean): void {
|
||||
this.isProcessing = isProcessing;
|
||||
this.broadcastSSE({
|
||||
type: 'processing_status',
|
||||
processing: {
|
||||
session_id: claudeSessionId,
|
||||
is_processing: isProcessing
|
||||
}
|
||||
isProcessing
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if all sessions have empty queues and stop spinner after debounce
|
||||
*/
|
||||
private checkAndStopSpinner(): void {
|
||||
// Clear any existing timer
|
||||
if (this.spinnerStopTimer) {
|
||||
clearTimeout(this.spinnerStopTimer);
|
||||
this.spinnerStopTimer = null;
|
||||
}
|
||||
|
||||
// Check if any session has pending messages
|
||||
const hasPendingMessages = Array.from(this.sessions.values()).some(
|
||||
session => session.pendingMessages.length > 0
|
||||
);
|
||||
|
||||
if (!hasPendingMessages) {
|
||||
// Debounce: wait 1.5s and check again
|
||||
this.spinnerStopTimer = setTimeout(() => {
|
||||
const stillEmpty = Array.from(this.sessions.values()).every(
|
||||
session => session.pendingMessages.length === 0
|
||||
);
|
||||
|
||||
if (stillEmpty) {
|
||||
logger.debug('WORKER', 'All queues empty - stopping spinner');
|
||||
this.broadcastProcessingStatus(false);
|
||||
}
|
||||
|
||||
this.spinnerStopTimer = null;
|
||||
}, 1500);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /api/stats - Return worker and database stats
|
||||
*/
|
||||
@@ -597,6 +630,14 @@ class WorkerService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /api/processing-status
|
||||
* Returns current processing status (boolean)
|
||||
*/
|
||||
private handleGetProcessingStatus(_req: Request, res: Response): void {
|
||||
res.json({ isProcessing: this.isProcessing });
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /sessions/:sessionDbId/init
|
||||
* Body: { project, userPrompt }
|
||||
@@ -691,6 +732,9 @@ class WorkerService {
|
||||
this.sessions.delete(sessionDbId);
|
||||
});
|
||||
|
||||
// Start processing indicator (user submitted prompt)
|
||||
this.broadcastProcessingStatus(true);
|
||||
|
||||
logger.success('WORKER', 'Session initialized', { sessionId: sessionDbId, port: this.port });
|
||||
res.json({
|
||||
status: 'initialized',
|
||||
@@ -755,9 +799,6 @@ class WorkerService {
|
||||
prompt_number
|
||||
});
|
||||
|
||||
// Don't broadcast processing status for observations - only for summaries
|
||||
// Observations are processed continuously, skeleton should only show during summary generation
|
||||
|
||||
res.json({ status: 'queued', queueLength: session.pendingMessages.length });
|
||||
}
|
||||
|
||||
@@ -813,12 +854,24 @@ class WorkerService {
|
||||
prompt_number
|
||||
});
|
||||
|
||||
// Notify UI that processing is active
|
||||
this.broadcastProcessingStatus(session.claudeSessionId, true);
|
||||
|
||||
res.json({ status: 'queued', queueLength: session.pendingMessages.length });
|
||||
}
|
||||
|
||||
/**
|
||||
* POST /sessions/:sessionDbId/complete
|
||||
* Called by cleanup hook to stop spinner when session ends
|
||||
*/
|
||||
private handleComplete(req: Request, res: Response): void {
|
||||
const sessionDbId = parseInt(req.params.sessionDbId, 10);
|
||||
|
||||
logger.info('WORKER', 'Session completed - stopping spinner', { sessionId: sessionDbId });
|
||||
|
||||
// Stop processing indicator
|
||||
this.broadcastProcessingStatus(false);
|
||||
|
||||
res.json({ status: 'ok' });
|
||||
}
|
||||
|
||||
/**
|
||||
* GET /sessions/:sessionDbId/status
|
||||
*/
|
||||
@@ -839,42 +892,6 @@ class WorkerService {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* DELETE /sessions/:sessionDbId
|
||||
*/
|
||||
private async handleDelete(req: Request, res: Response): Promise<void> {
|
||||
const sessionDbId = parseInt(req.params.sessionDbId, 10);
|
||||
|
||||
const session = this.sessions.get(sessionDbId);
|
||||
if (!session) {
|
||||
res.status(404).json({ error: 'Session not found' });
|
||||
return;
|
||||
}
|
||||
|
||||
logger.warn('WORKER', 'Session delete requested', { sessionId: sessionDbId });
|
||||
|
||||
// Abort SDK agent
|
||||
session.abortController.abort();
|
||||
|
||||
// Wait for generator to finish (with timeout)
|
||||
if (session.generatorPromise) {
|
||||
await Promise.race([
|
||||
session.generatorPromise,
|
||||
new Promise(resolve => setTimeout(resolve, 5000))
|
||||
]);
|
||||
}
|
||||
|
||||
// Mark as failed since we're aborting
|
||||
const db = new SessionStore();
|
||||
db.markSessionFailed(sessionDbId);
|
||||
db.close();
|
||||
|
||||
this.sessions.delete(sessionDbId);
|
||||
|
||||
logger.info('WORKER', 'Session deleted', { sessionId: sessionDbId });
|
||||
res.json({ status: 'deleted' });
|
||||
}
|
||||
|
||||
/**
|
||||
* Run SDK agent for a session
|
||||
*/
|
||||
@@ -1149,9 +1166,6 @@ class WorkerService {
|
||||
}
|
||||
});
|
||||
|
||||
// Notify UI that processing is complete (summary is the final step)
|
||||
this.broadcastProcessingStatus(session.claudeSessionId, false);
|
||||
|
||||
// Sync to Chroma (non-blocking fire-and-forget, but crash on failure)
|
||||
this.chromaSync.syncSummary(
|
||||
id,
|
||||
@@ -1178,12 +1192,12 @@ class WorkerService {
|
||||
promptNumber,
|
||||
contentSample: content.substring(0, 500)
|
||||
});
|
||||
|
||||
// Still mark processing as complete even if no summary was generated
|
||||
this.broadcastProcessingStatus(session.claudeSessionId, false);
|
||||
}
|
||||
|
||||
db.close();
|
||||
|
||||
// Check if queue is empty and stop spinner after debounce
|
||||
this.checkAndStopSpinner();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user