Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/commands/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ async function runDeltaSync(dbPath: string): Promise<DeltaSyncResult> {
const deltaSyncService = new DeltaSyncService({
dbPath,
localApiClient: client,
logger,
});

try {
Expand Down Expand Up @@ -706,6 +707,7 @@ export function registerSyncCommands(program: Command): void {
const deltaSyncService = new DeltaSyncService({
dbPath: resolvedPaths.dbPath,
localApiClient: client,
logger,
});

const watchOptions = {
Expand Down
73 changes: 73 additions & 0 deletions src/services/delta-sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,30 @@ import type {
/** Page size for API pagination */
const PAGE_SIZE = 100;

/**
* Fraction of `sync_metadata.total_nodes` that a single delta is allowed to
* exceed before we abort. The failure mode we guard against (Local API
* ignoring `edited.since`) returns ~100% of the workspace, so any threshold
* below 100% catches it — and the lower the threshold, the less work wasted
* before the abort fires. 25% is well below any plausible real delta yet
* still allows for months of accumulated edits; a real delta exceeding this
* is better served by a full re-sync anyway.
*/
const MAX_PAGES_RATIO = 0.25;

/**
* Fallback cap if `total_nodes` is unavailable (e.g. malformed metadata).
* Conservative — equivalent to a ~400K-node workspace under the ratio.
*/
const FALLBACK_MAX_PAGES = 1000;

/**
* Log a progress line on page 1 and every Nth page thereafter. Avoids
* one-info-line-per-page noise on normal deltas while still proving liveness
* on long runs.
*/
const PROGRESS_INTERVAL_PAGES = 10;

/**
* DeltaSyncService handles incremental sync of Tana nodes
* from the local API into the SQLite database.
Expand All @@ -31,6 +55,8 @@ export class DeltaSyncService {
private localApiClient: DeltaSyncOptions["localApiClient"];
private embeddingConfig?: DeltaSyncOptions["embeddingConfig"];
private logger: NonNullable<DeltaSyncOptions["logger"]>;
/** Explicit override from constructor options; if undefined, auto-scaled per sync from `sync_metadata.total_nodes`. */
private maxPagesOverride: number | undefined;
private syncing = false;

constructor(options: DeltaSyncOptions) {
Expand All @@ -44,6 +70,28 @@ export class DeltaSyncService {
warn: () => {},
error: () => {},
};
this.maxPagesOverride = options.maxPages;
}

/**
* Compute the per-sync abort cap.
* - If the constructor was given an explicit `maxPages`, that wins.
* - Otherwise scale against `sync_metadata.total_nodes`: cap at
* `MAX_PAGES_RATIO` of the graph.
* - If `total_nodes` is missing or zero, fall back to `FALLBACK_MAX_PAGES`.
*
* Note: for very small workspaces the scaled cap may round down to 1 page;
* that's inherent to page-quantized pagination, not a bug — the broken-API
* signature still trips at page 2.
*/
private resolveMaxPages(): number {
if (this.maxPagesOverride !== undefined) return this.maxPagesOverride;
const row = this.db
.query("SELECT total_nodes FROM sync_metadata WHERE id = 1")
.get() as { total_nodes: number } | undefined;
const totalNodes = row?.total_nodes ?? 0;
if (totalNodes <= 0) return FALLBACK_MAX_PAGES;
return Math.ceil((totalNodes * MAX_PAGES_RATIO) / PAGE_SIZE);
}

/**
Expand Down Expand Up @@ -325,6 +373,9 @@ export class DeltaSyncService {
// Use 0 as fallback watermark if null (first delta after full sync with no delta timestamp)
const sinceMs = watermarkBefore ?? 0;

// Resolve the per-sync abort cap from current workspace size.
const maxPages = this.resolveMaxPages();

// Step 3: Page through changed nodes
let nodesFound = 0;
let nodesInserted = 0;
Expand All @@ -336,8 +387,30 @@ export class DeltaSyncService {

for await (const page of this.fetchChangedNodes(sinceMs)) {
pages++;

if (pages > maxPages) {
// Rows merged on pages 1..maxPages are already committed to SQLite;
// the watermark is NOT advanced because we throw before Step 5.
// The next delta-sync will replay from the same `sinceMs` and
// re-merge those rows idempotently. Recovery is `supertag sync index`.
// Check fires BEFORE merging this page, so we don't do work we're
// about to throw away.
throw new Error(
`Delta-sync aborted after ${maxPages} pages (${nodesFound} nodes merged so far; watermark NOT advanced). ` +
`This usually means the Local API is not honoring 'edited.since' ` +
`and is returning the entire workspace. Run 'supertag sync index' ` +
`for a full sync instead, or raise maxPages if this is a legitimately large delta.`,
);
}

nodesFound += page.length;

if (pages === 1 || pages % PROGRESS_INTERVAL_PAGES === 0) {
this.logger.info(
`delta-sync progress: ${pages} page(s), ${nodesFound} nodes (latest page: ${page.length})`,
);
}

for (const node of page) {
const result = this.mergeNode(node);
if (result.inserted) nodesInserted++;
Expand Down
7 changes: 7 additions & 0 deletions src/types/local-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ export interface DeltaSyncOptions {
warn(message: string, data?: Record<string, unknown>): void;
error(message: string, data?: Record<string, unknown>): void;
};
/**
* Explicit override for the abort cap on pages fetched per delta-sync.
* When omitted, the cap auto-scales from `sync_metadata.total_nodes`
* (25% of the graph). Guards against runaway loops when the Local API
* ignores `edited.since` and returns the entire workspace as "changed".
*/
maxPages?: number;
}

/**
Expand Down
184 changes: 182 additions & 2 deletions tests/unit/delta-sync-pagination.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ function createDbWithFullSync(dbPath: string): void {
total_nodes INTEGER NOT NULL DEFAULT 0
)
`);
// Insert a full sync record so delta-sync can proceed
// Insert a full sync record so delta-sync can proceed. total_nodes is set high
// enough that the auto-scaled abort cap (25% * total / PAGE_SIZE) doesn't trip
// unrelated tests that happen to page through dozens of results.
db.run(
"INSERT INTO sync_metadata (id, last_export_file, last_sync_timestamp, total_nodes) VALUES (1, 'export.json', ?, 1000)",
"INSERT INTO sync_metadata (id, last_export_file, last_sync_timestamp, total_nodes) VALUES (1, 'export.json', ?, 1000000)",
[Date.now() - 60000]
);
db.close();
Expand Down Expand Up @@ -381,6 +383,184 @@ describe("DeltaSyncService - Pagination + Sync Orchestration (T-2.2)", () => {
expect(result.embeddingsGenerated).toBe(0);
});

it("logs a progress line on page 1 (Bug 6)", async () => {
const page1 = Array.from({ length: 100 }, (_, i) => createTestNode(`p1-${i}`, `Page1 Node ${i}`));
const page2 = Array.from({ length: 50 }, (_, i) => createTestNode(`p2-${i}`, `Page2 Node ${i}`));
const logs: string[] = [];

service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async (_query, options) => {
const offset = options?.offset ?? 0;
if (offset === 0) return page1;
if (offset === 100) return page2;
return [];
},
health: async () => true,
},
logger: {
info: (msg) => logs.push(msg),
warn: () => {},
error: () => {},
},
});

await service.sync();

const progress = logs.filter((m) => m.startsWith("delta-sync progress:"));
// 2 pages total: page 1 logs, page 2 does not (< interval).
expect(progress).toHaveLength(1);
expect(progress[0]).toContain("1 page(s)");
expect(progress[0]).toContain("100 nodes");
});

it("emits a progress line every 10 pages (Bug 6)", async () => {
const logs: string[] = [];
let calls = 0;
const TOTAL_PAGES = 25;

service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async () => {
calls++;
if (calls > TOTAL_PAGES) return [];
// Distinct ids per call so DB doesn't collide
return Array.from({ length: 100 }, (_, i) => createTestNode(`hb-${calls}-${i}`, `Heartbeat ${calls}-${i}`));
},
health: async () => true,
},
logger: {
info: (msg) => logs.push(msg),
warn: () => {},
error: () => {},
},
});

await service.sync();

const progress = logs.filter((m) => m.startsWith("delta-sync progress:"));
// Pages 1, 10, 20 — page 25 doesn't hit the interval.
expect(progress).toHaveLength(3);
expect(progress[0]).toContain("1 page(s)");
expect(progress[1]).toContain("10 page(s)");
expect(progress[2]).toContain("20 page(s)");
});

it("auto-scales the abort cap from total_nodes when no override is given (Bug 6)", async () => {
// total_nodes = 24000 → cap = ceil(24000 * 0.25 / 100) = 60 pages.
const reseedDb = new Database(dbPath);
reseedDb.run("UPDATE sync_metadata SET total_nodes = 24000 WHERE id = 1");
reseedDb.close();

let calls = 0;
service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async () => {
calls++;
return Array.from({ length: 100 }, (_, i) =>
createTestNode(`scale-${calls}-${i}`, `Scale ${calls}-${i}`),
);
},
health: async () => true,
},
// no maxPages — let it auto-scale
});

await expect(service.sync()).rejects.toThrow(/aborted after 60 pages/);
});

it("auto-scaled cap scales down to small workspaces without a floor (Bug 6)", async () => {
// Small workspace: total_nodes = 4000 → cap = ceil(4000 * 0.25 / 100) = 10 pages.
// Critically the cap stays proportional rather than being raised to a floor;
// the broken-API failure mode (whole-workspace return = 40 pages here) trips.
const reseedDb = new Database(dbPath);
reseedDb.run("UPDATE sync_metadata SET total_nodes = 4000 WHERE id = 1");
reseedDb.close();

let calls = 0;
service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async () => {
calls++;
return Array.from({ length: 100 }, (_, i) =>
createTestNode(`tiny-${calls}-${i}`, `Tiny ${calls}-${i}`),
);
},
health: async () => true,
},
});

await expect(service.sync()).rejects.toThrow(/aborted after 10 pages/);
});

it("explicit maxPages overrides the auto-scaled cap (Bug 6)", async () => {
// Even with total_nodes=24000 (auto-scaled cap would be 60), the explicit override wins.
const reseedDb = new Database(dbPath);
reseedDb.run("UPDATE sync_metadata SET total_nodes = 24000 WHERE id = 1");
reseedDb.close();

service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async () => {
return Array.from({ length: 100 }, (_, i) =>
createTestNode(`ov-${i}`, `Override ${i}`),
);
},
health: async () => true,
},
maxPages: 3,
});

await expect(service.sync()).rejects.toThrow(/aborted after 3 pages/);
});

it("aborts with a clear error when maxPages is hit and leaves watermark unchanged (Bug 6)", async () => {
// Capture watermark from the test-db seed so we can assert it is NOT advanced on abort.
const seedDb = new Database(dbPath);
const watermarkBefore = (seedDb.query(
"SELECT last_sync_timestamp FROM sync_metadata WHERE id = 1",
).get() as { last_sync_timestamp: number }).last_sync_timestamp;
seedDb.close();

service = new DeltaSyncService({
dbPath,
localApiClient: {
searchNodes: async (_q, options) => {
const offset = options?.offset ?? 0;
// Always return a full page → loop never stops naturally
return Array.from({ length: 100 }, (_, i) =>
createTestNode(`cap-${offset}-${i}`, `Cap ${offset}-${i}`),
);
},
health: async () => true,
},
maxPages: 3,
});

await expect(service.sync()).rejects.toThrow(
/Delta-sync aborted after 3 pages.*watermark NOT advanced.*supertag sync index/s,
);

// Watermark must NOT have advanced — next delta should replay from the same point.
const verifyDb = new Database(dbPath);
const watermarkAfter = (verifyDb.query(
"SELECT last_sync_timestamp FROM sync_metadata WHERE id = 1",
).get() as { last_sync_timestamp: number }).last_sync_timestamp;
verifyDb.close();
expect(watermarkAfter).toBe(watermarkBefore);

// And rows from the pages that did complete should have been merged (idempotently replayable).
const verifyDb2 = new Database(dbPath);
const rowCount = (verifyDb2.query("SELECT COUNT(*) as c FROM nodes").get() as { c: number }).c;
verifyDb2.close();
expect(rowCount).toBeGreaterThan(0);
});

it("paginates through multiple pages", async () => {
let callIndex = 0;
const page1 = Array.from({ length: 100 }, (_, i) => createTestNode(`p1-${i}`, `Page1 Node ${i}`));
Expand Down