Concurrency in JavaScript and the power of Web Workers

Concurrency in JavaScript and the power of Web Workers

Introduction

Recently I stumbled upon a problem while working on a browser application I was building where it seemed I ran into the limits of working in JavaScript’s single threaded execution environment.

My application needed to process potentially thousands of files all in the browser to scan their metadata to save locally to IndexedDB. The problem was that the library function I was using to process the files with was quite an expensive call which consequently blocked the browser and most of the interactivity with the UI. This is not acceptable in a UI where the user may want to continue using the application in a performant way or in my case the ability to cancel the file processing.

Now the first thing all JavaScript developers learn is that JavaScript runs in a single execution/threaded environment. Whether they are starting out in Node.js or writing front-end browser applications we are told that this is by design as the event loop (in Node.js) or the browser engine will handle long running tasks and queue them accordindly. This is the reason the page blocks on expensive synchronous tasks.

Since I needed to process the files locally, and uploading them to a server to process them was not an option, this is where I thought of Web Workers. They allow you to launch a seperate thread where you can run your expensive function calls freeing the main window thread.

Web Workers Introduction

Here are the basics of using web workers in the browser with an implmentation left out:

// main.ts

// Create a new worker
const worker = new Worker(my-worker.js);

// Listen for responses from the worker
worker.addEventListener(message, event => {
const data = event.data as MyResponse;

// do something with the response
});

let message: MyCustomMessage;

// …

// Sending a message to the worker thread
worker.postMessage(message);

// … later in the program
worker.terminate();

// my-worker.ts

addEventListener(message, event => {
const data = event.data as MyMessage

// …process result

let response: MyCustomRepsonse;

// Send the response back to the main window thread
postMessage(response);
});

The best way to think about Web Workers and how to utilize them is to think of them as just a plain old function. You send the worker a message (function arguments), the worker processes the input (worker file), then returns a response (function return statement).

You can combine the web worker creation, sending, and message receiving into one function that returns a promise. Or you can launch a web worker that sits in the background waiting for messages. We should note that initiating a web worker (new Worker(…)) is a costly operation but either way just make sure we terminate the worker after use.

Here is a example of using a Web Worker that checks if a number is prime of not.

// main.ts

function isPrime(value: number) {
return new Promise<boolean>(resolve => {
const worker = new Worker(is-prime-worker.js);

worker.addEventListener(message, event => {
const data = event.data as boolean;

resolve(data);

worker.terminate();
});

worker.postMessage(value);
});
}

// is-prime-worker.ts

addEventListener(message, event => {
const data = event.data as number;

postMessage(isPrime(data));
});

function isPrime(value: number) {
for (let i = 2, s = Math.sqrt(num); i <= s; i++) {
if (num % i === 0) return false;
}
return num > 1;
}

Concurrency

Concurrency is therefore possible in JavaScript if we use multiple web workers, with the main window thread acting as the communicator for all worker threads.

When building my application I needed to send an array of ArrayBuffer’s to the web workers for them to process and send back a result.

Since the time to process is dependent on an external thread we cannot use a loop but a recursive function. What I mean is the worker.onmessage event will how we trigger the next item to be processed. Let me explain below:

Concurrent web worker setup

First of all we need a pool of workers. The first question I had was how many to create? Well for optimal performance we should utilize all threads the user has available. Luckily the browser provides a global property for this and can be accessed via navigator.hardwareConcurrency. Since we already have one being used and we wish not to block it, use navigator.hardwareConcurrency – 1.

const MAX_WORKERS = navigator.hardwareConcurrency 1;

Now let’s create the pool with each worker having a flag that specifies when it is currently processing.

interface WorkerPoolItem {
value: Worker;
isProcessing: boolean;
}

const pool: WorkerPoolItem[] = [];

for (let i = 0; i < MAX_WORKERS; i += 1) {
const worker: WorkerPoolItem = {
isProcessing: false,
value: new Worker(workerURL),
};

pool.push(worker);
}

Now we will create a function called batchProcessor. This function will be responsible for finding free workers in the pool and sending work to be processed.

function batchProcessor<T>(
pool: WorkerPoolItem[],
work: T[],
onComplete: () => void,
) {
const batch: [T, WorkerPoolItem][] = [];

// find available workers
const availableWorkers =
pool.filter(worker => !worker.isProcessing);

// assign work to available workers
for (const worker of availableWorkers) {
const value = work.pop();

// if value is undefined this means
// there is no more work to do
if (value === undefined) break;

batch.push([value, worker]);
}

// no work left?
if (batch.length === 0) {
// are any workers still processing?
if (pool.every(worker => !worker.isProcessing)) {
// now we are done
onComplete();
}

// always return as there is no more work to do
return;
}

// send work to workers
for (const item of batch) {
const [value, worker] = item;

// mark worker as processing
worker.isProcessing = true;

// send work to worker
worker.value.postMessage(value);
}
}

The first time this function is called on the pool it will get all workers started with work to do. As we will get to see this function will be called recursively and it has to handle the initial case (where all workers are free) and the ending case (where there is more workers free than work left to do). It also handles another scenario which will get into where we may want to do some processing before we send out a message to a worker and during this time multiple workers may have completed and we have a batch of work to send out.

