Worker
Web Worker
线程池
线程池中线程的数量到底多少合适并没有标准答案,但navigator.hardwareConcurrency
属性(系统可用的核心数量)是个不错的参考。
线程池用于每个线程执行同样任务时的一种简单实现需要定义TaskWorker
类及其管理类WorkerPool
。
ts
class TaskWorker extends Worker {
/** 告诉线程池的当前线程可用的方法 */
notifyAvailable: () => void;
/** 可用标识 */
available: boolean = false;
resolve: null = null;
reject: null = null;
constructor(
notifyAvailable: () => void,
...workerArgs: ConstructorParameters<typeof Worker>
) {
super(...workerArgs);
this.notifyAvailable = notifyAvailable;
this.onmessage = () => this.setAvailable();
}
dispatch({
resolve,
reject,
postMessageArgs,
}: {
resolve: (data: any) => void;
reject: (ev: ErrorEvent) => void;
postMessageArgs: Parameters<Worker["postMessage"]>;
}) {
this.available = false;
this.onmessage = ({ data }) => {
resolve(data);
this.setAvailable();
};
this.onerror = (e) => {
reject(e);
this.setAvailable();
};
this.postMessage(...postMessageArgs);
}
setAvailable() {
this.available = true;
this.resolve = null;
this.reject = null;
this.notifyAvailable();
}
}
export { TaskWorker };
class TaskWorker extends Worker {
/** 告诉线程池的当前线程可用的方法 */
notifyAvailable: () => void;
/** 可用标识 */
available: boolean = false;
resolve: null = null;
reject: null = null;
constructor(
notifyAvailable: () => void,
...workerArgs: ConstructorParameters<typeof Worker>
) {
super(...workerArgs);
this.notifyAvailable = notifyAvailable;
this.onmessage = () => this.setAvailable();
}
dispatch({
resolve,
reject,
postMessageArgs,
}: {
resolve: (data: any) => void;
reject: (ev: ErrorEvent) => void;
postMessageArgs: Parameters<Worker["postMessage"]>;
}) {
this.available = false;
this.onmessage = ({ data }) => {
resolve(data);
this.setAvailable();
};
this.onerror = (e) => {
reject(e);
this.setAvailable();
};
this.postMessage(...postMessageArgs);
}
setAvailable() {
this.available = true;
this.resolve = null;
this.reject = null;
this.notifyAvailable();
}
}
export { TaskWorker };
ts
import { TaskWorker } from "./TaskWorker";
class WorkerPool {
taskQueue: Parameters<TaskWorker["dispatch"]>[0][] = [];
workers: TaskWorker[] = [];
constructor(
poolSize: number,
...workerArgs: ConstructorParameters<typeof Worker>
) {
for (let index = 0; index < poolSize; index++) {
this.workers.push(
new TaskWorker(this.dispatchIfAvailable, ...workerArgs)
);
}
}
/**
* 任务推入队列
* @param postMessageArgs 待发送消息
*/
enqueue(...postMessageArgs: Parameters<Worker["postMessage"]>) {
return new Promise((resolve, reject) => {
this.taskQueue.push({
resolve,
reject,
postMessageArgs,
});
this.dispatchIfAvailable();
});
}
/**
* 把任务推送给下一个空闲的线程
*/
dispatchIfAvailable() {
if (!this.taskQueue.length) {
return;
}
for (const worker of this.workers) {
if (worker.available) {
let task = this.taskQueue.shift();
worker.dispatch(task);
break;
}
}
}
close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
export { WorkerPool };
import { TaskWorker } from "./TaskWorker";
class WorkerPool {
taskQueue: Parameters<TaskWorker["dispatch"]>[0][] = [];
workers: TaskWorker[] = [];
constructor(
poolSize: number,
...workerArgs: ConstructorParameters<typeof Worker>
) {
for (let index = 0; index < poolSize; index++) {
this.workers.push(
new TaskWorker(this.dispatchIfAvailable, ...workerArgs)
);
}
}
/**
* 任务推入队列
* @param postMessageArgs 待发送消息
*/
enqueue(...postMessageArgs: Parameters<Worker["postMessage"]>) {
return new Promise((resolve, reject) => {
this.taskQueue.push({
resolve,
reject,
postMessageArgs,
});
this.dispatchIfAvailable();
});
}
/**
* 把任务推送给下一个空闲的线程
*/
dispatchIfAvailable() {
if (!this.taskQueue.length) {
return;
}
for (const worker of this.workers) {
if (worker.available) {
let task = this.taskQueue.shift();
worker.dispatch(task);
break;
}
}
}
close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
export { WorkerPool };
Shared Worker
连接
ts
const connectedPorts = new Set();
self.onconnect = ({ ports }) => {
connectedPorts.add(ports[0]);
};
const connectedPorts = new Set();
self.onconnect = ({ ports }) => {
connectedPorts.add(ports[0]);
};