From 61df20d5f9670dfbdca298821f4ec747334c7413 Mon Sep 17 00:00:00 2001 From: Paul Bouzian Date: Sun, 12 Apr 2026 20:25:58 +0200 Subject: [PATCH 1/2] fix: repair legacy projection thread migrations Repair upgrades where older desktop builds recorded different migration IDs for 17-22, causing the current thread metadata migrations to be skipped and migration 23 to crash. The patched migration now rebuilds any missing thread/message columns before backfilling the durable associated worktree ref, and adds a regression test that reproduces the incompatible migration ledger. Co-authored-by: Codex --- ...ectionThreadsAssociatedWorktreeRef.test.ts | 202 ++++++++++++++++++ ..._ProjectionThreadsAssociatedWorktreeRef.ts | 116 +++++++++- 2 files changed, 307 insertions(+), 11 deletions(-) create mode 100644 apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.test.ts diff --git a/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.test.ts b/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.test.ts new file mode 100644 index 00000000..a84f477d --- /dev/null +++ b/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.test.ts @@ -0,0 +1,202 @@ +import { assert, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +const legacyMigrationLedgerRows = [ + [17, "ProjectionThreadsArchivedAt"], + [18, "ProjectionThreadsArchivedAtIndex"], + [19, "ProjectionSnapshotLookupIndexes"], + [20, "AuthAccessManagement"], + [21, "AuthSessionClientMetadata"], + [22, "AuthSessionLastConnectedAt"], +] as const; + +layer("023_ProjectionThreadsAssociatedWorktreeRef", (it) => { + it.effect( + "repairs legacy migration ledgers that skipped the current thread metadata columns", + () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 16 }); + + yield* sql` + INSERT INTO projection_projects ( + project_id, + title, + workspace_root, + scripts_json, + default_model_selection_json, + created_at, + updated_at, + deleted_at + ) + VALUES ( + 'project-legacy', + 'Legacy project', + '/tmp/project-legacy', + '[]', + '{"provider":"codex","model":"gpt-5.4"}', + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_threads ( + thread_id, + project_id, + title, + model_selection_json, + runtime_mode, + interaction_mode, + branch, + worktree_path, + latest_turn_id, + created_at, + updated_at, + deleted_at + ) + VALUES + ( + 'thread-worktree', + 'project-legacy', + 'Worktree thread', + '{"provider":"codex","model":"gpt-5.4"}', + 'full-access', + 'default', + 'feature/fix-upgrade', + '/tmp/project-legacy/.worktrees/thread-worktree', + NULL, + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ), + ( + 'thread-local', + 'project-legacy', + 'Local thread', + '{"provider":"codex","model":"gpt-5.4"}', + 'full-access', + 'default', + 'main', + NULL, + NULL, + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z', + NULL + ) + `; + + yield* sql` + INSERT INTO projection_thread_messages ( + message_id, + thread_id, + turn_id, + role, + text, + attachments_json, + is_streaming, + created_at, + updated_at + ) + VALUES ( + 'message-legacy', + 'thread-worktree', + 'turn-legacy', + 'assistant', + 'hello from the past', + NULL, + 0, + '2026-01-01T00:00:00.000Z', + '2026-01-01T00:00:00.000Z' + ) + `; + + for (const [migrationId, name] of legacyMigrationLedgerRows) { + yield* sql` + INSERT INTO effect_sql_migrations (migration_id, created_at, name) + VALUES (${migrationId}, '2026-01-01T00:00:00.000Z', ${name}) + `; + } + + const executedMigrations = yield* runMigrations({ toMigrationInclusive: 23 }); + + assert.deepStrictEqual(executedMigrations, [ + [23, "ProjectionThreadsAssociatedWorktreeRef"], + ]); + + const threadRows = yield* sql<{ + readonly threadId: string; + readonly envMode: string; + readonly associatedWorktreePath: string | null; + readonly associatedWorktreeBranch: string | null; + readonly associatedWorktreeRef: string | null; + readonly forkSourceThreadId: string | null; + readonly handoff: string | null; + }>` + SELECT + thread_id AS "threadId", + env_mode AS "envMode", + associated_worktree_path AS "associatedWorktreePath", + associated_worktree_branch AS "associatedWorktreeBranch", + associated_worktree_ref AS "associatedWorktreeRef", + fork_source_thread_id AS "forkSourceThreadId", + handoff_json AS "handoff" + FROM projection_threads + ORDER BY thread_id ASC + `; + + assert.deepStrictEqual(threadRows, [ + { + threadId: "thread-local", + envMode: "local", + associatedWorktreePath: null, + associatedWorktreeBranch: "main", + associatedWorktreeRef: "main", + forkSourceThreadId: null, + handoff: null, + }, + { + threadId: "thread-worktree", + envMode: "worktree", + associatedWorktreePath: "/tmp/project-legacy/.worktrees/thread-worktree", + associatedWorktreeBranch: "feature/fix-upgrade", + associatedWorktreeRef: "feature/fix-upgrade", + forkSourceThreadId: null, + handoff: null, + }, + ]); + + const messageRows = yield* sql<{ + readonly messageId: string; + readonly skills: string | null; + readonly mentions: string | null; + readonly source: string; + }>` + SELECT + message_id AS "messageId", + skills_json AS "skills", + mentions_json AS "mentions", + source + FROM projection_thread_messages + ORDER BY message_id ASC + `; + + assert.deepStrictEqual(messageRows, [ + { + messageId: "message-legacy", + skills: null, + mentions: null, + source: "native", + }, + ]); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.ts b/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.ts index c34152b5..bd9c7fc6 100644 --- a/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.ts +++ b/apps/server/src/persistence/Migrations/023_ProjectionThreadsAssociatedWorktreeRef.ts @@ -8,15 +8,109 @@ import * as Effect from "effect/Effect"; export default Effect.gen(function* () { const sql = yield* SqlClient.SqlClient; - yield* sql` - ALTER TABLE projection_threads - ADD COLUMN associated_worktree_ref TEXT - `.pipe(Effect.catchTag("SqlError", () => Effect.void)); - - yield* sql` - UPDATE projection_threads - SET associated_worktree_ref = associated_worktree_branch - WHERE associated_worktree_ref IS NULL - AND associated_worktree_branch IS NOT NULL - `; + // Some legacy desktop databases already recorded different 17-22 migration IDs, + // so this repair step must rebuild the current thread metadata shape before 23 can finish. + const projectionThreadsColumnExists = (columnName: string) => + sql<{ readonly exists: number }>` + SELECT EXISTS( + SELECT 1 + FROM pragma_table_info('projection_threads') + WHERE name = ${columnName} + ) AS "exists" + `.pipe(Effect.map(([row]) => row?.exists === 1)); + + const projectionThreadMessagesColumnExists = (columnName: string) => + sql<{ readonly exists: number }>` + SELECT EXISTS( + SELECT 1 + FROM pragma_table_info('projection_thread_messages') + WHERE name = ${columnName} + ) AS "exists" + `.pipe(Effect.map(([row]) => row?.exists === 1)); + + const ensureProjectionThreadsColumn = (columnName: string, definition: string) => + Effect.gen(function* () { + const exists = yield* projectionThreadsColumnExists(columnName); + if (exists) { + return false; + } + + yield* sql.unsafe(` + ALTER TABLE projection_threads + ADD COLUMN ${definition} + `); + return true; + }); + + const ensureProjectionThreadMessagesColumn = (columnName: string, definition: string) => + Effect.gen(function* () { + const exists = yield* projectionThreadMessagesColumnExists(columnName); + if (exists) { + return false; + } + + yield* sql.unsafe(` + ALTER TABLE projection_thread_messages + ADD COLUMN ${definition} + `); + return true; + }); + + yield* ensureProjectionThreadsColumn("handoff_json", "handoff_json TEXT"); + yield* ensureProjectionThreadMessagesColumn("source", "source TEXT NOT NULL DEFAULT 'native'"); + yield* ensureProjectionThreadMessagesColumn("skills_json", "skills_json TEXT"); + yield* ensureProjectionThreadMessagesColumn("mentions_json", "mentions_json TEXT"); + + const addedEnvMode = yield* ensureProjectionThreadsColumn( + "env_mode", + "env_mode TEXT NOT NULL DEFAULT 'local'", + ); + if (addedEnvMode) { + yield* sql` + UPDATE projection_threads + SET env_mode = CASE + WHEN worktree_path IS NOT NULL THEN 'worktree' + ELSE 'local' + END + `; + } + + yield* ensureProjectionThreadsColumn("fork_source_thread_id", "fork_source_thread_id TEXT"); + + const addedAssociatedWorktreePath = yield* ensureProjectionThreadsColumn( + "associated_worktree_path", + "associated_worktree_path TEXT", + ); + if (addedAssociatedWorktreePath) { + yield* sql` + UPDATE projection_threads + SET associated_worktree_path = worktree_path + WHERE associated_worktree_path IS NULL + `; + } + + const addedAssociatedWorktreeBranch = yield* ensureProjectionThreadsColumn( + "associated_worktree_branch", + "associated_worktree_branch TEXT", + ); + if (addedAssociatedWorktreeBranch) { + yield* sql` + UPDATE projection_threads + SET associated_worktree_branch = branch + WHERE associated_worktree_branch IS NULL + `; + } + + const addedAssociatedWorktreeRef = yield* ensureProjectionThreadsColumn( + "associated_worktree_ref", + "associated_worktree_ref TEXT", + ); + if (addedAssociatedWorktreeRef) { + yield* sql` + UPDATE projection_threads + SET associated_worktree_ref = COALESCE(associated_worktree_branch, branch) + WHERE associated_worktree_ref IS NULL + AND COALESCE(associated_worktree_branch, branch) IS NOT NULL + `; + } }); From bda7c5edaa15fe24eb400e0d9e3b4d95f8032ec1 Mon Sep 17 00:00:00 2001 From: Paul Bouzian Date: Sun, 12 Apr 2026 20:44:03 +0200 Subject: [PATCH 2/2] fix: normalize legacy archived thread events Repair desktop upgrades where older databases contain thread.archived or thread.unarchived orchestration events that the current release cannot decode during startup replay. Add a migration that rewrites those legacy rows into thread.meta-updated events, and cover the normalization with a regression test. Co-authored-by: Codex --- apps/server/src/persistence/Migrations.ts | 2 + ...ormalizeLegacyArchivedThreadEvents.test.ts | 191 ++++++++++++++++++ ...024_NormalizeLegacyArchivedThreadEvents.ts | 29 +++ 3 files changed, 222 insertions(+) create mode 100644 apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.test.ts create mode 100644 apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.ts diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index aa607081..832e0810 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -36,6 +36,7 @@ import Migration0020 from "./Migrations/020_ProjectionThreadsForkSource.ts"; import Migration0021 from "./Migrations/021_ProjectionThreadsAssociatedWorktree.ts"; import Migration0022 from "./Migrations/022_ProjectionThreadsAssociatedWorktreeBranch.ts"; import Migration0023 from "./Migrations/023_ProjectionThreadsAssociatedWorktreeRef.ts"; +import Migration0024 from "./Migrations/024_NormalizeLegacyArchivedThreadEvents.ts"; /** * Migration loader with all migrations defined inline. @@ -71,6 +72,7 @@ export const migrationEntries = [ [21, "ProjectionThreadsAssociatedWorktree", Migration0021], [22, "ProjectionThreadsAssociatedWorktreeBranch", Migration0022], [23, "ProjectionThreadsAssociatedWorktreeRef", Migration0023], + [24, "NormalizeLegacyArchivedThreadEvents", Migration0024], ] as const; export const makeMigrationLoader = (throughId?: number) => diff --git a/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.test.ts b/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.test.ts new file mode 100644 index 00000000..b5723fa8 --- /dev/null +++ b/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.test.ts @@ -0,0 +1,191 @@ +import { CommandId, EventId, OrchestrationEvent, ProjectId, ThreadId } from "@t3tools/contracts"; +import { assert, it } from "@effect/vitest"; +import { Effect, Layer, Schema } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +import { runMigrations } from "../Migrations.ts"; +import * as NodeSqliteClient from "../NodeSqliteClient.ts"; + +const layer = it.layer(Layer.mergeAll(NodeSqliteClient.layerMemory())); + +const decodeEvent = Schema.decodeUnknownSync(OrchestrationEvent); + +layer("024_NormalizeLegacyArchivedThreadEvents", (it) => { + it.effect("rewrites legacy archived thread events into decodable meta updates", () => + Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* runMigrations({ toMigrationInclusive: 23 }); + + yield* sql` + INSERT INTO orchestration_events ( + event_id, + aggregate_kind, + stream_id, + stream_version, + event_type, + occurred_at, + command_id, + causation_event_id, + correlation_id, + actor_kind, + payload_json, + metadata_json + ) + VALUES + ( + ${EventId.makeUnsafe("evt-thread-archived")}, + ${"thread"}, + ${ThreadId.makeUnsafe("thread-archived")}, + ${0}, + ${"thread.archived"}, + ${"2026-01-01T00:00:00.000Z"}, + ${CommandId.makeUnsafe("cmd-thread-archived")}, + ${null}, + ${null}, + ${"client"}, + ${JSON.stringify({ + threadId: "thread-archived", + archivedAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + })}, + ${"{}"} + ), + ( + ${EventId.makeUnsafe("evt-thread-unarchived")}, + ${"thread"}, + ${ThreadId.makeUnsafe("thread-unarchived")}, + ${0}, + ${"thread.unarchived"}, + ${"2026-01-02T00:00:00.000Z"}, + ${CommandId.makeUnsafe("cmd-thread-unarchived")}, + ${null}, + ${null}, + ${"client"}, + ${JSON.stringify({ + threadId: "thread-unarchived", + unarchivedAt: "2026-01-02T00:00:00.000Z", + })}, + ${"{}"} + ) + `; + + yield* runMigrations({ toMigrationInclusive: 24 }); + + const normalizedRows = yield* sql<{ + readonly sequence: number; + readonly eventId: string; + readonly type: string; + readonly aggregateKind: "project" | "thread"; + readonly aggregateId: string; + readonly occurredAt: string; + readonly commandId: string | null; + readonly causationEventId: string | null; + readonly correlationId: string | null; + readonly payload: string; + readonly metadata: string; + }>` + SELECT + sequence, + event_id AS "eventId", + event_type AS "type", + aggregate_kind AS "aggregateKind", + stream_id AS "aggregateId", + occurred_at AS "occurredAt", + command_id AS "commandId", + causation_event_id AS "causationEventId", + correlation_id AS "correlationId", + payload_json AS "payload", + metadata_json AS "metadata" + FROM orchestration_events + WHERE event_id IN ( + ${EventId.makeUnsafe("evt-thread-archived")}, + ${EventId.makeUnsafe("evt-thread-unarchived")} + ) + ORDER BY event_id ASC + `; + + assert.deepStrictEqual( + normalizedRows.map((row) => ({ + eventId: row.eventId, + type: row.type, + payload: JSON.parse(row.payload), + })), + [ + { + eventId: "evt-thread-archived", + type: "thread.meta-updated", + payload: { + threadId: "thread-archived", + updatedAt: "2026-01-01T00:00:00.000Z", + }, + }, + { + eventId: "evt-thread-unarchived", + type: "thread.meta-updated", + payload: { + threadId: "thread-unarchived", + updatedAt: "2026-01-02T00:00:00.000Z", + }, + }, + ], + ); + + const decodedEvents = normalizedRows.map((row) => + decodeEvent({ + sequence: row.sequence, + eventId: EventId.makeUnsafe(row.eventId), + type: row.type, + aggregateKind: row.aggregateKind, + aggregateId: + row.aggregateKind === "project" + ? ProjectId.makeUnsafe(row.aggregateId) + : ThreadId.makeUnsafe(row.aggregateId), + occurredAt: row.occurredAt, + commandId: row.commandId === null ? null : CommandId.makeUnsafe(row.commandId), + causationEventId: + row.causationEventId === null ? null : EventId.makeUnsafe(row.causationEventId), + correlationId: + row.correlationId === null ? null : CommandId.makeUnsafe(row.correlationId), + payload: JSON.parse(row.payload), + metadata: JSON.parse(row.metadata), + }), + ); + + assert.deepStrictEqual( + decodedEvents.map((event) => { + assert.equal(event.type, "thread.meta-updated"); + if (event.type !== "thread.meta-updated") { + throw new Error(`Expected thread.meta-updated, got ${event.type}`); + } + return { + eventId: String(event.eventId), + type: event.type, + payload: { + threadId: String(event.payload.threadId), + updatedAt: event.payload.updatedAt, + }, + }; + }), + [ + { + eventId: "evt-thread-archived", + type: "thread.meta-updated", + payload: { + threadId: "thread-archived", + updatedAt: "2026-01-01T00:00:00.000Z", + }, + }, + { + eventId: "evt-thread-unarchived", + type: "thread.meta-updated", + payload: { + threadId: "thread-unarchived", + updatedAt: "2026-01-02T00:00:00.000Z", + }, + }, + ], + ); + }), + ); +}); diff --git a/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.ts b/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.ts new file mode 100644 index 00000000..d466990c --- /dev/null +++ b/apps/server/src/persistence/Migrations/024_NormalizeLegacyArchivedThreadEvents.ts @@ -0,0 +1,29 @@ +/** + * Older desktop builds persisted thread.archived/thread.unarchived domain + * events, but the current released schema only supports thread.meta-updated. + * Normalize those legacy rows so replay stays decodable across upgrades. + */ +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as Effect from "effect/Effect"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + UPDATE orchestration_events + SET + event_type = 'thread.meta-updated', + payload_json = json_object( + 'threadId', + json_extract(payload_json, '$.threadId'), + 'updatedAt', + COALESCE( + json_extract(payload_json, '$.updatedAt'), + json_extract(payload_json, '$.unarchivedAt'), + json_extract(payload_json, '$.archivedAt'), + occurred_at + ) + ) + WHERE event_type IN ('thread.archived', 'thread.unarchived') + `; +});