Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions core/base/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -261,14 +261,38 @@ function readFileset(blocks: EncryptedBlockstore, files: DocFiles, isPublic = fa
const result = await blocks.ebOpts.storeRuntime.decodeFile(
{
get: async (cid: AnyLink) => {
return await blocks.getFile(throwFalsy(fileMeta.car), cid);
try {
return await blocks.getFile(throwFalsy(fileMeta.car), cid);
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
throw blocks.logger
.Error()
.Str("car", throwFalsy(fileMeta.car).toString())
.Str("cid", cid.toString())
.Str("errorMessage", err.message)
.Str("errorStack", err.stack || "")
.Msg("Error getting file block from CAR")
.AsError();
}
},
},
fileMeta.cid,
fileMeta,
);
if (result.isErr()) {
throw blocks.logger.Error().Any("error", result.Err()).Any("cid", fileMeta.cid).Msg("Error decoding file").AsError();
const err = result.Err();
const errMessage = err instanceof Error ? err.message : String(err);
const errStack = err instanceof Error ? err.stack : "";
throw blocks.logger
.Error()
.Str("errorMessage", errMessage)
.Str("errorStack", errStack || "")
.Str("cid", fileMeta.cid.toString())
.Str("car", throwFalsy(fileMeta.car).toString())
.Str("fileName", fileMeta.type || "unknown")
.Any("error", err)
.Msg("Error decoding file")
.AsError();
}

return result.unwrap();
Expand Down
43 changes: 22 additions & 21 deletions core/blockstore/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
CarLog,
FroozenCarLog,
CarStore,
FileStore,
FPBlock,
CarBlockItem,
BlockFetcher,
Expand Down Expand Up @@ -825,7 +826,8 @@ export class Loader implements Loadable {
private async makeDecoderAndCarReader(carCid: AnyLink, store: CIDActiveStore): Promise<FPBlock<CarBlockItem>> {
const carCidStr = carCid.toString();
let loadedCar: AnyBlock | undefined;
const activeStore = store.active as CarStore;
// store.active can be CarStore or FileStore, both have load() method
let activeStore = store.active as CarStore | FileStore;
try {
//loadedCar now is an array of AnyBlocks
this.logger.Debug().Any("cid", carCidStr).Msg("loading car");
Expand All @@ -836,24 +838,23 @@ export class Loader implements Loadable {
if (!isNotFoundError(e)) {
throw this.logger.Error().Str("cid", carCidStr).Err(e).Msg("loading car");
}
// for (const remote of store.remotes() as CarStore[]) {
// // console.log("makeDecoderAndCarReader:remote:", remote.url().toString());
// try {
// const remoteCar = await remote.load(carCid);
// if (remoteCar) {
// // todo test for this
// this.logger.Debug().Ref("cid", remoteCar.cid).Msg("saving remote car locally");
// await store.local().save(remoteCar);
// loadedCar = remoteCar;
// activeStore = remote;
// break;
// } else {
// this.logger.Error().Str("cid", carCidStr).Err(e).Msg("loading car");
// }
// } catch (e) {
// this.logger.Warn().Str("cid", carCidStr).Url(remote.url()).Err(e).Msg("loading car");
// }
// }
// Try remote stores if local load failed
for (const remote of store.remotes()) {
// console.log("makeDecoderAndCarReader:remote:", remote.url().toString());
try {
const remoteCar = await remote.load(carCid);
if (remoteCar) {
// todo test for this
this.logger.Debug().Ref("cid", remoteCar.cid).Msg("saving remote car locally");
await store.local().save(remoteCar);
loadedCar = remoteCar;
activeStore = remote;
break;
}
} catch (e) {
this.logger.Warn().Str("cid", carCidStr).Url(remote.url()).Err(e).Msg("loading car from remote");
}
}
}
if (!loadedCar) {
return {
Expand All @@ -876,7 +877,7 @@ export class Loader implements Loadable {
//This needs a fix as well as the fromBytes function expects a Uint8Array
//Either we can merge the bytes or return an array of rawReaders
const bytes = await asyncBlockDecode({ bytes: loadedCar.bytes, hasher, codec: (await activeStore.keyedCrypto()).codec() }); // as Uint8Array,
const rawReader = await CarReader.fromBytes(bytes.value.data);
const rawReader = await CarReader.fromBytes((bytes.value as { data: Uint8Array }).data);
// const readerP = Promise.resolve(rawReader);
// const kc = await activeStore.keyedCrypto()
// const readerP = !(kc.isEncrypting ? Promise.resolve(rawReader) : this.ensureDecryptedReader(activeStore, rawReader));
Expand All @@ -898,7 +899,7 @@ export class Loader implements Loadable {
}
return {
cid: carCid,
bytes: bytes.value.data,
bytes: (bytes.value as { data: Uint8Array }).data,
item: {
type: "car",
status: "ready",
Expand Down
46 changes: 28 additions & 18 deletions core/blockstore/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -519,19 +519,21 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore {
noLoaderOps,
async (dbMeta) => {
await retryableUpload(async () => {
// if (!this.loader) {
// return;
// }
if (!this.loader) {
return;
}
for (const cid of dbMeta.cars) {
const car = await this.loader.attachedStores.local().active.car.load(cid);
// .carStore().then((i) => i.load(cid));
if (!car) {
if (carLogIncludesGroup(this.loader.carLog.asArray(), dbMeta.cars)) {
throw this.logger.Error().Ref("cid", cid).Msg("missing local car").AsError();
}
// } else {
// await this.loader.attachedStores.forRemotes((x) => x.active.car.save(car));
// throwFalsy(this.loader.xremoteCarStore).save(car);
} else {
await this.loader.attachedStores.forRemotes(async (x) => {
await x.active.car.save(car);
});
// await throwFalsy(this.loader.xremoteCarStore).save(car);
}
}
// Remove from walState after successful upload
Expand All @@ -546,20 +548,20 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore {
operations,
async (dbMeta) => {
await retryableUpload(async () => {
// if (!this.loader) {
// return;
// }
if (!this.loader) {
return;
}
for (const cid of dbMeta.cars) {
const car = await this.loader.attachedStores.local().active.car.load(cid);
if (!car) {
if (carLogIncludesGroup(this.loader.carLog.asArray(), dbMeta.cars)) {
throw this.logger.Error().Ref("cid", cid).Msg(`missing local car`).AsError();
}
// } else {
// // await throwFalsy(this.loader.xremoteCarStore).save(car);
// await this.loader.attachedStores.forRemotes(async (x) => {
// await x.active.car.save(car);
// });
} else {
// await throwFalsy(this.loader.xremoteCarStore).save(car);
await this.loader.attachedStores.forRemotes(async (x) => {
await x.active.car.save(car);
});
}
}
// Remove from walState after successful upload
Expand All @@ -572,15 +574,19 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore {
// Process fileOperations
await pMap(
fileOperations,
async ({ cid: fileCid }) => {
async ({ cid: fileCid, public: publicFile }) => {
await retryableUpload(async () => {
// if (!this.loader) {
// return;
// }
if (!this.loader) {
return;
}
const fileBlock = await this.loader.attachedStores.local().active.file.load(fileCid);
if (!fileBlock) {
throw this.logger.Error().Ref("cid", fileCid).Msg("missing file block").AsError();
}

await this.loader.attachedStores.forRemotes(async (x) => {
await x.active.file.save(fileBlock, { public: publicFile });
});
// await this.loader.attachedStores.forRemotes((x) => x.active.file.save(fileBlock, { public: publicFile }));
// await this.loader.xremoteFileStore?.save(fileBlock, { public: publicFile });
// Remove from walState after successful upload
Expand All @@ -597,6 +603,10 @@ export class WALStoreImpl extends BaseStoreImpl implements WALStore {
if (!this.loader) {
return;
}

await this.loader.attachedStores.forRemotes(async (x) => {
await x.active.meta.save(lastOp);
});
// await this.loader.xremoteMetaStore?.save(lastOp);
// await this.loader.attachedStores.forRemotes((x) => x.active.meta.save(lastOp));
}, `remoteMetaStore save with dbMeta.cars=${lastOp.cars.toString()}`);
Expand Down