From d8d4d2464fe118ccd8e8e780318df0e97a5fbf91 Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Sat, 25 Oct 2025 14:38:06 -0400 Subject: [PATCH] worker --- plugin/scripts/worker-service.cjs | 2 +- src/services/worker-service.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/scripts/worker-service.cjs b/plugin/scripts/worker-service.cjs index fd535008..42e75641 100755 --- a/plugin/scripts/worker-service.cjs +++ b/plugin/scripts/worker-service.cjs @@ -487,7 +487,7 @@ Respond in this XML format: [Additional insights] -IMPORTANT: This is not the end of the session. You will receive more requests to process, and more tool usages to observe and record. The summary helps keep track of progress. Always write at least a minimal summary explaining where we are at currently, even if you didn't learn anything new or complete any work.`}function xg(a,e){let r=[],t=/([\s\S]*?)<\/observation>/g,i;for(;(i=t.exec(a))!==null;){let s=i[1],n=Vt(s,"type"),o=Vt(s,"title"),p=Vt(s,"subtitle"),c=Vt(s,"narrative"),l=ps(s,"facts","fact"),u=ps(s,"concepts","concept"),d=ps(s,"files_read","file"),m=ps(s,"files_modified","file"),v="change";n?["bugfix","feature","refactor","change","discovery","decision"].includes(n.trim())?v=n.trim():ne.warn("PARSER",`Invalid observation type: ${n}, using "change"`,{correlationId:e}):ne.warn("PARSER",'Observation missing type field, using "change"',{correlationId:e});let f=u.filter(g=>g!==v);f.length!==u.length&&ne.warn("PARSER","Removed observation type from concepts array",{correlationId:e,type:v,originalConcepts:u,cleanedConcepts:f}),r.push({type:v,title:o,subtitle:p,facts:l,narrative:c,concepts:f,files_read:d,files_modified:m})}return r}function yg(a,e){let t=//.exec(a);if(t)return ne.info("PARSER","Summary skipped",{sessionId:e,reason:t[1]}),null;let s=/([\s\S]*?)<\/summary>/.exec(a);if(!s)return null;let n=s[1],o=Vt(n,"request"),p=Vt(n,"investigated"),c=Vt(n,"learned"),l=Vt(n,"completed"),u=Vt(n,"next_steps"),d=Vt(n,"notes");return{request:o,investigated:p,learned:c,completed:l,next_steps:u,notes:d}}function Vt(a,e){let t=new RegExp(`<${e}>([^<]*)`).exec(a);if(!t)return null;let i=t[1].trim();return i===""?null:i}function ps(a,e,r){let t=[],s=new RegExp(`<${e}>(.*?)`,"s").exec(a);if(!s)return t;let n=s[1],o=new RegExp(`<${r}>([^<]+)`,"g"),p;for(;(p=o.exec(n))!==null;)t.push(p[1].trim());return t}var K2=process.env.CLAUDE_MEM_MODEL||"claude-sonnet-4-5",Q2=["Glob","Grep","ListMcpResourcesTool","WebSearch"],ls=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),us=class{app;port=null;sessions=new Map;constructor(){this.app=(0,Vc.default)(),this.app.use(Vc.default.json({limit:"50mb"})),this.app.get("/health",this.handleHealth.bind(this)),this.app.post("/sessions/:sessionDbId/init",this.handleInit.bind(this)),this.app.post("/sessions/:sessionDbId/observations",this.handleObservation.bind(this)),this.app.post("/sessions/:sessionDbId/summarize",this.handleSummarize.bind(this)),this.app.get("/sessions/:sessionDbId/status",this.handleStatus.bind(this)),this.app.delete("/sessions/:sessionDbId",this.handleDelete.bind(this))}async start(){this.port=ls;let e=new lt,r=e.cleanupOrphanedSessions();return e.close(),r>0&&ne.info("SYSTEM",`Cleaned up ${r} orphaned sessions`),new Promise((t,i)=>{this.app.listen(ls,"127.0.0.1",()=>{ne.info("SYSTEM","Worker started",{port:ls,pid:process.pid,activeSessions:this.sessions.size}),t()}).on("error",s=>{s.code==="EADDRINUSE"&&ne.error("SYSTEM",`Port ${ls} already in use - worker may already be running`),i(s)})})}handleHealth(e,r){r.json({status:"ok",port:this.port,pid:process.pid,activeSessions:this.sessions.size,uptime:process.uptime(),memory:process.memoryUsage()})}async handleInit(e,r){let t=parseInt(e.params.sessionDbId,10),{project:i,userPrompt:s}=e.body,n=ne.sessionId(t);ne.info("WORKER","Session init",{correlationId:n,project:i});let o=new lt,p=o.getSessionById(t);if(!p){o.close(),r.status(404).json({error:"Session not found in database"});return}let c=p.claude_session_id,l={sessionDbId:t,claudeSessionId:c,sdkSessionId:null,project:i,userPrompt:s,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()};this.sessions.set(t,l),o.setWorkerPort(t,this.port),o.close(),l.generatorPromise=this.runSDKAgent(l).catch(u=>{ne.failure("WORKER","SDK agent error",{sessionId:t},u);let d=new lt;d.markSessionFailed(t),d.close(),this.sessions.delete(t)}),ne.success("WORKER","Session initialized",{sessionId:t,port:this.port}),r.json({status:"initialized",sessionDbId:t,port:this.port})}handleObservation(e,r){let t=parseInt(e.params.sessionDbId,10),{tool_name:i,tool_input:s,tool_output:n,prompt_number:o}=e.body,p=this.sessions.get(t);if(!p){let u=new lt,d=u.getSessionById(t);u.close(),p={sessionDbId:t,claudeSessionId:d.claude_session_id,sdkSessionId:null,project:d.project,userPrompt:d.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,p),p.generatorPromise=this.runSDKAgent(p).catch(m=>{ne.failure("WORKER","SDK agent error",{sessionId:t},m);let v=new lt;v.markSessionFailed(t),v.close(),this.sessions.delete(t)})}p.observationCounter++;let c=ne.correlationId(t,p.observationCounter),l=ne.formatTool(i,s);ne.dataIn("WORKER",`Observation queued: ${l}`,{correlationId:c,queue:p.pendingMessages.length+1}),p.pendingMessages.push({type:"observation",tool_name:i,tool_input:s,tool_output:n,prompt_number:o}),r.json({status:"queued",queueLength:p.pendingMessages.length})}handleSummarize(e,r){let t=parseInt(e.params.sessionDbId,10),{prompt_number:i}=e.body,s=this.sessions.get(t);if(!s){let n=new lt,o=n.getSessionById(t);n.close(),s={sessionDbId:t,claudeSessionId:o.claude_session_id,sdkSessionId:null,project:o.project,userPrompt:o.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,s),s.generatorPromise=this.runSDKAgent(s).catch(p=>{ne.failure("WORKER","SDK agent error",{sessionId:t},p);let c=new lt;c.markSessionFailed(t),c.close(),this.sessions.delete(t)})}ne.dataIn("WORKER","Summary requested",{sessionId:t,promptNumber:i,queue:s.pendingMessages.length+1}),s.pendingMessages.push({type:"summarize",prompt_number:i}),r.json({status:"queued",queueLength:s.pendingMessages.length})}handleStatus(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}r.json({sessionDbId:t,sdkSessionId:i.sdkSessionId,project:i.project,pendingMessages:i.pendingMessages.length})}async handleDelete(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}ne.warn("WORKER","Session delete requested",{sessionId:t}),i.abortController.abort(),i.generatorPromise&&await Promise.race([i.generatorPromise,new Promise(n=>setTimeout(n,5e3))]);let s=new lt;s.markSessionFailed(t),s.close(),this.sessions.delete(t),ne.info("WORKER","Session deleted",{sessionId:t}),r.json({status:"deleted"})}async runSDKAgent(e){ne.info("SDK","Agent starting",{sessionId:e.sessionDbId});let r=process.env.CLAUDE_CODE_PATH||"/usr/local/bin/claude";try{let t=lg({prompt:this.createMessageGenerator(e),options:{model:K2,disallowedTools:Q2,abortController:e.abortController}});for await(let n of t)if(n.type==="assistant"){let o=n.message.content,p=Array.isArray(o)?o.filter(l=>l.type==="text").map(l=>l.text).join(` +IMPORTANT: This is not the end of the session. You will receive more requests to process, and more tool usages to observe and record. The summary helps keep track of progress. Always write at least a minimal summary explaining where we are at currently, even if you didn't learn anything new or complete any work.`}function xg(a,e){let r=[],t=/([\s\S]*?)<\/observation>/g,i;for(;(i=t.exec(a))!==null;){let s=i[1],n=Vt(s,"type"),o=Vt(s,"title"),p=Vt(s,"subtitle"),c=Vt(s,"narrative"),l=ps(s,"facts","fact"),u=ps(s,"concepts","concept"),d=ps(s,"files_read","file"),m=ps(s,"files_modified","file"),v="change";n?["bugfix","feature","refactor","change","discovery","decision"].includes(n.trim())?v=n.trim():ne.warn("PARSER",`Invalid observation type: ${n}, using "change"`,{correlationId:e}):ne.warn("PARSER",'Observation missing type field, using "change"',{correlationId:e});let f=u.filter(g=>g!==v);f.length!==u.length&&ne.warn("PARSER","Removed observation type from concepts array",{correlationId:e,type:v,originalConcepts:u,cleanedConcepts:f}),r.push({type:v,title:o,subtitle:p,facts:l,narrative:c,concepts:f,files_read:d,files_modified:m})}return r}function yg(a,e){let t=//.exec(a);if(t)return ne.info("PARSER","Summary skipped",{sessionId:e,reason:t[1]}),null;let s=/([\s\S]*?)<\/summary>/.exec(a);if(!s)return null;let n=s[1],o=Vt(n,"request"),p=Vt(n,"investigated"),c=Vt(n,"learned"),l=Vt(n,"completed"),u=Vt(n,"next_steps"),d=Vt(n,"notes");return{request:o,investigated:p,learned:c,completed:l,next_steps:u,notes:d}}function Vt(a,e){let t=new RegExp(`<${e}>([^<]*)`).exec(a);if(!t)return null;let i=t[1].trim();return i===""?null:i}function ps(a,e,r){let t=[],s=new RegExp(`<${e}>(.*?)`,"s").exec(a);if(!s)return t;let n=s[1],o=new RegExp(`<${r}>([^<]+)`,"g"),p;for(;(p=o.exec(n))!==null;)t.push(p[1].trim());return t}var K2=process.env.CLAUDE_MEM_MODEL||"claude-sonnet-4-5",Q2=["Glob","Grep","ListMcpResourcesTool","WebSearch"],ls=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),us=class{app;port=null;sessions=new Map;constructor(){this.app=(0,Vc.default)(),this.app.use(Vc.default.json({limit:"50mb"})),this.app.get("/health",this.handleHealth.bind(this)),this.app.post("/sessions/:sessionDbId/init",this.handleInit.bind(this)),this.app.post("/sessions/:sessionDbId/observations",this.handleObservation.bind(this)),this.app.post("/sessions/:sessionDbId/summarize",this.handleSummarize.bind(this)),this.app.get("/sessions/:sessionDbId/status",this.handleStatus.bind(this)),this.app.delete("/sessions/:sessionDbId",this.handleDelete.bind(this))}async start(){this.port=ls;let e=new lt,r=e.cleanupOrphanedSessions();return e.close(),r>0&&ne.info("SYSTEM",`Cleaned up ${r} orphaned sessions`),new Promise((t,i)=>{this.app.listen(ls,"127.0.0.1",()=>{ne.info("SYSTEM","Worker started",{port:ls,pid:process.pid,activeSessions:this.sessions.size}),t()}).on("error",s=>{s.code==="EADDRINUSE"&&ne.error("SYSTEM",`Port ${ls} already in use - worker may already be running`),i(s)})})}handleHealth(e,r){r.json({status:"ok",port:this.port,pid:process.pid,activeSessions:this.sessions.size,uptime:process.uptime(),memory:process.memoryUsage()})}async handleInit(e,r){let t=parseInt(e.params.sessionDbId,10),{project:i,userPrompt:s}=e.body,n=ne.sessionId(t);ne.info("WORKER","Session init",{correlationId:n,project:i});let o=new lt,p=o.getSessionById(t);if(!p){o.close(),r.status(404).json({error:"Session not found in database"});return}let c=p.claude_session_id,l={sessionDbId:t,claudeSessionId:c,sdkSessionId:null,project:i,userPrompt:s,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()};this.sessions.set(t,l),o.setWorkerPort(t,this.port),o.close(),l.generatorPromise=this.runSDKAgent(l).catch(u=>{ne.failure("WORKER","SDK agent error",{sessionId:t},u);let d=new lt;d.markSessionFailed(t),d.close(),this.sessions.delete(t)}),ne.success("WORKER","Session initialized",{sessionId:t,port:this.port}),r.json({status:"initialized",sessionDbId:t,port:this.port})}handleObservation(e,r){let t=parseInt(e.params.sessionDbId,10),{tool_name:i,tool_input:s,tool_output:n,prompt_number:o}=e.body,p=this.sessions.get(t);if(!p){let u=new lt,d=u.getSessionById(t);u.close(),p={sessionDbId:t,claudeSessionId:d.claude_session_id,sdkSessionId:null,project:d.project,userPrompt:d.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,p),p.generatorPromise=this.runSDKAgent(p).catch(m=>{ne.failure("WORKER","SDK agent error",{sessionId:t},m);let v=new lt;v.markSessionFailed(t),v.close(),this.sessions.delete(t)})}p.observationCounter++;let c=ne.correlationId(t,p.observationCounter),l=ne.formatTool(i,s);ne.dataIn("WORKER",`Observation queued: ${l}`,{correlationId:c,queue:p.pendingMessages.length+1}),p.pendingMessages.push({type:"observation",tool_name:i,tool_input:s,tool_output:n,prompt_number:o}),r.json({status:"queued",queueLength:p.pendingMessages.length})}handleSummarize(e,r){let t=parseInt(e.params.sessionDbId,10),{prompt_number:i}=e.body,s=this.sessions.get(t);if(!s){let n=new lt,o=n.getSessionById(t);n.close(),s={sessionDbId:t,claudeSessionId:o.claude_session_id,sdkSessionId:null,project:o.project,userPrompt:o.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,s),s.generatorPromise=this.runSDKAgent(s).catch(p=>{ne.failure("WORKER","SDK agent error",{sessionId:t},p);let c=new lt;c.markSessionFailed(t),c.close(),this.sessions.delete(t)})}ne.dataIn("WORKER","Summary requested",{sessionId:t,promptNumber:i,queue:s.pendingMessages.length+1}),s.pendingMessages.push({type:"summarize",prompt_number:i}),r.json({status:"queued",queueLength:s.pendingMessages.length})}handleStatus(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}r.json({sessionDbId:t,sdkSessionId:i.sdkSessionId,project:i.project,pendingMessages:i.pendingMessages.length})}async handleDelete(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}ne.warn("WORKER","Session delete requested",{sessionId:t}),i.abortController.abort(),i.generatorPromise&&await Promise.race([i.generatorPromise,new Promise(n=>setTimeout(n,5e3))]);let s=new lt;s.markSessionFailed(t),s.close(),this.sessions.delete(t),ne.info("WORKER","Session deleted",{sessionId:t}),r.json({status:"deleted"})}async runSDKAgent(e){ne.info("SDK","Agent starting",{sessionId:e.sessionDbId});let r=process.env.CLAUDE_CODE_PATH||"/usr/local/bin/claude";try{let t=lg({prompt:this.createMessageGenerator(e),options:{model:K2,disallowedTools:Q2,abortController:e.abortController,pathToClaudeCodeExecutable:""}});for await(let n of t)if(n.type==="assistant"){let o=n.message.content,p=Array.isArray(o)?o.filter(l=>l.type==="text").map(l=>l.text).join(` `):typeof o=="string"?o:"",c=p.length;ne.dataOut("SDK",`Response received (${c} chars)`,{sessionId:e.sessionDbId,promptNumber:e.lastPromptNumber}),ne.debug("SDK","Full response",{sessionId:e.sessionDbId},p),this.handleAgentMessage(e,p,e.lastPromptNumber)}let i=Date.now()-e.startTime;ne.success("SDK","Agent completed",{sessionId:e.sessionDbId,duration:`${(i/1e3).toFixed(1)}s`});let s=new lt;s.markSessionCompleted(e.sessionDbId),s.close(),this.sessions.delete(e.sessionDbId)}catch(t){throw t.name==="AbortError"?ne.warn("SDK","Agent aborted",{sessionId:e.sessionDbId}):ne.failure("SDK","Agent error",{sessionId:e.sessionDbId},t),t}}async*createMessageGenerator(e){let r=hg(e.project,e.claudeSessionId,e.userPrompt);for(ne.dataIn("SDK",`Init prompt sent (${r.length} chars)`,{sessionId:e.sessionDbId,claudeSessionId:e.claudeSessionId,project:e.project}),ne.debug("SDK","Full init prompt",{sessionId:e.sessionDbId},r),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:r}};!e.abortController.signal.aborted;){if(e.pendingMessages.length===0){await new Promise(t=>setTimeout(t,100));continue}for(;e.pendingMessages.length>0;){let t=e.pendingMessages.shift();if(t.type==="summarize"){e.lastPromptNumber=t.prompt_number;let i=new lt,s=i.getSessionById(e.sessionDbId);i.close();let n=gg(s);ne.dataIn("SDK",`Summary prompt sent (${n.length} chars)`,{sessionId:e.sessionDbId,promptNumber:t.prompt_number}),ne.debug("SDK","Full summary prompt",{sessionId:e.sessionDbId},n),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:n}}}else if(t.type==="observation"){e.lastPromptNumber=t.prompt_number;let i=vg({id:0,tool_name:t.tool_name,tool_input:t.tool_input,tool_output:t.tool_output,created_at_epoch:Date.now()}),s=ne.formatTool(t.tool_name,t.tool_input),n=ne.correlationId(e.sessionDbId,e.observationCounter);ne.dataIn("SDK",`Observation prompt: ${s}`,{correlationId:n,promptNumber:t.prompt_number,size:`${i.length} chars`}),ne.debug("SDK","Full observation prompt",{correlationId:n},i),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:i}}}}}}handleAgentMessage(e,r,t){let i=ne.correlationId(e.sessionDbId,e.observationCounter);ne.info("PARSER",`Processing response (${r.length} chars)`,{sessionId:e.sessionDbId,promptNumber:t,preview:r.substring(0,200)});let s=xg(r,i);s.length>0&&ne.info("PARSER",`Parsed ${s.length} observation(s)`,{correlationId:i,promptNumber:t,types:s.map(p=>p.type).join(", ")});let n=new lt;for(let p of s)n.storeObservation(e.claudeSessionId,e.project,p,t),ne.success("DB","Observation stored",{correlationId:i,type:p.type,title:p.title});ne.info("PARSER","Looking for summary tags...",{sessionId:e.sessionDbId});let o=yg(r,e.sessionDbId);o?(ne.success("PARSER","Summary parsed successfully!",{sessionId:e.sessionDbId,promptNumber:t,hasRequest:!!o.request,hasInvestigated:!!o.investigated,hasLearned:!!o.learned,hasCompleted:!!o.completed,hasNextSteps:!!o.next_steps}),n.storeSummary(e.claudeSessionId,e.project,o,t),ne.success("DB","\u{1F4DD} SUMMARY STORED IN DATABASE",{sessionId:e.sessionDbId,promptNumber:t})):ne.warn("PARSER","NO SUMMARY TAGS FOUND in response",{sessionId:e.sessionDbId,promptNumber:t,contentSample:r.substring(0,500)}),n.close()}};async function J2(){await new us().start(),process.on("SIGINT",()=>{ne.warn("SYSTEM","Shutting down (SIGINT)"),process.exit(0)}),process.on("SIGTERM",()=>{ne.warn("SYSTEM","Shutting down (SIGTERM)"),process.exit(0)})}J2().catch(a=>{ne.failure("SYSTEM","Fatal startup error",{},a),process.exit(1)});0&&(module.exports={WorkerService}); /*! Bundled license information: diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 05df1377..561e5450 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -353,8 +353,8 @@ class WorkerService { options: { model: MODEL, disallowedTools: DISALLOWED_TOOLS, - abortController: session.abortController - // pathToClaudeCodeExecutable: claudePath + abortController: session.abortController, + pathToClaudeCodeExecutable: '' } });