From 8f8649c2a8390e5a20f0fb8c9c5cc506a1028646 Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Sat, 25 Oct 2025 14:41:16 -0400 Subject: [PATCH] worker --- plugin/scripts/worker-service.cjs | 2 +- src/sdk/worker.ts | 4 ++-- src/services/worker-service.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/plugin/scripts/worker-service.cjs b/plugin/scripts/worker-service.cjs index 42e75641..875b6a91 100755 --- a/plugin/scripts/worker-service.cjs +++ b/plugin/scripts/worker-service.cjs @@ -487,7 +487,7 @@ Respond in this XML format: [Additional insights] -IMPORTANT: This is not the end of the session. You will receive more requests to process, and more tool usages to observe and record. The summary helps keep track of progress. Always write at least a minimal summary explaining where we are at currently, even if you didn't learn anything new or complete any work.`}function xg(a,e){let r=[],t=/([\s\S]*?)<\/observation>/g,i;for(;(i=t.exec(a))!==null;){let s=i[1],n=Vt(s,"type"),o=Vt(s,"title"),p=Vt(s,"subtitle"),c=Vt(s,"narrative"),l=ps(s,"facts","fact"),u=ps(s,"concepts","concept"),d=ps(s,"files_read","file"),m=ps(s,"files_modified","file"),v="change";n?["bugfix","feature","refactor","change","discovery","decision"].includes(n.trim())?v=n.trim():ne.warn("PARSER",`Invalid observation type: ${n}, using "change"`,{correlationId:e}):ne.warn("PARSER",'Observation missing type field, using "change"',{correlationId:e});let f=u.filter(g=>g!==v);f.length!==u.length&&ne.warn("PARSER","Removed observation type from concepts array",{correlationId:e,type:v,originalConcepts:u,cleanedConcepts:f}),r.push({type:v,title:o,subtitle:p,facts:l,narrative:c,concepts:f,files_read:d,files_modified:m})}return r}function yg(a,e){let t=//.exec(a);if(t)return ne.info("PARSER","Summary skipped",{sessionId:e,reason:t[1]}),null;let s=/([\s\S]*?)<\/summary>/.exec(a);if(!s)return null;let n=s[1],o=Vt(n,"request"),p=Vt(n,"investigated"),c=Vt(n,"learned"),l=Vt(n,"completed"),u=Vt(n,"next_steps"),d=Vt(n,"notes");return{request:o,investigated:p,learned:c,completed:l,next_steps:u,notes:d}}function Vt(a,e){let t=new RegExp(`<${e}>([^<]*)`).exec(a);if(!t)return null;let i=t[1].trim();return i===""?null:i}function ps(a,e,r){let t=[],s=new RegExp(`<${e}>(.*?)`,"s").exec(a);if(!s)return t;let n=s[1],o=new RegExp(`<${r}>([^<]+)`,"g"),p;for(;(p=o.exec(n))!==null;)t.push(p[1].trim());return t}var K2=process.env.CLAUDE_MEM_MODEL||"claude-sonnet-4-5",Q2=["Glob","Grep","ListMcpResourcesTool","WebSearch"],ls=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),us=class{app;port=null;sessions=new Map;constructor(){this.app=(0,Vc.default)(),this.app.use(Vc.default.json({limit:"50mb"})),this.app.get("/health",this.handleHealth.bind(this)),this.app.post("/sessions/:sessionDbId/init",this.handleInit.bind(this)),this.app.post("/sessions/:sessionDbId/observations",this.handleObservation.bind(this)),this.app.post("/sessions/:sessionDbId/summarize",this.handleSummarize.bind(this)),this.app.get("/sessions/:sessionDbId/status",this.handleStatus.bind(this)),this.app.delete("/sessions/:sessionDbId",this.handleDelete.bind(this))}async start(){this.port=ls;let e=new lt,r=e.cleanupOrphanedSessions();return e.close(),r>0&&ne.info("SYSTEM",`Cleaned up ${r} orphaned sessions`),new Promise((t,i)=>{this.app.listen(ls,"127.0.0.1",()=>{ne.info("SYSTEM","Worker started",{port:ls,pid:process.pid,activeSessions:this.sessions.size}),t()}).on("error",s=>{s.code==="EADDRINUSE"&&ne.error("SYSTEM",`Port ${ls} already in use - worker may already be running`),i(s)})})}handleHealth(e,r){r.json({status:"ok",port:this.port,pid:process.pid,activeSessions:this.sessions.size,uptime:process.uptime(),memory:process.memoryUsage()})}async handleInit(e,r){let t=parseInt(e.params.sessionDbId,10),{project:i,userPrompt:s}=e.body,n=ne.sessionId(t);ne.info("WORKER","Session init",{correlationId:n,project:i});let o=new lt,p=o.getSessionById(t);if(!p){o.close(),r.status(404).json({error:"Session not found in database"});return}let c=p.claude_session_id,l={sessionDbId:t,claudeSessionId:c,sdkSessionId:null,project:i,userPrompt:s,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()};this.sessions.set(t,l),o.setWorkerPort(t,this.port),o.close(),l.generatorPromise=this.runSDKAgent(l).catch(u=>{ne.failure("WORKER","SDK agent error",{sessionId:t},u);let d=new lt;d.markSessionFailed(t),d.close(),this.sessions.delete(t)}),ne.success("WORKER","Session initialized",{sessionId:t,port:this.port}),r.json({status:"initialized",sessionDbId:t,port:this.port})}handleObservation(e,r){let t=parseInt(e.params.sessionDbId,10),{tool_name:i,tool_input:s,tool_output:n,prompt_number:o}=e.body,p=this.sessions.get(t);if(!p){let u=new lt,d=u.getSessionById(t);u.close(),p={sessionDbId:t,claudeSessionId:d.claude_session_id,sdkSessionId:null,project:d.project,userPrompt:d.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,p),p.generatorPromise=this.runSDKAgent(p).catch(m=>{ne.failure("WORKER","SDK agent error",{sessionId:t},m);let v=new lt;v.markSessionFailed(t),v.close(),this.sessions.delete(t)})}p.observationCounter++;let c=ne.correlationId(t,p.observationCounter),l=ne.formatTool(i,s);ne.dataIn("WORKER",`Observation queued: ${l}`,{correlationId:c,queue:p.pendingMessages.length+1}),p.pendingMessages.push({type:"observation",tool_name:i,tool_input:s,tool_output:n,prompt_number:o}),r.json({status:"queued",queueLength:p.pendingMessages.length})}handleSummarize(e,r){let t=parseInt(e.params.sessionDbId,10),{prompt_number:i}=e.body,s=this.sessions.get(t);if(!s){let n=new lt,o=n.getSessionById(t);n.close(),s={sessionDbId:t,claudeSessionId:o.claude_session_id,sdkSessionId:null,project:o.project,userPrompt:o.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,s),s.generatorPromise=this.runSDKAgent(s).catch(p=>{ne.failure("WORKER","SDK agent error",{sessionId:t},p);let c=new lt;c.markSessionFailed(t),c.close(),this.sessions.delete(t)})}ne.dataIn("WORKER","Summary requested",{sessionId:t,promptNumber:i,queue:s.pendingMessages.length+1}),s.pendingMessages.push({type:"summarize",prompt_number:i}),r.json({status:"queued",queueLength:s.pendingMessages.length})}handleStatus(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}r.json({sessionDbId:t,sdkSessionId:i.sdkSessionId,project:i.project,pendingMessages:i.pendingMessages.length})}async handleDelete(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}ne.warn("WORKER","Session delete requested",{sessionId:t}),i.abortController.abort(),i.generatorPromise&&await Promise.race([i.generatorPromise,new Promise(n=>setTimeout(n,5e3))]);let s=new lt;s.markSessionFailed(t),s.close(),this.sessions.delete(t),ne.info("WORKER","Session deleted",{sessionId:t}),r.json({status:"deleted"})}async runSDKAgent(e){ne.info("SDK","Agent starting",{sessionId:e.sessionDbId});let r=process.env.CLAUDE_CODE_PATH||"/usr/local/bin/claude";try{let t=lg({prompt:this.createMessageGenerator(e),options:{model:K2,disallowedTools:Q2,abortController:e.abortController,pathToClaudeCodeExecutable:""}});for await(let n of t)if(n.type==="assistant"){let o=n.message.content,p=Array.isArray(o)?o.filter(l=>l.type==="text").map(l=>l.text).join(` +IMPORTANT: This is not the end of the session. You will receive more requests to process, and more tool usages to observe and record. The summary helps keep track of progress. Always write at least a minimal summary explaining where we are at currently, even if you didn't learn anything new or complete any work.`}function xg(a,e){let r=[],t=/([\s\S]*?)<\/observation>/g,i;for(;(i=t.exec(a))!==null;){let s=i[1],n=Vt(s,"type"),o=Vt(s,"title"),p=Vt(s,"subtitle"),c=Vt(s,"narrative"),l=ps(s,"facts","fact"),u=ps(s,"concepts","concept"),d=ps(s,"files_read","file"),m=ps(s,"files_modified","file"),v="change";n?["bugfix","feature","refactor","change","discovery","decision"].includes(n.trim())?v=n.trim():ne.warn("PARSER",`Invalid observation type: ${n}, using "change"`,{correlationId:e}):ne.warn("PARSER",'Observation missing type field, using "change"',{correlationId:e});let f=u.filter(g=>g!==v);f.length!==u.length&&ne.warn("PARSER","Removed observation type from concepts array",{correlationId:e,type:v,originalConcepts:u,cleanedConcepts:f}),r.push({type:v,title:o,subtitle:p,facts:l,narrative:c,concepts:f,files_read:d,files_modified:m})}return r}function yg(a,e){let t=//.exec(a);if(t)return ne.info("PARSER","Summary skipped",{sessionId:e,reason:t[1]}),null;let s=/([\s\S]*?)<\/summary>/.exec(a);if(!s)return null;let n=s[1],o=Vt(n,"request"),p=Vt(n,"investigated"),c=Vt(n,"learned"),l=Vt(n,"completed"),u=Vt(n,"next_steps"),d=Vt(n,"notes");return{request:o,investigated:p,learned:c,completed:l,next_steps:u,notes:d}}function Vt(a,e){let t=new RegExp(`<${e}>([^<]*)`).exec(a);if(!t)return null;let i=t[1].trim();return i===""?null:i}function ps(a,e,r){let t=[],s=new RegExp(`<${e}>(.*?)`,"s").exec(a);if(!s)return t;let n=s[1],o=new RegExp(`<${r}>([^<]+)`,"g"),p;for(;(p=o.exec(n))!==null;)t.push(p[1].trim());return t}var K2=process.env.CLAUDE_MEM_MODEL||"claude-sonnet-4-5",Q2=["Glob","Grep","ListMcpResourcesTool","WebSearch"],ls=parseInt(process.env.CLAUDE_MEM_WORKER_PORT||"37777",10),us=class{app;port=null;sessions=new Map;constructor(){this.app=(0,Vc.default)(),this.app.use(Vc.default.json({limit:"50mb"})),this.app.get("/health",this.handleHealth.bind(this)),this.app.post("/sessions/:sessionDbId/init",this.handleInit.bind(this)),this.app.post("/sessions/:sessionDbId/observations",this.handleObservation.bind(this)),this.app.post("/sessions/:sessionDbId/summarize",this.handleSummarize.bind(this)),this.app.get("/sessions/:sessionDbId/status",this.handleStatus.bind(this)),this.app.delete("/sessions/:sessionDbId",this.handleDelete.bind(this))}async start(){this.port=ls;let e=new lt,r=e.cleanupOrphanedSessions();return e.close(),r>0&&ne.info("SYSTEM",`Cleaned up ${r} orphaned sessions`),new Promise((t,i)=>{this.app.listen(ls,"127.0.0.1",()=>{ne.info("SYSTEM","Worker started",{port:ls,pid:process.pid,activeSessions:this.sessions.size}),t()}).on("error",s=>{s.code==="EADDRINUSE"&&ne.error("SYSTEM",`Port ${ls} already in use - worker may already be running`),i(s)})})}handleHealth(e,r){r.json({status:"ok",port:this.port,pid:process.pid,activeSessions:this.sessions.size,uptime:process.uptime(),memory:process.memoryUsage()})}async handleInit(e,r){let t=parseInt(e.params.sessionDbId,10),{project:i,userPrompt:s}=e.body,n=ne.sessionId(t);ne.info("WORKER","Session init",{correlationId:n,project:i});let o=new lt,p=o.getSessionById(t);if(!p){o.close(),r.status(404).json({error:"Session not found in database"});return}let c=p.claude_session_id,l={sessionDbId:t,claudeSessionId:c,sdkSessionId:null,project:i,userPrompt:s,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()};this.sessions.set(t,l),o.setWorkerPort(t,this.port),o.close(),l.generatorPromise=this.runSDKAgent(l).catch(u=>{ne.failure("WORKER","SDK agent error",{sessionId:t},u);let d=new lt;d.markSessionFailed(t),d.close(),this.sessions.delete(t)}),ne.success("WORKER","Session initialized",{sessionId:t,port:this.port}),r.json({status:"initialized",sessionDbId:t,port:this.port})}handleObservation(e,r){let t=parseInt(e.params.sessionDbId,10),{tool_name:i,tool_input:s,tool_output:n,prompt_number:o}=e.body,p=this.sessions.get(t);if(!p){let u=new lt,d=u.getSessionById(t);u.close(),p={sessionDbId:t,claudeSessionId:d.claude_session_id,sdkSessionId:null,project:d.project,userPrompt:d.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,p),p.generatorPromise=this.runSDKAgent(p).catch(m=>{ne.failure("WORKER","SDK agent error",{sessionId:t},m);let v=new lt;v.markSessionFailed(t),v.close(),this.sessions.delete(t)})}p.observationCounter++;let c=ne.correlationId(t,p.observationCounter),l=ne.formatTool(i,s);ne.dataIn("WORKER",`Observation queued: ${l}`,{correlationId:c,queue:p.pendingMessages.length+1}),p.pendingMessages.push({type:"observation",tool_name:i,tool_input:s,tool_output:n,prompt_number:o}),r.json({status:"queued",queueLength:p.pendingMessages.length})}handleSummarize(e,r){let t=parseInt(e.params.sessionDbId,10),{prompt_number:i}=e.body,s=this.sessions.get(t);if(!s){let n=new lt,o=n.getSessionById(t);n.close(),s={sessionDbId:t,claudeSessionId:o.claude_session_id,sdkSessionId:null,project:o.project,userPrompt:o.user_prompt,pendingMessages:[],abortController:new AbortController,generatorPromise:null,lastPromptNumber:0,observationCounter:0,startTime:Date.now()},this.sessions.set(t,s),s.generatorPromise=this.runSDKAgent(s).catch(p=>{ne.failure("WORKER","SDK agent error",{sessionId:t},p);let c=new lt;c.markSessionFailed(t),c.close(),this.sessions.delete(t)})}ne.dataIn("WORKER","Summary requested",{sessionId:t,promptNumber:i,queue:s.pendingMessages.length+1}),s.pendingMessages.push({type:"summarize",prompt_number:i}),r.json({status:"queued",queueLength:s.pendingMessages.length})}handleStatus(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}r.json({sessionDbId:t,sdkSessionId:i.sdkSessionId,project:i.project,pendingMessages:i.pendingMessages.length})}async handleDelete(e,r){let t=parseInt(e.params.sessionDbId,10),i=this.sessions.get(t);if(!i){r.status(404).json({error:"Session not found"});return}ne.warn("WORKER","Session delete requested",{sessionId:t}),i.abortController.abort(),i.generatorPromise&&await Promise.race([i.generatorPromise,new Promise(n=>setTimeout(n,5e3))]);let s=new lt;s.markSessionFailed(t),s.close(),this.sessions.delete(t),ne.info("WORKER","Session deleted",{sessionId:t}),r.json({status:"deleted"})}async runSDKAgent(e){ne.info("SDK","Agent starting",{sessionId:e.sessionDbId});let r=process.env.CLAUDE_CODE_PATH||"/Users/alexnewman/.nvm/versions/node/v24.5.0/bin/claude";try{let t=lg({prompt:this.createMessageGenerator(e),options:{model:K2,disallowedTools:Q2,abortController:e.abortController,pathToClaudeCodeExecutable:r}});for await(let n of t)if(n.type==="assistant"){let o=n.message.content,p=Array.isArray(o)?o.filter(l=>l.type==="text").map(l=>l.text).join(` `):typeof o=="string"?o:"",c=p.length;ne.dataOut("SDK",`Response received (${c} chars)`,{sessionId:e.sessionDbId,promptNumber:e.lastPromptNumber}),ne.debug("SDK","Full response",{sessionId:e.sessionDbId},p),this.handleAgentMessage(e,p,e.lastPromptNumber)}let i=Date.now()-e.startTime;ne.success("SDK","Agent completed",{sessionId:e.sessionDbId,duration:`${(i/1e3).toFixed(1)}s`});let s=new lt;s.markSessionCompleted(e.sessionDbId),s.close(),this.sessions.delete(e.sessionDbId)}catch(t){throw t.name==="AbortError"?ne.warn("SDK","Agent aborted",{sessionId:e.sessionDbId}):ne.failure("SDK","Agent error",{sessionId:e.sessionDbId},t),t}}async*createMessageGenerator(e){let r=hg(e.project,e.claudeSessionId,e.userPrompt);for(ne.dataIn("SDK",`Init prompt sent (${r.length} chars)`,{sessionId:e.sessionDbId,claudeSessionId:e.claudeSessionId,project:e.project}),ne.debug("SDK","Full init prompt",{sessionId:e.sessionDbId},r),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:r}};!e.abortController.signal.aborted;){if(e.pendingMessages.length===0){await new Promise(t=>setTimeout(t,100));continue}for(;e.pendingMessages.length>0;){let t=e.pendingMessages.shift();if(t.type==="summarize"){e.lastPromptNumber=t.prompt_number;let i=new lt,s=i.getSessionById(e.sessionDbId);i.close();let n=gg(s);ne.dataIn("SDK",`Summary prompt sent (${n.length} chars)`,{sessionId:e.sessionDbId,promptNumber:t.prompt_number}),ne.debug("SDK","Full summary prompt",{sessionId:e.sessionDbId},n),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:n}}}else if(t.type==="observation"){e.lastPromptNumber=t.prompt_number;let i=vg({id:0,tool_name:t.tool_name,tool_input:t.tool_input,tool_output:t.tool_output,created_at_epoch:Date.now()}),s=ne.formatTool(t.tool_name,t.tool_input),n=ne.correlationId(e.sessionDbId,e.observationCounter);ne.dataIn("SDK",`Observation prompt: ${s}`,{correlationId:n,promptNumber:t.prompt_number,size:`${i.length} chars`}),ne.debug("SDK","Full observation prompt",{correlationId:n},i),yield{type:"user",session_id:e.claudeSessionId,parent_tool_use_id:null,message:{role:"user",content:i}}}}}}handleAgentMessage(e,r,t){let i=ne.correlationId(e.sessionDbId,e.observationCounter);ne.info("PARSER",`Processing response (${r.length} chars)`,{sessionId:e.sessionDbId,promptNumber:t,preview:r.substring(0,200)});let s=xg(r,i);s.length>0&&ne.info("PARSER",`Parsed ${s.length} observation(s)`,{correlationId:i,promptNumber:t,types:s.map(p=>p.type).join(", ")});let n=new lt;for(let p of s)n.storeObservation(e.claudeSessionId,e.project,p,t),ne.success("DB","Observation stored",{correlationId:i,type:p.type,title:p.title});ne.info("PARSER","Looking for summary tags...",{sessionId:e.sessionDbId});let o=yg(r,e.sessionDbId);o?(ne.success("PARSER","Summary parsed successfully!",{sessionId:e.sessionDbId,promptNumber:t,hasRequest:!!o.request,hasInvestigated:!!o.investigated,hasLearned:!!o.learned,hasCompleted:!!o.completed,hasNextSteps:!!o.next_steps}),n.storeSummary(e.claudeSessionId,e.project,o,t),ne.success("DB","\u{1F4DD} SUMMARY STORED IN DATABASE",{sessionId:e.sessionDbId,promptNumber:t})):ne.warn("PARSER","NO SUMMARY TAGS FOUND in response",{sessionId:e.sessionDbId,promptNumber:t,contentSample:r.substring(0,500)}),n.close()}};async function J2(){await new us().start(),process.on("SIGINT",()=>{ne.warn("SYSTEM","Shutting down (SIGINT)"),process.exit(0)}),process.on("SIGTERM",()=>{ne.warn("SYSTEM","Shutting down (SIGTERM)"),process.exit(0)})}J2().catch(a=>{ne.failure("SYSTEM","Fatal startup error",{},a),process.exit(1)});0&&(module.exports={WorkerService}); /*! Bundled license information: diff --git a/src/sdk/worker.ts b/src/sdk/worker.ts index db2a543d..1af52766 100644 --- a/src/sdk/worker.ts +++ b/src/sdk/worker.ts @@ -283,7 +283,7 @@ class SDKWorker { */ private async runSDKAgent(): Promise { // Find Claude Code executable - const claudePath = process.env.CLAUDE_CODE_PATH || '/usr/local/bin/claude'; + const claudePath = process.env.CLAUDE_CODE_PATH || '/Users/alexnewman/.nvm/versions/node/v24.5.0/bin/claude'; console.error(`[SDK Worker DEBUG] About to call query with claudePath: ${claudePath}`); const queryResult = query({ @@ -292,7 +292,7 @@ class SDKWorker { model: MODEL, disallowedTools: DISALLOWED_TOOLS, abortController: this.abortController, - pathToClaudeCodeExecutable: '' + pathToClaudeCodeExecutable: claudePath } }); diff --git a/src/services/worker-service.ts b/src/services/worker-service.ts index 561e5450..3d357296 100644 --- a/src/services/worker-service.ts +++ b/src/services/worker-service.ts @@ -345,7 +345,7 @@ class WorkerService { private async runSDKAgent(session: ActiveSession): Promise { logger.info('SDK', 'Agent starting', { sessionId: session.sessionDbId }); - const claudePath = process.env.CLAUDE_CODE_PATH || '/usr/local/bin/claude'; + const claudePath = process.env.CLAUDE_CODE_PATH || '/Users/alexnewman/.nvm/versions/node/v24.5.0/bin/claude'; try { const queryResult = query({ @@ -354,7 +354,7 @@ class WorkerService { model: MODEL, disallowedTools: DISALLOWED_TOOLS, abortController: session.abortController, - pathToClaudeCodeExecutable: '' + pathToClaudeCodeExecutable: claudePath } });