Skip to content

Commit c959bcf

Browse files
techiejdclaude
andauthored
Merge main into split_db_adapter (beta.5) (#42)
* adds should embed (#38) * adds should embed * Ups version to get ready for release * splits the job into one per batch (#41) * splits the job into one per batch * fix: remove waitUntil delay and persist failedChunkData on batch records - Remove 30s waitUntil delay from per-batch task re-queue (was causing test timeouts since the original code had no such delay) - Add failedChunkData JSON field to batch collection so per-batch tasks can store chunk-level failure data independently - Aggregate failedChunkData from batch records in finalizeRunIfComplete() instead of relying on in-memory accumulation from the old single-task flow Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add batchLimit to CollectionVectorizeOption with coordinator/worker architecture Splits prepare-bulk-embedding into coordinator + per-collection workers. Each worker processes one page of one collection, queuing a continuation job before processing to ensure crash safety. Default batchLimit is 1000 when not explicitly set. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: rewrite batchLimit test 2 to reuse same Payload instance The second test was creating a separate Payload instance sharing the same DB and job queues, causing two crons to compete for jobs. This led to double-execution and mock state inconsistency (expected 4 to be 2). Now both tests use the single beforeAll instance with cleanup between. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: add payload.destroy() in afterAll to prevent OOM from leaked crons Every test file that creates a Payload instance now calls payload.destroy() in afterAll (or try/finally for in-test instances). This stops background cron jobs from accumulating across tests, which was causing heap exhaustion in CI. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Trying to not destroy our heap * Runs tests in parallel now that each test gets its own db * WIP * fix: fix OOM, polling test assertions, and add diagnostic logging - Add --max-old-space-size=8192 to test:int NODE_OPTIONS (cross-env was overriding the CI env var, so the heap limit never took effect) - Fix polling.spec.ts queueSpy assertions: coordinator/worker adds an extra queue call, so poll-or-complete-single-batch is now call 3 and 4 instead of 2 and 3 - Add extensive [vectorize-debug] console.log throughout task handlers (coordinator, worker, poll-single, finalize, streamAndBatchDocs) to diagnose any remaining CI hangs - Remove redundant NODE_OPTIONS from CI workflow (now in the script) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: remove poll-or-complete-bulk-embedding task and aggregate incrementally Remove the backward-compatible fan-out task since the per-batch architecture hasn't been released yet. Refactor finalizeRunIfComplete to aggregate batch counts incrementally during pagination instead of collecting all batch objects into memory. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: bump to 0.5.5, update changelog, remove debug logging - Bump version 0.5.4 → 0.5.5 - Add 0.5.5 entry to CHANGELOG.md (coordinator/worker, batchLimit, per-batch polling) - Document batchLimit in README CollectionVectorizeOption section - Remove all diagnostic console.log statements from bulkEmbedAll.ts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Adds upgrade note --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> * chore: bump version to 0.6.0-beta.5 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: resolve 4 CI test failures from merge - chunkers.spec.ts: remove getPayload() call that crashes on dummy db, pass SanitizedConfig directly to chunkRichText - batchLimit.spec.ts: add missing dbAdapter (createMockAdapter) required by split_db_adapter architecture - extensionFieldsVectorSearch.spec.ts: pass adapter as second arg to createVectorSearchHandlers (new signature from split_db_adapter) - versionBump.spec.ts: destroy payload0 before creating payload1 to prevent cron worker race condition between two instances Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * Cleans a nit double line * Undoes a weird test fix done by the bot * fix: harden versionBump test with sequential steps and queue isolation - Use test.step() to enforce sequential execution of each phase - Add separate realtimeQueueName per payload instance to prevent cron worker cross-talk on the default queue - Use dynamic Date.now() keys to avoid cached state interference - Increase waitForBulkJobs timeout to 30s for CI Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: prevent waitForBulkJobs from returning prematurely waitForBulkJobs could return early in the coordinator/worker fan-out pattern when there's a brief window with 0 pending jobs between job transitions. Now it also checks the bulk embeddings run status — only returns when both no pending jobs exist AND no runs are in queued/running state. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: remove test.step() — not available in Vitest test.step() is a Playwright API, not Vitest. Reverted to flat sequential code with phase comments for readability. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: rewrite versionBump test with single Payload instance Instead of creating two Payload instances (which caused cron cross-talk, timeout, and queue isolation issues on CI), use one instance and mutate the knowledgePools config version between bulk embed runs. Tests the same code path (versionMismatch in streamAndBatchDocs) without the multi-instance fragility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 303c939 commit c959bcf

44 files changed

Lines changed: 967 additions & 564 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CHANGELOG.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## 0.6.0-beta.5 - 2026-02-25
6+
7+
- Merges main into split_db_adapter (per-batch polling, coordinator/worker architecture, destroyPayload cleanup).
8+
59
## 0.6.0-beta.4 - 2026-02-20
610

711
- Merges main with should embed changes.
@@ -87,6 +91,26 @@ const score = result.similarity
8791
const score = result.score
8892
```
8993

94+
## 0.5.5 - 2026-02-24
95+
96+
### Added
97+
98+
- **`batchLimit` option on `CollectionVectorizeOption`** – limits the number of documents fetched per bulk-embed worker job. When set, each page of results queues a continuation job for the next page, preventing serverless time-limit issues on large collections. Defaults to 1000.
99+
100+
### Changed
101+
102+
- **Coordinator / worker architecture for `prepare-bulk-embedding`** – the initial job now acts as a coordinator that fans out one worker job per collection. Each worker processes a single page of documents, making bulk embedding parallelizable and more resilient to timeouts.
103+
- **Per-batch polling via `poll-or-complete-single-batch`** – replaced the monolithic `poll-or-complete-bulk-embedding` task. Each provider batch now has its own polling job, improving observability and reducing memory usage.
104+
- **Memory-efficient incremental aggregation**`finalizeRunIfComplete` now scans batch records page-by-page instead of loading all batches into memory at once.
105+
106+
### Removed
107+
108+
- `poll-or-complete-bulk-embedding` task (replaced by `poll-or-complete-single-batch`).
109+
110+
### Upgrade Notes
111+
112+
- **Ensure no bulk embedding run is in progress when upgrading.** The `poll-or-complete-bulk-embedding` task has been removed and replaced by `poll-or-complete-single-batch`. Any in-flight bulk run that still has pending `poll-or-complete-bulk-embedding` jobs will fail because the task slug no longer exists. Wait for all active runs to complete (or cancel them) before deploying this version.
113+
90114
## 0.5.4 - 2026-02-20
91115

92116
### Added

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,8 +426,8 @@ type OnBulkErrorArgs = {
426426
427427
The plugin uses separate Payload jobs for reliability with long-running providers:
428428
429-
- **`prepare-bulk-embedding`**: Streams through documents, calls your `addChunk` for each chunk, creates batch records.
430-
- **`poll-or-complete-bulk-embedding`**: Polls all batches, requeues itself until done, then writes all successful embeddings (partial chunk failures are allowed).
429+
- **`prepare-bulk-embedding`**: A coordinator job fans out one worker per collection. Each worker streams through documents, calls your `addChunk` for each chunk, and creates batch records. When `batchLimit` is set on a collection, workers paginate and queue continuation jobs.
430+
- **`poll-or-complete-single-batch`**: Polls a single batch, requeues itself until done, then writes successful embeddings. When all batches for a run are terminal, the run is finalized (partial chunk failures are allowed).
431431
432432
### Queue Configuration
433433
@@ -561,6 +561,7 @@ curl -X POST http://localhost:3000/api/vector-retry-failed-batch \
561561

562562
- `shouldEmbedFn? (doc, payload)` – optional filter that runs **before** the document is queued for embedding. Return `false` to skip the document entirely (no job is created and `toKnowledgePool` is never called). Works for both real-time and bulk embedding. Defaults to embedding all documents when omitted.
563563
- `toKnowledgePool (doc, payload)` – return an array of `{ chunk, ...extensionFieldValues }`. Each object becomes one embedding row and the index in the array determines `chunkIndex`.
564+
- `batchLimit? (number)` – max documents to fetch per bulk-embed worker job. When set, each page of results becomes a separate job that queues a continuation for the next page. Useful for large collections that would exceed serverless time limits in a single job. Defaults to 1000.
564565

565566
Reserved column names: `sourceCollection`, `docId`, `chunkIndex`, `chunkText`, `embeddingVersion`. Avoid reusing them in `extensionFields`.
566567

adapters/pg/dev/specs/extensionFields.spec.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import type { Payload } from 'payload'
2-
import { beforeAll, describe, expect, test } from 'vitest'
2+
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
33
import { postgresAdapter } from '@payloadcms/db-postgres'
44
import { buildDummyConfig, integration, plugin } from './constants.js'
5-
import { createTestDb, waitForVectorizationJobs } from './utils.js'
5+
import { createTestDb, destroyPayload, waitForVectorizationJobs } from './utils.js'
66
import { getPayload } from 'payload'
77
import { PostgresPayload } from '../../src/types.js'
88
import { chunkText, chunkRichTextSimple as chunkRichText } from '@shared-test/helpers/chunkers'
@@ -113,6 +113,10 @@ describe('Extension fields integration tests', () => {
113113
})
114114
})
115115

116+
afterAll(async () => {
117+
await destroyPayload(payload)
118+
})
119+
116120
test('extension fields are added to the embeddings table schema', async () => {
117121
const db = (payload as PostgresPayload).db
118122
const sql = `

adapters/pg/dev/specs/integration.spec.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
* These tests verify Postgres-specific functionality like
55
* vector column creation, schema modifications, etc.
66
*/
7-
import { beforeAll, describe, expect, test } from 'vitest'
7+
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
88
import type { Payload, SanitizedConfig } from 'payload'
99
import { buildConfig, getPayload } from 'payload'
1010
import { postgresAdapter } from '@payloadcms/db-postgres'
1111
import { lexicalEditor } from '@payloadcms/richtext-lexical'
1212
import { Client } from 'pg'
1313
import { createPostgresVectorIntegration } from '../../src/index.js'
14+
import { destroyPayload } from './utils.js'
1415
import payloadcmsVectorize from 'payloadcms-vectorize'
1516

1617
const DIMS = 8
@@ -88,6 +89,10 @@ describe('Postgres-specific integration tests', () => {
8889
})
8990
})
9091

