Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,26 @@

All notable changes to this project will be documented in this file.

## 0.5.5 - 2026-02-24

### Added

- **`batchLimit` option on `CollectionVectorizeOption`** – limits the number of documents fetched per bulk-embed worker job. When set, each page of results queues a continuation job for the next page, preventing serverless time-limit issues on large collections. Defaults to 1000.

### Changed

- **Coordinator / worker architecture for `prepare-bulk-embedding`** – the initial job now acts as a coordinator that fans out one worker job per collection. Each worker processes a single page of documents, making bulk embedding parallelizable and more resilient to timeouts.
- **Per-batch polling via `poll-or-complete-single-batch`** – replaced the monolithic `poll-or-complete-bulk-embedding` task. Each provider batch now has its own polling job, improving observability and reducing memory usage.
- **Memory-efficient incremental aggregation** – `finalizeRunIfComplete` now scans batch records page-by-page instead of loading all batches into memory at once.

### Removed

- `poll-or-complete-bulk-embedding` task (replaced by `poll-or-complete-single-batch`).

### Upgrade Notes

- **Ensure no bulk embedding run is in progress when upgrading.** The `poll-or-complete-bulk-embedding` task has been removed and replaced by `poll-or-complete-single-batch`. Any in-flight bulk run that still has pending `poll-or-complete-bulk-embedding` jobs will fail because the task slug no longer exists. Wait for all active runs to complete (or cancel them) before deploying this version.

## 0.5.4 - 2026-02-20

