Skip to content
29 changes: 29 additions & 0 deletions src/engine/universal/core/src/engine/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ class Engine {
* @param {string} the version of the process to deploy
*/
async deployProcessVersion(definitionId, versionId) {
const otherVersions = this.versions.filter((version) => version !== versionId);
otherVersions.forEach((version) => {
const process = this._versionProcessMapping[version];

// deactivate other versions so they don't keep spawning new instances automatically
if (process) {
process.undeploy();
Comment thread
Zayn-Javed marked this conversation as resolved.
}
});

if (!this._versionProcessMapping[versionId]) {
// Fetch the stored BPMN
const bpmn = await distribution.db.getProcessVersion(definitionId, versionId);
Expand Down Expand Up @@ -213,6 +223,22 @@ class Engine {
this._versionProcessMapping[versionId] = process;
this._versionBpmnMapping[versionId] = bpmn;
this.versions.push(versionId);
} else if (!this._versionProcessMapping[versionId].isDeployed()) {
// activate the process so auto-start events like timer events are allowed to trigger new
// instances
this._versionProcessMapping[versionId].deploy();
}
}

/**
* Removes the deployed state from the process version in the NeoBPMN Engine preventing it from starting instances
*
* @param {string} the version of the process to undeploy
*/
undeployProcessVersion(versionId) {
const process = this._versionProcessMapping[versionId];
if (process && process.isDeployed()) {
process.undeploy();
}
}

Expand Down Expand Up @@ -1014,6 +1040,9 @@ class Engine {
* Clean up some data when the engine is supposed to be removed
*/
destroy() {
for (const version of this.versions) {
this._versionProcessMapping[version].undeploy();
}
for (const instanceId of this.instanceIDs) {
this.deleteInstance(instanceId);
}
Expand Down
4 changes: 1 addition & 3 deletions src/engine/universal/core/src/management.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ const Management = {
const engine = this.ensureProcessEngine(definitionId);

// ensure that the version is deployed
if (!engine.versions.includes(version)) {
await engine.deployProcessVersion(definitionId, version);
}
await engine.deployProcessVersion(definitionId, version);

return engine;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,4 +393,71 @@ module.exports = (path, management) => {
});
},
);

network.put(`${path}/:definitionId/active`, { cors: true }, async (req) => {
const { definitionId } = req.params;

Comment thread
jjoderis marked this conversation as resolved.
const {
body: { active },
} = req;

if (active === true) {
return {
statusCode: 400,
mimeType: 'text/plain',
response:
'Cannot set active true on a process. Please select a specific version to activate.',
};
} else if (active === false) {
const engine = await management.getEngineWithDefinitionId(definitionId);

if (engine) {
Comment thread
Zayn-Javed marked this conversation as resolved.
engine.versions.forEach((version) => engine.undeployProcessVersion(version));
}

return {
statusCode: 200,
mimeType: 'application/json',
response: '{}',
};
} else {
return {
statusCode: 400,
mimeType: 'text/plain',
response:
'This endpoint expects the request body to contain an entry called active with a boolean value of "false".',
};
}
});

network.put(`${path}/:definitionId/versions/:version/active`, { cors: true }, async (req) => {
const { definitionId, version } = req.params;

const {
body: { active },
} = req;

if (active === true) {
await management.ensureProcessEngineWithVersion(definitionId, version);
} else if (active === false) {
const engine = await management.getEngineWithDefinitionId(definitionId);

if (engine) {
engine.undeployProcessVersion(version);
}
} else {
return {
statusCode: 400,
mimeType: 'text/plain',
response:
'This endpoint expects the request body to contain an entry called active with a boolean value',
};
}

return {
statusCode: 200,
mimeType: 'application/json',
response: '{}',
};
});
};
44 changes: 42 additions & 2 deletions src/management-system-v2/lib/engines/deployment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ async function deployProcessToMachines(
});

await Promise.all(allMachineRequests);

