Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions indexer/lib/pdp-verifier-handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,24 @@ export async function insertDataSetPiece(
* @param {(number | string)[]} pieceIds
*/
export async function removeDataSetPieces(env, dataSetId, pieceIds) {
await env.DB.prepare(
`
INSERT INTO pieces (id, data_set_id, is_deleted)
VALUES ${new Array(pieceIds.length)
.fill(null)
.map(() => '(?, ?, TRUE)')
.join(', ')}
ON CONFLICT DO UPDATE set is_deleted = true
`,
)
.bind(...pieceIds.flatMap((pieceId) => [pieceId, dataSetId]))
.run()
// Cloudflare D1 has a limit of 100 bound parameters per query
// Each piece requires 2 parameters (pieceId, dataSetId), so batch size is 50
const BATCH_SIZE = 50

// Process pieces in batches
for (let i = 0; i < pieceIds.length; i += BATCH_SIZE) {
const batch = pieceIds.slice(i, i + BATCH_SIZE)
await env.DB.prepare(
`
INSERT INTO pieces (id, data_set_id, is_deleted)
VALUES ${new Array(batch.length)
.fill(null)
.map(() => '(?, ?, TRUE)')
.join(', ')}
ON CONFLICT DO UPDATE set is_deleted = true
`,
)
.bind(...batch.flatMap((pieceId) => [pieceId, dataSetId]))
.run()
}
}
37 changes: 37 additions & 0 deletions indexer/test/indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,43 @@ describe('piece-retriever.indexer', () => {
)
}
})

it('deletes large number of pieces (140) without exceeding SQL parameter limit', async () => {
const dataSetId = randomId()
// Create 140 piece IDs (as in the issue)
const pieceIds = Array.from({ length: 140 }, (_, i) => i.toString())
const pieceCids = Array.from({ length: 140 }, () => randomId())

// Add pieces to database first
await withPieces(env, dataSetId, pieceIds, pieceCids)

// Now remove all 140 pieces
const req = new Request('https://host/pdp-verifier/pieces-removed', {
method: 'POST',
headers: {
[env.SECRET_HEADER_KEY]: env.SECRET_HEADER_VALUE,
},
body: JSON.stringify({
data_set_id: dataSetId,
piece_ids: pieceIds,
}),
})

const res = await workerImpl.fetch(req, env, CTX, {})
expect(res.status).toBe(200)
expect(await res.text()).toBe('OK')

// Verify all pieces are marked as deleted
const { results: pieces } = await env.DB.prepare(
'SELECT * FROM pieces WHERE data_set_id = ?',
)
.bind(dataSetId)
.all()
expect(pieces.length).toBe(140)
for (const piece of pieces) {
expect(piece.is_deleted).toBe(1)
}
})
})

describe('POST /service-provider-registry/product-added', () => {
Expand Down
54 changes: 31 additions & 23 deletions indexer/test/test-helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -117,28 +117,36 @@ export async function withPieces(
pieceCids,
ipfsRootCids = [],
) {
await env.DB.prepare(
`
INSERT INTO pieces (
id,
data_set_id,
cid,
ipfs_root_cid
)
VALUES ${new Array(pieceIds.length)
.fill(null)
.map(() => '(?, ?, ?, ?)')
.join(', ')}
ON CONFLICT DO NOTHING
`,
)
.bind(
...pieceIds.flatMap((pieceId, i) => [
String(pieceId),
String(dataSetId),
pieceCids[i],
ipfsRootCids[i] || null,
]),
// Cloudflare D1 has a limit of 100 bound parameters per query
// Each piece requires 4 parameters (pieceId, dataSetId, cid, ipfsRootCid), so batch size is 25
const BATCH_SIZE = 25

// Process pieces in batches
for (let i = 0; i < pieceIds.length; i += BATCH_SIZE) {
const batch = pieceIds.slice(i, i + BATCH_SIZE)
await env.DB.prepare(
`
INSERT INTO pieces (
id,
data_set_id,
cid,
ipfs_root_cid
)
VALUES ${new Array(batch.length)
.fill(null)
.map(() => '(?, ?, ?, ?)')
.join(', ')}
ON CONFLICT DO NOTHING
`,
)
.run()
.bind(
...batch.flatMap((pieceId, idx) => [
String(pieceId),
String(dataSetId),
pieceCids[i + idx],
ipfsRootCids[i + idx] || null,
]),
)
.run()
}
}
Loading