Skip to content

Commit 2246dc8

Browse files
committed
Switched to async create & destroy pattern
1 parent 7e5f1b2 commit 2246dc8

5 files changed

Lines changed: 116 additions & 83 deletions

File tree

benches/WorkerManager.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ const logger = new Logger('WorkerManager Bench', LogLevel.WARN, [
1515
async function main() {
1616
const cores = os.cpus().length;
1717
logger.warn(`Cores: ${cores}`);
18-
const workerManager = new WorkerManager<WorkerModule>({ logger });
19-
await workerManager.start({
18+
const workerManager = await WorkerManager.createWorkerManager({
2019
workerFactory: () => spawn(new Worker('../src/worker')),
2120
cores,
21+
logger
2222
});
2323
// 1 MiB worth of data is the ballpark range of data to be worth parallelising
2424
// 1 KiB of data is still too small
@@ -114,7 +114,7 @@ async function main() {
114114
format: 'chart.html',
115115
}),
116116
);
117-
await workerManager.stop();
117+
await workerManager.destroy();
118118
return summary;
119119
}
120120

src/WorkerManager.ts

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,8 @@ import * as errors from './errors';
1010
class WorkerManager<W extends ModuleMethods>
1111
implements WorkerManagerInterface<W>
1212
{
13-
protected pool: Pool<ModuleThread<W>>;
14-
protected logger: Logger;
15-
protected _started: boolean = false;
16-
17-
constructor({
18-
logger,
19-
}: {
20-
logger?: Logger;
21-
} = {}) {
22-
this.logger = logger ?? new Logger(this.constructor.name);
23-
}
24-
25-
get started(): boolean {
26-
return this._started;
27-
}
28-
2913
/**
30-
* Starts the WorkerManager
14+
* Creates the WorkerManager
3115
* The workerFactory needs to be a callback:
3216
* `() => spawn(new Worker(workerPath))`
3317
* The `spawn` and `Worker` can be imported from `threads`
@@ -36,63 +20,118 @@ class WorkerManager<W extends ModuleMethods>
3620
* If it is a relative path, it has to be relative to the file location where
3721
* the function expression is defined
3822
*/
39-
public async start({
23+
public static async createWorkerManager<W extends ModuleMethods>({
4024
workerFactory,
4125
cores,
26+
logger
4227
}: {
4328
workerFactory: () => Promise<ModuleThread<W>>;
4429
cores?: number;
30+
logger?: Logger
31+
}): Promise<WorkerManager<W>> {
32+
const workerManager = new WorkerManager({
33+
workerFactory,
34+
cores,
35+
logger
36+
});
37+
return workerManager;
38+
}
39+
40+
protected pool: Pool<ModuleThread<W>>;
41+
protected logger: Logger;
42+
protected _running: boolean = false;
43+
protected _destroyed: boolean = false;
44+
45+
protected constructor({
46+
workerFactory,
47+
cores,
48+
logger,
49+
}: {
50+
workerFactory: () => Promise<ModuleThread<W>>;
51+
cores?: number;
52+
logger?: Logger;
4553
}) {
46-
try {
47-
if (this._started) {
48-
return;
49-
}
50-
this.logger.info('Starting WorkerManager');
51-
this._started = true;
52-
this.pool = Pool(workerFactory, cores);
53-
this.logger.info(`Started WorkerManager`);
54-
} catch (e) {
55-
this._started = false;
56-
throw e;
57-
}
54+
this.logger = logger ?? new Logger(this.constructor.name);
55+
this.pool = Pool(workerFactory, cores);
56+
this._running = true;
57+
}
58+
59+
get running(): boolean {
60+
return this._running;
5861
}
5962

60-
public async stop() {
61-
if (!this._started) {
63+
get destroyed(): boolean {
64+
return this._destroyed;
65+
}
66+
67+
// public async start({
68+
// workerFactory,
69+
// cores,
70+
// }: {
71+
// workerFactory: () => Promise<ModuleThread<W>>;
72+
// cores?: number;
73+
// }) {
74+
// try {
75+
// if (this._started) {
76+
// return;
77+
// }
78+
// this.logger.info('Starting WorkerManager');
79+
// this._started = true;
80+
// this.pool = Pool(workerFactory, cores);
81+
// this.logger.info(`Started WorkerManager`);
82+
// } catch (e) {
83+
// this._started = false;
84+
// throw e;
85+
// }
86+
// }
87+
88+
// public async stop() {
89+
// if (!this._started) {
90+
// return;
91+
// }
92+
// this.logger.info('Stopping WorkerManager');
93+
// await this.pool.terminate();
94+
// this._started = false;
95+
// this.logger.info('Stopped WorkerManager');
96+
// }
97+
98+
public async destroy(): Promise<void> {
99+
if (this._destroyed) {
62100
return;
63101
}
64-
this.logger.info('Stopping WorkerManager');
102+
this.logger.info('Destroying WorkerManager');
65103
await this.pool.terminate();
66-
this._started = false;
67-
this.logger.info('Stopped WorkerManager');
104+
this._running = false;
105+
this._destroyed = true;
106+
this.logger.info('Destroyed WorkerManager');
68107
}
69108

70109
public async call<T>(f: (worker: ModuleThread<W>) => Promise<T>): Promise<T> {
71-
if (!this._started) {
72-
throw new errors.ErrorWorkerManagerNotStarted();
110+
if (!this._running) {
111+
throw new errors.ErrorWorkerManagerNotRunning();
73112
}
74113
return await this.pool.queue(f);
75114
}
76115

77116
public queue<T>(
78117
f: (worker: ModuleThread<W>) => Promise<T>,
79118
): QueuedTask<ModuleThread<W>, T> {
80-
if (!this._started) {
81-
throw new errors.ErrorWorkerManagerNotStarted();
119+
if (!this._running) {
120+
throw new errors.ErrorWorkerManagerNotRunning();
82121
}
83122
return this.pool.queue(f);
84123
}
85124

86125
public async completed(): Promise<void> {
87-
if (!this._started) {
88-
throw new errors.ErrorWorkerManagerNotStarted();
126+
if (!this._running) {
127+
throw new errors.ErrorWorkerManagerNotRunning();
89128
}
90129
return await this.pool.completed();
91130
}
92131

93132
public async settled() {
94-
if (!this._started) {
95-
throw new errors.ErrorWorkerManagerNotStarted();
133+
if (!this._running) {
134+
throw new errors.ErrorWorkerManagerNotRunning();
96135
}
97136
return await this.pool.settled();
98137
}

src/WorkerManagerInterface.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@ import type { ModuleMethods } from 'threads/dist/types/master';
33
import type { QueuedTask } from 'threads/dist/master/pool-types';
44

55
interface WorkerManagerInterface<W extends ModuleMethods> {
6-
start(options: {
7-
workerFactory: () => Promise<ModuleThread<W>>;
8-
cores?: number;
9-
}): Promise<void>;
10-
stop(): Promise<void>;
6+
destroy(): Promise<void>;
117
call<T>(f: (worker: ModuleThread<W>) => Promise<T>): Promise<T>;
128
queue<T>(
139
f: (worker: ModuleThread<W>) => Promise<T>,

src/errors.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@ import { CustomError } from 'ts-custom-error';
22

33
class ErrorWorkerManager extends CustomError {}
44

5-
class ErrorWorkerManagerNotStarted extends ErrorWorkerManager {}
5+
class ErrorWorkerManagerNotRunning extends ErrorWorkerManager {}
66

7-
export { ErrorWorkerManager, ErrorWorkerManagerNotStarted };
7+
export { ErrorWorkerManager, ErrorWorkerManagerNotRunning };

tests/WorkerManager.test.ts

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,41 +9,39 @@ describe('WorkerManager', () => {
99
const logger = new Logger('WorkerManager Test', LogLevel.WARN, [
1010
new StreamHandler(),
1111
]);
12-
test('construction has no side effects', async () => {
13-
const workerManager = new WorkerManager<WorkerModule>({ logger });
14-
expect(workerManager.call(async () => undefined)).rejects.toThrow(
15-
errors.ErrorWorkerManagerNotStarted,
16-
);
17-
});
18-
test('async start and async stop', async () => {
19-
const workerManager = new WorkerManager<WorkerModule>({ logger });
20-
await workerManager.start({
12+
test('async construction and async destroy', async () => {
13+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
2114
workerFactory: () => spawn(new Worker('../src/worker')),
15+
logger
2216
});
17+
expect(workerManager.running).toBe(true);
18+
expect(workerManager.destroyed).toBe(false);
2319
expect(await workerManager.call(async () => 1)).toBe(1);
24-
await workerManager.stop();
20+
await workerManager.destroy();
21+
expect(workerManager.running).toBe(false);
22+
expect(workerManager.destroyed).toBe(true);
2523
expect(workerManager.call(async () => 1)).rejects.toThrow(
26-
errors.ErrorWorkerManagerNotStarted,
24+
errors.ErrorWorkerManagerNotRunning,
2725
);
2826
});
2927
test('start with just 1 worker core', async () => {
30-
const workerManager = new WorkerManager<WorkerModule>({ logger });
31-
await workerManager.start({
28+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
3229
workerFactory: () => spawn(new Worker('../src/worker')),
3330
cores: 1,
31+
logger
3432
});
3533
expect(await workerManager.call(async () => 1)).toBe(1);
36-
await workerManager.stop();
34+
await workerManager.destroy();
3735
});
3836
test('call runs in the main thread', async () => {
3937
const mainPid1 = process.pid;
40-
const workerManager = new WorkerManager<WorkerModule>({ logger });
41-
await workerManager.start({
38+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
4239
workerFactory: () => spawn(new Worker('../src/worker')),
4340
cores: 1,
41+
logger
4442
});
45-
let mainPid2;
46-
let mainPid3;
43+
let mainPid2: number;
44+
let mainPid3: number;
4745
// Only `w.f()` functions are running in the worker threads
4846
// the callback passed to `call` is still running in the main thread
4947
expect(
@@ -54,16 +52,16 @@ describe('WorkerManager', () => {
5452
return await w.isRunningInWorker();
5553
}),
5654
).toBe(true);
57-
await workerManager.stop();
58-
expect(mainPid2).toBe(mainPid1);
59-
expect(mainPid3).toBe(mainPid1);
55+
await workerManager.destroy();
56+
expect(mainPid2!).toBe(mainPid1);
57+
expect(mainPid3!).toBe(mainPid1);
6058
});
6159
test('can await a subset of tasks', async () => {
62-
const workerManager = new WorkerManager<WorkerModule>({ logger });
6360
// Use all possible cores
6461
// if you only use 1 core, this test will be much slower
65-
await workerManager.start({
62+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
6663
workerFactory: () => spawn(new Worker('../src/worker')),
64+
logger
6765
});
6866
const task = workerManager.call(async (w) => {
6967
return await w.sleep(500);
@@ -82,14 +80,14 @@ describe('WorkerManager', () => {
8280
expect(rs.every((x) => x === undefined)).toBe(true);
8381
const r = await task;
8482
expect(r).toBeUndefined();
85-
await workerManager.stop();
83+
await workerManager.destroy();
8684
});
8785
test('queueing up tasks', async () => {
88-
const workerManager = new WorkerManager<WorkerModule>({ logger });
8986
// Use all possible cores
9087
// if you only use 1 core, this test will be much slower
91-
await workerManager.start({
88+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
9289
workerFactory: () => spawn(new Worker('../src/worker')),
90+
logger
9391
});
9492
const t1 = workerManager.queue(async (w) => await w.sleep(500));
9593
const t2 = workerManager.queue(async (w) => await w.sleep(500));
@@ -106,13 +104,13 @@ describe('WorkerManager', () => {
106104
workerManager.queue(async (w) => await w.sleep(500));
107105
const es = await workerManager.settled();
108106
expect(es.length).toBe(0);
109-
await workerManager.stop();
107+
await workerManager.destroy();
110108
});
111109
test('zero-copy buffer transfer', async () => {
112-
const workerManager = new WorkerManager<WorkerModule>({ logger });
113-
await workerManager.start({
110+
const workerManager = await WorkerManager.createWorkerManager<WorkerModule>({
114111
workerFactory: () => spawn(new Worker('../src/worker')),
115112
cores: 1,
113+
logger
116114
});
117115
const buffer = await workerManager.call(async (w) => {
118116
// Start with a Node Buffer that is "pooled"
@@ -138,6 +136,6 @@ describe('WorkerManager', () => {
138136
return outputBuffer;
139137
});
140138
expect(buffer).toEqual(Buffer.from('hello 2'));
141-
await workerManager.stop();
139+
await workerManager.destroy();
142140
});
143141
});

0 commit comments

Comments
 (0)