From a001a5ce8a8f70d99df48c087da7dabcf8e9f51f Mon Sep 17 00:00:00 2001 From: Karsten Lehmann Date: Sat, 22 Nov 2025 22:54:29 +0100 Subject: [PATCH 1/2] Fix to store added files in remotes and fetch from remotes if missing locally --- core/base/crdt-helpers.ts | 28 +++++++++++++++++++++-- core/blockstore/loader.ts | 43 ++++++++++++++++++----------------- core/blockstore/store.ts | 48 +++++++++++++++++++++++---------------- 3 files changed, 77 insertions(+), 42 deletions(-) diff --git a/core/base/crdt-helpers.ts b/core/base/crdt-helpers.ts index 5c7ee62f4..c44458a27 100644 --- a/core/base/crdt-helpers.ts +++ b/core/base/crdt-helpers.ts @@ -261,14 +261,38 @@ function readFileset(blocks: EncryptedBlockstore, files: DocFiles, isPublic = fa const result = await blocks.ebOpts.storeRuntime.decodeFile( { get: async (cid: AnyLink) => { - return await blocks.getFile(throwFalsy(fileMeta.car), cid); + try { + return await blocks.getFile(throwFalsy(fileMeta.car), cid); + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + throw blocks.logger + .Error() + .Str("car", throwFalsy(fileMeta.car).toString()) + .Str("cid", cid.toString()) + .Str("errorMessage", err.message) + .Str("errorStack", err.stack || "") + .Msg("Error getting file block from CAR") + .AsError(); + } }, }, fileMeta.cid, fileMeta, ); if (result.isErr()) { - throw blocks.logger.Error().Any("error", result.Err()).Any("cid", fileMeta.cid).Msg("Error decoding file").AsError(); + const err = result.Err(); + const errMessage = err instanceof Error ? err.message : String(err); + const errStack = err instanceof Error ? err.stack : ""; + throw blocks.logger + .Error() + .Str("errorMessage", errMessage) + .Str("errorStack", errStack || "") + .Str("cid", fileMeta.cid.toString()) + .Str("car", throwFalsy(fileMeta.car).toString()) + .Str("fileName", fileMeta.type || "unknown") + .Any("error", err) + .Msg("Error decoding file") + .AsError(); } return result.unwrap(); diff --git a/core/blockstore/loader.ts b/core/blockstore/loader.ts index 0f050f98c..a2ea1b719 100644 --- a/core/blockstore/loader.ts +++ b/core/blockstore/loader.ts @@ -18,6 +18,7 @@ import { CarLog, FroozenCarLog, CarStore, + FileStore, FPBlock, CarBlockItem, BlockFetcher, @@ -825,7 +826,8 @@ export class Loader implements Loadable { private async makeDecoderAndCarReader(carCid: AnyLink, store: CIDActiveStore): Promise> { const carCidStr = carCid.toString(); let loadedCar: AnyBlock | undefined; - const activeStore = store.active as CarStore; + // store.active can be CarStore or FileStore, both have load() method + let activeStore = store.active as CarStore | FileStore; try { //loadedCar now is an array of AnyBlocks this.logger.Debug().Any("cid", carCidStr).Msg("loading car"); @@ -836,24 +838,23 @@ export class Loader implements Loadable { if (!isNotFoundError(e)) { throw this.logger.Error().Str("cid", carCidStr).Err(e).Msg("loading car"); } - // for (const remote of store.remotes() as CarStore[]) { - // // console.log("makeDecoderAndCarReader:remote:", remote.url().toString()); - // try { - // const remoteCar = await remote.load(carCid); - // if (remoteCar) { - // // todo test for this - // this.logger.Debug().Ref("cid", remoteCar.cid).Msg("saving remote car locally"); - // await store.local().save(remoteCar); - // loadedCar = remoteCar; - // activeStore = remote; - // break; - // } else { - // this.logger.Error().Str("cid", carCidStr).Err(e).Msg("loading car"); - // } - // } catch (e) { - // this.logger.Warn().Str("cid", carCidStr).Url(remote.url()).Err(e).Msg("loading car"); - // } - // } + // Try remote stores if local load failed + for (const remote of store.remotes()) { + // console.log("makeDecoderAndCarReader:remote:", remote.url().toString()); + try { + const remoteCar = await remote.load(carCid); + if (remoteCar) { + // todo test for this + this.logger.Debug().Ref("cid", remoteCar.cid).Msg("saving remote car locally"); + await store.local().save(remoteCar); + loadedCar = remoteCar; + activeStore = remote; + break; + } + } catch (e) { + this.logger.Warn().Str("cid", carCidStr).Url(remote.url()).Err(e).Msg("loading car from remote"); + } + } } if (!loadedCar) { return { @@ -876,7 +877,7 @@ export class Loader implements Loadable { //This needs a fix as well as the fromBytes function expects a Uint8Array //Either we can merge the bytes or return an array of rawReaders const bytes = await asyncBlockDecode({ bytes: loadedCar.bytes, hasher, codec: (await activeStore.keyedCrypto()).codec() }); // as Uint8Array, - const rawReader = await CarReader.fromBytes(bytes.value.data); + const rawReader = await CarReader.fromBytes((bytes.value as { data: Uint8Array }).data); // const readerP = Promise.resolve(rawReader); // const kc = await activeStore.keyedCrypto() // const readerP = !(kc.isEncrypting ? Promise.resolve(rawReader) : this.ensureDecryptedReader(activeStore, rawReader)); @@ -898,7 +899,7 @@ export class Loader implements Loadable { } return { cid: carCid, - bytes: bytes.value.data, + bytes: (bytes.value as { data: Uint8Array }).data, item: { type: "car", status: "ready", diff --git a/core/blockstore/store.ts b/core/blockstore/store.ts index 8931a260e..67d1eab8b 100644 --- a/core/blockstore/store.ts +++ b/core/blockstore/store.ts @@ -29,7 +29,7 @@ import { UnsubscribeResult, CommitQueueIf, } from "@fireproof/core-types-blockstore"; -import { Falsy, isNotFoundError, PARAM, StoreType, SuperThis } from "@fireproof/core-types-base"; +import { Falsy, isNotFoundError, PARAM, StoreType, SuperThis, throwFalsy } from "@fireproof/core-types-base"; import { carLogIncludesGroup } from "./loader.js"; import { EventView } from "@web3-storage/pail/clock/api"; import { EventBlock } from "@web3-storage/pail/clock"; @@ -519,9 +519,9 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore { noLoaderOps, async (dbMeta) => { await retryableUpload(async () => { - // if (!this.loader) { - // return; - // } + if (!this.loader) { + return; + } for (const cid of dbMeta.cars) { const car = await this.loader.attachedStores.local().active.car.load(cid); // .carStore().then((i) => i.load(cid)); @@ -529,9 +529,11 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore { if (carLogIncludesGroup(this.loader.carLog.asArray(), dbMeta.cars)) { throw this.logger.Error().Ref("cid", cid).Msg("missing local car").AsError(); } - // } else { - // await this.loader.attachedStores.forRemotes((x) => x.active.car.save(car)); - // throwFalsy(this.loader.xremoteCarStore).save(car); + } else { + await this.loader.attachedStores.forRemotes(async (x) => { + await x.active.car.save(car); + }); + // await throwFalsy(this.loader.xremoteCarStore).save(car); } } // Remove from walState after successful upload @@ -546,20 +548,20 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore { operations, async (dbMeta) => { await retryableUpload(async () => { - // if (!this.loader) { - // return; - // } + if (!this.loader) { + return; + } for (const cid of dbMeta.cars) { const car = await this.loader.attachedStores.local().active.car.load(cid); if (!car) { if (carLogIncludesGroup(this.loader.carLog.asArray(), dbMeta.cars)) { throw this.logger.Error().Ref("cid", cid).Msg(`missing local car`).AsError(); } - // } else { - // // await throwFalsy(this.loader.xremoteCarStore).save(car); - // await this.loader.attachedStores.forRemotes(async (x) => { - // await x.active.car.save(car); - // }); + } else { + // await throwFalsy(this.loader.xremoteCarStore).save(car); + await this.loader.attachedStores.forRemotes(async (x) => { + await x.active.car.save(car); + }); } } // Remove from walState after successful upload @@ -572,15 +574,19 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore { // Process fileOperations await pMap( fileOperations, - async ({ cid: fileCid }) => { + async ({ cid: fileCid, public: publicFile }) => { await retryableUpload(async () => { - // if (!this.loader) { - // return; - // } + if (!this.loader) { + return; + } const fileBlock = await this.loader.attachedStores.local().active.file.load(fileCid); if (!fileBlock) { throw this.logger.Error().Ref("cid", fileCid).Msg("missing file block").AsError(); } + + await this.loader.attachedStores.forRemotes(async (x) => { + await x.active.file.save(fileBlock, { public: publicFile }); + }); // await this.loader.attachedStores.forRemotes((x) => x.active.file.save(fileBlock, { public: publicFile })); // await this.loader.xremoteFileStore?.save(fileBlock, { public: publicFile }); // Remove from walState after successful upload @@ -597,6 +603,10 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore { if (!this.loader) { return; } + + await this.loader.attachedStores.forRemotes(async (x) => { + await x.active.meta.save(lastOp); + }); // await this.loader.xremoteMetaStore?.save(lastOp); // await this.loader.attachedStores.forRemotes((x) => x.active.meta.save(lastOp)); }, `remoteMetaStore save with dbMeta.cars=${lastOp.cars.toString()}`); From 3d0b2ec3b5fdeaab8fed70914380f69ef6954b34 Mon Sep 17 00:00:00 2001 From: Karsten Lehmann Date: Sun, 23 Nov 2025 13:02:59 +0100 Subject: [PATCH 2/2] Import cleanup --- core/blockstore/store.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/blockstore/store.ts b/core/blockstore/store.ts index 67d1eab8b..7504d658d 100644 --- a/core/blockstore/store.ts +++ b/core/blockstore/store.ts @@ -29,7 +29,7 @@ import { UnsubscribeResult, CommitQueueIf, } from "@fireproof/core-types-blockstore"; -import { Falsy, isNotFoundError, PARAM, StoreType, SuperThis, throwFalsy } from "@fireproof/core-types-base"; +import { Falsy, isNotFoundError, PARAM, StoreType, SuperThis } from "@fireproof/core-types-base"; import { carLogIncludesGroup } from "./loader.js"; import { EventView } from "@web3-storage/pail/clock/api"; import { EventBlock } from "@web3-storage/pail/clock";