Skip to content

Commit a402585

Browse files
authored
feat: support lazy environment activation in NodeEnvManager (#2841)
1 parent b0a6c96 commit a402585

6 files changed

Lines changed: 85 additions & 8 deletions

File tree

packages/runtime-node/src/micro-rpc.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ export function rpcCall<T>(worker: Worker, type: string, timeout = 10000): Promi
3030
return result;
3131
}
3232

33+
export function rpcPost(worker: Worker, type: string, value?: unknown) {
34+
const outgoingMessage = { type, id: getNextMessageId(), value };
35+
worker.postMessage(outgoingMessage);
36+
}
37+
3338
export function bindRpcListener<T>(type: string, customFetcher: () => Promise<T> | T) {
3439
const handler = async (message: unknown) => {
3540
if (isValidRpcMessage(message) && message.type === type) {

packages/runtime-node/src/node-env-manager.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export interface RunningNodeEnvironment {
1919
id: string;
2020
dispose(): Promise<void>;
2121
getMetrics(): Promise<PerformanceMetrics>;
22+
activate?(): Promise<void>;
2223
}
2324

2425
export interface NodeEnvConfig extends Pick<AnyEnvironment, 'env' | 'endpointType'> {
@@ -55,6 +56,7 @@ export class NodeEnvManager implements IDisposable {
5556
}: ILaunchHttpServerOptions & {
5657
connectionHandlers?: ConnectionHandlers;
5758
} = {},
59+
lazy = false,
5860
) {
5961
process.env.ENGINE_FLOW_V2_DIST_URL = this.importMeta.url;
6062
const disposeMetricsListener = bindMetricsListener(() => this.collectMetricsFromAllOpenEnvironments());
@@ -95,6 +97,10 @@ export class NodeEnvManager implements IDisposable {
9597
}
9698
await this.runFeatureEnvironments(verbose, runtimeOptions, forwardingCom);
9799

100+
if (!lazy) {
101+
await this.activateEnvs();
102+
}
103+
98104
app.get('/health', (_req, res) => {
99105
res.status(200).end();
100106
});
@@ -120,6 +126,15 @@ export class NodeEnvManager implements IDisposable {
120126
return { port };
121127
}
122128

129+
async activateEnvs() {
130+
const activatedEnvs: Promise<void>[] = [];
131+
for (const env of this.openEnvironments.values()) {
132+
if (!env.activate) continue;
133+
activatedEnvs.push(env.activate());
134+
}
135+
await Promise.all(activatedEnvs);
136+
}
137+
123138
async closeAll() {
124139
await Promise.all([...this.openEnvironments.values()].map((env) => this.closeEnv(env)));
125140
}
@@ -186,7 +201,7 @@ export class NodeEnvManager implements IDisposable {
186201
workerURL: this.createEnvironmentFileUrl(envName),
187202
runtimeOptions: runtimeOptions,
188203
});
189-
await envWithInit.initialize();
204+
envWithInit.preLoad();
190205
runningEnv = envWithInit;
191206
}
192207

packages/runtime-node/src/worker-thread-initializer2.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import { Worker } from '@dazl/isomorphic-worker/worker';
33
import { type UniversalWorkerOptions } from '@dazl/isomorphic-worker/types';
44
import { createDisposables } from '@dazl/patterns';
55
import { getMetricsFromWorker } from './metrics-utils.js';
6-
import { rpcCall } from './micro-rpc.js';
6+
import { rpcCall, rpcPost } from './micro-rpc.js';
77
import type { RunningNodeEnvironment } from './node-env-manager.js';
88

99
export interface WorkerThreadInitializer2 extends RunningNodeEnvironment {
1010
initialize: () => Promise<void>;
11+
preLoad: () => void;
12+
activate: () => Promise<void>;
1113
}
1214

1315
export interface WorkerThreadInitializerOptions2 {
@@ -37,7 +39,8 @@ export function workerThreadInitializer2({
3739
const instanceId = communication.getEnvironmentInstanceId(env.env, env.endpointType);
3840
const envIsReady = communication.envReady(instanceId);
3941
let worker: Worker;
40-
const initialize = async () => {
42+
43+
const preLoad = () => {
4144
const envRuntimeOptions = new Map(runtimeOptions?.entries());
4245
envRuntimeOptions.set('environment_id', instanceId);
4346

@@ -73,13 +76,21 @@ export function workerThreadInitializer2({
7376
communication.clearEnvironment(instanceId);
7477
communication.removeMessageHandler(host);
7578
});
79+
};
7680

81+
const activate = async () => {
82+
rpcPost(worker, 'activate');
7783
await envIsReady;
7884
};
7985

8086
return {
8187
id: instanceId,
82-
initialize,
88+
initialize: async () => {
89+
preLoad();
90+
await activate();
91+
},
92+
preLoad,
93+
activate,
8394
dispose: () => disposables.dispose(),
8495
getMetrics: () => getMetricsFromWorker(worker),
8596
};

packages/runtime-node/test-kit/entrypoints/a.node.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ export function runEnv({
3838

3939
if (workerData) {
4040
const unbindMetricsListener = bindMetricsListener();
41+
let running: ReturnType<typeof runEnv>;
42+
const unbindActivateListener = bindRpcListener('activate', () => {
43+
unbindActivateListener();
44+
running = runEnv();
45+
});
4146
const unbindTerminationListener = bindRpcListener('terminate', async () => {
4247
if (verbose) {
4348
console.log(`[${env.env}]: Termination Requested. Waiting for engine.`);
@@ -55,8 +60,6 @@ if (workerData) {
5560
return;
5661
}
5762
});
58-
// used by dispose to wait for engine to be ready
59-
const running = runEnv();
6063
} else {
6164
console.log('running engine in test mode');
6265
}

packages/runtime-node/test-kit/entrypoints/b.node.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ export function runEnv({
3838

3939
if (workerData) {
4040
const unbindMetricsListener = bindMetricsListener();
41+
let running: ReturnType<typeof runEnv>;
42+
const unbindActivateListener = bindRpcListener('activate', () => {
43+
unbindActivateListener();
44+
running = runEnv();
45+
});
4146
const unbindTerminationListener = bindRpcListener('terminate', async () => {
4247
if (verbose) {
4348
console.log(`[${env.env}]: Termination Requested. Waiting for engine.`);
@@ -55,8 +60,6 @@ if (workerData) {
5560
return;
5661
}
5762
});
58-
// used by dispose to wait for engine to be ready
59-
const running = runEnv();
6063
} else {
6164
console.log('running engine in test mode');
6265
}

packages/runtime-node/test/node-env.manager.unit.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { expect } from 'chai';
22
import sinon from 'sinon';
3+
import { timeout } from 'promise-assist';
34
import { BaseHost, COM, Communication, WsClientHost } from '@dazl/engine-core';
45
import {
56
IConnectionHandler,
@@ -155,6 +156,45 @@ describe('NodeEnvManager', () => {
155156
});
156157
});
157158

159+
describe('NodeEnvManager with lazy activation', () => {
160+
it('should not respond before activateEnvs and work after it', async () => {
161+
const featureEnvironmentsMapping: NodeEnvsFeatureMapping = {
162+
featureToEnvironments: {
163+
'test-feature': [aEnv.env, bEnv.env],
164+
},
165+
availableEnvironments: {
166+
a: {
167+
env: aEnv.env,
168+
endpointType: 'single',
169+
envType: 'node',
170+
},
171+
b: {
172+
env: bEnv.env,
173+
endpointType: 'single',
174+
envType: 'node',
175+
},
176+
},
177+
};
178+
179+
const manager = disposeAfterTest(new NodeEnvManager(meta, featureEnvironmentsMapping));
180+
const { port } = await manager.autoLaunch(new Map([['feature', 'test-feature']]), {}, true);
181+
const communication = disposeAfterTest(getClientCom(port));
182+
const api = communication.apiProxy<EchoService>({ id: aEnv.env }, { id: 'test-feature.echoAService' });
183+
184+
// env should not respond before activation
185+
const error = await timeout(api.echo(), 500).catch((e: Error) => e);
186+
expect(error, 'uninitialized env')
187+
.to.be.instanceOf(Error)
188+
.with.property('message')
189+
.that.includes('timed out');
190+
191+
// activate and verify env responds
192+
await manager.activateEnvs();
193+
194+
expect(await api.echo()).to.equal('a');
195+
});
196+
});
197+
158198
describe('NodeEnvManager with connection handlers', () => {
159199
it('should call connection, reconnection and disconnection handlers', async () => {
160200
const connectionHandler = sinon.spy();

0 commit comments

Comments
 (0)