diff --git a/src/commands/sync.ts b/src/commands/sync.ts index 962ae50..ddaefd1 100644 --- a/src/commands/sync.ts +++ b/src/commands/sync.ts @@ -158,6 +158,7 @@ async function runDeltaSync(dbPath: string): Promise { const deltaSyncService = new DeltaSyncService({ dbPath, localApiClient: client, + logger, }); try { @@ -706,6 +707,7 @@ export function registerSyncCommands(program: Command): void { const deltaSyncService = new DeltaSyncService({ dbPath: resolvedPaths.dbPath, localApiClient: client, + logger, }); const watchOptions = { diff --git a/src/services/delta-sync.ts b/src/services/delta-sync.ts index 62407ad..48bd0b7 100644 --- a/src/services/delta-sync.ts +++ b/src/services/delta-sync.ts @@ -21,6 +21,30 @@ import type { /** Page size for API pagination */ const PAGE_SIZE = 100; +/** + * Fraction of `sync_metadata.total_nodes` that a single delta is allowed to + * exceed before we abort. The failure mode we guard against (Local API + * ignoring `edited.since`) returns ~100% of the workspace, so any threshold + * below 100% catches it — and the lower the threshold, the less work wasted + * before the abort fires. 25% is well below any plausible real delta yet + * still allows for months of accumulated edits; a real delta exceeding this + * is better served by a full re-sync anyway. + */ +const MAX_PAGES_RATIO = 0.25; + +/** + * Fallback cap if `total_nodes` is unavailable (e.g. malformed metadata). + * Conservative — equivalent to a ~400K-node workspace under the ratio. + */ +const FALLBACK_MAX_PAGES = 1000; + +/** + * Log a progress line on page 1 and every Nth page thereafter. Avoids + * one-info-line-per-page noise on normal deltas while still proving liveness + * on long runs. + */ +const PROGRESS_INTERVAL_PAGES = 10; + /** * DeltaSyncService handles incremental sync of Tana nodes * from the local API into the SQLite database. @@ -31,6 +55,8 @@ export class DeltaSyncService { private localApiClient: DeltaSyncOptions["localApiClient"]; private embeddingConfig?: DeltaSyncOptions["embeddingConfig"]; private logger: NonNullable; + /** Explicit override from constructor options; if undefined, auto-scaled per sync from `sync_metadata.total_nodes`. */ + private maxPagesOverride: number | undefined; private syncing = false; constructor(options: DeltaSyncOptions) { @@ -44,6 +70,28 @@ export class DeltaSyncService { warn: () => {}, error: () => {}, }; + this.maxPagesOverride = options.maxPages; + } + + /** + * Compute the per-sync abort cap. + * - If the constructor was given an explicit `maxPages`, that wins. + * - Otherwise scale against `sync_metadata.total_nodes`: cap at + * `MAX_PAGES_RATIO` of the graph. + * - If `total_nodes` is missing or zero, fall back to `FALLBACK_MAX_PAGES`. + * + * Note: for very small workspaces the scaled cap may round down to 1 page; + * that's inherent to page-quantized pagination, not a bug — the broken-API + * signature still trips at page 2. + */ + private resolveMaxPages(): number { + if (this.maxPagesOverride !== undefined) return this.maxPagesOverride; + const row = this.db + .query("SELECT total_nodes FROM sync_metadata WHERE id = 1") + .get() as { total_nodes: number } | undefined; + const totalNodes = row?.total_nodes ?? 0; + if (totalNodes <= 0) return FALLBACK_MAX_PAGES; + return Math.ceil((totalNodes * MAX_PAGES_RATIO) / PAGE_SIZE); } /** @@ -325,6 +373,9 @@ export class DeltaSyncService { // Use 0 as fallback watermark if null (first delta after full sync with no delta timestamp) const sinceMs = watermarkBefore ?? 0; + // Resolve the per-sync abort cap from current workspace size. + const maxPages = this.resolveMaxPages(); + // Step 3: Page through changed nodes let nodesFound = 0; let nodesInserted = 0; @@ -336,8 +387,30 @@ export class DeltaSyncService { for await (const page of this.fetchChangedNodes(sinceMs)) { pages++; + + if (pages > maxPages) { + // Rows merged on pages 1..maxPages are already committed to SQLite; + // the watermark is NOT advanced because we throw before Step 5. + // The next delta-sync will replay from the same `sinceMs` and + // re-merge those rows idempotently. Recovery is `supertag sync index`. + // Check fires BEFORE merging this page, so we don't do work we're + // about to throw away. + throw new Error( + `Delta-sync aborted after ${maxPages} pages (${nodesFound} nodes merged so far; watermark NOT advanced). ` + + `This usually means the Local API is not honoring 'edited.since' ` + + `and is returning the entire workspace. Run 'supertag sync index' ` + + `for a full sync instead, or raise maxPages if this is a legitimately large delta.`, + ); + } + nodesFound += page.length; + if (pages === 1 || pages % PROGRESS_INTERVAL_PAGES === 0) { + this.logger.info( + `delta-sync progress: ${pages} page(s), ${nodesFound} nodes (latest page: ${page.length})`, + ); + } + for (const node of page) { const result = this.mergeNode(node); if (result.inserted) nodesInserted++; diff --git a/src/types/local-api.ts b/src/types/local-api.ts index bcbaa35..192304e 100644 --- a/src/types/local-api.ts +++ b/src/types/local-api.ts @@ -362,6 +362,13 @@ export interface DeltaSyncOptions { warn(message: string, data?: Record): void; error(message: string, data?: Record): void; }; + /** + * Explicit override for the abort cap on pages fetched per delta-sync. + * When omitted, the cap auto-scales from `sync_metadata.total_nodes` + * (25% of the graph). Guards against runaway loops when the Local API + * ignores `edited.since` and returns the entire workspace as "changed". + */ + maxPages?: number; } /** diff --git a/tests/unit/delta-sync-pagination.test.ts b/tests/unit/delta-sync-pagination.test.ts index 0e9c6d8..ad67515 100644 --- a/tests/unit/delta-sync-pagination.test.ts +++ b/tests/unit/delta-sync-pagination.test.ts @@ -50,9 +50,11 @@ function createDbWithFullSync(dbPath: string): void { total_nodes INTEGER NOT NULL DEFAULT 0 ) `); - // Insert a full sync record so delta-sync can proceed + // Insert a full sync record so delta-sync can proceed. total_nodes is set high + // enough that the auto-scaled abort cap (25% * total / PAGE_SIZE) doesn't trip + // unrelated tests that happen to page through dozens of results. db.run( - "INSERT INTO sync_metadata (id, last_export_file, last_sync_timestamp, total_nodes) VALUES (1, 'export.json', ?, 1000)", + "INSERT INTO sync_metadata (id, last_export_file, last_sync_timestamp, total_nodes) VALUES (1, 'export.json', ?, 1000000)", [Date.now() - 60000] ); db.close(); @@ -381,6 +383,184 @@ describe("DeltaSyncService - Pagination + Sync Orchestration (T-2.2)", () => { expect(result.embeddingsGenerated).toBe(0); }); + it("logs a progress line on page 1 (Bug 6)", async () => { + const page1 = Array.from({ length: 100 }, (_, i) => createTestNode(`p1-${i}`, `Page1 Node ${i}`)); + const page2 = Array.from({ length: 50 }, (_, i) => createTestNode(`p2-${i}`, `Page2 Node ${i}`)); + const logs: string[] = []; + + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async (_query, options) => { + const offset = options?.offset ?? 0; + if (offset === 0) return page1; + if (offset === 100) return page2; + return []; + }, + health: async () => true, + }, + logger: { + info: (msg) => logs.push(msg), + warn: () => {}, + error: () => {}, + }, + }); + + await service.sync(); + + const progress = logs.filter((m) => m.startsWith("delta-sync progress:")); + // 2 pages total: page 1 logs, page 2 does not (< interval). + expect(progress).toHaveLength(1); + expect(progress[0]).toContain("1 page(s)"); + expect(progress[0]).toContain("100 nodes"); + }); + + it("emits a progress line every 10 pages (Bug 6)", async () => { + const logs: string[] = []; + let calls = 0; + const TOTAL_PAGES = 25; + + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async () => { + calls++; + if (calls > TOTAL_PAGES) return []; + // Distinct ids per call so DB doesn't collide + return Array.from({ length: 100 }, (_, i) => createTestNode(`hb-${calls}-${i}`, `Heartbeat ${calls}-${i}`)); + }, + health: async () => true, + }, + logger: { + info: (msg) => logs.push(msg), + warn: () => {}, + error: () => {}, + }, + }); + + await service.sync(); + + const progress = logs.filter((m) => m.startsWith("delta-sync progress:")); + // Pages 1, 10, 20 — page 25 doesn't hit the interval. + expect(progress).toHaveLength(3); + expect(progress[0]).toContain("1 page(s)"); + expect(progress[1]).toContain("10 page(s)"); + expect(progress[2]).toContain("20 page(s)"); + }); + + it("auto-scales the abort cap from total_nodes when no override is given (Bug 6)", async () => { + // total_nodes = 24000 → cap = ceil(24000 * 0.25 / 100) = 60 pages. + const reseedDb = new Database(dbPath); + reseedDb.run("UPDATE sync_metadata SET total_nodes = 24000 WHERE id = 1"); + reseedDb.close(); + + let calls = 0; + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async () => { + calls++; + return Array.from({ length: 100 }, (_, i) => + createTestNode(`scale-${calls}-${i}`, `Scale ${calls}-${i}`), + ); + }, + health: async () => true, + }, + // no maxPages — let it auto-scale + }); + + await expect(service.sync()).rejects.toThrow(/aborted after 60 pages/); + }); + + it("auto-scaled cap scales down to small workspaces without a floor (Bug 6)", async () => { + // Small workspace: total_nodes = 4000 → cap = ceil(4000 * 0.25 / 100) = 10 pages. + // Critically the cap stays proportional rather than being raised to a floor; + // the broken-API failure mode (whole-workspace return = 40 pages here) trips. + const reseedDb = new Database(dbPath); + reseedDb.run("UPDATE sync_metadata SET total_nodes = 4000 WHERE id = 1"); + reseedDb.close(); + + let calls = 0; + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async () => { + calls++; + return Array.from({ length: 100 }, (_, i) => + createTestNode(`tiny-${calls}-${i}`, `Tiny ${calls}-${i}`), + ); + }, + health: async () => true, + }, + }); + + await expect(service.sync()).rejects.toThrow(/aborted after 10 pages/); + }); + + it("explicit maxPages overrides the auto-scaled cap (Bug 6)", async () => { + // Even with total_nodes=24000 (auto-scaled cap would be 60), the explicit override wins. + const reseedDb = new Database(dbPath); + reseedDb.run("UPDATE sync_metadata SET total_nodes = 24000 WHERE id = 1"); + reseedDb.close(); + + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async () => { + return Array.from({ length: 100 }, (_, i) => + createTestNode(`ov-${i}`, `Override ${i}`), + ); + }, + health: async () => true, + }, + maxPages: 3, + }); + + await expect(service.sync()).rejects.toThrow(/aborted after 3 pages/); + }); + + it("aborts with a clear error when maxPages is hit and leaves watermark unchanged (Bug 6)", async () => { + // Capture watermark from the test-db seed so we can assert it is NOT advanced on abort. + const seedDb = new Database(dbPath); + const watermarkBefore = (seedDb.query( + "SELECT last_sync_timestamp FROM sync_metadata WHERE id = 1", + ).get() as { last_sync_timestamp: number }).last_sync_timestamp; + seedDb.close(); + + service = new DeltaSyncService({ + dbPath, + localApiClient: { + searchNodes: async (_q, options) => { + const offset = options?.offset ?? 0; + // Always return a full page → loop never stops naturally + return Array.from({ length: 100 }, (_, i) => + createTestNode(`cap-${offset}-${i}`, `Cap ${offset}-${i}`), + ); + }, + health: async () => true, + }, + maxPages: 3, + }); + + await expect(service.sync()).rejects.toThrow( + /Delta-sync aborted after 3 pages.*watermark NOT advanced.*supertag sync index/s, + ); + + // Watermark must NOT have advanced — next delta should replay from the same point. + const verifyDb = new Database(dbPath); + const watermarkAfter = (verifyDb.query( + "SELECT last_sync_timestamp FROM sync_metadata WHERE id = 1", + ).get() as { last_sync_timestamp: number }).last_sync_timestamp; + verifyDb.close(); + expect(watermarkAfter).toBe(watermarkBefore); + + // And rows from the pages that did complete should have been merged (idempotently replayable). + const verifyDb2 = new Database(dbPath); + const rowCount = (verifyDb2.query("SELECT COUNT(*) as c FROM nodes").get() as { c: number }).c; + verifyDb2.close(); + expect(rowCount).toBeGreaterThan(0); + }); + it("paginates through multiple pages", async () => { let callIndex = 0; const page1 = Array.from({ length: 100 }, (_, i) => createTestNode(`p1-${i}`, `Page1 Node ${i}`));