diff --git a/.changeset/multi-root-local-adaptors.md b/.changeset/multi-root-local-adaptors.md new file mode 100644 index 000000000..85e0925b6 --- /dev/null +++ b/.changeset/multi-root-local-adaptors.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': minor +--- + +Support colon-separated `OPENFN_ADAPTORS_REPO` so a private adaptor monorepo can be loaded alongside the canonical OpenFn adaptors monorepo. When a job pins an adaptor to `@local`, the worker now walks the configured roots in order and resolves to the first root that contains `packages//package.json`. Single-path values continue to work unchanged. Earlier paths win on collision, mirroring Lightning's `AdaptorRegistry` precedence rules so the registry view and the worker's execution path stay consistent. diff --git a/packages/ws-worker/src/util/cli.ts b/packages/ws-worker/src/util/cli.ts index f8d27a126..c98b9c6bd 100644 --- a/packages/ws-worker/src/util/cli.ts +++ b/packages/ws-worker/src/util/cli.ts @@ -132,7 +132,7 @@ export default function parseArgs(argv: string[]): Args { .option('monorepo-dir', { alias: 'm', description: - 'Path to the adaptors monorepo, from where @local adaptors will be loaded. Env: OPENFN_ADAPTORS_REPO', + 'Path to the adaptors monorepo, from where @local adaptors will be loaded. Accepts a colon-separated list to merge multiple monorepos; the first path containing a given adaptor wins. Env: OPENFN_ADAPTORS_REPO', }) .option('secret', { alias: 's', diff --git a/packages/ws-worker/src/util/convert-lightning-plan.ts b/packages/ws-worker/src/util/convert-lightning-plan.ts index 2d24190da..fd19e0716 100644 --- a/packages/ws-worker/src/util/convert-lightning-plan.ts +++ b/packages/ws-worker/src/util/convert-lightning-plan.ts @@ -1,4 +1,5 @@ import crypto from 'node:crypto'; +import fs from 'node:fs'; import path from 'node:path'; import type { Step, @@ -50,18 +51,49 @@ export default ( ): { plan: ExecutionPlan; options: WorkerRunOptions; input: Lazy } => { const { collectionsVersion, monorepoPath } = options; + // monorepoPath is a colon-separated list of monorepo roots, mirroring how + // Lightning's AdaptorRegistry parses OPENFN_ADAPTORS_REPO. A single path + // (the common case) just becomes a one-element list. Order is precedence: + // when a `packages/` directory exists in more than one root, the + // earlier entry wins, so a private adaptor repo can be listed before the + // canonical OpenFn monorepo to override individual adaptors locally. + const monorepoRoots = (monorepoPath ?? '') + .split(':') + .map((p) => p.trim()) + .filter((p) => p.length > 0); + + const resolveLocalAdaptorPath = (shortName: string): string | undefined => { + if (monorepoRoots.length === 0) return undefined; + + for (const root of monorepoRoots) { + const candidate = path.resolve(root, 'packages', shortName); + if (fs.existsSync(path.join(candidate, 'package.json'))) { + return candidate; + } + } + // Fall back to the first root's resolved candidate path. The directory + // does not exist, but this surfaces a recognisable "missing local + // adaptor" path to the runtime instead of an unresolved colon-joined + // string. It also preserves the single-path behaviour from before + // multi-root support was added (the path was returned without an + // existence check). + return path.resolve(monorepoRoots[0], 'packages', shortName); + }; + const appendLocalVersions = (job: Job) => { - if (monorepoPath && job.adaptors!) { + if (monorepoRoots.length && job.adaptors!) { for (const adaptor of job.adaptors) { const { name, version } = getNameAndVersion(adaptor); - if (monorepoPath && version === 'local') { + if (version === 'local') { const shortName = name.replace('@openfn/language-', ''); - const localPath = path.resolve(monorepoPath, 'packages', shortName); - job.linker ??= {}; - job.linker[name] = { - path: localPath, - version: 'local', - }; + const localPath = resolveLocalAdaptorPath(shortName); + if (localPath) { + job.linker ??= {}; + job.linker[name] = { + path: localPath, + version: 'local', + }; + } } } } diff --git a/packages/ws-worker/test/util/convert-lightning-plan.test.ts b/packages/ws-worker/test/util/convert-lightning-plan.test.ts index 7cf1e3132..554ff45ba 100644 --- a/packages/ws-worker/test/util/convert-lightning-plan.test.ts +++ b/packages/ws-worker/test/util/convert-lightning-plan.test.ts @@ -1,4 +1,7 @@ import test from 'ava'; +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; import type { LightningPlan, LightningJob, @@ -7,6 +10,17 @@ import type { import convertPlan from '../../src/util/convert-lightning-plan'; import { Job } from '@openfn/lexicon'; +// Builds a temporary monorepo root with package.json files in each named adaptor. +const makeMonorepo = (adaptors: string[]) => { + const root = fs.mkdtempSync(path.join(os.tmpdir(), 'multi-root-')); + for (const adaptor of adaptors) { + const pkgDir = path.join(root, 'packages', adaptor); + fs.mkdirSync(pkgDir, { recursive: true }); + fs.writeFileSync(path.join(pkgDir, 'package.json'), '{}'); + } + return root; +}; + // Creates a lightning node (job or trigger) const createNode = (props = {}) => ({ @@ -585,6 +599,112 @@ test('Use local paths', (t) => { }); }); +test('Use local paths: resolves @local against a single existing root', (t) => { + const root = makeMonorepo(['common']); + + const run: Partial = { + id: 'w', + jobs: [createNode({ id: 'a', adaptor: 'common@local' })], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { monorepoPath: root }); + const [, a] = plan.workflow.steps as any[]; + + t.deepEqual(a.linker.common, { + path: path.resolve(root, 'packages', 'common'), + version: 'local', + }); +}); + +test('Use local paths: walks colon-separated roots in order, first match wins', (t) => { + const privateRoot = makeMonorepo(['publicschema']); + const canonicalRoot = makeMonorepo(['common', 'publicschema']); + + const run: Partial = { + id: 'w', + jobs: [ + createNode({ id: 'a', adaptor: 'common@local' }), + createNode({ id: 'b', adaptor: 'publicschema@local' }), + ], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a'), createEdge('a', 'b')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + monorepoPath: `${privateRoot}:${canonicalRoot}`, + }); + const [, a, b] = plan.workflow.steps as any[]; + + // common only exists in the canonical root, so it falls through. + t.is(a.linker.common.path, path.resolve(canonicalRoot, 'packages', 'common')); + // publicschema exists in both; the private (earlier) root wins. + t.is( + b.linker.publicschema.path, + path.resolve(privateRoot, 'packages', 'publicschema') + ); +}); + +test('Use local paths: ignores roots that do not contain the adaptor', (t) => { + const emptyRoot = makeMonorepo([]); + const realRoot = makeMonorepo(['http']); + + const run: Partial = { + id: 'w', + jobs: [createNode({ id: 'a', adaptor: 'http@local' })], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + monorepoPath: `${emptyRoot}:${realRoot}`, + }); + const [, a] = plan.workflow.steps as any[]; + + t.is(a.linker.http.path, path.resolve(realRoot, 'packages', 'http')); +}); + +test('Use local paths: trims whitespace and drops empty segments', (t) => { + const root = makeMonorepo(['common']); + + const run: Partial = { + id: 'w', + jobs: [createNode({ id: 'a', adaptor: 'common@local' })], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + monorepoPath: ` : ${root} : `, + }); + const [, a] = plan.workflow.steps as any[]; + + t.is(a.linker.common.path, path.resolve(root, 'packages', 'common')); +}); + +test('Use local paths: falls back to the first root when no root has the adaptor', (t) => { + const rootA = makeMonorepo([]); + const rootB = makeMonorepo([]); + + const run: Partial = { + id: 'w', + jobs: [createNode({ id: 'a', adaptor: 'mystery@local' })], + triggers: [{ id: 't', type: 'cron' }], + edges: [createEdge('t', 'a')], + }; + + const { plan } = convertPlan(run as LightningPlan, { + monorepoPath: `${rootA}:${rootB}`, + }); + const [, a] = plan.workflow.steps as any[]; + + // The candidate path under the first root is surfaced even though the + // adaptor is missing, so the runtime emits a clean "missing adaptor" + // error instead of crashing on a malformed colon-joined path. + t.is(a.linker.mystery.path, path.resolve(rootA, 'packages', 'mystery')); +}); + test('pass globals from lightning run to plan', (t) => { const GLOBALS_CONTENT = "export const prefixer = (v) => 'prefix-' + v"; const run: Partial = {