Combining the code for the worker pool initilization and the batchProcessor function we get a function concurrentProcessor that processes an array of arbitrary values by concurrently sending it to a pool of workers.

async function concurrentProcessor<T, V>(
workerURL: string,
work: T[],
onProcess: (value: V) => void,
) {
const pool: WorkerPoolItem[] = [];

try {
// promisify the recursive function
await new Promise<void>(resolve => {
// initialize worker pool
for (let i = 0; i < MAX_WORKERS; i += 1) {
const worker: WorkerPoolItem = {
isProcessing: false,
value: new Worker(workerURL),
};

// on recieve message from worker
worker.value.addEventListener(message, event => {
onProcess(event.data as V);

worker.isProcessing = false;

// Since a worker has just completed processing
// a value, we can start processing another batch
batchProcessor(pool, work, resolve);
});

pool.push(worker);
}

// start processing first batch of work
batchProcessor(pool, work, resolve);
});
} finally {
// terminate workers
for (const worker of pool) {
worker.value.terminate();
}
}
}

Final Implementation

This implementation extends the functionality a bit further by allowing us to process the value before sending it out to the worker. In my case I needed to send the ArrayBuffer of a File object to the worker but I did not want to have an array of ArrayBuffer’s as this would use far to much memory.

// to be passed into options.onRead
function onRead(file: File) {
return file.arrayBuffer();
}
const MAX_WORKERS = navigator.hardwareConcurrency 1;

export async function concurrentProcessor<T, V, R = T>(
options: ConcurrentProcessorOptions<T, V, R>,
) {
const pool: WorkerPoolItem[] = [];

try {
// promisify the recursive function
await new Promise<void>(resolve => {
// initialize worker pool
for (let i = 0; i < MAX_WORKERS; i += 1) {
const worker: WorkerPoolItem = {
isProcessing: false,
value: new Worker(options.workerURL, options.workerOptions),
};

worker.value.addEventListener(message, event => {
options.onProcess(event.data as V);

worker.isProcessing = false;

// Since a worker has just completed processing
// a value, we can start processing another batch
void batchProcessor<T, V, R>(pool, options, resolve);
});

pool.push(worker);
}

// Start processing first batch of work
void batchProcessor<T, V, R>(pool, options, resolve);
});
} finally {
// terminate workers
for (const worker of pool) {
worker.value.terminate();
}
}
}

async function batchProcessor<T, V, R = T>(
pool: WorkerPoolItem[],
options: ConcurrentProcessorOptions<T, V, R>,
onComplete: () => void,
) {
const batch: [T, WorkerPoolItem][] = [];

// find available workers
const availableWorkers = pool.filter(worker => !worker.isProcessing);

// assign work to available workers
for (const worker of availableWorkers) {
const value = options.values.pop();

// if value is undefined this means there is no more work to do
if (value === undefined) break;

batch.push([value, worker]);
}

// no work left?
if (batch.length === 0) {
// are any workers still processing?
if (pool.every(worker => !worker.isProcessing)) {
// now we are done
onComplete();
}

// always return as there is no more work to do
return;
}

// mark workers as processing
// we should do this here as below we may have to wait for promises to resolve
// and we don’t want another batch to be processed in the meantime
for (const item of batch) {
const worker = item[1];

worker.isProcessing = true;
}

let values: [R, WorkerPoolItem][] = [];

if (options.onRead) {
const promises: [Promise<R>, WorkerPoolItem][] = [];

for (const item of batch) {
const [value, worker] = item;

const result = options.onRead(value);

if (result instanceof Promise) {
promises.push([result, worker]);
} else {
values.push([result, worker]);
}
}

if (promises.length > 0) {
const promiseValues = await Promise.all(promises.map(([promise]) => promise));

for (let i = 0; i < promiseValues.length; i += 1) {
// @ts-expect-error
values.push([promiseValues[i], promises[i][1]]);
}
}
} else {
// if there is no onRead function, it just passes the values through
values = batch as unknown as [R, WorkerPoolItem][];
}

// send work to workers
for (const item of values) {
const [value, worker] = item;

if (options.transfer && isTransferable(value)) {
worker.value.postMessage(value, [value]);
} else {
worker.value.postMessage(value);
}
}
}

function isTransferable(value: unknown): value is Transferable {
return (
value instanceof ArrayBuffer ||
value instanceof OffscreenCanvas ||
value instanceof ImageBitmap ||
value instanceof MessagePort ||
value instanceof ReadableStream ||
value instanceof WritableStream ||
value instanceof TransformStream ||
value instanceof VideoFrame
);
}

interface WorkerPoolItem {
value: Worker;
isProcessing: boolean;
}

export interface ConcurrentProcessorOptions<T, V, R = T> {
workerURL: string;
workerOptions?: WorkerOptions;
transfer?: boolean;
values: T[];
onRead?: (value: T) => Promise<R> | R;
onProcess: (value: V) => void;
}

Leave a Reply

Your email address will not be published. Required fields are marked *