// TODO: if we handle static deployment with machine mapping of process elements in the future
// we might need to make sure that we only activate the process in a way that start events that are not mapped to a machine are not triggered on that machine
// (maybe we need to split the process into multiple sections and deploy them seperately
await asyncForEach(machines, async (machine) => {
await asyncForEach(processesExportData, async (process) => {
await asyncForEach(Object.keys(process.versions), async (version) => {
await changeDeploymentActivation(machine, process.definitionId, version, true);
});
});
});
} catch (error) {
// TODO: don't remove the whole process when deploying a single version fails
await asyncForEach(Object.values(processesExportData), async ({ definitionId }) => {
Expand Down Expand Up @@ -153,6 +164,8 @@ async function dynamicDeployment(
}

await deployProcessToMachines([preferredMachine], processesExportData);

return [preferredMachine];
}

async function staticDeployment(
Expand Down Expand Up @@ -198,6 +211,8 @@ async function staticDeployment(
// targetedMachines.push(forceMachine);

await deployProcessToMachines(targetedMachines, processesExportData);

return targetedMachines;
}

export async function deployProcess(
Expand Down Expand Up @@ -230,9 +245,9 @@ export async function deployProcess(
);

if (method === 'static') {
await staticDeployment(definitionId, version, processesExportData, machines);
return await staticDeployment(definitionId, version, processesExportData, machines);
} else {
await dynamicDeployment(definitionId, version, processesExportData, machines);
return await dynamicDeployment(definitionId, version, processesExportData, machines);
}
}
export type ImportInformation = { definitionId: string; processId: string; version: number };
Expand Down Expand Up @@ -338,6 +353,31 @@ export async function getDeployment(engine: Engine, definitionId: string) {
return deployment as DeployedProcessInfo;
}

export async function changeDeploymentActivation(
engine: Engine,
definitionId: string,
version: string | undefined,
value: boolean,
) {
if (version) {
await engineRequest({
method: 'put',
endpoint: '/process/:definitionId/versions/:version/active',
engine,
pathParams: { definitionId, version },
body: { active: value },
});
} else {
await engineRequest({
method: 'put',
endpoint: '/process/:definitionId/active',
engine,
pathParams: { definitionId },
body: { active: value },
});
}
}

export async function getProcessImageFromMachine(
engine: Engine,
definitionId: string,
Expand Down
2 changes: 2 additions & 0 deletions src/management-system-v2/lib/engines/endpoints/endpoints.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
"/process/:definitionId/instance/:instanceID/instanceState": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId": {},
"/process/:definitionId/instance/:instanceId/tokens/:tokenId/currentFlowNodeState": {},
"/process/:definitionId/active": {},
"/process/:definitionId/versions/:version/active": {},
"/process/:definitionId/versions/:version/start-form": {},
"/process/:definitionId/user-tasks/:fileName": {},
"/process/:definitionId/script-tasks/:fileName": {},
Expand Down
79 changes: 75 additions & 4 deletions src/management-system-v2/lib/engines/server-actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

import { UserFacingError, getErrorMessage, userError } from '../user-error';
import {
DeployedProcessInfo,
deployProcess as _deployProcess,
getDeployments as fetchDeployments,
getDeployment as fetchDeployment,
getProcessImageFromMachine,
removeDeploymentFromMachines,
changeDeploymentActivation as _changeDeploymentActivation,
DeployedProcessInfo,
} from './deployment';
import { Engine, SpaceEngine } from './machines';
import { savedEnginesToEngines } from './saved-engines-helpers';
import { getCurrentEnvironment, getCurrentUser } from '@/components/auth';
import { getCurrentEnvironment } from '@/components/auth';
import { enableUseDB } from 'FeatureFlags';
import { getDbEngines, getDbEngineByAddress } from '@/lib/data/db/engines';
import { asyncFilter, asyncMap, asyncForEach } from '../helpers/javascriptHelpers';
Expand Down Expand Up @@ -46,7 +47,6 @@ import {
import { getFileFromMachine, submitFileToMachine, updateVariablesOnMachine } from './instances';
import { getProcessIds, getVariablesFromElementById } from '@proceed/bpmn-helper';
import { Variable } from '@proceed/bpmn-helper/src/getters';
import { getUsersInSpace } from '../data/db/iam/memberships';
import Ability from '../ability/abilityHelper';
import { getUserById } from '../data/db/iam/users';

Expand Down Expand Up @@ -107,7 +107,51 @@ export async function deployProcess(

if (engines.length === 0) throw new UserFacingError('No fitting engine found.');

await _deployProcess(definitionId, versionId, spaceId, method, engines);
const processAlreadyDeployedInfo = await asyncMap(engines, async (engine) => {
let deployment;
try {
deployment = await fetchDeployment(engine, definitionId);
// ignore not found errors on engines that don't have a deployment of the process
} catch (err) {
deployment = undefined;
}
return [engine, deployment] as const;
});

function withDeployment(
info: (typeof processAlreadyDeployedInfo)[number],
): info is readonly [Engine, DeployedProcessInfo] {
return !!info[1];
}
const enginesWithDeployment = processAlreadyDeployedInfo.filter(withDeployment);

// check if the version is already deployed to some engine since we don't
// need to redeploy it in that case
if (
!_forceEngine &&
enginesWithDeployment.some(([_, deployment]) =>
deployment.versions.some((version) => version.versionId === versionId),
)
) {
return;
}

if (!_forceEngine && enginesWithDeployment.length) {
// check if an engine already has another version in which case that engine is selected
engines = enginesWithDeployment.map(([engine]) => engine);
}

const deployedTo = await _deployProcess(definitionId, versionId, spaceId, method, engines);

// deactivate the process on all engines that have a deployment but which were not target of the
// new deployment
await Promise.allSettled(
enginesWithDeployment.map(async ([engine]) => {
if (!deployedTo.some((dE) => dE.id === engine.id)) {
await _changeDeploymentActivation(engine, definitionId, undefined, false);
}
}),
);
} catch (e) {
const message = getErrorMessage(e);
return userError(message);
Expand All @@ -131,6 +175,33 @@ export async function removeDeployment(definitionId: string, spaceId: string) {
}
}

export async function changeDeploymentActivation(
definitionId: string,
spaceId: string,
version: string,
value: boolean,
) {
try {
const engines = await getCorrectTargetEngines(spaceId, false, async (engine: Engine) => {
const deployments = await fetchDeployments([engine]);

return deployments.some(
(deployment) =>
deployment.definitionId === definitionId &&
deployment.versions.some((v) => v.versionId === version),
);
});

if (!engines.length)
throw new Error('There is no available engine with the requested process version.');

await _changeDeploymentActivation(engines[0], definitionId, version, value);
} catch (e) {
const message = getErrorMessage(e);
return userError(message);
}
}

export async function getAvailableTaskListEntries(spaceId: string, engines: Engine[]) {
try {
if (!enableUseDB)
Expand Down
Loading