Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"@hugomrdias/docs": "^0.2.0",
"@types/react": "catalog:",
"@types/react-dom": "catalog:",
"astro": "^6.1.6",
"astro": "^6.4.2",
"astro-mermaid": "^2.0.1",
"expressive-code-twoslash": "^0.6.1",
"mermaid": "^11.12.2",
Expand Down
2 changes: 1 addition & 1 deletion docs/src/content/docs/developer-guides/synapse-core.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ await sp.uploadPiece({
await sp.findPiece({
pieceCid,
serviceURL: provider.pdp.serviceURL,
retry: true,
poll: true,
})

console.log(`Piece ${pieceCid.toString()} uploaded to provider ${provider.pdp.serviceURL}`)
Expand Down
2 changes: 1 addition & 1 deletion examples/cli/src/commands/upload-dataset.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export const uploadDataset: Command = command(
await SP.findPiece({
pieceCid,
serviceURL: provider.pdp.serviceURL,
retry: true,
poll: true,
})

const rsp = await SP.createDataSetAndAddPieces(client, {
Expand Down
38 changes: 33 additions & 5 deletions packages/synapse-core/src/piece/download.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import { type AbortError, HttpError, type NetworkError, request, type TimeoutError } from 'iso-web/http'
import { HttpError, type RequestErrors, request } from 'iso-web/http'
import { DownloadPieceError } from '../errors/pdp.ts'
import { InvalidPieceCIDError } from '../errors/piece.ts'
import { RETRY_CONSTANTS } from '../utils/constants.ts'
import { transformStream } from './calculate.ts'
import { tryFrom } from './parse.ts'
import type { PieceCID } from './piece-cid.ts'

export namespace download {
export type OptionsType = {
url: string
/** The number of retries. Defaults to 2. */
retryCount?: number
/** The delay with exponential backoff between retries in milliseconds. Defaults to {@link RETRY_CONSTANTS.RETRY_DELAY}. */
retryDelay?: number
/** The signal to abort the request. */
signal?: AbortSignal
}
export type ReturnType = Uint8Array
export type ErrorType = DownloadPieceError | TimeoutError | NetworkError | AbortError
export type ErrorType = DownloadPieceError | RequestErrors
}

/**
Expand All @@ -21,7 +28,14 @@ export namespace download {
* @throws Errors {@link download.ErrorType}
*/
export async function download(options: download.OptionsType): Promise<download.ReturnType> {
const response = await request.get(options.url)
const response = await request.get(options.url, {
timeout: false,
retry: {
retries: options.retryCount,
minTimeout: options.retryDelay ?? RETRY_CONSTANTS.RETRY_DELAY,
},
signal: options.signal,
})
if (response.error) {
if (HttpError.is(response.error)) {
throw new DownloadPieceError(await response.error.response.text())
Expand All @@ -35,9 +49,15 @@ export namespace downloadAndValidate {
export type OptionsType = {
url: string
expectedPieceCid: string | PieceCID
/** The number of retries. Defaults to 2. */
retryCount?: number
/** The delay with exponential backoff between retries in milliseconds. Defaults to {@link RETRY_CONSTANTS.RETRY_DELAY}. */
retryDelay?: number
/** The signal to abort the request. */
signal?: AbortSignal
}
export type ReturnType = Uint8Array
export type ErrorType = DownloadPieceError | TimeoutError | NetworkError | AbortError | InvalidPieceCIDError
export type ErrorType = DownloadPieceError | RequestErrors | InvalidPieceCIDError
}

/**
Expand Down Expand Up @@ -71,7 +91,15 @@ export async function downloadAndValidate(options: downloadAndValidate.OptionsTy
throw new InvalidPieceCIDError(expectedPieceCid)
}

const rsp = await request.get(url)
const rsp = await request.get(url, {
timeout: false,
retry: {
retries: options.retryCount,
minTimeout: options.retryDelay ?? RETRY_CONSTANTS.RETRY_DELAY,
},
signal: options.signal,
})

if (rsp.error) {
if (HttpError.is(rsp.error)) {
throw new DownloadPieceError(await rsp.error.response.text())
Expand Down
47 changes: 28 additions & 19 deletions packages/synapse-core/src/piece/resolve-piece-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import pLocate from 'p-locate'
import pSome from 'p-some'
import type { Address, Chain, Client, Transport } from 'viem'
import { asChain } from '../chains.ts'
import { findPiece } from '../sp/find-piece.ts'
import type { PDPProvider } from '../sp-registry/types.ts'
import { createPieceUrlPDP } from '../utils/piece-url.ts'
import { getPdpDataSets } from '../warm-storage/get-pdp-data-sets.ts'
Expand Down Expand Up @@ -132,13 +131,17 @@ export async function chainResolver(options: resolvePieceUrl.ResolverFnOptionsTy
}, new Map<bigint, (typeof dataSets)[number]['provider']>())
const providers = [...providersById.values()]

const result = await findPieceOnProviders(providers, pieceCid, signal)
const result = await findPieceOnProviders(
providers.map((p) => p.pdp.serviceURL),
pieceCid,
signal
)
if (result == null) {
throw new Error('No provider found')
}
return createPieceUrlPDP({
cid: pieceCid.toString(),
serviceURL: result.pdp.serviceURL,
serviceURL: result,
})
}

Expand All @@ -160,43 +163,49 @@ export async function chainResolver(options: resolvePieceUrl.ResolverFnOptionsTy
export function providersResolver(providers: PDPProvider[]): resolvePieceUrl.ResolverFnType {
return async (options: resolvePieceUrl.ResolverFnOptionsType) => {
const { pieceCid, signal } = options
const result = await findPieceOnProviders(providers, pieceCid, signal)
const result = await findPieceOnProviders(
providers.map((p) => p.pdp.serviceURL),
pieceCid,
signal
)
if (result == null) {
throw new Error('No provider found')
}

return createPieceUrlPDP({
cid: pieceCid.toString(),
serviceURL: result.pdp.serviceURL,
serviceURL: result,
})
}
}

/**
* Find the piece on the providers
*
* @param providers - {@link PDPProvider[]}
* @param serviceURLs - {@link string[]}
* @param pieceCid - {@link PieceCID}
* @param signal - {@link AbortSignal}
* @returns The piece URL
* @returns The Service URL
*/
export async function findPieceOnProviders(providers: PDPProvider[], pieceCid: PieceCID, signal?: AbortSignal) {
export async function findPieceOnProviders(serviceURLs: string[], pieceCid: PieceCID, signal?: AbortSignal) {
Comment thread
hugomrdias marked this conversation as resolved.
const controller = new AbortController()
const _signal = signal ? AbortSignal.any([controller.signal, signal]) : controller.signal

async function headPiece(serviceURL: string) {
const result = await request.head(new URL(`piece/${pieceCid.toString()}`, serviceURL), {
signal: _signal,
retry: true,
})
if (result.error) {
throw result.error
}
return serviceURL
}

const result = await pLocate(
providers.map((p) =>
findPiece({
serviceURL: p.pdp.serviceURL,
pieceCid,
signal: _signal,
}).then(
() => p,
() => null
)
),
serviceURLs.map((p) => headPiece(p).catch(() => undefined)),
(p) => {
if (p !== null) {
if (p != null) {
controller.abort()
return true
}
Expand Down
65 changes: 40 additions & 25 deletions packages/synapse-core/src/sp/add-pieces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ export namespace addPiecesApiRequest {
pieces: PieceCID[]
/** The extra data for the add pieces. {@link TypedData.signAddPieces} */
extraData: Hex
/** The number of retries. Defaults to 2. */
retryCount?: number
/** The delay with exponential backoff between retries in milliseconds. Defaults to {@link RETRY_CONSTANTS.RETRY_DELAY}. */
retryDelay?: number
}
export type OutputType = {
/** The transaction hash. */
Expand Down Expand Up @@ -52,17 +56,19 @@ export async function addPiecesApiRequest(
): Promise<addPiecesApiRequest.OutputType> {
const { serviceURL, dataSetId, pieces, extraData } = options
const response = await request.post(new URL(`pdp/data-sets/${dataSetId}/pieces`, serviceURL), {
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
json: {
pieces: pieces.map((piece) => ({
pieceCid: piece.toString(),
subPieces: [{ subPieceCid: piece.toString() }],
})),
extraData: extraData,
}),
timeout: RETRY_CONSTANTS.MAX_RETRY_TIME,
},
timeout: RETRY_CONSTANTS.TIMEOUT,
retry: {
retries: options.retryCount,
minTimeout: options.retryDelay ?? RETRY_CONSTANTS.RETRY_DELAY,
shouldRetry: (ctx) => HttpError.is(ctx.error) && ctx.error.code === 429,
},
})

if (response.error) {
Expand Down Expand Up @@ -101,6 +107,10 @@ export namespace addPieces {
nonce?: bigint
/** Pre-built signed extraData. When provided, skips internal EIP-712 signing. */
extraData?: Hex
/** The number of retries. Defaults to 2. */
retryCount?: number
/** The delay with exponential backoff between retries in milliseconds. Defaults to {@link RETRY_CONSTANTS.RETRY_DELAY}. */
retryDelay?: number
}

export type OutputType = addPiecesApiRequest.OutputType
Expand Down Expand Up @@ -154,6 +164,8 @@ export async function addPieces(
dataSetId: options.dataSetId,
pieces: options.pieces.map((piece) => piece.pieceCid),
extraData,
retryCount: options.retryCount,
retryDelay: options.retryDelay,
})
}

Expand Down Expand Up @@ -199,7 +211,11 @@ export namespace waitForAddPieces {
statusUrl: string
/** The timeout in milliseconds. Defaults to 5 minutes. */
timeout?: number
/** The polling interval in milliseconds. Defaults to 4 seconds. */
/** The number of retries. Defaults to 2. */
retryCount?: number
/** The delay with exponential backoff between retries in milliseconds. Defaults to {@link RETRY_CONSTANTS.RETRY_DELAY}. */
retryDelay?: number
/** The poll interval in milliseconds. Defaults to {@link RETRY_CONSTANTS.POLL_INTERVAL}. */
pollInterval?: number
}
export type OutputType = AddPiecesOutput
Expand All @@ -221,32 +237,31 @@ export namespace waitForAddPieces {
* @throws Errors {@link waitForAddPieces.ErrorType}
*/
export async function waitForAddPieces(options: waitForAddPieces.OptionsType): Promise<waitForAddPieces.OutputType> {
const response = await request.json.get<AddPiecesResponse>(options.statusUrl, {
async onResponse(response) {
if (response.ok) {
const data = (await response.clone().json()) as AddPiecesResponse
if (data.piecesAdded === false) {
throw new Error('Still pending')
}
}
},
const response = await request.json.get(options.statusUrl, {
retry: {
shouldRetry: (ctx) => ctx.error.message === 'Still pending',
retries: RETRY_CONSTANTS.RETRIES,
factor: RETRY_CONSTANTS.FACTOR,
minTimeout: options.pollInterval ?? RETRY_CONSTANTS.DELAY_TIME,
retries: options.retryCount,
minTimeout: options.retryDelay ?? RETRY_CONSTANTS.RETRY_DELAY,
},
poll: {
limit: RETRY_CONSTANTS.POLL_LIMIT,
interval: options.pollInterval ?? RETRY_CONSTANTS.POLL_INTERVAL,
statusCodes: [202, 200], // 202 is processing, 200 is success
shouldPoll: async (ctx) => {
const data = (await ctx.response.clone().json()) as AddPiecesResponse
return data.piecesAdded === false
},
},
Comment thread
hugomrdias marked this conversation as resolved.
timeout: options.timeout ?? RETRY_CONSTANTS.MAX_RETRY_TIME,
timeout: options.timeout ?? RETRY_CONSTANTS.TIMEOUT,
schema,
})
if (response.error) {
if (HttpError.is(response.error)) {
throw new WaitForAddPiecesError(await response.error.response.text())
}
throw response.error
}
const data = schema.parse(response.result)
if (data.txStatus === 'rejected') {
throw new WaitForAddPiecesRejectedError(data)
if (response.result.txStatus === 'rejected') {
throw new WaitForAddPiecesRejectedError(response.result)
}
return data
return response.result
}
Loading
Loading