From 369c423e5b1dfc777270e5807300e744a211a807 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 27 Jun 2023 10:04:37 -0400 Subject: [PATCH 01/11] runtime: allow globals to be injected --- packages/runtime/src/execute/context.ts | 5 +++- packages/runtime/src/runtime.ts | 5 ++++ packages/runtime/test/runtime.test.ts | 12 ++++++++++ pnpm-lock.yaml | 31 +++++++------------------ 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/packages/runtime/src/execute/context.ts b/packages/runtime/src/execute/context.ts index 10bfeb624..585567199 100644 --- a/packages/runtime/src/execute/context.ts +++ b/packages/runtime/src/execute/context.ts @@ -15,11 +15,14 @@ const freezeAll = ( // Build a safe and helpful execution context // This will be shared by all jobs -export default (state: State, options: Pick) => { +export default (state: State, options: Pick) => { const logger = options.jobLogger ?? console; + const globals = options.globals || {}; const context = vm.createContext( freezeAll( { + ...globals, + // Note that these globals will be overridden console: logger, clearInterval, clearTimeout, diff --git a/packages/runtime/src/runtime.ts b/packages/runtime/src/runtime.ts index d00010901..5830e7e33 100644 --- a/packages/runtime/src/runtime.ts +++ b/packages/runtime/src/runtime.ts @@ -27,6 +27,11 @@ export type Options = { forceSandbox?: boolean; linker?: LinkerOptions; + + // inject stuff into the environment + // aka globals + // Used by unit tests. any security concerns? + globals?: any; }; const defaultState = { data: {}, configuration: {} }; diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 73fd6477d..6db890e09 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -269,3 +269,15 @@ test('stuff written to state before an error is preserved', async (t) => { t.is(result.x, 1); }); + +test('inject globals', async (t) => { + const expression = 'export default [(s) => Object.assign(s, { data: { x } })]'; + + const result: any = await run(expression, {}, { + globals: { + x: 90210 + } + }); + t.is(result.data.x, 90210); +}); + diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5b3e0e7e6..c9696925c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -59,14 +59,6 @@ importers: tslib: 2.4.0 typescript: 4.8.3 - integration-tests/cli/repo: - specifiers: - '@openfn/language-common_1.7.7': npm:@openfn/language-common@^1.7.7 - is-array_1.0.1: npm:is-array@^1.0.1 - dependencies: - '@openfn/language-common_1.7.7': /@openfn/language-common/1.7.7 - is-array_1.0.1: /is-array/1.0.1 - packages/cli: specifiers: '@openfn/compiler': workspace:* @@ -618,17 +610,6 @@ packages: - debug dev: true - /@openfn/language-common/1.7.7: - resolution: {integrity: sha512-GSoAbo6oL0b8jHufhLKvIzHJ271aE2AKv/ibeuiWU3CqN1gRmaHArlA/omlCs/rsfcieSp2VWAvWeGuFY8buZw==} - dependencies: - axios: 1.1.3 - date-fns: 2.29.3 - jsonpath-plus: 4.0.0 - lodash: 4.17.21 - transitivePeerDependencies: - - debug - dev: false - /@openfn/language-common/2.0.0-rc3: resolution: {integrity: sha512-7kwhBnCd1idyTB3MD9dXmUqROAhoaUIkz2AGDKuv9vn/cbZh7egEv9/PzKkRcDJYFV9qyyS+cVT3Xbgsg2ii5g==} bundledDependencies: [] @@ -1072,6 +1053,7 @@ packages: /asynckit/0.4.0: resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} + dev: true /atob/2.1.2: resolution: {integrity: sha512-Wm6ukoaOGJi/73p/cl2GvLjTI5JM1k/O14isD73YML8StrH/7/lRFgmg8nICZgD3bZZvjwCGxtMOD3wWNAu8cg==} @@ -1145,6 +1127,7 @@ packages: proxy-from-env: 1.1.0 transitivePeerDependencies: - debug + dev: true /b4a/1.6.1: resolution: {integrity: sha512-AsKjNhz72yxteo/0EtQEiwkMUgk/tGmycXlbG4g3Ard2/ULtNLUykGOkeK0egmN27h0xMAhb76jYccW+XTBExA==} @@ -1540,6 +1523,7 @@ packages: engines: {node: '>= 0.8'} dependencies: delayed-stream: 1.0.0 + dev: true /commander/4.1.1: resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==} @@ -1810,6 +1794,7 @@ packages: /delayed-stream/1.0.0: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + dev: true /delegates/1.0.0: resolution: {integrity: sha512-bd2L678uiWATM6m5Z1VzNCErI3jiGzt6HGY8OVICs40JQq/HALfbyNJmp0UDakEY4pMMaN0Ly5om/B1VI/+xfQ==} @@ -2849,6 +2834,7 @@ packages: peerDependenciesMeta: debug: optional: true + dev: true /for-in/1.0.2: resolution: {integrity: sha512-7EwmXrOjyL+ChxMhmG5lnW9MPt1aIeZEwKhQzoBUdTV0N3zuwWDZYVJatDvZ2OyzPUvdIAZDsCetk3coyMfcnQ==} @@ -2862,6 +2848,7 @@ packages: asynckit: 0.4.0 combined-stream: 1.0.8 mime-types: 2.1.35 + dev: true /fragment-cache/0.2.1: resolution: {integrity: sha512-GMBAbW9antB8iZRHLoGw0b3HANt57diZYFO/HL1JGIC1MjKrdmhxvrJbupnVvpys0zsz7yBApXdQyfepKly2kA==} @@ -3264,10 +3251,6 @@ packages: kind-of: 6.0.3 dev: true - /is-array/1.0.1: - resolution: {integrity: sha512-gxiZ+y/u67AzpeFmAmo4CbtME/bs7J2C++su5zQzvQyaxUqVzkh69DI+jN+KZuSO6JaH6TIIU6M6LhqxMjxEpw==} - dev: false - /is-arrayish/0.2.1: resolution: {integrity: sha512-zz06S8t0ozoDXMG+ube26zeCTNXcKIPJZJi8hBrF4idCLms4CG9QtK7qBl1boi5ODzFpjswb5JPmHCbMpjaYzg==} dev: true @@ -3584,6 +3567,7 @@ packages: /jsonpath-plus/4.0.0: resolution: {integrity: sha512-e0Jtg4KAzDJKKwzbLaUtinCn0RZseWBVRTRGihSpvFlM3wTR7ExSp+PTdeTsDrLNJUe7L7JYJe8mblHX5SCT6A==} engines: {node: '>=10.0'} + dev: true /keygrip/1.1.0: resolution: {integrity: sha512-iYSchDJ+liQ8iwbSI2QqsQOvqv58eJCEanyJPJi+Khyu8smkcKSFUCbPwzFcL7YVtZ6eONjqRX/38caJ7QjRAQ==} @@ -4533,6 +4517,7 @@ packages: /proxy-from-env/1.1.0: resolution: {integrity: sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==} + dev: true /proxy-middleware/0.15.0: resolution: {integrity: sha512-EGCG8SeoIRVMhsqHQUdDigB2i7qU7fCsWASwn54+nPutYO8n4q6EiwMzyfWlC+dzRFExP+kvcnDFdBDHoZBU7Q==} From 6002a0f58a45315d0fc88daab0baf521e212d110 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 27 Jun 2023 10:13:30 -0400 Subject: [PATCH 02/11] runtime: extra test on global injection --- packages/runtime/test/runtime.test.ts | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/packages/runtime/test/runtime.test.ts b/packages/runtime/test/runtime.test.ts index 6db890e09..0ce9226c3 100644 --- a/packages/runtime/test/runtime.test.ts +++ b/packages/runtime/test/runtime.test.ts @@ -281,3 +281,28 @@ test('inject globals', async (t) => { t.is(result.data.x, 90210); }); +test('injected globals can\'t override special functions', async (t) => { + const panic = () => { throw new Error('illegal override') } + + const globals = { + console: panic, + clearInterval: panic, + clearTimeout: panic, + parseFloat: panic, + parseInt: panic, + setInterval: panic, + setTimeout: panic, + } + const expression = `export default [(s) => { + parseFloat(); + parseInt(); + const i = setInterval(() => {}, 1000); + clearInterval(i); + const t = setTimeout(() => {}, 1000); + clearTimeout(t); + return s; + }]`; + + const result: any = await run(expression, {}, { globals }); + t.falsy(result.errors); +}); From a520e7b7aa7ed65119902efd877300ee62687f32 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 27 Jun 2023 10:46:41 -0400 Subject: [PATCH 03/11] runtime: use global execute from injection if it exists --- packages/runtime/src/execute/expression.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/runtime/src/execute/expression.ts b/packages/runtime/src/execute/expression.ts index 7ac7a49e1..d03d0a978 100644 --- a/packages/runtime/src/execute/expression.ts +++ b/packages/runtime/src/execute/expression.ts @@ -23,7 +23,7 @@ export default ( const { operations, execute } = await prepareJob(expression, context, opts); // Create the main reducer function - const reducer = (execute || defaultExecute)( + const reducer = (execute || opts.globals?.execute || defaultExecute)( ...operations.map((op, idx) => wrapOperation(op, logger, `${idx + 1}`, opts.immutableState) ) From 810397bb386c0ba28fc841173d4c77ae649439fe Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 15:05:12 +0000 Subject: [PATCH 04/11] worker: use real runtime in engine mock --- packages/ws-worker/src/mock/runtime-engine.ts | 272 +++++++++++------- .../test/mock/runtime-engine.test.ts | 60 ++-- 2 files changed, 216 insertions(+), 116 deletions(-) diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 9e4380294..2cc5bd24f 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -1,6 +1,6 @@ import crypto from 'node:crypto'; import { EventEmitter } from 'node:events'; -import type { ExecutionPlan, JobNode } from '@openfn/runtime'; +import run, { ExecutionPlan, JobNode, NotifyEvents } from '@openfn/runtime'; import * as engine from '@openfn/engine-multi'; import mockResolvers from './resolvers'; @@ -26,6 +26,18 @@ export type WorkflowErrorEvent = { message: string; }; +// this is basically a fake adaptor +// these functions will be injected into scope +// maybe +// needs me to add the globals option to the runtime +// (which is fine) +const helpers = {}; + +// The mock runtime engine creates a fake engine interface +// around a real runtime engine +// Note that it does not dispatch runtime logs and only supports console.log +// This gives us real eventing in the worker tests +// TODO - even better would be to re-use the engine's event map or something async function createMock() { const activeWorkflows = {} as Record; const bus = new EventEmitter(); @@ -53,116 +65,176 @@ async function createMock() { listeners[planId] = events; }; - const executeJob = async ( - workflowId: string, - job: JobNode, - initialState = {}, - resolvers: engine.Resolvers = mockResolvers + const execute = async ( + xplan: ExecutionPlan, + options: { resolvers?: engine.Resolvers; throw?: boolean } = { + resolvers: mockResolvers, + } ) => { - const { id, expression, configuration, adaptor } = job; + const { id, jobs } = xplan; + activeWorkflows[id!] = true; - // If no expression or adaptor, this is (probably) a trigger node. - // Silently do nothing - if (!expression && !adaptor) { - return initialState; + // Call the crendtial callback, but don't do anything with it + for (const job of jobs) { + if (typeof job.configuration === 'string') { + job.configuration = await options.resolvers.credential?.( + job.configuration + ); + } } - const runId = crypto.randomUUID(); - - const jobId = id; - if (typeof configuration === 'string') { - // Fetch the credential but do nothing with it - // Maybe later we use it to assemble state - await resolvers.credential?.(configuration); - } + // TODO do I need a more sophisticated solution here? + const jobLogger = { + log: (...args) => { + dispatch('workflow-log', { + workflowId: id, + level: 'info', + json: true, + message: args, + }); + }, + }; - const info = (...message: any[]) => { - dispatch('workflow-log', { - workflowId, - message: message, - level: 'info', - time: (BigInt(Date.now()) * BigInt(1e3)).toString(), - name: 'mck', - }); + const opts = { + strict: false, + // logger? + jobLogger, + // linker? + ...options, + callbacks: { + notify: (name: NotifyEvents, payload: any) => { + // console.log(name, payload); + // TODO events need to be mapped into runtime engine events (noot runtime events) + dispatch(name, { + workflowId: id, + ...payload, // ? + }); + }, + }, }; + setTimeout(async () => { + dispatch('workflow-start', { workflowId: id }); - // Get the job details from lightning - // start instantly and emit as it goes - dispatch('job-start', { workflowId, jobId, runId }); - info('Running job ' + jobId); - let nextState = initialState; - - // @ts-ignore - if (expression?.startsWith?.('wait@')) { - const [_, delay] = (expression as string).split('@'); - nextState = initialState; - await new Promise((resolve) => { - setTimeout(() => resolve(), parseInt(delay)); - }); - } else { - // Try and parse the expression as JSON, in which case we use it as the final state - try { - // @ts-ignore - nextState = JSON.parse(expression); - // What does this look like? Should be a logger object - info('Parsing expression as JSON state'); - info(nextState); - } catch (e) { - // Do nothing, it's fine - nextState = initialState; - } - } + await run(xplan, undefined, opts); - dispatch('job-complete', { - workflowId, - jobId, - state: nextState, - runId, - next: [], // TODO hmm. I think we need to do better than this. - }); + delete activeWorkflows[id!]; + dispatch('workflow-complete', { workflowId: id }); + }, 1); - return nextState; + // Technically the engine should return an event emitter + // But as I don't think we use it, I'm happy to ignore this }; - // Start executing an ExecutionPlan - // The mock uses lots of timeouts to make testing a bit easier and simulate asynchronicity - const execute = ( - xplan: ExecutionPlan, - options: { resolvers?: engine.Resolvers; throw?: boolean } = { - resolvers: mockResolvers, - } - ) => { - // This is just an easy way to test the options gets fed through to execute - // Also lets me test error handling! - if (options.throw) { - throw new Error('test error'); - } - - const { id, jobs, initialState } = xplan; - const workflowId = id; - activeWorkflows[id!] = true; - - // TODO do we want to load a globals dataclip from job.state here? - // This isn't supported right now - // We would need to use resolvers.dataclip if we wanted it - - setTimeout(() => { - dispatch('workflow-start', { workflowId }); - setTimeout(async () => { - let state = initialState || {}; - // Trivial job reducer in our mock - for (const job of jobs) { - state = await executeJob(id!, job, state, options.resolvers); - } - setTimeout(() => { - delete activeWorkflows[id!]; - dispatch('workflow-complete', { workflowId }); - // TODO on workflow complete we should maybe tidy the listeners? - // Doesn't really matter in the mock though - }, 1); - }, 1); - }, 1); - }; + // const executeJob = async ( + // workflowId: string, + // job: JobNode, + // initialState = {}, + // resolvers: engine.Resolvers = mockResolvers + // ) => { + // const { id, expression, configuration, adaptor } = job; + + // // If no expression or adaptor, this is (probably) a trigger node. + // // Silently do nothing + // if (!expression && !adaptor) { + // return initialState; + // } + + // const runId = crypto.randomUUID(); + + // const jobId = id; + // if (typeof configuration === 'string') { + // // Fetch the credential but do nothing with it + // // Maybe later we use it to assemble state + // await resolvers.credential?.(configuration); + // } + + // const info = (...message: any[]) => { + // dispatch('workflow-log', { + // workflowId, + // message: message, + // level: 'info', + // time: (BigInt(Date.now()) * BigInt(1e3)).toString(), + // name: 'mck', + // }); + // }; + + // // Get the job details from lightning + // // start instantly and emit as it goes + // dispatch('job-start', { workflowId, jobId, runId }); + // info('Running job ' + jobId); + // let nextState = initialState; + + // // @ts-ignore + // if (expression?.startsWith?.('wait@')) { + // const [_, delay] = (expression as string).split('@'); + // nextState = initialState; + // await new Promise((resolve) => { + // setTimeout(() => resolve(), parseInt(delay)); + // }); + // } else { + // // Try and parse the expression as JSON, in which case we use it as the final state + // try { + // // @ts-ignore + // nextState = JSON.parse(expression); + // // What does this look like? Should be a logger object + // info('Parsing expression as JSON state'); + // info(nextState); + // } catch (e) { + // // Do nothing, it's fine + // nextState = initialState; + // } + // } + + // dispatch('job-complete', { + // workflowId, + // jobId, + // state: nextState, + // runId, + // next: [], // TODO hmm. I think we need to do better than this. + // }); + + // return nextState; + // }; + + // // Start executing an ExecutionPlan + // // The mock uses lots of timeouts to make testing a bit easier and simulate asynchronicity + // const execute = ( + // xplan: ExecutionPlan, + // options: { resolvers?: engine.Resolvers; throw?: boolean } = { + // resolvers: mockResolvers, + // } + // ) => { + // // This is just an easy way to test the options gets fed through to execute + // // Also lets me test error handling! + // if (options.throw) { + // throw new Error('test error'); + // } + + // const { id, jobs, initialState } = xplan; + // const workflowId = id; + // activeWorkflows[id!] = true; + + // // TODO do we want to load a globals dataclip from job.state here? + // // This isn't supported right now + // // We would need to use resolvers.dataclip if we wanted it + + // setTimeout(() => { + // dispatch('workflow-start', { workflowId }); + // setTimeout(async () => { + // let state = initialState || {}; + // // Trivial job reducer in our mock + // for (const job of jobs) { + // state = await executeJob(id!, job, state, options.resolvers); + // } + // setTimeout(() => { + // delete activeWorkflows[id!]; + // dispatch('workflow-complete', { workflowId }); + // // TODO on workflow complete we should maybe tidy the listeners? + // // Doesn't really matter in the mock though + // }, 1); + // }, 1); + // }, 1); + // }; // return a list of jobs in progress const getStatus = () => { diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index eb207ab4a..fabf88451 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -14,11 +14,17 @@ const sampleWorkflow = { { id: 'j1', adaptor: 'common@1.0.0', - expression: '{ "x": 10 }', + expression: 'export default [() => ({ x: 10 })]', }, ], } as ExecutionPlan; +// rethinking these tests, I think I want +// - fake adaptor functions, fn & wait +// - deeper testing on the engine events +// - fake compilation +// - do we do anything about adaptors? + test('getStatus() should should have no active workflows', async (t) => { const engine = await create(); const { active } = engine.getStatus(); @@ -85,7 +91,11 @@ test('mock should evaluate expressions as JSON', async (t) => { t.deepEqual(evt.state, { x: 10 }); }); -test('mock should wait if expression starts with @wait', async (t) => { +// well, maybe it shouldn't +// We should have real looking expressions +// albiet with no compilation +// we could fake compilation though +test.skip('mock should wait if expression starts with @wait', async (t) => { const engine = await create(); const wf = { id: 'w1', @@ -103,7 +113,8 @@ test('mock should wait if expression starts with @wait', async (t) => { t.true(end > 90); }); -test('mock should return initial state as result state', async (t) => { +// nope +test.skip('mock should return initial state as result state', async (t) => { const engine = await create(); const wf = { @@ -120,7 +131,8 @@ test('mock should return initial state as result state', async (t) => { t.deepEqual(evt.state, { y: 22 }); }); -test('mock prefers JSON state to initial state', async (t) => { +// nope +test.skip('mock prefers JSON state to initial state', async (t) => { const engine = await create(); const wf = { @@ -138,7 +150,8 @@ test('mock prefers JSON state to initial state', async (t) => { t.deepEqual(evt.state, { z: 33 }); }); -test('mock should dispatch log events when evaluating JSON', async (t) => { +// logs yes, json no +test.skip('mock should dispatch log events when evaluating JSON', async (t) => { const engine = await create(); const logs = []; @@ -153,7 +166,8 @@ test('mock should dispatch log events when evaluating JSON', async (t) => { t.deepEqual(logs[1].message, ['Parsing expression as JSON state']); }); -test('mock should throw if the magic option is passed', async (t) => { +// nope, the engine should not throw at all +test.skip('mock should throw if the magic option is passed', async (t) => { const engine = await create(); const logs = []; @@ -198,34 +212,45 @@ test('listen to events', async (t) => { 'workflow-complete': false, }; - engine.listen(sampleWorkflow.id, { + const wf = { + id: 'wibble', + jobs: [ + { + id: 'j1', + adaptor: 'common@1.0.0', + expression: 'export default [() => { console.log("x"); }]', + }, + ], + } as ExecutionPlan; + + engine.listen(wf.id, { 'job-start': ({ workflowId, jobId }) => { called['job-start'] = true; - t.is(workflowId, sampleWorkflow.id); - t.is(jobId, sampleWorkflow.jobs[0].id); + t.is(workflowId, wf.id); + t.is(jobId, wf.jobs[0].id); }, 'job-complete': ({ workflowId, jobId }) => { called['job-complete'] = true; - t.is(workflowId, sampleWorkflow.id); - t.is(jobId, sampleWorkflow.jobs[0].id); + t.is(workflowId, wf.id); + t.is(jobId, wf.jobs[0].id); // TODO includes state? }, 'workflow-log': ({ workflowId, message }) => { called['workflow-log'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); t.truthy(message); }, 'workflow-start': ({ workflowId }) => { called['workflow-start'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); }, 'workflow-complete': ({ workflowId }) => { called['workflow-complete'] = true; - t.is(workflowId, sampleWorkflow.id); + t.is(workflowId, wf.id); }, }); - engine.execute(sampleWorkflow); + engine.execute(wf); await waitForEvent(engine, 'workflow-complete'); t.assert(Object.values(called).every((v) => v === true)); }); @@ -244,12 +269,13 @@ test('only listen to events for the correct workflow', async (t) => { t.pass(); }); -test('do nothing for a job if no expression and adaptor (trigger node)', async (t) => { +test.skip('do nothing for a job if no expression and adaptor (trigger node)', async (t) => { const workflow = { id: 'w1', jobs: [ { id: 'j1', + expression: 'export default [() => console.log("x"); )]', }, ], } as ExecutionPlan; @@ -279,5 +305,7 @@ test('do nothing for a job if no expression and adaptor (trigger node)', async ( engine.execute(workflow); await waitForEvent(engine, 'workflow-complete'); + console.log(); + t.false(didCallEvent); }); From f7f504820fd3b047cba0650ad246d82854e49577 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 15:26:39 +0000 Subject: [PATCH 05/11] worker: fix tests with new mock --- packages/ws-worker/src/mock/runtime-engine.ts | 12 +++- packages/ws-worker/test/api/execute.test.ts | 14 +++-- packages/ws-worker/test/lightning.test.ts | 57 +++++++++++-------- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 2cc5bd24f..65df0398f 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -91,6 +91,7 @@ async function createMock() { level: 'info', json: true, message: args, + time: Date.now(), }); }, }; @@ -115,7 +116,16 @@ async function createMock() { setTimeout(async () => { dispatch('workflow-start', { workflowId: id }); - await run(xplan, undefined, opts); + try { + await run(xplan, undefined, opts); + } catch (e: any) { + // TODO I have no test on this + dispatch('workflow-error', { + workflowId: id, + type: e.name, + message: e.message, + }); + } delete activeWorkflows[id!]; dispatch('workflow-complete', { workflowId: id }); diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index d6af0e965..a74832edb 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -407,7 +407,8 @@ test('execute should pass the final result to onFinish', async (t) => { id: 'a', jobs: [ { - expression: JSON.stringify({ done: true }), + // TODO use new fn helper + expression: 'export default [() => ({ done: true })]', }, ], }; @@ -431,7 +432,8 @@ test('execute should return a context object', async (t) => { id: 'a', jobs: [ { - expression: JSON.stringify({ done: true }), + // TODO use new fn helper + expression: 'export default [() => ({ done: true })]', }, ], }; @@ -476,7 +478,8 @@ test('execute should lazy-load a credential', async (t) => { jobs: [ { configuration: 'abc', - expression: JSON.stringify({ done: true }), + // TODO use new fn helper + expression: 'export default [() => ({ done: true })]', }, ], }; @@ -511,7 +514,8 @@ test('execute should lazy-load initial state', async (t) => { initialState: 'abc', jobs: [ { - expression: JSON.stringify({ done: true }), + // TODO use new fn helper + expression: 'export default [() => ({ done: true })]', }, ], }; @@ -558,8 +562,8 @@ test('execute should call all events on the socket', async (t) => { { id: 'trigger', configuration: 'a', - expression: 'fn(a => a)', adaptor: '@openfn/language-common@1.0.0', + expression: 'export default [() => console.log("x")]', }, ], }; diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index faf004527..bac5b9754 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -30,13 +30,18 @@ test.before(async () => { let rollingAttemptId = 0; +// simulate an fn operation without compilation +// TODO even better to mock cmompilation tbh +const fn = (expression: string) => + `const fn = (f) => (s) => f(s); export default [${expression}]`; + const getAttempt = (ext = {}, jobs?: any) => ({ id: `a${++rollingAttemptId}`, jobs: jobs || [ { id: 'j', adaptor: '@openfn/language-common@1.0.0', - body: JSON.stringify({ answer: 42 }), + body: fn('() => ({ answer: 42 })'), }, ], ...ext, @@ -51,7 +56,7 @@ test.serial( id: 'attempt-1', jobs: [ { - body: JSON.stringify({ count: 122 }), + body: fn('() => ({ count: 122 })'), }, ], }; @@ -69,7 +74,7 @@ test.serial( test.serial('should run an attempt which returns intial state', async (t) => { return new Promise((done) => { lng.addDataclip('x', { - route: 66, + data: 66, }); const attempt = { @@ -77,13 +82,13 @@ test.serial('should run an attempt which returns intial state', async (t) => { dataclip_id: 'x', jobs: [ { - body: 'whatever', + body: fn('(s) => s'), }, ], }; lng.waitForResult(attempt.id).then((result) => { - t.deepEqual(result, { route: 66 }); + t.deepEqual(result, { data: 66 }); done(); }); @@ -173,17 +178,17 @@ test.serial( id: 'some-job', credential_id: 'a', adaptor: '@openfn/language-common@1.0.0', - body: JSON.stringify({ answer: 42 }), + body: fn('() => ({ answer: 42 })'), }, ]); let didCallEvent = false; - lng.onSocketEvent(e.GET_CREDENTIAL, attempt.id, ({ payload }) => { + lng.onSocketEvent(e.GET_CREDENTIAL, attempt.id, () => { // again there's no way to check the right credential was returned didCallEvent = true; }); - lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { + lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, () => { t.true(didCallEvent); done(); }); @@ -268,11 +273,15 @@ test.serial( `events: lightning should receive a ${e.ATTEMPT_LOG} event`, (t) => { return new Promise((done) => { - const attempt = getAttempt(); - - let didCallEvent = false; + const attempt = { + id: 'attempt-1', + jobs: [ + { + body: fn('(s) => { console.log("x"); return s }'), + }, + ], + }; - // The mock runtime will put out a default log lng.onSocketEvent(e.ATTEMPT_LOG, attempt.id, ({ payload }) => { const log = payload; @@ -280,13 +289,10 @@ test.serial( t.truthy(log.attempt_id); t.truthy(log.run_id); t.truthy(log.message); - t.assert(log.message[0].startsWith('Running job')); - - didCallEvent = true; + t.deepEqual(log.message, ['x']); }); lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { - t.true(didCallEvent); done(); }); @@ -300,13 +306,13 @@ test.serial( test.serial.skip(`events: logs should have increasing timestamps`, (t) => { return new Promise((done) => { const attempt = getAttempt({}, [ - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, - { body: '{ x: 1 }', adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, ]); const history: bigint[] = []; @@ -372,7 +378,7 @@ test('should register and de-register attempts to the server', async (t) => { id: 'attempt-1', jobs: [ { - body: JSON.stringify({ count: 122 }), + body: fn('() => ({ count: 122 })'), }, ], }; @@ -398,7 +404,8 @@ test('should register and de-register attempts to the server', async (t) => { // TODO this is a server test // What I am testing here is that the first job completes // before the second job starts -test('should not claim while at capacity', async (t) => { +// TODO add wait helper +test.skip('should not claim while at capacity', async (t) => { return new Promise((done) => { const attempt1 = { id: 'attempt-1', From acf2f7cdbcb127404fce92ad4ac64e083981637a Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 15:27:53 +0000 Subject: [PATCH 06/11] tidyup --- packages/ws-worker/src/mock/runtime-engine.ts | 119 +----------------- packages/ws-worker/test/api/execute.test.ts | 3 +- 2 files changed, 4 insertions(+), 118 deletions(-) diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 65df0398f..ca46b8a13 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -1,6 +1,5 @@ -import crypto from 'node:crypto'; import { EventEmitter } from 'node:events'; -import run, { ExecutionPlan, JobNode, NotifyEvents } from '@openfn/runtime'; +import run, { ExecutionPlan, NotifyEvents } from '@openfn/runtime'; import * as engine from '@openfn/engine-multi'; import mockResolvers from './resolvers'; @@ -74,9 +73,9 @@ async function createMock() { const { id, jobs } = xplan; activeWorkflows[id!] = true; - // Call the crendtial callback, but don't do anything with it for (const job of jobs) { if (typeof job.configuration === 'string') { + // Call the crendtial callback, but don't do anything with it job.configuration = await options.resolvers.credential?.( job.configuration ); @@ -98,13 +97,10 @@ async function createMock() { const opts = { strict: false, - // logger? jobLogger, - // linker? ...options, callbacks: { notify: (name: NotifyEvents, payload: any) => { - // console.log(name, payload); // TODO events need to be mapped into runtime engine events (noot runtime events) dispatch(name, { workflowId: id, @@ -135,117 +131,6 @@ async function createMock() { // But as I don't think we use it, I'm happy to ignore this }; - // const executeJob = async ( - // workflowId: string, - // job: JobNode, - // initialState = {}, - // resolvers: engine.Resolvers = mockResolvers - // ) => { - // const { id, expression, configuration, adaptor } = job; - - // // If no expression or adaptor, this is (probably) a trigger node. - // // Silently do nothing - // if (!expression && !adaptor) { - // return initialState; - // } - - // const runId = crypto.randomUUID(); - - // const jobId = id; - // if (typeof configuration === 'string') { - // // Fetch the credential but do nothing with it - // // Maybe later we use it to assemble state - // await resolvers.credential?.(configuration); - // } - - // const info = (...message: any[]) => { - // dispatch('workflow-log', { - // workflowId, - // message: message, - // level: 'info', - // time: (BigInt(Date.now()) * BigInt(1e3)).toString(), - // name: 'mck', - // }); - // }; - - // // Get the job details from lightning - // // start instantly and emit as it goes - // dispatch('job-start', { workflowId, jobId, runId }); - // info('Running job ' + jobId); - // let nextState = initialState; - - // // @ts-ignore - // if (expression?.startsWith?.('wait@')) { - // const [_, delay] = (expression as string).split('@'); - // nextState = initialState; - // await new Promise((resolve) => { - // setTimeout(() => resolve(), parseInt(delay)); - // }); - // } else { - // // Try and parse the expression as JSON, in which case we use it as the final state - // try { - // // @ts-ignore - // nextState = JSON.parse(expression); - // // What does this look like? Should be a logger object - // info('Parsing expression as JSON state'); - // info(nextState); - // } catch (e) { - // // Do nothing, it's fine - // nextState = initialState; - // } - // } - - // dispatch('job-complete', { - // workflowId, - // jobId, - // state: nextState, - // runId, - // next: [], // TODO hmm. I think we need to do better than this. - // }); - - // return nextState; - // }; - - // // Start executing an ExecutionPlan - // // The mock uses lots of timeouts to make testing a bit easier and simulate asynchronicity - // const execute = ( - // xplan: ExecutionPlan, - // options: { resolvers?: engine.Resolvers; throw?: boolean } = { - // resolvers: mockResolvers, - // } - // ) => { - // // This is just an easy way to test the options gets fed through to execute - // // Also lets me test error handling! - // if (options.throw) { - // throw new Error('test error'); - // } - - // const { id, jobs, initialState } = xplan; - // const workflowId = id; - // activeWorkflows[id!] = true; - - // // TODO do we want to load a globals dataclip from job.state here? - // // This isn't supported right now - // // We would need to use resolvers.dataclip if we wanted it - - // setTimeout(() => { - // dispatch('workflow-start', { workflowId }); - // setTimeout(async () => { - // let state = initialState || {}; - // // Trivial job reducer in our mock - // for (const job of jobs) { - // state = await executeJob(id!, job, state, options.resolvers); - // } - // setTimeout(() => { - // delete activeWorkflows[id!]; - // dispatch('workflow-complete', { workflowId }); - // // TODO on workflow complete we should maybe tidy the listeners? - // // Doesn't really matter in the mock though - // }, 1); - // }, 1); - // }, 1); - // }; - // return a list of jobs in progress const getStatus = () => { return { diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index a74832edb..a091bb68e 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -24,7 +24,8 @@ import { import createMockRTE from '../../src/mock/runtime-engine'; import { mockChannel } from '../../src/mock/sockets'; import { stringify, createAttemptState } from '../../src/util'; -import { ExecutionPlan } from '@openfn/runtime'; + +import type { ExecutionPlan } from '@openfn/runtime'; import type { AttemptState } from '../../src/types'; const enc = new TextEncoder(); From 1ea8e6e707a033fbd592d75c125ce3b212e5e2da Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 16:37:50 +0000 Subject: [PATCH 07/11] worker: update tests to use new helpers --- packages/ws-worker/src/mock/runtime-engine.ts | 15 +- packages/ws-worker/test/api/execute.test.ts | 17 +-- packages/ws-worker/test/lightning.test.ts | 36 ++--- .../test/mock/runtime-engine.test.ts | 132 ++++-------------- 4 files changed, 58 insertions(+), 142 deletions(-) diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index ca46b8a13..06f1e374e 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -27,10 +27,11 @@ export type WorkflowErrorEvent = { // this is basically a fake adaptor // these functions will be injected into scope -// maybe -// needs me to add the globals option to the runtime -// (which is fine) -const helpers = {}; +const helpers = { + fn: (f: Function) => (s: any) => f(s), + wait: (duration: number) => (s: any) => + new Promise((resolve) => setTimeout(resolve, duration)), +}; // The mock runtime engine creates a fake engine interface // around a real runtime engine @@ -80,6 +81,11 @@ async function createMock() { job.configuration ); } + + // Fake compilation + if (job.expression && !job.expression.match(/export default \[/)) { + job.expression = `export default [${job.expression}];`; + } } // TODO do I need a more sophisticated solution here? @@ -99,6 +105,7 @@ async function createMock() { strict: false, jobLogger, ...options, + globals: helpers, callbacks: { notify: (name: NotifyEvents, payload: any) => { // TODO events need to be mapped into runtime engine events (noot runtime events) diff --git a/packages/ws-worker/test/api/execute.test.ts b/packages/ws-worker/test/api/execute.test.ts index a091bb68e..2b7c58e46 100644 --- a/packages/ws-worker/test/api/execute.test.ts +++ b/packages/ws-worker/test/api/execute.test.ts @@ -408,8 +408,7 @@ test('execute should pass the final result to onFinish', async (t) => { id: 'a', jobs: [ { - // TODO use new fn helper - expression: 'export default [() => ({ done: true })]', + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -433,8 +432,7 @@ test('execute should return a context object', async (t) => { id: 'a', jobs: [ { - // TODO use new fn helper - expression: 'export default [() => ({ done: true })]', + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -479,8 +477,7 @@ test('execute should lazy-load a credential', async (t) => { jobs: [ { configuration: 'abc', - // TODO use new fn helper - expression: 'export default [() => ({ done: true })]', + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -515,8 +512,7 @@ test('execute should lazy-load initial state', async (t) => { initialState: 'abc', jobs: [ { - // TODO use new fn helper - expression: 'export default [() => ({ done: true })]', + expression: 'fn(() => ({ done: true }))', }, ], }; @@ -550,7 +546,7 @@ test('execute should call all events on the socket', async (t) => { // GET_DATACLIP, // TODO not really implemented properly yet ATTEMPT_START, RUN_START, - ATTEMPT_LOG, // This won't log with the mock logger + ATTEMPT_LOG, RUN_COMPLETE, ATTEMPT_COMPLETE, ]; @@ -564,7 +560,7 @@ test('execute should call all events on the socket', async (t) => { id: 'trigger', configuration: 'a', adaptor: '@openfn/language-common@1.0.0', - expression: 'export default [() => console.log("x")]', + expression: 'fn(() => console.log("x"))', }, ], }; @@ -573,7 +569,6 @@ test('execute should call all events on the socket', async (t) => { return new Promise((done) => { execute(channel, engine, logger, plan, options, (result) => { - // console.log(events); // Check that events were passed to the socket // This is deliberately crude t.assert(allEvents.every((e) => events[e])); diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index bac5b9754..3fe37558b 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -30,18 +30,13 @@ test.before(async () => { let rollingAttemptId = 0; -// simulate an fn operation without compilation -// TODO even better to mock cmompilation tbh -const fn = (expression: string) => - `const fn = (f) => (s) => f(s); export default [${expression}]`; - const getAttempt = (ext = {}, jobs?: any) => ({ id: `a${++rollingAttemptId}`, jobs: jobs || [ { id: 'j', adaptor: '@openfn/language-common@1.0.0', - body: fn('() => ({ answer: 42 })'), + body: 'fn(() => ({ answer: 42 }))', }, ], ...ext, @@ -56,7 +51,7 @@ test.serial( id: 'attempt-1', jobs: [ { - body: fn('() => ({ count: 122 })'), + body: 'fn(() => ({ count: 122 }))', }, ], }; @@ -82,7 +77,7 @@ test.serial('should run an attempt which returns intial state', async (t) => { dataclip_id: 'x', jobs: [ { - body: fn('(s) => s'), + body: 'fn((s) => s)', }, ], }; @@ -178,7 +173,7 @@ test.serial( id: 'some-job', credential_id: 'a', adaptor: '@openfn/language-common@1.0.0', - body: fn('() => ({ answer: 42 })'), + body: 'fn(() => ({ answer: 42 }))', }, ]); @@ -277,7 +272,7 @@ test.serial( id: 'attempt-1', jobs: [ { - body: fn('(s) => { console.log("x"); return s }'), + body: 'fn((s) => { console.log("x"); return s })', }, ], }; @@ -306,13 +301,14 @@ test.serial( test.serial.skip(`events: logs should have increasing timestamps`, (t) => { return new Promise((done) => { const attempt = getAttempt({}, [ - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, - { body: fn('() => ({ data: 1 })'), adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, + { body: 'fn(() => ({ data: 1 }))', adaptor: 'common' }, ]); const history: bigint[] = []; @@ -378,7 +374,7 @@ test('should register and de-register attempts to the server', async (t) => { id: 'attempt-1', jobs: [ { - body: fn('() => ({ count: 122 })'), + body: 'fn(() => ({ count: 122 }))', }, ], }; @@ -411,7 +407,7 @@ test.skip('should not claim while at capacity', async (t) => { id: 'attempt-1', jobs: [ { - body: 'wait@500', + body: 'wait(500)', }, ], }; @@ -453,5 +449,3 @@ test.skip('should not claim while at capacity', async (t) => { // hmm, i don't even think I can test this in the mock runtime test.skip('should pass the right dataclip when running in parallel', () => {}); - -test.todo(`should run multiple attempts`); diff --git a/packages/ws-worker/test/mock/runtime-engine.test.ts b/packages/ws-worker/test/mock/runtime-engine.test.ts index fabf88451..14dc2b843 100644 --- a/packages/ws-worker/test/mock/runtime-engine.test.ts +++ b/packages/ws-worker/test/mock/runtime-engine.test.ts @@ -14,27 +14,24 @@ const sampleWorkflow = { { id: 'j1', adaptor: 'common@1.0.0', - expression: 'export default [() => ({ x: 10 })]', + expression: 'fn(() => ({ data: { x: 10 } }))', }, ], } as ExecutionPlan; -// rethinking these tests, I think I want -// - fake adaptor functions, fn & wait -// - deeper testing on the engine events -// - fake compilation -// - do we do anything about adaptors? +let engine; + +test.before(async () => { + engine = await create(); +}); test('getStatus() should should have no active workflows', async (t) => { - const engine = await create(); const { active } = engine.getStatus(); t.is(active, 0); }); test('Dispatch start events for a new workflow', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'workflow-start'); t.truthy(evt); @@ -42,7 +39,6 @@ test('Dispatch start events for a new workflow', async (t) => { }); test('getStatus should report one active workflow', async (t) => { - const engine = await create(); engine.execute(sampleWorkflow); const { active } = engine.getStatus(); @@ -51,8 +47,6 @@ test('getStatus should report one active workflow', async (t) => { }); test('Dispatch complete events when a workflow completes', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent( engine, @@ -63,8 +57,6 @@ test('Dispatch complete events when a workflow completes', async (t) => { }); test('Dispatch start events for a job', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'job-start'); t.truthy(evt); @@ -73,114 +65,51 @@ test('Dispatch start events for a job', async (t) => { }); test('Dispatch complete events for a job', async (t) => { - const engine = await create(); - engine.execute(sampleWorkflow); const evt = await waitForEvent(engine, 'job-complete'); t.truthy(evt); t.is(evt.workflowId, 'w1'); t.is(evt.jobId, 'j1'); - t.truthy(evt.state); -}); - -test('mock should evaluate expressions as JSON', async (t) => { - const engine = await create(); - - engine.execute(sampleWorkflow); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { x: 10 }); + t.deepEqual(evt.state, { data: { x: 10 } }); }); -// well, maybe it shouldn't -// We should have real looking expressions -// albiet with no compilation -// we could fake compilation though -test.skip('mock should wait if expression starts with @wait', async (t) => { - const engine = await create(); +test('Dispatch error event for a crash', async (t) => { const wf = { - id: 'w1', + id: 'xyz', jobs: [ { id: 'j1', - expression: 'wait@100', + adaptor: 'common@1.0.0', + expression: 'fn(() => ( @~!"@£!4 )', }, ], } as ExecutionPlan; - engine.execute(wf); - const start = Date.now(); - const evt = await waitForEvent(engine, 'workflow-complete'); - const end = Date.now() - start; - t.true(end > 90); -}); -// nope -test.skip('mock should return initial state as result state', async (t) => { - const engine = await create(); - - const wf = { - initialState: { y: 22 }, - jobs: [ - { - adaptor: 'common@1.0.0', - }, - ], - }; engine.execute(wf); + const evt = await waitForEvent(engine, 'workflow-error'); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { y: 22 }); + t.is(evt.workflowId, 'xyz'); + t.is(evt.type, 'RuntimeCrash'); + t.regex(evt.message, /invalid or unexpected token/i); }); -// nope -test.skip('mock prefers JSON state to initial state', async (t) => { - const engine = await create(); - +test('wait function', async (t) => { const wf = { - initialState: { y: 22 }, + id: 'w1', jobs: [ { - adaptor: 'common@1.0.0', - expression: '{ "z": 33 }', + id: 'j1', + expression: 'wait(100)', }, ], - }; + } as ExecutionPlan; engine.execute(wf); + const start = Date.now(); - const evt = await waitForEvent(engine, 'job-complete'); - t.deepEqual(evt.state, { z: 33 }); -}); - -// logs yes, json no -test.skip('mock should dispatch log events when evaluating JSON', async (t) => { - const engine = await create(); - - const logs = []; - engine.on('workflow-log', (l) => { - logs.push(l); - }); - - engine.execute(sampleWorkflow); - await waitForEvent(engine, 'workflow-complete'); - - t.deepEqual(logs[0].message, ['Running job j1']); - t.deepEqual(logs[1].message, ['Parsing expression as JSON state']); -}); - -// nope, the engine should not throw at all -test.skip('mock should throw if the magic option is passed', async (t) => { - const engine = await create(); + await waitForEvent(engine, 'workflow-complete'); - const logs = []; - engine.on('workflow-log', (l) => { - logs.push(l); - }); - - await t.throwsAsync( - async () => engine.execute(sampleWorkflow, { throw: true }), - { - message: 'test error', - } - ); + const end = Date.now() - start; + t.true(end > 90); }); test('resolve credential before job-start if credential is a string', async (t) => { @@ -193,7 +122,6 @@ test('resolve credential before job-start if credential is a string', async (t) return {}; }; - const engine = await create(); // @ts-ignore engine.execute(wf, { resolvers: { credential } }); @@ -202,8 +130,6 @@ test('resolve credential before job-start if credential is a string', async (t) }); test('listen to events', async (t) => { - const engine = await create(); - const called = { 'job-start': false, 'job-complete': false, @@ -256,8 +182,6 @@ test('listen to events', async (t) => { }); test('only listen to events for the correct workflow', async (t) => { - const engine = await create(); - engine.listen('bobby mcgee', { 'workflow-start': ({ workflowId }) => { throw new Error('should not have called this!!'); @@ -269,19 +193,17 @@ test('only listen to events for the correct workflow', async (t) => { t.pass(); }); -test.skip('do nothing for a job if no expression and adaptor (trigger node)', async (t) => { +test('do nothing for a job if no expression and adaptor (trigger node)', async (t) => { const workflow = { id: 'w1', jobs: [ { id: 'j1', - expression: 'export default [() => console.log("x"); )]', + adaptor: '@openfn/language-common@1.0.0', }, ], } as ExecutionPlan; - const engine = await create(); - let didCallEvent = false; engine.listen(workflow.id, { @@ -305,7 +227,5 @@ test.skip('do nothing for a job if no expression and adaptor (trigger node)', as engine.execute(workflow); await waitForEvent(engine, 'workflow-complete'); - console.log(); - t.false(didCallEvent); }); From c2aa7ce8e4a72c5a5d1ec57b77d5d562237ee7a5 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 17:12:56 +0000 Subject: [PATCH 08/11] lightning-mock: allow to unsubscribe from listeners --- packages/lightning-mock/src/api-dev.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/lightning-mock/src/api-dev.ts b/packages/lightning-mock/src/api-dev.ts index d3a32cbf6..6ddf7260e 100644 --- a/packages/lightning-mock/src/api-dev.ts +++ b/packages/lightning-mock/src/api-dev.ts @@ -111,16 +111,18 @@ const setupDevAPI = ( attemptId: string, fn: (evt: any) => void, once = true - ) => { + ): (() => void) => { + const unsubscribe = () => state.events.removeListener(event, handler); function handler(e: any) { if (e.attemptId && e.attemptId === attemptId) { if (once) { - state.events.removeListener(event, handler); + unsubscribe(); } fn(e); } } state.events.addListener(event, handler); + return unsubscribe; }; }; From 47f491add64befee605b079cb223cb7767e13817 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 17:18:06 +0000 Subject: [PATCH 09/11] worker: epic parallelisation test --- packages/ws-worker/test/lightning.test.ts | 84 ++++++++++++++++++++++- 1 file changed, 82 insertions(+), 2 deletions(-) diff --git a/packages/ws-worker/test/lightning.test.ts b/packages/ws-worker/test/lightning.test.ts index 3fe37558b..72cf22167 100644 --- a/packages/ws-worker/test/lightning.test.ts +++ b/packages/ws-worker/test/lightning.test.ts @@ -447,5 +447,85 @@ test.skip('should not claim while at capacity', async (t) => { }); }); -// hmm, i don't even think I can test this in the mock runtime -test.skip('should pass the right dataclip when running in parallel', () => {}); +test('should pass the right dataclip when running in parallel', (t) => { + return new Promise((done) => { + const job = (id: string, next?: string) => ({ + id, + body: `fn((s) => { s.data.${id} = true; return s; })`, + }); + + const edge = (from: string, to: string) => ({ + id: `${from}-${to}`, + source_job_id: from, + target_job_id: to, + }); + + const outputDataclipIds = {}; + const inputDataclipIds = {}; + const outputs = {}; + const a = { + id: 'a', + body: 'fn(() => ({ data: { a: true } }))', + next: { j: true, k: true }, + }; + + const j = job('j', 'x'); + const k = job('k', 'y'); + const x = job('x'); + const y = job('y'); + + const attempt = { + id: 'p1', + jobs: [a, j, k, x, y], + edges: [edge('a', 'j'), edge('a', 'k'), edge('j', 'x'), edge('k', 'y')], + }; + + // Save all the input dataclip ids for each job + const unsub2 = lng.onSocketEvent( + e.RUN_START, + attempt.id, + ({ payload }) => { + inputDataclipIds[payload.job_id] = payload.input_dataclip_id; + }, + false + ); + + // Save all the output dataclips & ids for each job + const unsub1 = lng.onSocketEvent( + e.RUN_COMPLETE, + attempt.id, + ({ payload }) => { + outputDataclipIds[payload.job_id] = payload.output_dataclip_id; + outputs[payload.job_id] = JSON.parse(payload.output_dataclip); + }, + false + ); + + lng.onSocketEvent(e.ATTEMPT_COMPLETE, attempt.id, (evt) => { + unsub1(); + unsub2(); + + // Now check everything was correct + + // Job a we don't really care about, but check the output anyway + t.deepEqual(outputs.a.data, { a: true }); + + // a feeds in to j and k + t.deepEqual(inputDataclipIds.j, outputDataclipIds.a); + t.deepEqual(inputDataclipIds.k, outputDataclipIds.a); + + // j feeds into x + t.deepEqual(inputDataclipIds.x, outputDataclipIds.j); + + // k feeds into y + t.deepEqual(inputDataclipIds.y, outputDataclipIds.k); + + // x and y should have divergent states + t.deepEqual(outputs.x.data, { a: true, j: true, x: true }); + t.deepEqual(outputs.y.data, { a: true, k: true, y: true }); + done(); + }); + + lng.enqueueAttempt(attempt); + }); +}); From 431b43aaf9003825634dda321adc16f045a1f38c Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 17:35:28 +0000 Subject: [PATCH 10/11] worker: typings and tidies --- packages/ws-worker/src/mock/runtime-engine.ts | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/packages/ws-worker/src/mock/runtime-engine.ts b/packages/ws-worker/src/mock/runtime-engine.ts index 06f1e374e..bbf5324e6 100644 --- a/packages/ws-worker/src/mock/runtime-engine.ts +++ b/packages/ws-worker/src/mock/runtime-engine.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'node:events'; -import run, { ExecutionPlan, NotifyEvents } from '@openfn/runtime'; +import run, { ExecutionPlan } from '@openfn/runtime'; import * as engine from '@openfn/engine-multi'; import mockResolvers from './resolvers'; @@ -30,7 +30,7 @@ export type WorkflowErrorEvent = { const helpers = { fn: (f: Function) => (s: any) => f(s), wait: (duration: number) => (s: any) => - new Promise((resolve) => setTimeout(resolve, duration)), + new Promise((resolve) => setTimeout(() => resolve(s), duration)), }; // The mock runtime engine creates a fake engine interface @@ -77,20 +77,23 @@ async function createMock() { for (const job of jobs) { if (typeof job.configuration === 'string') { // Call the crendtial callback, but don't do anything with it - job.configuration = await options.resolvers.credential?.( + job.configuration = await options.resolvers?.credential?.( job.configuration ); } // Fake compilation - if (job.expression && !job.expression.match(/export default \[/)) { + if ( + typeof job.expression === 'string' && + !(job.expression as string).match(/export default \[/) + ) { job.expression = `export default [${job.expression}];`; } } // TODO do I need a more sophisticated solution here? const jobLogger = { - log: (...args) => { + log: (...args: any[]) => { dispatch('workflow-log', { workflowId: id, level: 'info', @@ -107,11 +110,10 @@ async function createMock() { ...options, globals: helpers, callbacks: { - notify: (name: NotifyEvents, payload: any) => { - // TODO events need to be mapped into runtime engine events (noot runtime events) + notify: (name: any, payload: any) => { dispatch(name, { workflowId: id, - ...payload, // ? + ...payload, }); }, }, @@ -120,9 +122,8 @@ async function createMock() { dispatch('workflow-start', { workflowId: id }); try { - await run(xplan, undefined, opts); + await run(xplan, undefined, opts as any); } catch (e: any) { - // TODO I have no test on this dispatch('workflow-error', { workflowId: id, type: e.name, From 40ffc226edecb728322fdd0c76114a56147625d8 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 17 Nov 2023 17:39:39 +0000 Subject: [PATCH 11/11] runtime: changeset --- .changeset/lovely-dodos-guess.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/lovely-dodos-guess.md diff --git a/.changeset/lovely-dodos-guess.md b/.changeset/lovely-dodos-guess.md new file mode 100644 index 000000000..cf75f4a47 --- /dev/null +++ b/.changeset/lovely-dodos-guess.md @@ -0,0 +1,5 @@ +--- +'@openfn/runtime': minor +--- + +Allow globals to be passed into the execution environment