diff --git a/packages/test/src/test-workflows.ts b/packages/test/src/test-workflows.ts index 7330e0bf9..c7e66e9bc 100644 --- a/packages/test/src/test-workflows.ts +++ b/packages/test/src/test-workflows.ts @@ -32,6 +32,8 @@ export interface Context { startTime: number; runId: string; workflowCreator: TestVMWorkflowCreator | TestReusableVMWorkflowCreator; + workflowNonOtelCreator: TestVMWorkflowCreator | TestReusableVMWorkflowCreator; + workflowOtelCreator: TestVMWorkflowCreator | TestReusableVMWorkflowCreator; } const test = anyTest as TestFn; @@ -63,25 +65,63 @@ class TestReusableVMWorkflowCreator extends ReusableVMWorkflowCreator { } } +const OTEL_TAG = '[otel]'; +const NON_OTEL_TAG = '[no-otel]'; +/** + * Helper function for generating 2 tests. One using a bundle with otel interceoptors and one without. + */ +function testWithOtel( + name: string, + impl: (t: ExecutionContext, args: any[]) => Promise, + ...args: any[] +) { + test(`${NON_OTEL_TAG} ${name}`, impl, args); + test(`${OTEL_TAG} ${name}`, impl, args); +} + test.before(async (t) => { const workflowsPath = path.join(__dirname, 'workflows'); const bundler = new WorkflowCodeBundler({ workflowsPath }); const workflowBundle = parseWorkflowCode((await bundler.createBundle()).code); + const otelBundler = new WorkflowCodeBundler({ + workflowsPath, + workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')], + }); + const workflowOtelBundle = parseWorkflowCode((await otelBundler.createBundle()).code); // FIXME: isolateExecutionTimeoutMs used to be 200 ms, but that's causing // lot of flakes on CI. Revert this after investigation / resolution. - t.context.workflowCreator = REUSE_V8_CONTEXT + t.context.workflowNonOtelCreator = REUSE_V8_CONTEXT ? await TestReusableVMWorkflowCreator.create(workflowBundle, 400, new Set()) : await TestVMWorkflowCreator.create(workflowBundle, 400, new Set()); + t.context.workflowOtelCreator = REUSE_V8_CONTEXT + ? await TestReusableVMWorkflowCreator.create(workflowOtelBundle, 400, new Set()) + : await TestVMWorkflowCreator.create(workflowOtelBundle, 400, new Set()); }); test.after.always(async (t) => { - await t.context.workflowCreator.destroy(); + await t.context.workflowNonOtelCreator.destroy(); + await t.context.workflowOtelCreator.destroy(); + if (t.context.workflowCreator) { + await t.context.workflowCreator.destroy(); + } }); test.beforeEach(async (t) => { - const { workflowCreator } = t.context; + const { workflowOtelCreator, workflowNonOtelCreator } = t.context; const workflowType = t.title.match(/\S+$/)![0]; const runId = t.title; + let useOtel: boolean; + if (runId.includes(OTEL_TAG)) { + useOtel = true; + } else if (runId.includes(NON_OTEL_TAG)) { + useOtel = false; + } else { + t.fail( + `'${runId}' did not include an otel prefix. Use 'testPlainAndOtel' to write new tests instead of raw 'test'` + ); + return; + } + const workflowCreator = useOtel ? workflowOtelCreator : workflowNonOtelCreator; const logs = new Array(); workflowCreator.logs[runId] = logs; const startTime = Date.now(); @@ -94,6 +134,8 @@ test.beforeEach(async (t) => { workflowCreator, startTime, workflow, + workflowOtelCreator, + workflowNonOtelCreator, }; }); @@ -164,6 +206,15 @@ function compareCompletion( expected: coresdk.workflow_completion.IWorkflowActivationCompletion ) { const stackTraces = extractFailureStackTraces(req, expected); + if (req.successful?.commands && expected.successful?.commands) { + const numCommands = req.successful.commands.length; + const reqPayload = req.successful.commands[numCommands - 1]?.completeWorkflowExecution?.result; + const expPayload = expected.successful.commands[numCommands - 1]?.completeWorkflowExecution?.result; + if (reqPayload && expPayload) { + t.deepEqual(defaultPayloadConverter.fromPayload(reqPayload), defaultPayloadConverter.fromPayload(expPayload)); + } + } + removeOtelMetadata(req); t.deepEqual( coresdk.workflow_completion.WorkflowActivationCompletion.create(req).toJSON(), coresdk.workflow_completion.WorkflowActivationCompletion.create({ @@ -210,6 +261,27 @@ function extractFailureStackTraces( return stackTraces; } +// Filter out metadata produced by OTEL interceoptors +function removeOtelMetadata(req: coresdk.workflow_completion.IWorkflowActivationCompletion) { + if (!req.successful) { + return; + } + if (req.successful.usedInternalFlags) { + req.successful.usedInternalFlags = req.successful.usedInternalFlags.filter((flag) => ![3, 4, 5].includes(flag)); + } + if (!req.successful.commands) { + return; + } + for (const command of req.successful.commands) { + if (command.scheduleActivity?.headers) { + delete command.scheduleActivity.headers; + } + if (command.continueAsNewWorkflowExecution?.headers) { + delete command.continueAsNewWorkflowExecution.headers; + } + } +} + function makeSuccess( commands: coresdk.workflow_commands.IWorkflowCommand[] = [makeCompleteWorkflowExecution()], usedInternalFlags: SdkFlag[] = [] @@ -437,7 +509,7 @@ function makeUpdateCompleteResponse(id: string, result: unknown): coresdk.workfl }; } -test('random', async (t) => { +testWithOtel('random', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -457,13 +529,13 @@ test('random', async (t) => { t.deepEqual(logs, [[0.8380154962651432], ['a50eca73-ff3e-4445-a512-2330c2f4f86e'], [0.18803317612037063]]); }); -test('successString', async (t) => { +testWithOtel('successString', async (t) => { const { workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('success'))])); }); -test('continueAsNewSuggested', async (t) => { +testWithOtel('continueAsNewSuggested', async (t) => { const { workflowType } = t.context; const activation = makeStartWorkflow(workflowType); activation.continueAsNewSuggested = true; @@ -491,7 +563,7 @@ function cleanWorkflowQueryFailureStackTrace( return req; } -test('throwAsync', async (t) => { +testWithOtel('throwAsync', async (t) => { const { workflowType } = t.context; const req = cleanWorkflowFailureStackTrace(await activate(t, makeStartWorkflow(workflowType))); compareCompletion( @@ -510,27 +582,27 @@ test('throwAsync', async (t) => { ); }); -test('date', async (t) => { +testWithOtel('date', async (t) => { const { startTime, logs, workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType, undefined, startTime)); compareCompletion(t, req, makeSuccess()); t.deepEqual(logs, [[startTime], [startTime], [true], [true], [true], [true], [true]]); }); -test('asyncWorkflow', async (t) => { +testWithOtel('asyncWorkflow', async (t) => { const { workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess([makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload('async'))])); }); -test('deferredResolve', async (t) => { +testWithOtel('deferredResolve', async (t) => { const { logs, workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess()); t.deepEqual(logs, [[1], [2]]); }); -test('sleeper', async (t) => { +testWithOtel('sleeper', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -543,7 +615,7 @@ test('sleeper', async (t) => { t.deepEqual(logs, [['slept']]); }); -test('with ms string - sleeper', async (t) => { +testWithOtel('with ms string - sleeper', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType, [defaultPayloadConverter.toPayload('10s')])); @@ -556,7 +628,7 @@ test('with ms string - sleeper', async (t) => { t.deepEqual(logs, [['slept']]); }); -test('setTimeoutAfterMicroTasks', async (t) => { +testWithOtel('setTimeoutAfterMicroTasks', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -569,28 +641,28 @@ test('setTimeoutAfterMicroTasks', async (t) => { t.deepEqual(logs, [['slept']]); }); -test('promiseThenPromise', async (t) => { +testWithOtel('promiseThenPromise', async (t) => { const { logs, workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess()); t.deepEqual(logs, [[2]]); }); -test('rejectPromise', async (t) => { +testWithOtel('rejectPromise', async (t) => { const { logs, workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess()); t.deepEqual(logs, [[true], [true]]); }); -test('promiseAll', async (t) => { +testWithOtel('promiseAll', async (t) => { const { logs, workflowType } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, req, makeSuccess()); t.deepEqual(logs, [[1, 2, 3], [1, 2, 3], [1, 2, 3], ['wow']]); }); -test('tasksAndMicrotasks', async (t) => { +testWithOtel('tasksAndMicrotasks', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -614,7 +686,7 @@ test('tasksAndMicrotasks', async (t) => { t.deepEqual(logs, [['script start'], ['script end'], ['promise1'], ['promise2'], ['setTimeout']]); }); -test('trailingTimer', async (t) => { +testWithOtel('trailingTimer', async (t) => { const { logs, workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -643,7 +715,7 @@ test('trailingTimer', async (t) => { t.deepEqual(logs, []); }); -test('promiseRace', async (t) => { +testWithOtel('promiseRace', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -663,7 +735,7 @@ test('promiseRace', async (t) => { t.deepEqual(logs, [[1], [1], [1], [1], [20], ['wow']]); }); -test('race', async (t) => { +testWithOtel('race', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -687,7 +759,7 @@ test('race', async (t) => { t.deepEqual(logs, [[1], [2], [3]]); }); -test('importer', async (t) => { +testWithOtel('importer', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -711,7 +783,7 @@ test('importer', async (t) => { t.deepEqual(logs, [['slept']]); }); -test('argsAndReturn', async (t) => { +testWithOtel('argsAndReturn', async (t) => { const { workflowType } = t.context; const req = await activate( t, @@ -741,7 +813,7 @@ test('argsAndReturn', async (t) => { ); }); -test('invalidOrFailedQueries', async (t) => { +testWithOtel('invalidOrFailedQueries', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -796,7 +868,7 @@ test('invalidOrFailedQueries', async (t) => { } }); -test('interruptableWorkflow', async (t) => { +testWithOtel('interruptableWorkflow', async (t) => { const { workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -830,7 +902,7 @@ test('interruptableWorkflow', async (t) => { } }); -test('failSignalWorkflow', async (t) => { +testWithOtel('failSignalWorkflow', async (t) => { const { workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -859,7 +931,7 @@ test('failSignalWorkflow', async (t) => { } }); -test('asyncFailSignalWorkflow', async (t) => { +testWithOtel('asyncFailSignalWorkflow', async (t) => { const { workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -898,7 +970,7 @@ test('asyncFailSignalWorkflow', async (t) => { } }); -test('cancelWorkflow', async (t) => { +testWithOtel('cancelWorkflow', async (t) => { const url = 'https://temporal.io'; const { workflowType } = t.context; { @@ -970,7 +1042,7 @@ test('cancelWorkflow', async (t) => { } }); -test('cancel - unblockOrCancel', async (t) => { +testWithOtel('cancel - unblockOrCancel', async (t) => { const { workflowType, logs } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -983,7 +1055,7 @@ test('cancel - unblockOrCancel', async (t) => { t.deepEqual(logs, [['Blocked'], ['Cancelled']]); }); -test('unblock - unblockOrCancel', async (t) => { +testWithOtel('unblock - unblockOrCancel', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1024,7 +1096,7 @@ test('unblock - unblockOrCancel', async (t) => { } }); -test('cancelTimer', async (t) => { +testWithOtel('cancelTimer', async (t) => { const { workflowType, logs } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion( @@ -1039,7 +1111,7 @@ test('cancelTimer', async (t) => { t.deepEqual(logs, [['Timer cancelled 👍']]); }); -test('cancelTimerAltImpl', async (t) => { +testWithOtel('cancelTimerAltImpl', async (t) => { const { workflowType, logs } = t.context; const req = await activate(t, makeStartWorkflow(workflowType)); compareCompletion( @@ -1054,7 +1126,7 @@ test('cancelTimerAltImpl', async (t) => { t.deepEqual(logs, [['Timer cancelled 👍']]); }); -test('nonCancellable', async (t) => { +testWithOtel('nonCancellable', async (t) => { const { workflowType } = t.context; const url = 'https://temporal.io'; const result = defaultPayloadConverter.toPayload({ test: true }); @@ -1083,7 +1155,7 @@ test('nonCancellable', async (t) => { } }); -test('resumeAfterCancellation', async (t) => { +testWithOtel('resumeAfterCancellation', async (t) => { const { workflowType } = t.context; const url = 'https://temporal.io'; const result = defaultPayloadConverter.toPayload({ test: true }); @@ -1116,7 +1188,7 @@ test('resumeAfterCancellation', async (t) => { } }); -test('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => { +testWithOtel('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => { const { workflowType } = t.context; const url = 'https://temporal.io'; const data = { content: 'new HTML content' }; @@ -1178,7 +1250,7 @@ test('handleExternalWorkflowCancellationWhileActivityRunning', async (t) => { } }); -test('nestedCancellation', async (t) => { +testWithOtel('nestedCancellation', async (t) => { const { workflowType } = t.context; const url = 'https://temporal.io'; { @@ -1265,7 +1337,7 @@ test('nestedCancellation', async (t) => { } }); -test('sharedCancellationScopes', async (t) => { +testWithOtel('sharedCancellationScopes', async (t) => { const { workflowType } = t.context; const result = { some: 'data' }; { @@ -1304,7 +1376,7 @@ test('sharedCancellationScopes', async (t) => { } }); -test('nonCancellableAwaitedInRootScope', async (t) => { +testWithOtel('nonCancellableAwaitedInRootScope', async (t) => { const { workflowType } = t.context; const result = { some: 'data' }; { @@ -1344,7 +1416,7 @@ test('nonCancellableAwaitedInRootScope', async (t) => { } }); -test('cancellationScopesWithCallbacks', async (t) => { +testWithOtel('cancellationScopesWithCallbacks', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1370,7 +1442,7 @@ test('cancellationScopesWithCallbacks', async (t) => { } }); -test('cancellationScopes', async (t) => { +testWithOtel('cancellationScopes', async (t) => { const { workflowType, logs } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1405,7 +1477,7 @@ test('cancellationScopes', async (t) => { ]); }); -test('childAndNonCancellable', async (t) => { +testWithOtel('childAndNonCancellable', async (t) => { const { workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1417,7 +1489,7 @@ test('childAndNonCancellable', async (t) => { } }); -test('partialNonCancellable', async (t) => { +testWithOtel('partialNonCancellable', async (t) => { const { workflowType, logs } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1456,7 +1528,7 @@ test('partialNonCancellable', async (t) => { t.deepEqual(logs, [['Workflow cancelled']]); }); -test('nonCancellableInNonCancellable', async (t) => { +testWithOtel('nonCancellableInNonCancellable', async (t) => { const { workflowType, logs } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1480,7 +1552,7 @@ test('nonCancellableInNonCancellable', async (t) => { t.deepEqual(logs, [['Timer 1 finished 👍'], ['Timer 0 finished 👍']]); }); -test('cancellationErrorIsPropagated', async (t) => { +testWithOtel('cancellationErrorIsPropagated', async (t) => { const { workflowType, logs } = t.context; const req = cleanWorkflowFailureStackTrace(await activate(t, makeStartWorkflow(workflowType)), 2); compareCompletion( @@ -1512,7 +1584,7 @@ test('cancellationErrorIsPropagated', async (t) => { t.deepEqual(logs, []); }); -test('cancelActivityAfterFirstCompletion', async (t) => { +testWithOtel('cancelActivityAfterFirstCompletion', async (t) => { const url = 'https://temporal.io'; const { workflowType, logs } = t.context; { @@ -1574,7 +1646,7 @@ test('cancelActivityAfterFirstCompletion', async (t) => { t.deepEqual(logs, [['Workflow cancelled while waiting on non cancellable scope']]); }); -test('multipleActivitiesSingleTimeout', async (t) => { +testWithOtel('multipleActivitiesSingleTimeout', async (t) => { const urls = ['https://slow-site.com/', 'https://slow-site.org/']; const { workflowType } = t.context; { @@ -1623,7 +1695,7 @@ test('multipleActivitiesSingleTimeout', async (t) => { } }); -test('resolve activity with result - http', async (t) => { +testWithOtel('resolve activity with result - http', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1659,7 +1731,7 @@ test('resolve activity with result - http', async (t) => { } }); -test('resolve activity with failure - http', async (t) => { +testWithOtel('resolve activity with failure - http', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1703,7 +1775,7 @@ test('resolve activity with failure - http', async (t) => { } }); -test('globalOverrides', async (t) => { +testWithOtel('globalOverrides', async (t) => { const { workflowType, logs } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1717,7 +1789,7 @@ test('globalOverrides', async (t) => { ); }); -test('logAndTimeout', async (t) => { +testWithOtel('logAndTimeout', async (t) => { const { workflowType, workflow } = t.context; const completion = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, completion, { @@ -1734,6 +1806,9 @@ test('logAndTimeout', async (t) => { // Ignore LogTimestamp and workflowInfo for the purpose of this comparison calls.forEach((call) => { delete call.args[1]?.[LogTimestamp]; + delete call.args[1]?.['span_id']; + delete call.args[1]?.['trace_id']; + delete call.args[1]?.['trace_flags']; delete (call as any).workflowInfo; }); t.deepEqual(calls, [ @@ -1744,7 +1819,7 @@ test('logAndTimeout', async (t) => { 'Workflow started', { namespace: 'default', - runId: 'beforeEach hook for logAndTimeout', + runId: t.context.runId, taskQueue: 'test', workflowId: 'test-workflowId', workflowType: 'logAndTimeout', @@ -1759,7 +1834,7 @@ test('logAndTimeout', async (t) => { 'logging before getting stuck', { namespace: 'default', - runId: 'beforeEach hook for logAndTimeout', + runId: t.context.runId, taskQueue: 'test', workflowId: 'test-workflowId', workflowType: 'logAndTimeout', @@ -1770,7 +1845,7 @@ test('logAndTimeout', async (t) => { ]); }); -test('continueAsNewSameWorkflow', async (t) => { +testWithOtel('continueAsNewSameWorkflow', async (t) => { const { workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1791,7 +1866,7 @@ test('continueAsNewSameWorkflow', async (t) => { } }); -test('not-replay patchedWorkflow', async (t) => { +testWithOtel('not-replay patchedWorkflow', async (t) => { const { logs, workflowType } = t.context; { const req = await activate(t, makeStartWorkflow(workflowType)); @@ -1811,7 +1886,7 @@ test('not-replay patchedWorkflow', async (t) => { t.deepEqual(logs, [['has change'], ['has change 2']]); }); -test('replay-no-marker patchedWorkflow', async (t) => { +testWithOtel('replay-no-marker patchedWorkflow', async (t) => { const { logs, workflowType } = t.context; { const act: coresdk.workflow_activation.IWorkflowActivation = { @@ -1836,7 +1911,7 @@ test('replay-no-marker patchedWorkflow', async (t) => { t.deepEqual(logs, [['no change'], ['no change 2']]); }); -test('replay-no-marker-then-not-replay patchedWorkflow', async (t) => { +testWithOtel('replay-no-marker-then-not-replay patchedWorkflow', async (t) => { const { logs, workflowType } = t.context; { const act: coresdk.workflow_activation.IWorkflowActivation = { @@ -1860,7 +1935,7 @@ test('replay-no-marker-then-not-replay patchedWorkflow', async (t) => { t.deepEqual(logs, [['no change'], ['has change 2']]); }); -test('replay-with-marker patchedWorkflow', async (t) => { +testWithOtel('replay-with-marker patchedWorkflow', async (t) => { const { logs, workflowType } = t.context; { const act: coresdk.workflow_activation.IWorkflowActivation = { @@ -1886,7 +1961,7 @@ test('replay-with-marker patchedWorkflow', async (t) => { t.deepEqual(logs, [['has change'], ['has change 2']]); }); -test('deprecatePatchWorkflow', async (t) => { +testWithOtel('deprecatePatchWorkflow', async (t) => { const { logs, workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1899,14 +1974,14 @@ test('deprecatePatchWorkflow', async (t) => { t.deepEqual(logs, [['has change']]); }); -test('patchedTopLevel', async (t) => { +testWithOtel('patchedTopLevel', async (t) => { const { workflowType, logs } = t.context; const completion = await activate(t, makeStartWorkflow(workflowType)); compareCompletion(t, completion, makeSuccess()); t.deepEqual(logs, [[['Patches cannot be used before Workflow starts']]]); }); -test('tryToContinueAfterCompletion', async (t) => { +testWithOtel('tryToContinueAfterCompletion', async (t) => { const { workflowType } = t.context; { const completion = cleanWorkflowFailureStackTrace(await activate(t, makeStartWorkflow(workflowType))); @@ -1927,7 +2002,7 @@ test('tryToContinueAfterCompletion', async (t) => { } }); -test('failUnlessSignaledBeforeStart', async (t) => { +testWithOtel('failUnlessSignaledBeforeStart', async (t) => { const { workflowType } = t.context; const completion = await activate( t, @@ -1938,7 +2013,7 @@ test('failUnlessSignaledBeforeStart', async (t) => { compareCompletion(t, completion, makeSuccess(undefined, [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch])); }); -test('conditionWaiter', async (t) => { +testWithOtel('conditionWaiter', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1958,7 +2033,7 @@ test('conditionWaiter', async (t) => { } }); -test('conditionRacer', async (t) => { +testWithOtel('conditionRacer', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -1990,7 +2065,7 @@ test('conditionRacer', async (t) => { } }); -test('signalHandlersCanBeCleared', async (t) => { +testWithOtel('signalHandlersCanBeCleared', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -2053,7 +2128,7 @@ test('signalHandlersCanBeCleared', async (t) => { } }); -test('waitOnUser', async (t) => { +testWithOtel('waitOnUser', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -2069,7 +2144,7 @@ test('waitOnUser', async (t) => { } }); -test('scopeCancelledWhileWaitingOnExternalWorkflowCancellation', async (t) => { +testWithOtel('scopeCancelledWhileWaitingOnExternalWorkflowCancellation', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeStartWorkflow(workflowType)); @@ -2094,7 +2169,7 @@ test('scopeCancelledWhileWaitingOnExternalWorkflowCancellation', async (t) => { } }); -test('query not found - successString', async (t) => { +testWithOtel('query not found - successString', async (t) => { const { workflowType } = t.context; { const completion = await activate(t, makeActivation(undefined, makeInitializeWorkflowJob(workflowType))); @@ -2129,7 +2204,7 @@ test('query not found - successString', async (t) => { } }); -test('Buffered signals are dispatched to correct handler and in correct order - signalsOrdering', async (t) => { +testWithOtel('Buffered signals are dispatched to correct handler and in correct order - signalsOrdering', async (t) => { const { workflowType } = t.context; { const completion = await activate( @@ -2178,7 +2253,7 @@ test('Buffered signals are dispatched to correct handler and in correct order - } }); -test('Buffered signals dispatch is reentrant - signalsOrdering2', async (t) => { +testWithOtel('Buffered signals dispatch is reentrant - signalsOrdering2', async (t) => { const { workflowType } = t.context; { const completion = await activate( @@ -2219,340 +2294,292 @@ test('Buffered signals dispatch is reentrant - signalsOrdering2', async (t) => }); // Validate that issue #1474 is fixed in 1.11.0+ -test("Pending promises can't unblock between signals and updates - 1.11.0+ - signalUpdateOrderingWorkflow", async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType), { - doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, - }), - isReplaying: false, - }); - compareCompletion( - t, - completion, - makeSuccess([ - { updateResponse: { protocolInstanceId: '1', accepted: {} } }, - { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, - ]) - ); - } +testWithOtel( + "Pending promises can't unblock between signals and updates - 1.11.0+ - signalUpdateOrderingWorkflow", + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType), { + doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, + }), + isReplaying: false, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, + ]) + ); + } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - { signalWorkflow: { signalName: 'fooSignal', input: [] } }, - { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } - ), - isReplaying: false, - }); - compareCompletion( - t, - completion, - makeSuccess( - [ - { updateResponse: { protocolInstanceId: '2', accepted: {} } }, - { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, - { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(3) } }, - ], - [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] - ) - ); + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { signalWorkflow: { signalName: 'fooSignal', input: [] } }, + { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } + ), + isReplaying: false, + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, + { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(3) } }, + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } } -}); +); // Validate that issue #1474 legacy behavior is maintained when replaying from pre-1.11.0 history -test("Pending promises can't unblock between signals and updates - pre-1.11.0 - signalUpdateOrderingWorkflow", async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType), { - doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, - }), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - { updateResponse: { protocolInstanceId: '1', accepted: {} } }, - { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, - ]) - ); - } - - { - const completion = await activate(t, { - ...makeActivation( - undefined, - { signalWorkflow: { signalName: 'fooSignal', input: [] } }, - { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } - ), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(2) } }, - { updateResponse: { protocolInstanceId: '2', accepted: {} } }, - { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, - ]) - ); - } -}); - -test('Signals/Updates/Activities/Timers have coherent promise completion ordering (no signal) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), - makeScheduleActivityCommand({ - seq: 1, - activityId: '1', - activityType: 'myActivity', - scheduleToCloseTimeout: msToTs('10s'), - taskQueue: 'test-activity', - doNotEagerlyExecute: false, - versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, +testWithOtel( + "Pending promises can't unblock between signals and updates - pre-1.11.0 - signalUpdateOrderingWorkflow", + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType), { + doUpdate: { name: 'fooUpdate', protocolInstanceId: '1', runValidator: false, id: 'first' }, }), - ]) - ); - } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, - makeFireTimerJob(1), - makeResolveActivityJob(1, { completed: {} }) - ), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - { updateResponse: { protocolInstanceId: '1', accepted: {} } }, - { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, - makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([false, true, true, true])), - ]) - ); - } -}); + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(1) } }, + ]) + ); + } -test('Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), - makeScheduleActivityCommand({ - seq: 1, - activityId: '1', - activityType: 'myActivity', - scheduleToCloseTimeout: msToTs('10s'), - taskQueue: 'test-activity', - doNotEagerlyExecute: false, - versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, - }), - ]) - ); - } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - makeSignalWorkflowJob('aaSignal', []), - { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, - makeFireTimerJob(1), - makeResolveActivityJob(1, { completed: {} }) - ), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - // Note the missing update responses here; this is due to #1474. The fact that the activity - // and timer completions have not been observed before the workflow completed is a related but - // distinct issue. But are resolved by the ProcessWorkflowActivationJobsAsSingleBatch fix. - makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, false, false, false])), - { updateResponse: { protocolInstanceId: '1', accepted: {} } }, - { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, - ]) - ); + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { signalWorkflow: { signalName: 'fooSignal', input: [] } }, + { doUpdate: { name: 'fooUpdate', protocolInstanceId: '2', id: 'second' } } + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + { completeWorkflowExecution: { result: defaultPayloadConverter.toPayload(2) } }, + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(3) } }, + ]) + ); + } } -}); +); -test('Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - signalsActivitiesTimersPromiseOrdering', async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), - }); - compareCompletion( - t, - completion, - makeSuccess([ - makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), - makeScheduleActivityCommand({ - seq: 1, - activityId: '1', - activityType: 'myActivity', - scheduleToCloseTimeout: msToTs('10s'), - taskQueue: 'test-activity', - doNotEagerlyExecute: false, - versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, - }), - ]) - ); - } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - makeSignalWorkflowJob('aaSignal', []), - { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, - makeFireTimerJob(1), - makeResolveActivityJob(1, { completed: {} }) - ), - }); - compareCompletion( - t, - completion, - makeSuccess( - [ +testWithOtel( + 'Signals/Updates/Activities/Timers have coherent promise completion ordering (no signal) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('10s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ { updateResponse: { protocolInstanceId: '1', accepted: {} } }, { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, - makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, true, true, true])), - ], - [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] - ) - ); + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([false, true, true, true])), + ]) + ); + } } -}); +); -test('Signals/Updates/Activities/Timers - Trace promises completion order - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrderingTracer', async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), - makeScheduleActivityCommand({ - seq: 1, - activityId: '1', - activityType: 'myActivity', - scheduleToCloseTimeout: msToTs('1s'), - taskQueue: 'test', - doNotEagerlyExecute: false, - versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, - }), - ]) - ); - } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - makeSignalWorkflowJob('aaSignal', ['signal1']), - makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), - makeSignalWorkflowJob('aaSignal', ['signal2']), - makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), - makeFireTimerJob(1), - makeResolveActivityJob(1, { completed: {} }) - ), - isReplaying: true, - }); - compareCompletion( - t, - completion, - makeSuccess([ - { updateResponse: { protocolInstanceId: '1', accepted: {} } }, - { updateResponse: { protocolInstanceId: '2', accepted: {} } }, - { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, - { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(undefined) } }, - makeCompleteWorkflowExecution( - defaultPayloadConverter.toPayload( - [ - // Signals first (sync part, then microtasks) - 'signal1.sync, signal2.sync', - 'signal1.1, signal2.1, signal1.2, signal2.2, signal1.3, signal2.3, signal1.4, signal2.4', - - // Then update (sync part first), then microtasks for update+timers+activities - 'update1.sync, update2.sync', - 'update1.1, update2.1, timer.1, activity.1', - 'update1.2, update2.2, timer.2, activity.2', - 'update1.3, update2.3, timer.3, activity.3', - 'update1.4, update2.4, timer.4, activity.4', - ].flatMap((x) => x.split(', ')) - ) +testWithOtel( + 'Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrdering', + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('10s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', []), + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) ), - ]) - ); + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + // Note the missing update responses here; this is due to #1474. The fact that the activity + // and timer completions have not been observed before the workflow completed is a related but + // distinct issue. But are resolved by the ProcessWorkflowActivationJobsAsSingleBatch fix. + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, false, false, false])), + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + ]) + ); + } } -}); +); -test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11.0+ - signalsActivitiesTimersPromiseOrderingTracer', async (t) => { - const { workflowType } = t.context; - { - const completion = await activate(t, { - ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), - }); - compareCompletion( - t, - completion, - makeSuccess([ - makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), - makeScheduleActivityCommand({ - seq: 1, - activityId: '1', - activityType: 'myActivity', - scheduleToCloseTimeout: msToTs('1s'), - taskQueue: 'test', - doNotEagerlyExecute: false, - versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, - }), - ]) - ); +testWithOtel( + 'Signals/Updates/Activities/Timers have coherent promise completion ordering (w/ signals) - signalsActivitiesTimersPromiseOrdering', + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(100) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('10s'), + taskQueue: 'test-activity', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', []), + { doUpdate: { id: 'first', name: 'aaUpdate', protocolInstanceId: '1' } }, + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + makeCompleteWorkflowExecution(defaultPayloadConverter.toPayload([true, true, true, true])), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } } - { - const completion = await activate(t, { - ...makeActivation( - undefined, - makeSignalWorkflowJob('aaSignal', ['signal1']), - makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), - makeSignalWorkflowJob('aaSignal', ['signal2']), - makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), - makeFireTimerJob(1), - makeResolveActivityJob(1, { completed: {} }) - ), - }); - compareCompletion( - t, - completion, - makeSuccess( - [ +); + +testWithOtel( + 'Signals/Updates/Activities/Timers - Trace promises completion order - pre-1.11.0 compatibility - signalsActivitiesTimersPromiseOrderingTracer', + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', ['signal1']), + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), + makeSignalWorkflowJob('aaSignal', ['signal2']), + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + isReplaying: true, + }); + compareCompletion( + t, + completion, + makeSuccess([ { updateResponse: { protocolInstanceId: '1', accepted: {} } }, { updateResponse: { protocolInstanceId: '2', accepted: {} } }, { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, @@ -2560,22 +2587,91 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11 makeCompleteWorkflowExecution( defaultPayloadConverter.toPayload( [ - 'signal1.sync, update1.sync, signal2.sync, update2.sync', - 'signal1.1, update1.1, signal2.1, update2.1, timer.1, activity.1', - 'signal1.2, update1.2, signal2.2, update2.2, timer.2, activity.2', - 'signal1.3, update1.3, signal2.3, update2.3, timer.3, activity.3', - 'signal1.4, update1.4, signal2.4, update2.4, timer.4, activity.4', + // Signals first (sync part, then microtasks) + 'signal1.sync, signal2.sync', + 'signal1.1, signal2.1, signal1.2, signal2.2, signal1.3, signal2.3, signal1.4, signal2.4', + + // Then update (sync part first), then microtasks for update+timers+activities + 'update1.sync, update2.sync', + 'update1.1, update2.1, timer.1, activity.1', + 'update1.2, update2.2, timer.2, activity.2', + 'update1.3, update2.3, timer.3, activity.3', + 'update1.4, update2.4, timer.4, activity.4', ].flatMap((x) => x.split(', ')) ) ), - ], - [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] - ) - ); + ]) + ); + } } -}); +); + +testWithOtel( + 'Signals/Updates/Activities/Timers - Trace promises completion order - 1.11.0+ - signalsActivitiesTimersPromiseOrderingTracer', + async (t) => { + const { workflowType } = t.context; + { + const completion = await activate(t, { + ...makeActivation(undefined, makeInitializeWorkflowJob(workflowType)), + }); + compareCompletion( + t, + completion, + makeSuccess([ + makeStartTimerCommand({ seq: 1, startToFireTimeout: msToTs(1) }), + makeScheduleActivityCommand({ + seq: 1, + activityId: '1', + activityType: 'myActivity', + scheduleToCloseTimeout: msToTs('1s'), + taskQueue: 'test', + doNotEagerlyExecute: false, + versioningIntent: coresdk.common.VersioningIntent.UNSPECIFIED, + }), + ]) + ); + } + { + const completion = await activate(t, { + ...makeActivation( + undefined, + makeSignalWorkflowJob('aaSignal', ['signal1']), + makeUpdateActivationJob('first', '1', 'aaUpdate', ['update1']), + makeSignalWorkflowJob('aaSignal', ['signal2']), + makeUpdateActivationJob('second', '2', 'aaUpdate', ['update2']), + makeFireTimerJob(1), + makeResolveActivityJob(1, { completed: {} }) + ), + }); + compareCompletion( + t, + completion, + makeSuccess( + [ + { updateResponse: { protocolInstanceId: '1', accepted: {} } }, + { updateResponse: { protocolInstanceId: '2', accepted: {} } }, + { updateResponse: { protocolInstanceId: '1', completed: defaultPayloadConverter.toPayload(undefined) } }, + { updateResponse: { protocolInstanceId: '2', completed: defaultPayloadConverter.toPayload(undefined) } }, + makeCompleteWorkflowExecution( + defaultPayloadConverter.toPayload( + [ + 'signal1.sync, update1.sync, signal2.sync, update2.sync', + 'signal1.1, update1.1, signal2.1, update2.1, timer.1, activity.1', + 'signal1.2, update1.2, signal2.2, update2.2, timer.2, activity.2', + 'signal1.3, update1.3, signal2.3, update2.3, timer.3, activity.3', + 'signal1.4, update1.4, signal2.4, update2.4, timer.4, activity.4', + ].flatMap((x) => x.split(', ')) + ) + ), + ], + [SdkFlags.ProcessWorkflowActivationJobsAsSingleBatch] + ) + ); + } + } +); -test('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => { +testWithOtel('Buffered updates are dispatched in the correct order - updatesOrdering', async (t) => { const { workflowType } = t.context; { const completion = await activate( @@ -2642,7 +2738,7 @@ test('Buffered updates are dispatched in the correct order - updatesOrdering', a } }); -test('Buffered updates are reentrant - updatesAreReentrant', async (t) => { +testWithOtel('Buffered updates are reentrant - updatesAreReentrant', async (t) => { const { workflowType } = t.context; { const completion = await activate(