### Added
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,8 @@ type OnBulkErrorArgs = {

The plugin uses separate Payload jobs for reliability with long-running providers:

- **`prepare-bulk-embedding`**: Streams through documents, calls your `addChunk` for each chunk, creates batch records.
- **`poll-or-complete-bulk-embedding`**: Polls all batches, requeues itself until done, then writes all successful embeddings (partial chunk failures are allowed).
- **`prepare-bulk-embedding`**: A coordinator job fans out one worker per collection. Each worker streams through documents, calls your `addChunk` for each chunk, and creates batch records. When `batchLimit` is set on a collection, workers paginate and queue continuation jobs.
- **`poll-or-complete-single-batch`**: Polls a single batch, requeues itself until done, then writes successful embeddings. When all batches for a run are terminal, the run is finalized (partial chunk failures are allowed).

### Queue Configuration

Expand Down Expand Up @@ -624,6 +624,7 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \

- `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted.
- `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`.
- `batchLimit? (number)` – max documents to fetch per bulk-embed worker job. When set, each page of results becomes a separate job that queues a continuation for the next page. Useful for large collections that would exceed serverless time limits in a single job. Defaults to 1000.

Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`.

Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/basic.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload, SanitizedConfig } from 'payload'
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js'
Expand All @@ -10,6 +10,7 @@ import {
clearAllCollections,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -54,6 +55,10 @@ describe('Bulk embed - basic tests', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

beforeEach(async () => {
await clearAllCollections(payload)
})
Expand Down
115 changes: 115 additions & 0 deletions dev/specs/bulkEmbed/batchLimit.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import type { Payload } from 'payload'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize'
import { expectGoodResult } from '../utils.vitest.js'

const DIMS = DEFAULT_DIMS
const dbName = `bulk_batchlimit_${Date.now()}`

describe('Bulk embed - batchLimit', () => {
let payload: Payload
let vectorizedPayload: VectorizedPayload | null = null

beforeAll(async () => {
await createTestDb({ dbName })
const built = await buildPayloadWithIntegration({
dbName,
pluginOpts: {
knowledgePools: {
default: {
collections: {
posts: {
toKnowledgePool: async (doc: any) => [{ chunk: doc.title }],
batchLimit: 2,
},
},
embeddingConfig: {
version: testEmbeddingVersion,
queryFn: makeDummyEmbedQuery(DIMS),
bulkEmbeddingsFns: createMockBulkEmbeddings({
statusSequence: ['succeeded'],
}),
},
},
},
bulkQueueNames: BULK_QUEUE_NAMES,
},
key: `batchlimit-${Date.now()}`,
})
payload = built.payload
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('batchLimit splits docs across continuation jobs and all get embedded', async () => {
// Create 5 posts with batchLimit: 2
// Should result in 3 prepare jobs (2 docs, 2 docs, 1 doc)
for (let i = 0; i < 5; i++) {
await payload.create({ collection: 'posts', data: { title: `BatchLimit Post ${i}` } as any })
}

const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
expectGoodResult(result)

await waitForBulkJobs(payload, 30000)

// All 5 posts should have embeddings
const embeds = await payload.find({ collection: 'default' })
expect(embeds.totalDocs).toBe(5)

// Run should be succeeded
const runDoc = (
await (payload as any).find({
collection: BULK_EMBEDDINGS_RUNS_SLUG,
where: { id: { equals: result!.runId } },
})
).docs[0]
expect(runDoc.status).toBe('succeeded')
expect(runDoc.inputs).toBe(5)
})

test('batchLimit equal to doc count does not create extra continuations', async () => {
// Clean up from prior test: delete all posts and embeddings
await payload.delete({ collection: 'posts', where: {} })
await payload.delete({ collection: 'default' as any, where: {} })

// Create exactly 2 posts (matching batchLimit: 2)
for (let i = 0; i < 2; i++) {
await payload.create({
collection: 'posts',
data: { title: `Exact Post ${i}` } as any,
})
}

const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
expectGoodResult(result)

await waitForBulkJobs(payload, 20000)

const embeds = await payload.find({ collection: 'default' })
expect(embeds.totalDocs).toBe(2)

const runDoc = (
await (payload as any).find({
collection: BULK_EMBEDDINGS_RUNS_SLUG,
where: { id: { equals: result!.runId } },
})
).docs[0]
expect(runDoc.status).toBe('succeeded')
})
})
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/canceledBatch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -49,6 +50,10 @@ describe('Bulk embed - canceled batch', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('canceled batch marks entire run as failed', async () => {
const post = await payload.create({ collection: 'posts', data: { title: 'Cancel' } as any })
const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/concurrentRuns.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { getVectorizedPayload } from '../../../src/types.js'
import {
Expand All @@ -8,6 +8,7 @@ import {
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'

Expand Down Expand Up @@ -45,6 +46,10 @@ describe('Bulk embed - concurrent runs prevention', () => {
payload = built.payload
})

afterAll(async () => {
await destroyPayload(payload)
})

test('cannot start concurrent bulk embed runs for the same pool', async () => {
const vectorizedPayload = getVectorizedPayload<'default'>(payload)!
// Create a test post first
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/extensionFields.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -53,6 +54,10 @@ describe('Bulk embed - extension fields', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('extension fields are merged when writing embeddings', async () => {
const post = await payload.create({
collection: 'posts',
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/failedBatch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js'
Expand All @@ -10,6 +10,7 @@ import {
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -49,6 +50,10 @@ describe('Bulk embed - failed batch', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('failed batch marks entire run as failed', async () => {
const post = await payload.create({ collection: 'posts', data: { title: 'Fail' } as any })

Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/ingestionFailure.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import {
Expand All @@ -8,6 +8,7 @@ import {
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand All @@ -24,6 +25,10 @@ describe('Bulk embed - ingestion validation failures', () => {
await createTestDb({ dbName })
})

afterAll(async () => {
await destroyPayload(payload)
})

test('malformed chunk entry fails the bulk embedding run', async () => {
// Use unique version to ensure this test only processes its own data
const testVersion = `${testEmbeddingVersion}-ingestion-fail-${Date.now()}`
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/multipleBatches.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
import {
Expand All @@ -8,6 +8,7 @@ import {
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -51,6 +52,10 @@ describe('Bulk embed - multiple batches', () => {
vectorizedPayload = getVectorizedPayload(payload)
})

afterAll(async () => {
await destroyPayload(payload)
})

test('multiple batches are created when flushing after N chunks', async () => {
// Create 5 posts (should result in 3 batches: 2, 2, 1)
for (let i = 0; i < 5; i++) {
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/multipleChunks.spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -51,6 +52,10 @@ describe('Bulk embed - multiple chunks with extension fields', () => {
payload = built.payload
})

afterAll(async () => {
await destroyPayload(payload)
})

test('multiple chunks keep their respective extension fields', async () => {
const post = await payload.create({
collection: 'posts',
Expand Down
7 changes: 6 additions & 1 deletion dev/specs/bulkEmbed/onError.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { Payload } from 'payload'
import { beforeAll, describe, expect, test } from 'vitest'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
import {
BULK_QUEUE_NAMES,
DEFAULT_DIMS,
buildPayloadWithIntegration,
createMockBulkEmbeddings,
createTestDb,
destroyPayload,
waitForBulkJobs,
} from '../utils.js'
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
Expand Down Expand Up @@ -56,6 +57,10 @@ describe('Bulk embed - onError callback', () => {
payload = built.payload
})

afterAll(async () => {
await destroyPayload(payload)
})

test('onError callback is called when batch fails', async () => {
await payload.create({ collection: 'posts', data: { title: 'Error Test' } as any })

Expand Down
Loading