Skip to content

Commit 3dbd877

Browse files
authored
Created runWorker (#288)
* refactor: Renamed function * breaking: Made runTask private * feature: Created runWorker * chore: Updated tests * chore: Updated docs * chore: Added changesets
1 parent a424bcf commit 3dbd877

File tree

19 files changed

+152
-194
lines changed

19 files changed

+152
-194
lines changed

.changeset/chubby-books-argue.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@codemod-utils/threads": minor
3+
"docs-app-for-codemod-utils": patch
4+
---
5+
6+
Made runTask private

.changeset/empty-carrots-invent.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@codemod-utils/threads": minor
3+
"docs-app-for-codemod-utils": patch
4+
---
5+
6+
Created runWorker

docs/src/docs/packages/codemod-utils-threads.md

Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ function parallelize<T extends unknown[], U>(
5555
): Promise<U[]>;
5656
```
5757

58-
```ts [Example (src/steps/analyze-files.ts)]
58+
```ts [Example (src/steps/analyze-files.ts)]{12-15}
5959
import type { Options } from '../types/index.js';
6060
import { task } from './analyze-files/task.js';
6161

@@ -83,33 +83,19 @@ export function task(filePath: string, projectRoot: string): Result {
8383
```
8484

8585
```ts [Example (src/steps/analyze-files/worker.ts)]
86-
import { parentPort, workerData } from 'node:worker_threads';
87-
88-
import { runTask } from '@codemod-utils/threads';
86+
import { runWorker } from '@codemod-utils/threads';
8987

9088
import { task } from './task.js';
9189

92-
type WorkerData = {
93-
datasets: Parameters<typeof task>[];
94-
};
95-
96-
const { datasets } = workerData as WorkerData;
97-
98-
runTask(task, datasets)
99-
.then((result) => {
100-
parentPort?.postMessage(result);
101-
})
102-
.catch((error) => {
103-
throw error;
104-
});
90+
runWorker(task);
10591
```
10692

10793
:::
10894

10995

110-
### runTask {#api-run-task}
96+
### runWorker {#api-run-worker}
11197

112-
Runs a task on many datasets. The size of datasets should be moderate. Primarily used to create a worker file for a task.
98+
Runs a task on a worker.
11399

114100
> [!TIP]
115101
>
@@ -122,43 +108,18 @@ Runs a task on many datasets. The size of datasets should be moderate. Primarily
122108
* @param task
123109
*
124110
* Some function to call.
125-
*
126-
* @param datasets
127-
*
128-
* An array of dataset's.
129-
*
130-
* @return
131-
*
132-
* An array of the task's return value.
133111
*/
134112
type Task<T extends unknown[], U> = (...dataset: T) => U | Promise<U>;
135113

136-
function runTask<T extends unknown[], U>(
137-
task: Task<T, U>,
138-
datasets: T[],
139-
): Promise<U[]>;
114+
function runWorker<T extends unknown[], U>(task: Task<T, U>): void;
140115
```
141116

142117
```ts [Example (Worker)]{5}
143-
import { parentPort, workerData } from 'node:worker_threads';
144-
145-
import { runTask } from '@codemod-utils/threads';
118+
import { runWorker } from '@codemod-utils/threads';
146119

147120
import { task } from './task.js';
148121

149-
type WorkerData = {
150-
datasets: Parameters<typeof task>[];
151-
};
152-
153-
const { datasets } = workerData as WorkerData;
154-
155-
runTask(task, datasets)
156-
.then((result) => {
157-
parentPort?.postMessage(result);
158-
})
159-
.catch((error) => {
160-
throw error;
161-
});
122+
runWorker(task);
162123
```
163124

164125
:::

packages/threads/src/-private/parallelize.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ export function batchDatasets<T>(
2222
return [datasetsForMainThread, datasetsForWorkerThreads];
2323
}
2424

25-
export function createRunWorker<U>(
25+
export function getCreateWorker<U>(
2626
workerOptions: WorkerOptions,
2727
): <T>(datasets: T[]) => Promise<U[]> {
2828
const { importMetaUrl, workerFilePath } = workerOptions;
2929

30-
function runWorker<T>(datasets: T[]): Promise<U[]> {
30+
function createWorker<T>(datasets: T[]): Promise<U[]> {
3131
return new Promise((resolve, reject) => {
3232
const workerUrl = new URL(workerFilePath, importMetaUrl);
3333

@@ -48,7 +48,7 @@ export function createRunWorker<U>(
4848
});
4949
}
5050

51-
return runWorker;
51+
return createWorker;
5252
}
5353

5454
export function validateWorkerFilePath(workerOptions: WorkerOptions): void {

packages/threads/src/-private/run-task.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export class Queue<U> {
1+
class Queue<U> {
22
private error: Error | undefined = undefined;
33
declare private maxNumTasksRunning: number;
44
private results: U[] = [];
@@ -62,3 +62,34 @@ export class Queue<U> {
6262
await Promise.race(this.tasksRunning);
6363
}
6464
}
65+
66+
const MAX_NUM_TASKS_RUNNING = 10;
67+
68+
export type Task<T extends unknown[], U> = (...dataset: T) => U | Promise<U>;
69+
70+
export async function runTask<T extends unknown[], U>(
71+
task: Task<T, U>,
72+
datasets: T[],
73+
): Promise<U[]> {
74+
const queue = new Queue<U>(MAX_NUM_TASKS_RUNNING);
75+
76+
for (const dataset of datasets) {
77+
if (queue.hasTooManyTasks) {
78+
await queue.wait();
79+
}
80+
81+
await queue.runTask(task(...dataset));
82+
}
83+
84+
if (queue.hasTasks) {
85+
await queue.finishTasks();
86+
}
87+
88+
const { output } = queue;
89+
90+
if (output.status === 'failed') {
91+
throw output.error;
92+
}
93+
94+
return output.results;
95+
}

packages/threads/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
export * from './parallelize.js';
2-
export * from './run-task.js';
2+
export * from './run-worker.js';

packages/threads/src/parallelize.ts

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ import { availableParallelism } from 'node:os';
22

33
import {
44
batchDatasets,
5-
createRunWorker,
5+
getCreateWorker,
66
validateWorkerFilePath,
77
type WorkerOptions,
88
} from './-private/parallelize.js';
9-
import { runTask, type Task } from './run-task.js';
9+
import { runTask, type Task } from './-private/run-task.js';
1010

1111
const MIN_NUM_TASKS_PER_WORKER = 100;
1212

@@ -40,6 +40,8 @@ const MIN_NUM_TASKS_PER_WORKER = 100;
4040
*
4141
* ```ts
4242
* // src/steps/analyze-files.ts
43+
* import { parallelize } from '@codemod-utils/threads';
44+
*
4345
* import type { Options } from '../types/index.js';
4446
* import { task } from './analyze-files/task.js';
4547
*
@@ -69,25 +71,11 @@ const MIN_NUM_TASKS_PER_WORKER = 100;
6971
*
7072
* ```ts
7173
* // src/steps/analyze-files/worker.ts
72-
* import { parentPort, workerData } from 'node:worker_threads';
73-
*
74-
* import { runTask } from '@codemod-utils/threads';
74+
* import { runWorker } from '@codemod-utils/threads';
7575
*
7676
* import { task } from './task.js';
7777
*
78-
* type WorkerData = {
79-
* datasets: Parameters<typeof task>[];
80-
* };
81-
*
82-
* const { datasets } = workerData as WorkerData;
83-
*
84-
* runTask(task, datasets)
85-
* .then((result) => {
86-
* parentPort?.postMessage(result);
87-
* })
88-
* .catch((error) => {
89-
* throw error;
90-
* });
78+
* runWorker(task);
9179
* ```
9280
*/
9381
export async function parallelize<T extends unknown[], U>(
@@ -115,11 +103,11 @@ export async function parallelize<T extends unknown[], U>(
115103
numTasksPerWorker,
116104
);
117105

118-
const runWorker = createRunWorker<U>(workerOptions);
106+
const createWorker = getCreateWorker<U>(workerOptions);
119107

120108
const [mainThreadResults, ...workerResults] = await Promise.all([
121109
runTask(task, datasetsForMainThread),
122-
...datasetsForWorkerThreads.map(runWorker),
110+
...datasetsForWorkerThreads.map(createWorker),
123111
]);
124112

125113
return [...mainThreadResults, ...workerResults.flat()];

packages/threads/src/run-task.ts

Lines changed: 0 additions & 77 deletions
This file was deleted.

packages/threads/src/run-worker.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { parentPort, workerData } from 'node:worker_threads';
2+
3+
import { runTask, type Task } from './-private/run-task.js';
4+
5+
/**
6+
* Runs a task on a worker.
7+
*
8+
* Note, a worker file always uses the code shown below. You just need to
9+
* import the task from the right file.
10+
*
11+
* @param task
12+
*
13+
* Some function to call.
14+
*
15+
* @example
16+
*
17+
* Create a worker file for a task.
18+
*
19+
* ```ts
20+
* import { runWorker } from '@codemod-utils/threads';
21+
*
22+
* import { task } from './task.js';
23+
*
24+
* runWorker(task);
25+
* ```
26+
*/
27+
export function runWorker<T extends unknown[], U>(task: Task<T, U>): void {
28+
const { datasets } = workerData as {
29+
datasets: T[];
30+
};
31+
32+
runTask(task, datasets)
33+
.then((result) => {
34+
parentPort?.postMessage(result);
35+
})
36+
.catch((error) => {
37+
throw error;
38+
});
39+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { test } from '@codemod-utils/tests';
2+
3+
import { runTask } from '../../../src/-private/run-task.js';
4+
import { assertOutput, getDatasets, task } from '../../helpers/vector/setup.js';
5+
6+
test('-private | run-task > vector (1)', async function () {
7+
const numTasks = 0;
8+
const datasets = getDatasets(numTasks);
9+
10+
const output = await runTask(task, datasets);
11+
12+
assertOutput(output);
13+
});

0 commit comments

Comments
 (0)