fix: add missing migration 28 mirror in SessionStore (#2139) (#2140)

* fix: mirror migration 28 in SessionStore so pending_messages.tool_use_id and worker_pid columns are created (#2139)

SessionStore's inline migration list jumped from v27 to v29, skipping
rebuildPendingMessagesForSelfHealingClaim. The worker uses SessionStore
directly via worker/DatabaseManager.ts and bypasses the canonical
MigrationRunner, so fresh installs ended up at "max v29" with neither
column present — every queue claim and observation insert failed.

Adds addPendingMessagesToolUseIdAndWorkerPidColumns following the existing
mirror precedent (addObservationSubagentColumns / addObservationsUniqueContentHashIndex).
Uses ALTER TABLE + column-existence guards so already-broken DBs at v29
self-heal on next worker boot.

Verified on fresh DB and on a synthetic v29-without-v28 broken DB:
both columns and indexes (idx_pending_messages_worker_pid,
ux_pending_session_tool) appear after one boot.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* fix: wrap v28 mirror dedup+index creation in transaction

Addresses Greptile P2 review on PR #2140: matches the existing pattern in
addObservationsUniqueContentHashIndex (v29 mirror at SessionStore.ts:1127)
and runner.ts rebuildPendingMessagesForSelfHealingClaim. A crash between
the dedup DELETE and the schema_versions INSERT no longer leaves the DB
in a half-applied state.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Alex Newman
2026-04-25 18:45:51 -07:00
committed by GitHub
parent 3e01b62f72
commit e8082bb992
3 changed files with 105 additions and 4 deletions
+14 -2
View File
@@ -6,7 +6,7 @@ ${o.stack}`:` ${o.message}`:this.getLevel()===0&&typeof o=="object"?u=`
`,"utf8")}catch(g){process.stderr.write(`[LOGGER] Failed to write to log file: ${g instanceof Error?g.message:String(g)}
`)}else process.stderr.write(p+`
`)}debug(e,t,s,n){this.log(0,e,t,s,n)}info(e,t,s,n){this.log(1,e,t,s,n)}warn(e,t,s,n){this.log(2,e,t,s,n)}error(e,t,s,n){this.log(3,e,t,s,n)}dataIn(e,t,s,n){this.info(e,`\u2192 ${t}`,s,n)}dataOut(e,t,s,n){this.info(e,`\u2190 ${t}`,s,n)}success(e,t,s,n){this.info(e,`\u2713 ${t}`,s,n)}failure(e,t,s,n){this.error(e,`\u2717 ${t}`,s,n)}timing(e,t,s,n){this.info(e,`\u23F1 ${t}`,n,{duration:`${s}ms`})}happyPathError(e,t,s,n,o=""){let m=((new Error().stack||"").split(`
`)[2]||"").match(/at\s+(?:.*\s+)?\(?([^:]+):(\d+):(\d+)\)?/),u=m?`${m[1].split("/").pop()}:${m[2]}`:"unknown",E={...s,location:u};return this.warn(e,`[HAPPY-PATH] ${t}`,E,n),o}},c=new Z;var Ht={};function wt(){return typeof __dirname<"u"?__dirname:(0,S.dirname)((0,Ae.fileURLToPath)(Ht.url))}var Ft=wt();function $t(){if(process.env.CLAUDE_MEM_DATA_DIR)return process.env.CLAUDE_MEM_DATA_DIR;let r=(0,S.join)((0,ee.homedir)(),".claude-mem"),e=(0,S.join)(r,"settings.json");try{if((0,H.existsSync)(e)){let{readFileSync:t}=require("fs"),s=JSON.parse(t(e,"utf-8")),n=s.env??s;if(n.CLAUDE_MEM_DATA_DIR)return n.CLAUDE_MEM_DATA_DIR}}catch{}return r}var R=$t(),y=process.env.CLAUDE_CONFIG_DIR||(0,S.join)((0,ee.homedir)(),".claude"),us=(0,S.join)(y,"plugins","marketplaces","thedotmack"),ms=(0,S.join)(R,"archives"),cs=(0,S.join)(R,"logs"),ls=(0,S.join)(R,"trash"),ps=(0,S.join)(R,"backups"),Es=(0,S.join)(R,"modes"),gs=(0,S.join)(R,"settings.json"),Re=(0,S.join)(R,"claude-mem.db"),Ts=(0,S.join)(R,"vector-db"),Pt=(0,S.join)(R,"observer-sessions"),te=(0,S.basename)(Pt),fs=(0,S.join)(y,"settings.json"),Ss=(0,S.join)(y,"commands"),bs=(0,S.join)(y,"CLAUDE.md");function Ne(r){(0,H.mkdirSync)(r,{recursive:!0})}function Ce(){return(0,S.join)(Ft,"..")}var De=require("crypto");var Le=require("os"),ye=L(require("path"),1);var j=require("fs"),G=L(require("path"),1),U={isWorktree:!1,worktreeName:null,parentRepoPath:null,parentProjectName:null};function Ie(r){let e=G.default.join(r,".git"),t;try{t=(0,j.statSync)(e)}catch(u){return u instanceof Error&&u.code!=="ENOENT"&&console.warn("[worktree] Unexpected error checking .git:",u),U}if(!t.isFile())return U;let s;try{s=(0,j.readFileSync)(e,"utf-8").trim()}catch(u){return console.warn("[worktree] Failed to read .git file:",u instanceof Error?u.message:String(u)),U}let n=s.match(/^gitdir:\s*(.+)$/);if(!n)return U;let i=n[1].match(/^(.+)[/\\]\.git[/\\]worktrees[/\\]([^/\\]+)$/);if(!i)return U;let a=i[1],d=G.default.basename(r),m=G.default.basename(a);return{isWorktree:!0,worktreeName:d,parentRepoPath:a,parentProjectName:m}}function Me(r){return r==="~"||r.startsWith("~/")?r.replace(/^~/,(0,Le.homedir)()):r}function Gt(r){if(!r||r.trim()==="")return c.warn("PROJECT_NAME","Empty cwd provided, using fallback",{cwd:r}),"unknown-project";let e=Me(r),t=ye.default.basename(e);if(t===""){if(process.platform==="win32"){let n=r.match(/^([A-Z]):\\/i);if(n){let i=`drive-${n[1].toUpperCase()}`;return c.info("PROJECT_NAME","Drive root detected",{cwd:r,projectName:i}),i}}return c.warn("PROJECT_NAME","Root directory detected, using fallback",{cwd:r}),"unknown-project"}return t}function se(r){let e=Gt(r);if(!r)return{primary:e,parent:null,isWorktree:!1,allProjects:[e]};let t=Me(r),s=Ie(t);if(s.isWorktree&&s.parentProjectName){let n=`${s.parentProjectName}/${e}`;return{primary:n,parent:s.parentProjectName,isWorktree:!0,allProjects:[s.parentProjectName,n]}}return{primary:e,parent:null,isWorktree:!1,allProjects:[e]}}function X(r,e,t){return(0,De.createHash)("sha256").update([r||"",e||"",t||""].join("\0")).digest("hex").slice(0,16)}function re(r){if(!r)return[];try{let e=JSON.parse(r);return Array.isArray(e)?e:[String(e)]}catch{return[r]}}var h="claude";function jt(r){return r.trim().toLowerCase().replace(/\s+/g,"-")}function M(r){if(!r)return h;let e=jt(r);return e?e==="transcript"||e.includes("codex")?"codex":e.includes("cursor")?"cursor":e.includes("claude")?"claude":e:h}function ve(r){let e=["claude","codex","cursor"];return[...r].sort((t,s)=>{let n=e.indexOf(t),o=e.indexOf(s);return n!==-1||o!==-1?n===-1?1:o===-1?-1:n-o:t.localeCompare(s)})}function Xt(r,e){return{customTitle:r,platformSource:e?M(e):void 0}}var B=class{db;constructor(e=Re){e instanceof ne.Database?this.db=e:(e!==":memory:"&&Ne(R),this.db=new ne.Database(e),this.db.run("PRAGMA journal_mode = WAL"),this.db.run("PRAGMA synchronous = NORMAL"),this.db.run("PRAGMA foreign_keys = ON"),this.db.run("PRAGMA journal_size_limit = 4194304")),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable(),this.ensureDiscoveryTokensColumn(),this.createPendingMessagesTable(),this.renameSessionIdColumns(),this.repairSessionIdColumnRename(),this.addFailedAtEpochColumn(),this.addOnUpdateCascadeToForeignKeys(),this.addObservationContentHashColumn(),this.addSessionCustomTitleColumn(),this.addSessionPlatformSourceColumn(),this.addObservationModelColumns(),this.ensureMergedIntoProjectColumns(),this.addObservationSubagentColumns(),this.addObservationsUniqueContentHashIndex()}initializeSchema(){this.db.run(`
`)[2]||"").match(/at\s+(?:.*\s+)?\(?([^:]+):(\d+):(\d+)\)?/),u=m?`${m[1].split("/").pop()}:${m[2]}`:"unknown",E={...s,location:u};return this.warn(e,`[HAPPY-PATH] ${t}`,E,n),o}},c=new Z;var Ht={};function wt(){return typeof __dirname<"u"?__dirname:(0,S.dirname)((0,Ae.fileURLToPath)(Ht.url))}var Ft=wt();function $t(){if(process.env.CLAUDE_MEM_DATA_DIR)return process.env.CLAUDE_MEM_DATA_DIR;let r=(0,S.join)((0,ee.homedir)(),".claude-mem"),e=(0,S.join)(r,"settings.json");try{if((0,H.existsSync)(e)){let{readFileSync:t}=require("fs"),s=JSON.parse(t(e,"utf-8")),n=s.env??s;if(n.CLAUDE_MEM_DATA_DIR)return n.CLAUDE_MEM_DATA_DIR}}catch{}return r}var R=$t(),y=process.env.CLAUDE_CONFIG_DIR||(0,S.join)((0,ee.homedir)(),".claude"),us=(0,S.join)(y,"plugins","marketplaces","thedotmack"),ms=(0,S.join)(R,"archives"),cs=(0,S.join)(R,"logs"),ls=(0,S.join)(R,"trash"),ps=(0,S.join)(R,"backups"),Es=(0,S.join)(R,"modes"),gs=(0,S.join)(R,"settings.json"),Re=(0,S.join)(R,"claude-mem.db"),Ts=(0,S.join)(R,"vector-db"),Pt=(0,S.join)(R,"observer-sessions"),te=(0,S.basename)(Pt),fs=(0,S.join)(y,"settings.json"),Ss=(0,S.join)(y,"commands"),bs=(0,S.join)(y,"CLAUDE.md");function Ne(r){(0,H.mkdirSync)(r,{recursive:!0})}function Ce(){return(0,S.join)(Ft,"..")}var De=require("crypto");var Le=require("os"),ye=L(require("path"),1);var j=require("fs"),G=L(require("path"),1),U={isWorktree:!1,worktreeName:null,parentRepoPath:null,parentProjectName:null};function Ie(r){let e=G.default.join(r,".git"),t;try{t=(0,j.statSync)(e)}catch(u){return u instanceof Error&&u.code!=="ENOENT"&&console.warn("[worktree] Unexpected error checking .git:",u),U}if(!t.isFile())return U;let s;try{s=(0,j.readFileSync)(e,"utf-8").trim()}catch(u){return console.warn("[worktree] Failed to read .git file:",u instanceof Error?u.message:String(u)),U}let n=s.match(/^gitdir:\s*(.+)$/);if(!n)return U;let i=n[1].match(/^(.+)[/\\]\.git[/\\]worktrees[/\\]([^/\\]+)$/);if(!i)return U;let a=i[1],d=G.default.basename(r),m=G.default.basename(a);return{isWorktree:!0,worktreeName:d,parentRepoPath:a,parentProjectName:m}}function Me(r){return r==="~"||r.startsWith("~/")?r.replace(/^~/,(0,Le.homedir)()):r}function Gt(r){if(!r||r.trim()==="")return c.warn("PROJECT_NAME","Empty cwd provided, using fallback",{cwd:r}),"unknown-project";let e=Me(r),t=ye.default.basename(e);if(t===""){if(process.platform==="win32"){let n=r.match(/^([A-Z]):\\/i);if(n){let i=`drive-${n[1].toUpperCase()}`;return c.info("PROJECT_NAME","Drive root detected",{cwd:r,projectName:i}),i}}return c.warn("PROJECT_NAME","Root directory detected, using fallback",{cwd:r}),"unknown-project"}return t}function se(r){let e=Gt(r);if(!r)return{primary:e,parent:null,isWorktree:!1,allProjects:[e]};let t=Me(r),s=Ie(t);if(s.isWorktree&&s.parentProjectName){let n=`${s.parentProjectName}/${e}`;return{primary:n,parent:s.parentProjectName,isWorktree:!0,allProjects:[s.parentProjectName,n]}}return{primary:e,parent:null,isWorktree:!1,allProjects:[e]}}function X(r,e,t){return(0,De.createHash)("sha256").update([r||"",e||"",t||""].join("\0")).digest("hex").slice(0,16)}function re(r){if(!r)return[];try{let e=JSON.parse(r);return Array.isArray(e)?e:[String(e)]}catch{return[r]}}var h="claude";function jt(r){return r.trim().toLowerCase().replace(/\s+/g,"-")}function M(r){if(!r)return h;let e=jt(r);return e?e==="transcript"||e.includes("codex")?"codex":e.includes("cursor")?"cursor":e.includes("claude")?"claude":e:h}function ve(r){let e=["claude","codex","cursor"];return[...r].sort((t,s)=>{let n=e.indexOf(t),o=e.indexOf(s);return n!==-1||o!==-1?n===-1?1:o===-1?-1:n-o:t.localeCompare(s)})}function Xt(r,e){return{customTitle:r,platformSource:e?M(e):void 0}}var B=class{db;constructor(e=Re){e instanceof ne.Database?this.db=e:(e!==":memory:"&&Ne(R),this.db=new ne.Database(e),this.db.run("PRAGMA journal_mode = WAL"),this.db.run("PRAGMA synchronous = NORMAL"),this.db.run("PRAGMA foreign_keys = ON"),this.db.run("PRAGMA journal_size_limit = 4194304")),this.initializeSchema(),this.ensureWorkerPortColumn(),this.ensurePromptTrackingColumns(),this.removeSessionSummariesUniqueConstraint(),this.addObservationHierarchicalFields(),this.makeObservationsTextNullable(),this.createUserPromptsTable(),this.ensureDiscoveryTokensColumn(),this.createPendingMessagesTable(),this.renameSessionIdColumns(),this.repairSessionIdColumnRename(),this.addFailedAtEpochColumn(),this.addOnUpdateCascadeToForeignKeys(),this.addObservationContentHashColumn(),this.addSessionCustomTitleColumn(),this.addSessionPlatformSourceColumn(),this.addObservationModelColumns(),this.ensureMergedIntoProjectColumns(),this.addObservationSubagentColumns(),this.addPendingMessagesToolUseIdAndWorkerPidColumns(),this.addObservationsUniqueContentHashIndex()}initializeSchema(){this.db.run(`
CREATE TABLE IF NOT EXISTS schema_versions (
id INTEGER PRIMARY KEY,
version INTEGER UNIQUE NOT NULL,
@@ -290,7 +290,19 @@ ${o.stack}`:` ${o.message}`:this.getLevel()===0&&typeof o=="object"?u=`
UPDATE sdk_sessions
SET platform_source = '${h}'
WHERE platform_source IS NULL OR platform_source = ''
`),n||this.db.run("CREATE INDEX IF NOT EXISTS idx_sdk_sessions_platform_source ON sdk_sessions(platform_source)"),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(24,new Date().toISOString()))}addObservationModelColumns(){let e=this.db.query("PRAGMA table_info(observations)").all(),t=e.some(n=>n.name==="generated_by_model"),s=e.some(n=>n.name==="relevance_count");t&&s||(t||this.db.run("ALTER TABLE observations ADD COLUMN generated_by_model TEXT"),s||this.db.run("ALTER TABLE observations ADD COLUMN relevance_count INTEGER DEFAULT 0"),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(26,new Date().toISOString()))}ensureMergedIntoProjectColumns(){this.db.query("PRAGMA table_info(observations)").all().some(s=>s.name==="merged_into_project")||this.db.run("ALTER TABLE observations ADD COLUMN merged_into_project TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_merged_into ON observations(merged_into_project)"),this.db.query("PRAGMA table_info(session_summaries)").all().some(s=>s.name==="merged_into_project")||this.db.run("ALTER TABLE session_summaries ADD COLUMN merged_into_project TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_summaries_merged_into ON session_summaries(merged_into_project)")}addObservationSubagentColumns(){let e=this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(27),t=this.db.query("PRAGMA table_info(observations)").all(),s=t.some(i=>i.name==="agent_type"),n=t.some(i=>i.name==="agent_id");s||this.db.run("ALTER TABLE observations ADD COLUMN agent_type TEXT"),n||this.db.run("ALTER TABLE observations ADD COLUMN agent_id TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_agent_type ON observations(agent_type)"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_agent_id ON observations(agent_id)");let o=this.db.query("PRAGMA table_info(pending_messages)").all();if(o.length>0){let i=o.some(d=>d.name==="agent_type"),a=o.some(d=>d.name==="agent_id");i||this.db.run("ALTER TABLE pending_messages ADD COLUMN agent_type TEXT"),a||this.db.run("ALTER TABLE pending_messages ADD COLUMN agent_id TEXT")}e||this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(27,new Date().toISOString())}addObservationsUniqueContentHashIndex(){if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(29))return;let t=this.db.query("PRAGMA table_info(observations)").all(),s=t.some(o=>o.name==="memory_session_id"),n=t.some(o=>o.name==="content_hash");if(!s||!n){this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(29,new Date().toISOString());return}this.db.run("BEGIN TRANSACTION");try{this.db.run(`
`),n||this.db.run("CREATE INDEX IF NOT EXISTS idx_sdk_sessions_platform_source ON sdk_sessions(platform_source)"),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(24,new Date().toISOString()))}addObservationModelColumns(){let e=this.db.query("PRAGMA table_info(observations)").all(),t=e.some(n=>n.name==="generated_by_model"),s=e.some(n=>n.name==="relevance_count");t&&s||(t||this.db.run("ALTER TABLE observations ADD COLUMN generated_by_model TEXT"),s||this.db.run("ALTER TABLE observations ADD COLUMN relevance_count INTEGER DEFAULT 0"),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(26,new Date().toISOString()))}ensureMergedIntoProjectColumns(){this.db.query("PRAGMA table_info(observations)").all().some(s=>s.name==="merged_into_project")||this.db.run("ALTER TABLE observations ADD COLUMN merged_into_project TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_merged_into ON observations(merged_into_project)"),this.db.query("PRAGMA table_info(session_summaries)").all().some(s=>s.name==="merged_into_project")||this.db.run("ALTER TABLE session_summaries ADD COLUMN merged_into_project TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_summaries_merged_into ON session_summaries(merged_into_project)")}addObservationSubagentColumns(){let e=this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(27),t=this.db.query("PRAGMA table_info(observations)").all(),s=t.some(i=>i.name==="agent_type"),n=t.some(i=>i.name==="agent_id");s||this.db.run("ALTER TABLE observations ADD COLUMN agent_type TEXT"),n||this.db.run("ALTER TABLE observations ADD COLUMN agent_id TEXT"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_agent_type ON observations(agent_type)"),this.db.run("CREATE INDEX IF NOT EXISTS idx_observations_agent_id ON observations(agent_id)");let o=this.db.query("PRAGMA table_info(pending_messages)").all();if(o.length>0){let i=o.some(d=>d.name==="agent_type"),a=o.some(d=>d.name==="agent_id");i||this.db.run("ALTER TABLE pending_messages ADD COLUMN agent_type TEXT"),a||this.db.run("ALTER TABLE pending_messages ADD COLUMN agent_id TEXT")}e||this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(27,new Date().toISOString())}addPendingMessagesToolUseIdAndWorkerPidColumns(){if(this.db.query("SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'").all().length===0){this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(28,new Date().toISOString());return}let t=this.db.query("PRAGMA table_info(pending_messages)").all(),s=t.some(o=>o.name==="tool_use_id"),n=t.some(o=>o.name==="worker_pid");s||this.db.run("ALTER TABLE pending_messages ADD COLUMN tool_use_id TEXT"),n||this.db.run("ALTER TABLE pending_messages ADD COLUMN worker_pid INTEGER"),this.db.run("BEGIN TRANSACTION");try{this.db.run("CREATE INDEX IF NOT EXISTS idx_pending_messages_worker_pid ON pending_messages(worker_pid)"),this.db.run(`
DELETE FROM pending_messages
WHERE tool_use_id IS NOT NULL
AND id NOT IN (
SELECT MIN(id) FROM pending_messages
WHERE tool_use_id IS NOT NULL
GROUP BY content_session_id, tool_use_id
)
`),this.db.run(`
CREATE UNIQUE INDEX IF NOT EXISTS ux_pending_session_tool
ON pending_messages(content_session_id, tool_use_id)
WHERE tool_use_id IS NOT NULL
`),this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(28,new Date().toISOString()),this.db.run("COMMIT")}catch(o){throw this.db.run("ROLLBACK"),o}}addObservationsUniqueContentHashIndex(){if(this.db.prepare("SELECT version FROM schema_versions WHERE version = ?").get(29))return;let t=this.db.query("PRAGMA table_info(observations)").all(),s=t.some(o=>o.name==="memory_session_id"),n=t.some(o=>o.name==="content_hash");if(!s||!n){this.db.prepare("INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)").run(29,new Date().toISOString());return}this.db.run("BEGIN TRANSACTION");try{this.db.run(`
DELETE FROM observations
WHERE id NOT IN (
SELECT MIN(id) FROM observations
File diff suppressed because one or more lines are too long
+77
View File
@@ -73,6 +73,7 @@ export class SessionStore {
this.addObservationModelColumns();
this.ensureMergedIntoProjectColumns();
this.addObservationSubagentColumns();
this.addPendingMessagesToolUseIdAndWorkerPidColumns();
this.addObservationsUniqueContentHashIndex();
}
@@ -1038,6 +1039,82 @@ export class SessionStore {
}
}
/**
* Add tool_use_id and worker_pid columns + indexes to pending_messages (migration 28).
*
* Mirrors MigrationRunner.rebuildPendingMessagesForSelfHealingClaim so bundled
* artifacts that embed SessionStore (e.g. worker-service.cjs, context-generator.cjs)
* stay schema-consistent. Without this, every queue-claim cycle fails with
* "no such column: worker_pid" and every observation insert fails with
* "table pending_messages has no column named tool_use_id" (issue #2139).
*
* Uses ALTER TABLE rather than the full table rebuild from MigrationRunner because:
* - It's safe on populated DBs that already reached v29 without ever applying v28.
* - The legacy stale-reset epoch column the rebuild dropped never existed in
* pending_messages tables created by the SessionStore migration path.
*
* Column existence is checked directly — schema_versions cannot be trusted because
* affected DBs may already have v29 recorded with neither column present (#2139).
*/
private addPendingMessagesToolUseIdAndWorkerPidColumns(): void {
// pending_messages may not exist yet on freshly-created DBs at this point in
// the migration order — createPendingMessagesTable (v16) has already run by
// the time we get here, so this guard is defensive only.
const tables = this.db.query(
"SELECT name FROM sqlite_master WHERE type='table' AND name='pending_messages'"
).all() as TableNameRow[];
if (tables.length === 0) {
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(28, new Date().toISOString());
return;
}
const cols = this.db.query('PRAGMA table_info(pending_messages)').all() as TableColumnInfo[];
const hasToolUseId = cols.some(c => c.name === 'tool_use_id');
const hasWorkerPid = cols.some(c => c.name === 'worker_pid');
if (!hasToolUseId) {
this.db.run('ALTER TABLE pending_messages ADD COLUMN tool_use_id TEXT');
}
if (!hasWorkerPid) {
this.db.run('ALTER TABLE pending_messages ADD COLUMN worker_pid INTEGER');
}
// Wrap dedup DELETE + UNIQUE index creation + version-record in a transaction
// so a crash mid-flight cannot leave duplicates removed without v28 recorded.
// Matches addObservationsUniqueContentHashIndex (v29) at line 1127 and
// runner.ts rebuildPendingMessagesForSelfHealingClaim (v28).
this.db.run('BEGIN TRANSACTION');
try {
// Indexes are idempotent — match runner.ts:1117-1120 + 1134-1138.
this.db.run('CREATE INDEX IF NOT EXISTS idx_pending_messages_worker_pid ON pending_messages(worker_pid)');
// The UNIQUE partial index requires no duplicate (content_session_id, tool_use_id)
// pairs. Dedup before creating it (matches runner.ts:1124-1132). Safe to run
// unconditionally — if tool_use_id was just added, every row has it as NULL
// and the WHERE filter excludes them.
this.db.run(`
DELETE FROM pending_messages
WHERE tool_use_id IS NOT NULL
AND id NOT IN (
SELECT MIN(id) FROM pending_messages
WHERE tool_use_id IS NOT NULL
GROUP BY content_session_id, tool_use_id
)
`);
this.db.run(`
CREATE UNIQUE INDEX IF NOT EXISTS ux_pending_session_tool
ON pending_messages(content_session_id, tool_use_id)
WHERE tool_use_id IS NOT NULL
`);
this.db.prepare('INSERT OR IGNORE INTO schema_versions (version, applied_at) VALUES (?, ?)').run(28, new Date().toISOString());
this.db.run('COMMIT');
} catch (error) {
this.db.run('ROLLBACK');
throw error;
}
}
/**
* Add UNIQUE(memory_session_id, content_hash) on observations (migration 29).
* Mirrors MigrationRunner.addObservationsUniqueContentHashIndex so bundled