92+
afterAll(async () => {
93+
await destroyPayload(payload)
94+
})
95+
9196
test('adds embeddings collection with vector column', async () => {
9297
// Check schema for embeddings collection
9398
const collections = payload.collections

adapters/pg/dev/specs/migrationCli.spec.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { postgresAdapter } from '@payloadcms/db-postgres'
44
import { buildConfig, getPayload } from 'payload'
55
import { createPostgresVectorIntegration } from '../../src/index.js'
66
import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed'
7-
import { createTestDb } from './utils.js'
7+
import { createTestDb, destroyPayload } from './utils.js'
88
import { DIMS } from './constants.js'
99

1010
const createVectorizeIntegration = createPostgresVectorIntegration
@@ -77,6 +77,10 @@ describe('Migration CLI integration tests', () => {
7777
payload = await getPayload({ config, cron: true })
7878
})
7979

80+
afterAll(async () => {
81+
await destroyPayload(payload)
82+
})
83+
8084
test('VectorizedPayload has _staticConfigs via getDbAdapterCustom', async () => {
8185
const { getVectorizedPayload } = await import('payloadcms-vectorize')
8286
const vectorizedPayload = getVectorizedPayload(payload)
@@ -159,6 +163,10 @@ describe('Migration CLI integration tests', () => {
159163
})
160164
})
161165

166+
afterAll(async () => {
167+
await destroyPayload(payload)
168+
})
169+
162170
test('vector search fails with descriptive error when embedding column missing', async () => {
163171
const { getVectorizedPayload } = await import('payloadcms-vectorize')
164172
const vectorizedPayload = getVectorizedPayload(payload)
@@ -207,7 +215,7 @@ describe('Migration CLI integration tests', () => {
207215
})
208216

209217
afterAll(async () => {
210-
// Cleanup: remove test migrations directory
218+
await destroyPayload(autoPayload)
211219
if (existsSync(migrationsDir)) {
212220
rmSync(migrationsDir, { recursive: true, force: true })
213221
}
@@ -429,6 +437,7 @@ describe('Migration CLI integration tests', () => {
429437
})
430438

431439
afterAll(async () => {
440+
await destroyPayload(dimsPayload)
432441
if (existsSync(migrationsDir)) {
433442
rmSync(migrationsDir, { recursive: true, force: true })
434443
}
@@ -729,6 +738,7 @@ describe('Migration CLI integration tests', () => {
729738
})
730739

731740
afterAll(async () => {
741+
await destroyPayload(multiPayload)
732742
if (existsSync(migrationsDir)) {
733743
rmSync(migrationsDir, { recursive: true, force: true })
734744
}

adapters/pg/dev/specs/multipools.spec.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import type { Payload, SanitizedConfig } from 'payload'
22

33
import { buildConfig } from 'payload'
4-
import { beforeAll, describe, expect, test } from 'vitest'
4+
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
55
import { lexicalEditor } from '@payloadcms/richtext-lexical'
66
import { postgresAdapter } from '@payloadcms/db-postgres'
7-
import { createTestDb } from './utils.js'
7+
import { createTestDb, destroyPayload } from './utils.js'
88
import { getPayload } from 'payload'
99
import { createPostgresVectorIntegration } from '../../src/index.js'
1010
import payloadcmsVectorize from 'payloadcms-vectorize'
@@ -79,6 +79,10 @@ describe('Multiple knowledge pools', () => {
7979
})
8080
})
8181

82+
afterAll(async () => {
83+
await destroyPayload(payload)
84+
})
85+
8286
test('creates two embeddings collections with vector columns', async () => {
8387
const collections = payload.collections
8488
expect(collections).toHaveProperty('pool1')

adapters/pg/dev/specs/schemaName.spec.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@ import type { Payload } from 'payload'
33
import { postgresAdapter } from '@payloadcms/db-postgres'
44
import { makeDummyEmbedDocs, makeDummyEmbedQuery, testEmbeddingVersion } from '@shared-test/helpers/embed'
55
import { Client } from 'pg'
6-
import { beforeAll, describe, expect, test } from 'vitest'
6+
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
77

88
import type { PostgresPayload } from '../../src/types.js'
99

1010
import { buildDummyConfig, DIMS, integration, plugin } from './constants.js'
11-
import { createTestDb, waitForVectorizationJobs } from './utils.js'
11+
import {
12+
createTestDb,
13+
destroyPayload,
14+
waitForVectorizationJobs,
15+
} from './utils.js'
1216
import { getPayload } from 'payload'
1317
import { getVectorizedPayload } from 'payloadcms-vectorize'
1418
const CUSTOM_SCHEMA = 'custom'
@@ -91,6 +95,10 @@ describe('Custom schemaName support', () => {
9195
})
9296
})
9397

98+
afterAll(async () => {
99+
await destroyPayload(payload)
100+
})
101+
94102
test('embeddings table is created in custom schema', async () => {
95103
const db = (payload as PostgresPayload).db
96104
const tablesRes = await db.pool?.query(

adapters/pg/dev/specs/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Client } from 'pg'
22

3-
export { waitForVectorizationJobs } from '@shared-test/utils'
3+
export { waitForVectorizationJobs, destroyPayload } from '@shared-test/utils'
44

55
export const createTestDb = async ({ dbName }: { dbName: string }) => {
66
const adminUri =

dev/specs/bulkEmbed/basic.spec.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Payload, SanitizedConfig } from 'payload'
2-
import { afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
2+
import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test, vi } from 'vitest'
33
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
44
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
55
import { BULK_EMBEDDINGS_INPUT_METADATA_SLUG } from '../../../src/collections/bulkEmbeddingInputMetadata.js'
@@ -10,6 +10,7 @@ import {
1010
clearAllCollections,
1111
createMockBulkEmbeddings,
1212
createTestDb,
13+
destroyPayload,
1314
waitForBulkJobs,
1415
} from '../utils.js'
1516
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
@@ -56,6 +57,10 @@ describe('Bulk embed - basic tests', () => {
5657
vectorizedPayload = getVectorizedPayload(payload)
5758
})
5859

60+
afterAll(async () => {
61+
await destroyPayload(payload)
62+
})
63+
5964
beforeEach(async () => {
6065
await clearAllCollections(payload)
6166
})
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import type { Payload } from 'payload'
2+
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
3+
import { BULK_EMBEDDINGS_RUNS_SLUG } from '../../../src/collections/bulkEmbeddingsRuns.js'
4+
import { BULK_EMBEDDINGS_BATCHES_SLUG } from '../../../src/collections/bulkEmbeddingsBatches.js'
5+
import {
6+
BULK_QUEUE_NAMES,
7+
DEFAULT_DIMS,
8+
buildPayloadWithIntegration,
9+
createMockBulkEmbeddings,
10+
createTestDb,
11+
destroyPayload,
12+
waitForBulkJobs,
13+
} from '../utils.js'
14+
import { makeDummyEmbedQuery, testEmbeddingVersion } from 'helpers/embed.js'
15+
import { getVectorizedPayload, VectorizedPayload } from 'payloadcms-vectorize'
16+
import { expectGoodResult } from '../utils.vitest.js'
17+
import { createMockAdapter } from 'helpers/mockAdapter.js'
18+
19+
const DIMS = DEFAULT_DIMS
20+
const dbName = `bulk_batchlimit_${Date.now()}`
21+
22+
describe('Bulk embed - batchLimit', () => {
23+
let payload: Payload
24+
let vectorizedPayload: VectorizedPayload | null = null
25+
26+
beforeAll(async () => {
27+
await createTestDb({ dbName })
28+
const built = await buildPayloadWithIntegration({
29+
dbName,
30+
pluginOpts: {
31+
dbAdapter: createMockAdapter(),
32+
knowledgePools: {
33+
default: {
34+
collections: {
35+
posts: {
36+
toKnowledgePool: async (doc: any) => [{ chunk: doc.title }],
37+
batchLimit: 2,
38+
},
39+
},
40+
embeddingConfig: {
41+
version: testEmbeddingVersion,
42+
queryFn: makeDummyEmbedQuery(DIMS),
43+
bulkEmbeddingsFns: createMockBulkEmbeddings({
44+
statusSequence: ['succeeded'],
45+
}),
46+
},
47+
},
48+
},
49+
bulkQueueNames: BULK_QUEUE_NAMES,
50+
},
51+
key: `batchlimit-${Date.now()}`,
52+
})
53+
payload = built.payload
54+
vectorizedPayload = getVectorizedPayload(payload)
55+
})
56+
57+
afterAll(async () => {
58+
await destroyPayload(payload)
59+
})
60+
61+
test('batchLimit splits docs across continuation jobs and all get embedded', async () => {
62+
// Create 5 posts with batchLimit: 2
63+
// Should result in 3 prepare jobs (2 docs, 2 docs, 1 doc)
64+
for (let i = 0; i < 5; i++) {
65+
await payload.create({ collection: 'posts', data: { title: `BatchLimit Post ${i}` } as any })
66+
}
67+
68+
const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
69+
expectGoodResult(result)
70+
71+
await waitForBulkJobs(payload, 30000)
72+
73+
// All 5 posts should have embeddings
74+
const embeds = await payload.find({ collection: 'default' })
75+
expect(embeds.totalDocs).toBe(5)
76+
77+
// Run should be succeeded
78+
const runDoc = (
79+
await (payload as any).find({
80+
collection: BULK_EMBEDDINGS_RUNS_SLUG,
81+
where: { id: { equals: result!.runId } },
82+
})
83+
).docs[0]
84+
expect(runDoc.status).toBe('succeeded')
85+
expect(runDoc.inputs).toBe(5)
86+
})
87+
88+
test('batchLimit equal to doc count does not create extra continuations', async () => {
89+
// Clean up from prior test: delete all posts and embeddings
90+
await payload.delete({ collection: 'posts', where: {} })
91+
await payload.delete({ collection: 'default' as any, where: {} })
92+
93+
// Create exactly 2 posts (matching batchLimit: 2)
94+
for (let i = 0; i < 2; i++) {
95+
await payload.create({
96+
collection: 'posts',
97+
data: { title: `Exact Post ${i}` } as any,
98+
})
99+
}
100+
101+
const result = await vectorizedPayload?.bulkEmbed({ knowledgePool: 'default' })
102+
expectGoodResult(result)
103+
104+
await waitForBulkJobs(payload, 20000)
105+
106+
const embeds = await payload.find({ collection: 'default' })
107+
expect(embeds.totalDocs).toBe(2)
108+
109+
const runDoc = (
110+
await (payload as any).find({
111+
collection: BULK_EMBEDDINGS_RUNS_SLUG,
112+
where: { id: { equals: result!.runId } },
113+
})
114+
).docs[0]
115+
expect(runDoc.status).toBe('succeeded')
116+
})
117+
})

0 commit comments

Comments
 (0)