From 39f110260072a548a4c684bb06f948df36c4805d Mon Sep 17 00:00:00 2001 From: Alex Newman Date: Mon, 4 May 2026 13:08:53 -0700 Subject: [PATCH] fix: remove ONNX/OpenBLAS thread cap from chroma-mcp spawn env MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 2-thread cap was a bandaid for #2220 (Windows) and #2253 (macOS Intel) CPU runaway reports on v12.4.9. The actual root causes (watermark stuck at 0 → continuous re-embed, orphan process trees, fire-and-forget backfill across 80+ projects) were fixed structurally in #2282: per-batch watermark persistence, killProcessTree() + pgid registration, max-3 concurrent backfills with re-entrancy guard, kernel-enforced child cleanup (#2216). With the structural fixes in place, capping ONNX/OpenBLAS/MKL at 2 threads slows initial backfill 3–6× on multi-core machines and provides no steady-state benefit. Defer to the OS scheduler and the user's environment. ANONYMIZED_TELEMETRY=false stays — unrelated to the storm, blocks background HTTP from the embedding subprocess. Co-Authored-By: Claude Opus 4.7 (1M context) --- plugin/scripts/worker-service.cjs | 2 +- src/services/sync/ChromaMcpManager.ts | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/plugin/scripts/worker-service.cjs b/plugin/scripts/worker-service.cjs index ecc2127f..e500cf8f 100755 --- a/plugin/scripts/worker-service.cjs +++ b/plugin/scripts/worker-service.cjs @@ -975,7 +975,7 @@ Set the \`cycles\` parameter to \`"ref"\` to resolve cyclical schemas with defs. `}var Ane=Nf.default.platform==="win32"?["APPDATA","HOMEDRIVE","HOMEPATH","LOCALAPPDATA","PATH","PROCESSOR_ARCHITECTURE","SYSTEMDRIVE","SYSTEMROOT","TEMP","USERNAME","USERPROFILE","PROGRAMFILES"]:["HOME","LOGNAME","PATH","SHELL","TERM","USER"];function Nne(){let t={};for(let e of Ane){let r=Nf.default.env[e];r!==void 0&&(r.startsWith("()")||(t[e]=r))}return t}var rl=class{constructor(e){this._readBuffer=new J_,this._stderrStream=null,this._serverParams=e,(e.stderr==="pipe"||e.stderr==="overlapped")&&(this._stderrStream=new $4.PassThrough)}async start(){if(this._process)throw new Error("StdioClientTransport already started! If using Client class, note that connect() calls start() automatically.");return new Promise((e,r)=>{this._process=(0,T4.default)(this._serverParams.command,this._serverParams.args??[],{env:{...Nne(),...this._serverParams.env},stdio:["pipe","pipe",this._serverParams.stderr??"inherit"],shell:!1,windowsHide:Nf.default.platform==="win32"&&Mne(),cwd:this._serverParams.cwd}),this._process.on("error",n=>{r(n),this.onerror?.(n)}),this._process.on("spawn",()=>{e()}),this._process.on("close",n=>{this._process=void 0,this.onclose?.()}),this._process.stdin?.on("error",n=>{this.onerror?.(n)}),this._process.stdout?.on("data",n=>{this._readBuffer.append(n),this.processReadBuffer()}),this._process.stdout?.on("error",n=>{this.onerror?.(n)}),this._stderrStream&&this._process.stderr&&this._process.stderr.pipe(this._stderrStream)})}get stderr(){return this._stderrStream?this._stderrStream:this._process?.stderr??null}get pid(){return this._process?.pid??null}processReadBuffer(){for(;;)try{let e=this._readBuffer.readMessage();if(e===null)break;this.onmessage?.(e)}catch(e){this.onerror?.(e)}}async close(){if(this._process){let e=this._process;this._process=void 0;let r=new Promise(n=>{e.once("close",()=>{n()})});try{e.stdin?.end()}catch{}if(await Promise.race([r,new Promise(n=>setTimeout(n,2e3).unref())]),e.exitCode===null){try{e.kill("SIGTERM")}catch{}await Promise.race([r,new Promise(n=>setTimeout(n,2e3).unref())])}if(e.exitCode===null)try{e.kill("SIGKILL")}catch{}}this._readBuffer.clear()}send(e){return new Promise(r=>{if(!this._process?.stdin)throw new Error("Not connected");let n=k4(e);this._process.stdin.write(n)?r():this._process.stdin.once("drain",r)})}};function Mne(){return"type"in Nf.default}ln();var Qa=require("fs");re();Me();NR();var MR=Re.envFile(),Tie=["ANTHROPIC_API_KEY","CLAUDECODE","CLAUDE_CODE_OAUTH_TOKEN"];function $ie(t){let e={};for(let r of t.split(` `)){let n=r.trim();if(!n||n.startsWith("#"))continue;let i=n.indexOf("=");if(i===-1)continue;let s=n.slice(0,i).trim(),o=n.slice(i+1).trim();(o.startsWith('"')&&o.endsWith('"')||o.startsWith("'")&&o.endsWith("'"))&&(o=o.slice(1,-1)),s&&(e[s]=o)}return e}function DR(){if(!(0,Qa.existsSync)(MR))return{};try{let t=(0,Qa.readFileSync)(MR,"utf-8"),e=$ie(t),r={};return e.ANTHROPIC_API_KEY&&(r.ANTHROPIC_API_KEY=e.ANTHROPIC_API_KEY),e.ANTHROPIC_BASE_URL&&(r.ANTHROPIC_BASE_URL=e.ANTHROPIC_BASE_URL),e.GEMINI_API_KEY&&(r.GEMINI_API_KEY=e.GEMINI_API_KEY),e.OPENROUTER_API_KEY&&(r.OPENROUTER_API_KEY=e.OPENROUTER_API_KEY),r}catch(t){return h.warn("ENV","Failed to load .env file",{path:MR},t instanceof Error?t:new Error(String(t))),{}}}function Rie(t=!0){let e={};for(let[r,n]of Object.entries(process.env))n!==void 0&&!Tie.includes(r)&&(e[r]=n);if(e.CLAUDE_CODE_ENTRYPOINT="sdk-ts",e.CLAUDE_MEM_INTERNAL="1",t){let r=DR();r.ANTHROPIC_API_KEY&&(e.ANTHROPIC_API_KEY=r.ANTHROPIC_API_KEY),r.ANTHROPIC_BASE_URL&&(e.ANTHROPIC_BASE_URL=r.ANTHROPIC_BASE_URL),r.GEMINI_API_KEY&&(e.GEMINI_API_KEY=r.GEMINI_API_KEY),r.OPENROUTER_API_KEY&&(e.OPENROUTER_API_KEY=r.OPENROUTER_API_KEY)}return e}async function Hf(t=!0){let e=Rie(t);if(delete e.CLAUDE_CODE_OAUTH_TOKEN,!t)return e;if(e.ANTHROPIC_API_KEY)return ib(),e;let r;try{r=await pL()}catch(n){return h.warn("OAUTH","OAuth token read failed unexpectedly; proceeding without token",{},n instanceof Error?n:new Error(String(n))),e}switch(r.kind){case"present":e.CLAUDE_CODE_OAUTH_TOKEN=r.token,h.info("OAUTH","Injected fresh CLAUDE_CODE_OAUTH_TOKEN at spawn-time",{source:r.source,expiresAt:r.expiresAt}),ib();break;case"expired":h.warn("OAUTH",`Refusing to inject expired CLAUDE_CODE_OAUTH_TOKEN: ${r.reason}. Re-login via Claude Desktop to refresh.`,{expiresAt:r.expiresAt}),fL(r.reason);break;case"absent":h.debug("OAUTH",`No OAuth token available: ${r.reason}`),ib();break}return e}function al(t){return DR()[t]}function Iie(){return!!DR().ANTHROPIC_API_KEY}function sb(){return Iie()?"API key (from ~/.claude-mem/.env)":process.env.CLAUDE_CODE_OAUTH_TOKEN?"Claude Code OAuth token (env, refreshed via keychain at spawn)":"Claude Code OAuth token (read from system keychain at spawn)"}re();var Bf=require("child_process"),_L=require("util"),bL=ze(require("os"),1),ec=ze(require("fs"),1);re();Bt();Me();Ko();Yo();var hL=(0,_L.promisify)(Bf.execFile),Oie="claude-mem-chroma",Cie="1.0.0",gL=3e4,vL=1e4,Pie=Re.chroma(),ob="chroma-mcp",yL="0.2.6",hi=class t{static instance=null;client=null;transport=null;connected=!1;lastConnectionFailureTimestamp=0;connecting=null;constructor(){}static getInstance(){return t.instance||(t.instance=new t),t.instance}async ensureConnected(){if(this.connected&&this.client)return;let e=Date.now()-this.lastConnectionFailureTimestamp;if(this.lastConnectionFailureTimestamp>0&&e{a=setTimeout(()=>d(new Error(`MCP connection to chroma-mcp timed out after ${gL}ms`)),gL)});try{await Promise.race([o,c])}catch(l){clearTimeout(a),h.warn("CHROMA_MCP","Connection failed, killing subprocess to prevent zombie",{error:l instanceof Error?l.message:String(l)});try{await this.transport.close()}catch{}try{await this.client.close()}catch{}throw this.client=null,this.transport=null,this.connected=!1,l}clearTimeout(a),this.connected=!0,this.registerManagedProcess(),h.info("CHROMA_MCP","Connected to chroma-mcp successfully");let u=this.transport;this.transport.onclose=()=>{if(this.transport!==u){h.debug("CHROMA_MCP","Ignoring stale onclose from previous transport");return}h.warn("CHROMA_MCP","chroma-mcp subprocess closed unexpectedly, applying reconnect backoff"),this.connected=!1,zr().unregisterProcess(ob),this.client=null,this.transport=null,this.lastConnectionFailureTimestamp=Date.now()}}buildCommandArgs(){let e=ye.loadFromFile(gt),r=e.CLAUDE_MEM_CHROMA_MODE||"local",n=process.env.CLAUDE_MEM_PYTHON_VERSION||e.CLAUDE_MEM_PYTHON_VERSION||"3.13";if(r==="remote"){let i=e.CLAUDE_MEM_CHROMA_HOST||"127.0.0.1",s=e.CLAUDE_MEM_CHROMA_PORT||"8000",o=e.CLAUDE_MEM_CHROMA_SSL==="true",a=e.CLAUDE_MEM_CHROMA_TENANT||"default_tenant",c=e.CLAUDE_MEM_CHROMA_DATABASE||"default_database",u=e.CLAUDE_MEM_CHROMA_API_KEY||"",l=["--python",n,`chroma-mcp==${yL}`,"--client-type","http","--host",i,"--port",s];return l.push("--ssl",o?"true":"false"),a!=="default_tenant"&&l.push("--tenant",a),c!=="default_database"&&l.push("--database",c),u&&l.push("--api-key",u),l}return["--python",n,`chroma-mcp==${yL}`,"--client-type","persistent","--data-dir",Pie.replace(/\\/g,"/")]}async callTool(e,r){await this.ensureConnected(),h.debug("CHROMA_MCP",`Calling tool: ${e}`,{arguments:JSON.stringify(r).slice(0,200)});let n;try{n=await this.client.callTool({name:e,arguments:r})}catch(o){this.connected=!1,this.client=null,this.transport=null,h.warn("CHROMA_MCP",`Transport error during "${e}", reconnecting and retrying once`,{error:o instanceof Error?o.message:String(o)});try{await this.ensureConnected(),n=await this.client.callTool({name:e,arguments:r})}catch(a){throw this.connected=!1,new Error(`chroma-mcp transport error during "${e}" (retry failed): ${a instanceof Error?a.message:String(a)}`)}}if(n.isError){let o=n.content?.find(a=>a.type==="text")?.text||"Unknown chroma-mcp error";throw new Error(`chroma-mcp tool "${e}" returned error: ${o}`)}let i=n.content;if(!i||i.length===0)return null;let s=i.find(o=>o.type==="text"&&o.text);if(!s||!s.text)return null;try{return JSON.parse(s.text)}catch(o){return o instanceof Error&&h.debug("CHROMA_MCP","Non-JSON response from tool, returning null",{toolName:e,textPreview:s.text.slice(0,100)}),null}}async isHealthy(){try{return await this.callTool("chroma_list_collections",{limit:1}),!0}catch(e){return h.warn("CHROMA_MCP","Health check failed",{error:e instanceof Error?e.message:String(e)}),!1}}async probeSemanticSearch(){let e;try{let n=await this.callTool("chroma_list_collections",{limit:100});Array.isArray(n)?e=n.length:n&&Array.isArray(n.collections)?e=n.collections.length:n&&typeof n=="object"&&"length"in n&&(e=n.length)}catch(n){let i=n instanceof Error?n.message:String(n);return h.warn("CHROMA_MCP","Deep probe failed at list stage",{error:i}),{ok:!1,stage:"list",error:i}}let r=Date.now();try{await this.callTool("chroma_query_documents",{collection_name:"cm__claude-mem",query_texts:["ping"],n_results:1});let n=Date.now()-r;return{ok:!0,stage:"done",collections:e,queryLatencyMs:n}}catch(n){let i=Date.now()-r,s=n instanceof Error?n.message:String(n),a=/not exist|missing|empty|no such/i.test(s)?`collection cm__claude-mem missing or empty (${s})`:s;return h.warn("CHROMA_MCP","Deep probe failed at query stage",{error:s,queryLatencyMs:i}),{ok:!1,stage:"query",error:a,collections:e,queryLatencyMs:i}}}async stop(){if(!this.client){h.debug("CHROMA_MCP","No active MCP connection to stop");return}h.info("CHROMA_MCP","Stopping chroma-mcp MCP connection");let e=this.transport?._process;e?.pid&&await t.killProcessTree(e.pid);try{await this.client.close()}catch(r){r instanceof Error?h.debug("CHROMA_MCP","Error during client close (subprocess may already be dead)",{},r):h.debug("CHROMA_MCP","Error during client close (subprocess may already be dead)",{error:String(r)})}zr().unregisterProcess(ob),this.client=null,this.transport=null,this.connected=!1,this.connecting=null,h.info("CHROMA_MCP","chroma-mcp MCP connection stopped")}static async killProcessTree(e){if(h.debug("CHROMA_MCP",`Killing process tree rooted at PID ${e}`),process.platform==="win32"){try{await hL("taskkill",["/PID",String(e),"/T","/F"],{timeout:5e3,windowsHide:!0})}catch(r){h.debug("CHROMA_MCP","taskkill tree-kill finished (may already be dead)",{pid:e,error:r instanceof Error?r.message:String(r)})}return}try{let r=await t.collectDescendantPids(e);for(let s of r)try{process.kill(s,"SIGTERM")}catch{}try{process.kill(e,"SIGTERM")}catch(s){let o=s.code;o!=="ESRCH"&&h.debug("CHROMA_MCP",`Failed to SIGTERM PID ${e}`,{code:o})}await new Promise(s=>setTimeout(s,500));let n=await t.collectDescendantPids(e),i=Array.from(new Set([...r,...n]));for(let s of i)try{process.kill(s,"SIGKILL")}catch{}try{process.kill(e,"SIGKILL")}catch{}}catch(r){h.debug("CHROMA_MCP","Process tree kill completed (best-effort)",{pid:e,error:r instanceof Error?r.message:String(r)})}}static async collectDescendantPids(e){let r=new Set,n=[];async function i(s){let o="";try{o=(await hL("pgrep",["-P",String(s)],{timeout:2e3})).stdout}catch{return}let a=o.split(` `).map(c=>c.trim()).filter(c=>c.length>0).map(c=>Number.parseInt(c,10)).filter(c=>Number.isFinite(c)&&c>0&&!r.has(c));for(let c of a)r.add(c),await i(c),n.push(c)}return await i(e),n}static async reset(){t.instance&&await t.instance.stop(),t.instance=null}getCombinedCertPath(){let e=Re.combinedCerts();if(ec.default.existsSync(e)){let r=ec.default.statSync(e);if(Date.now()-r.mtimeMs<1440*60*1e3)return e}if(process.platform==="darwin")try{let r;try{r=(0,Bf.execSync)('uvx --with certifi python -c "import certifi; print(certifi.where())"',{encoding:"utf8",stdio:["pipe","pipe","pipe"],timeout:1e4}).trim()}catch(o){h.debug("CHROMA_MCP","Failed to resolve certifi path via uvx",{error:o instanceof Error?o.message:String(o)});return}if(!r||!ec.default.existsSync(r))return;let n="";try{n=(0,Bf.execSync)('security find-certificate -a -c "Zscaler" -p /Library/Keychains/System.keychain',{encoding:"utf8",stdio:["pipe","pipe","pipe"],timeout:5e3})}catch(o){h.debug("CHROMA_MCP","No Zscaler certificate found in system keychain",{error:o instanceof Error?o.message:String(o)});return}if(!n||!n.includes("-----BEGIN CERTIFICATE-----")||!n.includes("-----END CERTIFICATE-----"))return;let i=ec.default.readFileSync(r,"utf8"),s=e+".tmp";return ec.default.writeFileSync(s,i+` -`+n),ec.default.renameSync(s,e),h.info("CHROMA_MCP","Created combined SSL certificate bundle for Zscaler",{path:e}),e}catch(r){h.debug("CHROMA_MCP","Could not create combined cert bundle",{},r);return}}getSpawnEnv(){let e={};for(let[i,s]of Object.entries(Vn(process.env)))s!==void 0&&(e[i]=s);let r="2";for(let i of["OMP_NUM_THREADS","ONNX_NUM_THREADS","OPENBLAS_NUM_THREADS","MKL_NUM_THREADS"])e[i]||(e[i]=r);e.ANONYMIZED_TELEMETRY||(e.ANONYMIZED_TELEMETRY="false");let n=this.getCombinedCertPath();return n?(h.info("CHROMA_MCP","Using combined SSL certificates for enterprise compatibility",{certPath:n}),{...e,SSL_CERT_FILE:n,REQUESTS_CA_BUNDLE:n,CURL_CA_BUNDLE:n,NODE_EXTRA_CA_CERTS:n}):e}registerManagedProcess(){let e=this.transport._process;e?.pid&&(zr().registerProcess(ob,{pid:e.pid,type:"chroma",startedAt:new Date().toISOString(),pgid:e.pid},e),e.once("exit",()=>{zr().unregisterProcess(ob)}))}};var gi=require("fs"),xL=require("path");Bt();var SL={observations:0,summaries:0,prompts:0};function LR(){let t=ye.get("CLAUDE_MEM_DATA_DIR");return(0,xL.join)(t,"chroma-sync-state.json")}var ro=null,Zf=!1;function jR(){if(ro)return ro;let t=LR();if(!(0,gi.existsSync)(t))return ro={},ro;let e=(0,gi.readFileSync)(t,"utf8"),r=JSON.parse(e),n={};for(let[i,s]of Object.entries(r))n[i]={observations:Number.isInteger(s.observations)?s.observations:0,summaries:Number.isInteger(s.summaries)?s.summaries:0,prompts:Number.isInteger(s.prompts)?s.prompts:0};return ro=n,ro}function zR(){if(!ro)return;let t=LR(),e=ye.get("CLAUDE_MEM_DATA_DIR");(0,gi.existsSync)(e)||(0,gi.mkdirSync)(e,{recursive:!0});let r=`${t}.tmp`;(0,gi.writeFileSync)(r,JSON.stringify(ro,null,2),"utf8"),(0,gi.renameSync)(r,t),Zf=!1}var vi={exists(){return(0,gi.existsSync)(LR())},get(t){return{...jR()[t]??SL}},bump(t,e,r){if(!Number.isInteger(r)||r<=0)return;let n=jR(),i=n[t]??{...SL};r<=i[e]||(i[e]=r,n[t]=i,Zf=!0,zR())},replace(t,e){let r=jR();r[t]={...e},Zf=!0,zR()},flush(){Zf&&zR()},resetCache(){ro=null,Zf=!1}};lb();re();UR();var Qo=class t{project;collectionName;collectionCreated=!1;BATCH_SIZE=100;constructor(e){this.project=e;let r=e.replace(/[^a-zA-Z0-9._-]/g,"_").replace(/[^a-zA-Z0-9]+$/,"");this.collectionName=`cm__${r||"unknown"}`}async ensureCollectionExists(){if(this.collectionCreated)return;let e=hi.getInstance();try{await e.callTool("chroma_create_collection",{collection_name:this.collectionName})}catch(r){if(!(r instanceof Error?r.message:String(r)).includes("already exists"))throw r}this.collectionCreated=!0,h.debug("CHROMA_SYNC","Collection ready",{collection:this.collectionName})}formatObservationDocs(e){let r=[],n=e.facts?JSON.parse(e.facts):[],i=e.concepts?JSON.parse(e.concepts):[],s=cl(e.files_read),o=cl(e.files_modified),a={sqlite_id:e.id,doc_type:"observation",memory_session_id:e.memory_session_id,project:e.project,merged_into_project:e.merged_into_project??null,created_at_epoch:e.created_at_epoch,type:e.type||"discovery",title:e.title||"Untitled"};return e.subtitle&&(a.subtitle=e.subtitle),i.length>0&&(a.concepts=i.join(",")),s.length>0&&(a.files_read=s.join(",")),o.length>0&&(a.files_modified=o.join(",")),e.narrative&&r.push({id:`obs_${e.id}_narrative`,document:e.narrative,metadata:{...a,field_type:"narrative"}}),e.text&&r.push({id:`obs_${e.id}_text`,document:e.text,metadata:{...a,field_type:"text"}}),n.forEach((c,u)=>{r.push({id:`obs_${e.id}_fact_${u}`,document:c,metadata:{...a,field_type:"fact",fact_index:u}})}),r}formatSummaryDocs(e){let r=[],n={sqlite_id:e.id,doc_type:"session_summary",memory_session_id:e.memory_session_id,project:e.project,merged_into_project:e.merged_into_project??null,created_at_epoch:e.created_at_epoch,prompt_number:e.prompt_number||0};return e.request&&r.push({id:`summary_${e.id}_request`,document:e.request,metadata:{...n,field_type:"request"}}),e.investigated&&r.push({id:`summary_${e.id}_investigated`,document:e.investigated,metadata:{...n,field_type:"investigated"}}),e.learned&&r.push({id:`summary_${e.id}_learned`,document:e.learned,metadata:{...n,field_type:"learned"}}),e.completed&&r.push({id:`summary_${e.id}_completed`,document:e.completed,metadata:{...n,field_type:"completed"}}),e.next_steps&&r.push({id:`summary_${e.id}_next_steps`,document:e.next_steps,metadata:{...n,field_type:"next_steps"}}),e.notes&&r.push({id:`summary_${e.id}_notes`,document:e.notes,metadata:{...n,field_type:"notes"}}),r}async addDocuments(e){if(e.length===0)return 0;await this.ensureCollectionExists();let r=hi.getInstance(),n=0;for(let i=0;iObject.fromEntries(Object.entries(a.metadata).filter(([c,u])=>u!=null&&u!=="")));try{await r.callTool("chroma_add_documents",{collection_name:this.collectionName,ids:s.map(a=>a.id),documents:s.map(a=>a.document),metadatas:o}),n+=s.length}catch(a){if((a instanceof Error?a.message:String(a)).includes("already exist"))try{await r.callTool("chroma_delete_documents",{collection_name:this.collectionName,ids:s.map(u=>u.id)}),await r.callTool("chroma_add_documents",{collection_name:this.collectionName,ids:s.map(u=>u.id),documents:s.map(u=>u.document),metadatas:o}),n+=s.length,h.info("CHROMA_SYNC","Batch reconciled via delete+add after duplicate conflict",{collection:this.collectionName,batchStart:i,batchSize:s.length})}catch(u){h.error("CHROMA_SYNC","Batch reconcile (delete+add) failed \u2014 watermark will not advance for this batch",{collection:this.collectionName,batchStart:i,batchSize:s.length},u)}else h.error("CHROMA_SYNC","Batch add failed \u2014 watermark will not advance for this batch, continuing with remaining batches",{collection:this.collectionName,batchStart:i,batchSize:s.length},a)}}return h.debug("CHROMA_SYNC","Documents added",{collection:this.collectionName,requested:e.length,written:n}),n}async syncObservation(e,r,n,i,s,o,a=0){let c={id:e,memory_session_id:r,project:n,merged_into_project:null,text:null,type:i.type,title:i.title,subtitle:i.subtitle,facts:JSON.stringify(i.facts),narrative:i.narrative,concepts:JSON.stringify(i.concepts),files_read:JSON.stringify(i.files_read),files_modified:JSON.stringify(i.files_modified),prompt_number:s,discovery_tokens:a,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o},u=this.formatObservationDocs(c);h.info("CHROMA_SYNC","Syncing observation",{observationId:e,documentCount:u.length,project:n});let l=await this.addDocuments(u);l===u.length?vi.bump(n,"observations",e):h.warn("CHROMA_SYNC","Observation watermark bump skipped \u2014 partial write",{observationId:e,project:n,requested:u.length,written:l})}async syncSummary(e,r,n,i,s,o,a=0){let c={id:e,memory_session_id:r,project:n,merged_into_project:null,request:i.request,investigated:i.investigated,learned:i.learned,completed:i.completed,next_steps:i.next_steps,notes:i.notes,prompt_number:s,discovery_tokens:a,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o},u=this.formatSummaryDocs(c);h.info("CHROMA_SYNC","Syncing summary",{summaryId:e,documentCount:u.length,project:n});let l=await this.addDocuments(u);l===u.length?vi.bump(n,"summaries",e):h.warn("CHROMA_SYNC","Summary watermark bump skipped \u2014 partial write",{summaryId:e,project:n,requested:u.length,written:l})}formatUserPromptDoc(e){return{id:`prompt_${e.id}`,document:e.prompt_text,metadata:{sqlite_id:e.id,doc_type:"user_prompt",memory_session_id:e.memory_session_id,project:e.project,created_at_epoch:e.created_at_epoch,prompt_number:e.prompt_number}}}async syncUserPrompt(e,r,n,i,s,o){let a={id:e,content_session_id:"",prompt_number:s,prompt_text:i,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o,memory_session_id:r,project:n},c=this.formatUserPromptDoc(a);h.info("CHROMA_SYNC","Syncing user prompt",{promptId:e,project:n});let u=await this.addDocuments([c]);u===1?vi.bump(n,"prompts",e):h.warn("CHROMA_SYNC","Prompt watermark bump skipped \u2014 write failed",{promptId:e,project:n,written:u})}async getExistingChromaIds(e){let r=e??this.project;await this.ensureCollectionExists();let n=hi.getInstance(),i=new Set,s=new Set,o=new Set,a=0,c=1e3;for(h.info("CHROMA_SYNC","Fetching existing Chroma document IDs...",{project:r});;){let l=(await n.callTool("chroma_get_documents",{collection_name:this.collectionName,limit:c,offset:a,where:{project:r},include:["metadatas"]}))?.metadatas||[];if(l.length===0)break;for(let d of l)if(d&&d.sqlite_id){let p=d.sqlite_id;d.doc_type==="observation"?i.add(p):d.doc_type==="session_summary"?s.add(p):d.doc_type==="user_prompt"&&o.add(p)}a+=c,h.debug("CHROMA_SYNC","Fetched batch of existing IDs",{project:r,offset:a,batchSize:l.length})}return h.info("CHROMA_SYNC","Existing IDs fetched",{project:r,observations:i.size,summaries:s.size,prompts:o.size,total:i.size+s.size+o.size}),{observations:i,summaries:s,prompts:o}}async bootstrapWatermarksFromChroma(e){let r=await this.getExistingChromaIds(e),n=i=>{let s=0;for(let o of i)o>s&&(s=o);return s};vi.replace(e,{observations:n(r.observations),summaries:n(r.summaries),prompts:n(r.prompts)}),h.info("CHROMA_SYNC","Bootstrapped watermarks from Chroma",{project:e,watermarks:vi.get(e)})}async ensureBackfilled(e,r){let n=e??this.project;h.info("CHROMA_SYNC","Starting smart backfill",{project:n}),await this.ensureCollectionExists();let i=vi.get(n),s=r??new no;try{await this.runBackfillPipeline(s,n,i)}catch(o){throw h.error("CHROMA_SYNC","Backfill failed",{project:n},o instanceof Error?o:new Error(String(o))),new Error(`Backfill failed: ${o instanceof Error?o.message:String(o)}`)}finally{r||s.close()}}async runBackfillPipeline(e,r,n){let i=await this.backfillObservations(e,r,n.observations),s=await this.backfillSummaries(e,r,n.summaries),o=await this.backfillPrompts(e,r,n.prompts);h.info("CHROMA_SYNC","Smart backfill complete",{project:r,synced:{observationDocs:i.length,summaryDocs:s.length,promptDocs:o.length},watermarks:vi.get(r)})}async backfillObservations(e,r,n){let i=e.db.prepare(` +`+n),ec.default.renameSync(s,e),h.info("CHROMA_MCP","Created combined SSL certificate bundle for Zscaler",{path:e}),e}catch(r){h.debug("CHROMA_MCP","Could not create combined cert bundle",{},r);return}}getSpawnEnv(){let e={};for(let[n,i]of Object.entries(Vn(process.env)))i!==void 0&&(e[n]=i);e.ANONYMIZED_TELEMETRY||(e.ANONYMIZED_TELEMETRY="false");let r=this.getCombinedCertPath();return r?(h.info("CHROMA_MCP","Using combined SSL certificates for enterprise compatibility",{certPath:r}),{...e,SSL_CERT_FILE:r,REQUESTS_CA_BUNDLE:r,CURL_CA_BUNDLE:r,NODE_EXTRA_CA_CERTS:r}):e}registerManagedProcess(){let e=this.transport._process;e?.pid&&(zr().registerProcess(ob,{pid:e.pid,type:"chroma",startedAt:new Date().toISOString(),pgid:e.pid},e),e.once("exit",()=>{zr().unregisterProcess(ob)}))}};var gi=require("fs"),xL=require("path");Bt();var SL={observations:0,summaries:0,prompts:0};function LR(){let t=ye.get("CLAUDE_MEM_DATA_DIR");return(0,xL.join)(t,"chroma-sync-state.json")}var ro=null,Zf=!1;function jR(){if(ro)return ro;let t=LR();if(!(0,gi.existsSync)(t))return ro={},ro;let e=(0,gi.readFileSync)(t,"utf8"),r=JSON.parse(e),n={};for(let[i,s]of Object.entries(r))n[i]={observations:Number.isInteger(s.observations)?s.observations:0,summaries:Number.isInteger(s.summaries)?s.summaries:0,prompts:Number.isInteger(s.prompts)?s.prompts:0};return ro=n,ro}function zR(){if(!ro)return;let t=LR(),e=ye.get("CLAUDE_MEM_DATA_DIR");(0,gi.existsSync)(e)||(0,gi.mkdirSync)(e,{recursive:!0});let r=`${t}.tmp`;(0,gi.writeFileSync)(r,JSON.stringify(ro,null,2),"utf8"),(0,gi.renameSync)(r,t),Zf=!1}var vi={exists(){return(0,gi.existsSync)(LR())},get(t){return{...jR()[t]??SL}},bump(t,e,r){if(!Number.isInteger(r)||r<=0)return;let n=jR(),i=n[t]??{...SL};r<=i[e]||(i[e]=r,n[t]=i,Zf=!0,zR())},replace(t,e){let r=jR();r[t]={...e},Zf=!0,zR()},flush(){Zf&&zR()},resetCache(){ro=null,Zf=!1}};lb();re();UR();var Qo=class t{project;collectionName;collectionCreated=!1;BATCH_SIZE=100;constructor(e){this.project=e;let r=e.replace(/[^a-zA-Z0-9._-]/g,"_").replace(/[^a-zA-Z0-9]+$/,"");this.collectionName=`cm__${r||"unknown"}`}async ensureCollectionExists(){if(this.collectionCreated)return;let e=hi.getInstance();try{await e.callTool("chroma_create_collection",{collection_name:this.collectionName})}catch(r){if(!(r instanceof Error?r.message:String(r)).includes("already exists"))throw r}this.collectionCreated=!0,h.debug("CHROMA_SYNC","Collection ready",{collection:this.collectionName})}formatObservationDocs(e){let r=[],n=e.facts?JSON.parse(e.facts):[],i=e.concepts?JSON.parse(e.concepts):[],s=cl(e.files_read),o=cl(e.files_modified),a={sqlite_id:e.id,doc_type:"observation",memory_session_id:e.memory_session_id,project:e.project,merged_into_project:e.merged_into_project??null,created_at_epoch:e.created_at_epoch,type:e.type||"discovery",title:e.title||"Untitled"};return e.subtitle&&(a.subtitle=e.subtitle),i.length>0&&(a.concepts=i.join(",")),s.length>0&&(a.files_read=s.join(",")),o.length>0&&(a.files_modified=o.join(",")),e.narrative&&r.push({id:`obs_${e.id}_narrative`,document:e.narrative,metadata:{...a,field_type:"narrative"}}),e.text&&r.push({id:`obs_${e.id}_text`,document:e.text,metadata:{...a,field_type:"text"}}),n.forEach((c,u)=>{r.push({id:`obs_${e.id}_fact_${u}`,document:c,metadata:{...a,field_type:"fact",fact_index:u}})}),r}formatSummaryDocs(e){let r=[],n={sqlite_id:e.id,doc_type:"session_summary",memory_session_id:e.memory_session_id,project:e.project,merged_into_project:e.merged_into_project??null,created_at_epoch:e.created_at_epoch,prompt_number:e.prompt_number||0};return e.request&&r.push({id:`summary_${e.id}_request`,document:e.request,metadata:{...n,field_type:"request"}}),e.investigated&&r.push({id:`summary_${e.id}_investigated`,document:e.investigated,metadata:{...n,field_type:"investigated"}}),e.learned&&r.push({id:`summary_${e.id}_learned`,document:e.learned,metadata:{...n,field_type:"learned"}}),e.completed&&r.push({id:`summary_${e.id}_completed`,document:e.completed,metadata:{...n,field_type:"completed"}}),e.next_steps&&r.push({id:`summary_${e.id}_next_steps`,document:e.next_steps,metadata:{...n,field_type:"next_steps"}}),e.notes&&r.push({id:`summary_${e.id}_notes`,document:e.notes,metadata:{...n,field_type:"notes"}}),r}async addDocuments(e){if(e.length===0)return 0;await this.ensureCollectionExists();let r=hi.getInstance(),n=0;for(let i=0;iObject.fromEntries(Object.entries(a.metadata).filter(([c,u])=>u!=null&&u!=="")));try{await r.callTool("chroma_add_documents",{collection_name:this.collectionName,ids:s.map(a=>a.id),documents:s.map(a=>a.document),metadatas:o}),n+=s.length}catch(a){if((a instanceof Error?a.message:String(a)).includes("already exist"))try{await r.callTool("chroma_delete_documents",{collection_name:this.collectionName,ids:s.map(u=>u.id)}),await r.callTool("chroma_add_documents",{collection_name:this.collectionName,ids:s.map(u=>u.id),documents:s.map(u=>u.document),metadatas:o}),n+=s.length,h.info("CHROMA_SYNC","Batch reconciled via delete+add after duplicate conflict",{collection:this.collectionName,batchStart:i,batchSize:s.length})}catch(u){h.error("CHROMA_SYNC","Batch reconcile (delete+add) failed \u2014 watermark will not advance for this batch",{collection:this.collectionName,batchStart:i,batchSize:s.length},u)}else h.error("CHROMA_SYNC","Batch add failed \u2014 watermark will not advance for this batch, continuing with remaining batches",{collection:this.collectionName,batchStart:i,batchSize:s.length},a)}}return h.debug("CHROMA_SYNC","Documents added",{collection:this.collectionName,requested:e.length,written:n}),n}async syncObservation(e,r,n,i,s,o,a=0){let c={id:e,memory_session_id:r,project:n,merged_into_project:null,text:null,type:i.type,title:i.title,subtitle:i.subtitle,facts:JSON.stringify(i.facts),narrative:i.narrative,concepts:JSON.stringify(i.concepts),files_read:JSON.stringify(i.files_read),files_modified:JSON.stringify(i.files_modified),prompt_number:s,discovery_tokens:a,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o},u=this.formatObservationDocs(c);h.info("CHROMA_SYNC","Syncing observation",{observationId:e,documentCount:u.length,project:n});let l=await this.addDocuments(u);l===u.length?vi.bump(n,"observations",e):h.warn("CHROMA_SYNC","Observation watermark bump skipped \u2014 partial write",{observationId:e,project:n,requested:u.length,written:l})}async syncSummary(e,r,n,i,s,o,a=0){let c={id:e,memory_session_id:r,project:n,merged_into_project:null,request:i.request,investigated:i.investigated,learned:i.learned,completed:i.completed,next_steps:i.next_steps,notes:i.notes,prompt_number:s,discovery_tokens:a,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o},u=this.formatSummaryDocs(c);h.info("CHROMA_SYNC","Syncing summary",{summaryId:e,documentCount:u.length,project:n});let l=await this.addDocuments(u);l===u.length?vi.bump(n,"summaries",e):h.warn("CHROMA_SYNC","Summary watermark bump skipped \u2014 partial write",{summaryId:e,project:n,requested:u.length,written:l})}formatUserPromptDoc(e){return{id:`prompt_${e.id}`,document:e.prompt_text,metadata:{sqlite_id:e.id,doc_type:"user_prompt",memory_session_id:e.memory_session_id,project:e.project,created_at_epoch:e.created_at_epoch,prompt_number:e.prompt_number}}}async syncUserPrompt(e,r,n,i,s,o){let a={id:e,content_session_id:"",prompt_number:s,prompt_text:i,created_at:new Date(o*1e3).toISOString(),created_at_epoch:o,memory_session_id:r,project:n},c=this.formatUserPromptDoc(a);h.info("CHROMA_SYNC","Syncing user prompt",{promptId:e,project:n});let u=await this.addDocuments([c]);u===1?vi.bump(n,"prompts",e):h.warn("CHROMA_SYNC","Prompt watermark bump skipped \u2014 write failed",{promptId:e,project:n,written:u})}async getExistingChromaIds(e){let r=e??this.project;await this.ensureCollectionExists();let n=hi.getInstance(),i=new Set,s=new Set,o=new Set,a=0,c=1e3;for(h.info("CHROMA_SYNC","Fetching existing Chroma document IDs...",{project:r});;){let l=(await n.callTool("chroma_get_documents",{collection_name:this.collectionName,limit:c,offset:a,where:{project:r},include:["metadatas"]}))?.metadatas||[];if(l.length===0)break;for(let d of l)if(d&&d.sqlite_id){let p=d.sqlite_id;d.doc_type==="observation"?i.add(p):d.doc_type==="session_summary"?s.add(p):d.doc_type==="user_prompt"&&o.add(p)}a+=c,h.debug("CHROMA_SYNC","Fetched batch of existing IDs",{project:r,offset:a,batchSize:l.length})}return h.info("CHROMA_SYNC","Existing IDs fetched",{project:r,observations:i.size,summaries:s.size,prompts:o.size,total:i.size+s.size+o.size}),{observations:i,summaries:s,prompts:o}}async bootstrapWatermarksFromChroma(e){let r=await this.getExistingChromaIds(e),n=i=>{let s=0;for(let o of i)o>s&&(s=o);return s};vi.replace(e,{observations:n(r.observations),summaries:n(r.summaries),prompts:n(r.prompts)}),h.info("CHROMA_SYNC","Bootstrapped watermarks from Chroma",{project:e,watermarks:vi.get(e)})}async ensureBackfilled(e,r){let n=e??this.project;h.info("CHROMA_SYNC","Starting smart backfill",{project:n}),await this.ensureCollectionExists();let i=vi.get(n),s=r??new no;try{await this.runBackfillPipeline(s,n,i)}catch(o){throw h.error("CHROMA_SYNC","Backfill failed",{project:n},o instanceof Error?o:new Error(String(o))),new Error(`Backfill failed: ${o instanceof Error?o.message:String(o)}`)}finally{r||s.close()}}async runBackfillPipeline(e,r,n){let i=await this.backfillObservations(e,r,n.observations),s=await this.backfillSummaries(e,r,n.summaries),o=await this.backfillPrompts(e,r,n.prompts);h.info("CHROMA_SYNC","Smart backfill complete",{project:r,synced:{observationDocs:i.length,summaryDocs:s.length,promptDocs:o.length},watermarks:vi.get(r)})}async backfillObservations(e,r,n){let i=e.db.prepare(` SELECT * FROM observations WHERE project = ? AND id > ? ORDER BY id ASC diff --git a/src/services/sync/ChromaMcpManager.ts b/src/services/sync/ChromaMcpManager.ts index dfe6ddaf..e65446e4 100644 --- a/src/services/sync/ChromaMcpManager.ts +++ b/src/services/sync/ChromaMcpManager.ts @@ -588,15 +588,6 @@ export class ChromaMcpManager { } } - // Cap embedding-thread fanout. ONNX Runtime / OpenBLAS / MKL all default to - // cpu_count(), so a 12-core box runs 12 threads burning embeddings in - // parallel — the dominant cause of the chroma-mcp CPU storm on Windows - // (#2220). Two threads keeps backfill latency reasonable without saturating - // the box. Only set if the user hasn't pinned them explicitly. - const threadCap = '2'; - for (const key of ['OMP_NUM_THREADS', 'ONNX_NUM_THREADS', 'OPENBLAS_NUM_THREADS', 'MKL_NUM_THREADS']) { - if (!baseEnv[key]) baseEnv[key] = threadCap; - } // Disable Chroma's anonymous telemetry — it issues background HTTP from // the embedding subprocess on every collection touch. if (!baseEnv.ANONYMIZED_TELEMETRY) baseEnv.ANONYMIZED_TELEMETRY = 'false';