Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions core/base/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,24 @@ function sortClockHead(clockHead: ClockHead) {
return clockHead.sort((a, b) => a.toString().localeCompare(b.toString()));
}

/**
* Validates that all blocks in the clock head exist in the blockstore.
* Ensures validation completes before returning to prevent silent data corruption.
* @param logger - Logger instance for error reporting
* @param newHead - Array of CIDs representing the new clock head to validate
* @param blockstore - Blockstore to validate blocks against
* @throws Error if blockstore is missing or any block doesn't exist
*/
async function validateBlocks(logger: Logger, newHead: ClockHead, blockstore?: BaseBlockstore) {
if (!blockstore) throw logger.Error().Msg("missing blockstore");
newHead.map(async (cid) => {
const got = await blockstore.get(cid);
if (!got) {
throw logger.Error().Str("cid", cid.toString()).Msg("int_applyHead missing block").AsError();
}
});
await Promise.all(
newHead.map(async (cid) => {
const got = await blockstore.get(cid);
if (!got) {
throw logger.Error().Str("cid", cid.toString()).Msg("int_applyHead missing block").AsError();
}
})
);
}

function compareClockHeads(head1: ClockHead, head2: ClockHead) {
Expand Down
107 changes: 107 additions & 0 deletions core/tests/fireproof/crdt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,110 @@ describe("Loader with many committed transactions", function () {
expect(parsed.meta.head).toBeTruthy();
});
});

/**
* Test suite for ROBUST-05 fix: validateBlocks async bug.
* Verifies that block validation is properly awaited before operations complete.
* Before the fix, .map() without Promise.all() allowed fire-and-forget validation.
*/
describe("Block validation (ROBUST-05 fix)", function () {
let crdt: CRDT;
const sthis = ensureSuperThis();

afterEach(async () => {
await crdt.close();
await crdt.destroy();
});

beforeEach(async () => {
await sthis.start();
const dbOpts: LedgerOpts = {
name: "test-block-validation",
writeQueue: defaultWriteQueueOpts({}),
keyBag: defaultKeyBagOpts(sthis),
storeUrls: toStoreURIRuntime(sthis, `test-validation@${sthis.nextId().str}`),
storeEnDe: ensureStoreEnDeFile({}),
ctx: new AppContext(),
tracer,
};
crdt = new CRDTImpl(sthis, dbOpts);
await crdt.ready();
});

/** Verifies bulk() awaits validation and documents are readable immediately after. */
it("should complete block validation before bulk returns", async () => {
// This test verifies that validateBlocks is properly awaited.
// Before the fix, .map() without Promise.all() would return immediately
// and validation would run in the background (fire-and-forget).
// After the fix, bulk() only returns after all blocks are validated.
const result = await crdt.bulk([
{ id: "doc1", value: { data: "test1" } },
{ id: "doc2", value: { data: "test2" } },
{ id: "doc3", value: { data: "test3" } },
]);

// If we get here, validation completed synchronously (awaited properly)
expect(result.head.length).toBe(1);

// Validation should complete before returning (not be a fire-and-forget)
// We verify by checking the operation completed and returned valid head
expect(result.head[0]).toBeTruthy();

// Verify all documents are readable immediately after bulk returns
// Before the fix, reading too soon could race with background validation
const doc1 = await crdt.get("doc1");
const doc2 = await crdt.get("doc2");
const doc3 = await crdt.get("doc3");
expect(doc1).toBeTruthy();
expect(doc2).toBeTruthy();
expect(doc3).toBeTruthy();
});

/** Verifies sequential bulk operations each await their own validation. */
it("should validate all blocks in a multi-document commit atomically", async () => {
// Write multiple documents in parallel batches
const batch1 = await crdt.bulk([
{ id: "a1", value: { v: 1 } },
{ id: "a2", value: { v: 2 } },
]);
const batch2 = await crdt.bulk([
{ id: "b1", value: { v: 3 } },
{ id: "b2", value: { v: 4 } },
]);

// Both batches should have validated their blocks before returning
expect(batch1.head.length).toBe(1);
expect(batch2.head.length).toBe(1);

// The heads should be different (sequential commits)
expect(batch1.head[0].toString()).not.toBe(batch2.head[0].toString());

// All documents should be accessible
for (const id of ["a1", "a2", "b1", "b2"]) {
const doc = await crdt.get(id);
expect(doc).toBeTruthy();
}
});

/** Verifies concurrent bulk operations all await validation before resolving. */
it("should properly await validation in concurrent writes", async () => {
// Launch multiple concurrent bulk operations
const writes = await Promise.all([
crdt.bulk([{ id: "c1", value: { concurrent: 1 } }]),
crdt.bulk([{ id: "c2", value: { concurrent: 2 } }]),
crdt.bulk([{ id: "c3", value: { concurrent: 3 } }]),
]);

// All writes should have completed with valid heads
for (const write of writes) {
expect(write.head.length).toBe(1);
expect(write.head[0]).toBeTruthy();
}

// All documents should be readable
for (let i = 1; i <= 3; i++) {
const doc = await crdt.get(`c${i}`);
expect(doc).toBeTruthy();
}
});
});