diff --git a/plugin/scripts/worker-service.cjs b/plugin/scripts/worker-service.cjs index 9f6e420a..a2b448da 100755 --- a/plugin/scripts/worker-service.cjs +++ b/plugin/scripts/worker-service.cjs @@ -786,7 +786,7 @@ No previous sessions found for this project yet.`;let g=d.slice(0,r.sessionCount FROM user_prompts up JOIN sdk_sessions s ON up.claude_session_id = s.claude_session_id WHERE s.project = ? - `).get(this.project);O.info("CHROMA_SYNC","Backfilling user prompts",{project:this.project,missing:_.length,existing:e.prompts.size,total:p.count});let h=[];for(let y of _)h.push(this.formatUserPromptDoc(y));for(let y=0;y{O.error("DB","Chroma backfill failed (non-fatal)",{},e)}),O.info("DB","Database initialized")}async close(){this.chromaSync&&(await this.chromaSync.close(),this.chromaSync=null),this.sessionStore&&(this.sessionStore.close(),this.sessionStore=null),this.sessionSearch&&(this.sessionSearch.close(),this.sessionSearch=null),O.info("DB","Database closed")}getSessionStore(){if(!this.sessionStore)throw new Error("Database not initialized");return this.sessionStore}getSessionSearch(){if(!this.sessionSearch)throw new Error("Database not initialized");return this.sessionSearch}getChromaSync(){if(!this.chromaSync)throw new Error("ChromaSync not initialized");return this.chromaSync}getSessionById(e){let r=this.getSessionStore().getSessionById(e);if(!r)throw new Error(`Session ${e} not found`);return r}};var C1=require("events");ot();Po();var Qu=class{dbManager;sessions=new Map;sessionQueues=new Map;onSessionDeletedCallback;pendingStore=null;constructor(e){this.dbManager=e}getPendingStore(){if(!this.pendingStore){let e=this.dbManager.getSessionStore();this.pendingStore=new Ro(e.db,3)}return this.pendingStore}setOnSessionDeleted(e){this.onSessionDeletedCallback=e}initializeSession(e,r,n){O.info("SESSION","initializeSession called",{sessionDbId:e,promptNumber:n,has_currentUserPrompt:!!r});let a=this.sessions.get(e);if(a){O.info("SESSION","Returning cached session",{sessionDbId:e,claudeSessionId:a.claudeSessionId,lastPromptNumber:a.lastPromptNumber});let c=this.dbManager.getSessionById(e);return c.project&&c.project!==a.project&&(O.debug("SESSION","Updating project from database",{sessionDbId:e,oldProject:a.project,newProject:c.project}),a.project=c.project),r?(O.debug("SESSION","Updating userPrompt for continuation",{sessionDbId:e,promptNumber:n,oldPrompt:a.userPrompt.substring(0,80),newPrompt:r.substring(0,80)}),a.userPrompt=r,a.lastPromptNumber=n||a.lastPromptNumber):O.debug("SESSION","No currentUserPrompt provided for existing session",{sessionDbId:e,promptNumber:n,usingCachedPrompt:a.userPrompt.substring(0,80)}),a}let s=this.dbManager.getSessionById(e);O.info("SESSION","Fetched session from database",{sessionDbId:e,claude_session_id:s.claude_session_id,sdk_session_id:s.sdk_session_id});let i=r||s.user_prompt;r?O.debug("SESSION","Initializing session with fresh userPrompt",{sessionDbId:e,promptNumber:n,userPrompt:r.substring(0,80)}):O.debug("SESSION","No currentUserPrompt provided for new session, using database",{sessionDbId:e,promptNumber:n,dbPrompt:s.user_prompt.substring(0,80)}),a={sessionDbId:e,claudeSessionId:s.claude_session_id,sdkSessionId:null,project:s.project,userPrompt:i,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:n||this.dbManager.getSessionStore().getPromptNumberFromUserPrompts(s.claude_session_id),startTime:Date.now(),cumulativeInputTokens:0,cumulativeOutputTokens:0,pendingProcessingIds:new Set,earliestPendingTimestamp:null,conversationHistory:[],currentProvider:null},O.info("SESSION","Creating new session object",{sessionDbId:e,claudeSessionId:s.claude_session_id,lastPromptNumber:n||this.dbManager.getSessionStore().getPromptNumberFromUserPrompts(s.claude_session_id)}),this.sessions.set(e,a);let o=new C1.EventEmitter;return this.sessionQueues.set(e,o),O.info("SESSION","Session initialized",{sessionId:e,project:a.project,claudeSessionId:a.claudeSessionId,queueDepth:0,hasGenerator:!1}),a}getSession(e){return this.sessions.get(e)}queueObservation(e,r){let n=this.sessions.get(e);n||(n=this.initializeSession(e));let a=n.pendingMessages.length,s={type:"observation",tool_name:r.tool_name,tool_input:r.tool_input,tool_response:r.tool_response,prompt_number:r.prompt_number,cwd:r.cwd};try{let u=this.getPendingStore().enqueue(e,n.claudeSessionId,s);O.debug("SESSION","Observation persisted to DB",{sessionId:e,messageId:u,tool:r.tool_name})}catch(u){throw O.error("SESSION","Failed to persist observation to DB",{sessionId:e,tool:r.tool_name},u),u}n.pendingMessages.push(s);let i=n.pendingMessages.length;this.sessionQueues.get(e)?.emit("message");let c=O.formatTool(r.tool_name,r.tool_input);O.info("SESSION",`Observation queued (${a}\u2192${i})`,{sessionId:e,tool:c,hasGenerator:!!n.generatorPromise})}queueSummarize(e,r,n){let a=this.sessions.get(e);a||(a=this.initializeSession(e));let s=a.pendingMessages.length,i={type:"summarize",last_user_message:r,last_assistant_message:n};try{let u=this.getPendingStore().enqueue(e,a.claudeSessionId,i);O.debug("SESSION","Summarize persisted to DB",{sessionId:e,messageId:u})}catch(u){throw O.error("SESSION","Failed to persist summarize to DB",{sessionId:e},u),u}a.pendingMessages.push(i);let o=a.pendingMessages.length;this.sessionQueues.get(e)?.emit("message"),O.info("SESSION",`Summarize queued (${s}\u2192${o})`,{sessionId:e,hasGenerator:!!a.generatorPromise})}async deleteSession(e){let r=this.sessions.get(e);if(!r)return;let n=Date.now()-r.startTime;r.abortController.abort(),r.generatorPromise&&await r.generatorPromise.catch(()=>{}),this.sessions.delete(e),this.sessionQueues.delete(e),O.info("SESSION","Session deleted",{sessionId:e,duration:`${(n/1e3).toFixed(1)}s`,project:r.project}),this.onSessionDeletedCallback&&this.onSessionDeletedCallback()}async shutdownAll(){let e=Array.from(this.sessions.keys());await Promise.all(e.map(r=>this.deleteSession(r)))}hasPendingMessages(){return Array.from(this.sessions.values()).some(e=>e.pendingMessages.length>0)}getActiveSessionCount(){return this.sessions.size}getTotalQueueDepth(){let e=0;for(let r of this.sessions.values())e+=r.pendingMessages.length;return e}getTotalActiveWork(){let e=0;for(let r of this.sessions.values())e+=r.pendingMessages.length,r.generatorPromise!==null&&(e+=1);return e}isAnySessionProcessing(){for(let e of this.sessions.values())if(e.pendingMessages.length>0||e.generatorPromise!==null)return!0;return!1}async*getMessageIterator(e){let r=this.sessions.get(e);r||(r=this.initializeSession(e));let n=this.sessionQueues.get(e);if(!n)throw new Error(`No emitter for session ${e}`);for(;!r.abortController.signal.aborted;){let a=this.getPendingStore().peekPending(e);if(!a){if(await new Promise(o=>{let c=()=>{n.off("message",c),o()},u=()=>{n.off("message",c),o()};n.once("message",c),r.abortController.signal.addEventListener("abort",u,{once:!0})}),this.getPendingStore().peekPending(e))continue;if(r.abortController.signal.aborted){O.info("SESSION","Generator exiting due to abort",{sessionId:e});return}continue}this.getPendingStore().markProcessing(a.id),r.pendingProcessingIds.add(a.id),r.earliestPendingTimestamp===null?r.earliestPendingTimestamp=a.created_at_epoch:r.earliestPendingTimestamp=Math.min(r.earliestPendingTimestamp,a.created_at_epoch);let s={_persistentId:a.id,_originalTimestamp:a.created_at_epoch,...this.getPendingStore().toPendingMessage(a)};if(r.pendingMessages.push(s),yield s,r.pendingMessages.shift(),s.type==="summarize"){O.info("SESSION","Summary yielded - ending generator",{sessionId:e});return}}}getPendingMessageStore(){return this.getPendingStore()}};ot();var el=class{sseClients=new Set;addClient(e){this.sseClients.add(e),O.debug("WORKER","Client connected",{total:this.sseClients.size}),e.on("close",()=>{this.removeClient(e)}),this.sendToClient(e,{type:"connected",timestamp:Date.now()})}removeClient(e){this.sseClients.delete(e),O.debug("WORKER","Client disconnected",{total:this.sseClients.size})}broadcast(e){if(this.sseClients.size===0){O.debug("WORKER","SSE broadcast skipped (no clients)",{eventType:e.type});return}let r={...e,timestamp:Date.now()},n=`data: ${JSON.stringify(r)} + `).get(this.project);O.info("CHROMA_SYNC","Backfilling user prompts",{project:this.project,missing:_.length,existing:e.prompts.size,total:p.count});let h=[];for(let y of _)h.push(this.formatUserPromptDoc(y));for(let y=0;y{}),this.sessions.delete(e),this.sessionQueues.delete(e),O.info("SESSION","Session deleted",{sessionId:e,duration:`${(n/1e3).toFixed(1)}s`,project:r.project}),this.onSessionDeletedCallback&&this.onSessionDeletedCallback()}async shutdownAll(){let e=Array.from(this.sessions.keys());await Promise.all(e.map(r=>this.deleteSession(r)))}hasPendingMessages(){return Array.from(this.sessions.values()).some(e=>e.pendingMessages.length>0)}getActiveSessionCount(){return this.sessions.size}getTotalQueueDepth(){let e=0;for(let r of this.sessions.values())e+=r.pendingMessages.length;return e}getTotalActiveWork(){let e=0;for(let r of this.sessions.values())e+=r.pendingMessages.length,r.generatorPromise!==null&&(e+=1);return e}isAnySessionProcessing(){for(let e of this.sessions.values())if(e.pendingMessages.length>0||e.generatorPromise!==null)return!0;return!1}async*getMessageIterator(e){let r=this.sessions.get(e);r||(r=this.initializeSession(e));let n=this.sessionQueues.get(e);if(!n)throw new Error(`No emitter for session ${e}`);for(;!r.abortController.signal.aborted;){let a=this.getPendingStore().peekPending(e);if(!a){if(await new Promise(o=>{let c=()=>{n.off("message",c),o()},u=()=>{n.off("message",c),o()};n.once("message",c),r.abortController.signal.addEventListener("abort",u,{once:!0})}),this.getPendingStore().peekPending(e))continue;if(r.abortController.signal.aborted){O.info("SESSION","Generator exiting due to abort",{sessionId:e});return}continue}this.getPendingStore().markProcessing(a.id),r.pendingProcessingIds.add(a.id),r.earliestPendingTimestamp===null?r.earliestPendingTimestamp=a.created_at_epoch:r.earliestPendingTimestamp=Math.min(r.earliestPendingTimestamp,a.created_at_epoch);let s={_persistentId:a.id,_originalTimestamp:a.created_at_epoch,...this.getPendingStore().toPendingMessage(a)};if(r.pendingMessages.push(s),yield s,r.pendingMessages.shift(),s.type==="summarize"){O.info("SESSION","Summary yielded - ending generator",{sessionId:e});return}}}getPendingMessageStore(){return this.getPendingStore()}};ot();var el=class{sseClients=new Set;addClient(e){this.sseClients.add(e),O.debug("WORKER","Client connected",{total:this.sseClients.size}),e.on("close",()=>{this.removeClient(e)}),this.sendToClient(e,{type:"connected",timestamp:Date.now()})}removeClient(e){this.sseClients.delete(e),O.debug("WORKER","Client disconnected",{total:this.sseClients.size})}broadcast(e){if(this.sseClients.size===0){O.debug("WORKER","SSE broadcast skipped (no clients)",{eventType:e.type});return}let r={...e,timestamp:Date.now()},n=`data: ${JSON.stringify(r)} `;O.debug("WORKER","SSE broadcast sent",{eventType:e.type,clients:this.sseClients.size});for(let a of this.sseClients)a.write(n)}getClientCount(){return this.sseClients.size}sendToClient(e,r){let n=`data: ${JSON.stringify(r)} diff --git a/src/services/worker/DatabaseManager.ts b/src/services/worker/DatabaseManager.ts index 677ac3d8..353e8a1a 100644 --- a/src/services/worker/DatabaseManager.ts +++ b/src/services/worker/DatabaseManager.ts @@ -27,14 +27,9 @@ export class DatabaseManager { this.sessionStore = new SessionStore(); this.sessionSearch = new SessionSearch(); - // Initialize ChromaSync + // Initialize ChromaSync (lazy - connects on first search, not at startup) this.chromaSync = new ChromaSync('claude-mem'); - // Start background backfill (fire-and-forget) - this.chromaSync.ensureBackfilled().catch(error => { - logger.error('DB', 'Chroma backfill failed (non-fatal)', {}, error); - }); - logger.info('DB', 'Database initialized'); }