In the previous article, we talked about Atomics in Node.js and the problems they solve in multithreaded programs. While Atomics
API is powerful, it is not always convenient to work with it simply because it is just too low level.
Other programming languages like Golang have higher-level synchronization primitives built into their standard libraries to control access to shared resources between multiple threads.
Although JavaScript doesn't have any of such primitives except Atomics
, we can build them from scratch based on what we see in other languages.
In this article, we'll do exactly that. We'll implement two of the most popular synchronization primitives: Semaphore and Mutex. We'll explore the differences between these primitives and discuss when to use each one.
Understanding what critical section is
One of the main problems in multithreaded programs is shared resources that can be directly accessed from multiple threads. This is where the concept of a critical section becomes increasingly important.
Any code that accesses the shared resources is considered a critical section. Note that not only write operations but even read operations can be considered as a part of a critical section.
It is important to clearly identify a critical section to write code that runs safely in a multithreaded environment.
Let's look at a code example from the previous article about Atomics
in Node.js.
import { isMainThread, Worker, workerData, threadId } from 'node:worker_threads';
if (isMainThread) {
const buffer = new SharedArrayBuffer(10);
new Worker(import.meta.filename, { workerData: buffer });
new Worker(import.meta.filename, { workerData: buffer });
} else {
// Critical section
const typedArray = new Int8Array(workerData);
const value = Atomics.store(typedArray, 0, threadId);
console.dir({ threadId, value });
// End of critical section
}
In this example, we can consider the whole else
block as a critical section, and here is why:
We create a view into the shared resources, which is basically an indicator of intention to interact with the shared resources. If you're not familiar with views, I recommend reading one of the previous articles about what buffer views are and how to use them in JavaScript.
We directly mutate the shared buffer using
Atomics.store
function.We read from the buffer and print this value into the console.
If we're not modifying the shared buffer inside of a worker thread, then it is not considered a critical section simply because there is no way we can run into race conditions based on such logic.
I think you'll agree with me that it is kind of hard to track such critical sections simply by the type of operations we perform on a specific set of resources.
In other languages, it is fairly easy to mark some sections as critical using thread locks. Let's see how it works and compare it to Node.js.
Compare thread locks from Golang with Node.js
The problem of access to shared resources across multiple execution contexts is not new, and other languages and platforms have their own way of dealing with this problem. Let's look at an example of how we can do it in Golang.
Thread locks in Golang
Here's an example of how Golang uses a mutex to protect shared resources:
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key]++
c.mu.Unlock()
}
Even if you're not familiar with Golang, the logic should be pretty clear:
c.mu.Lock()
marks the beginning of a critical sectionc.v[key]++
is the actual operation over the shared resourcesc.mu.Unlock()
marks the end of a critical section
When you look at this code, it is fairly easy to identify where operations over shared resources are taking place. You don't have to think about what kind of operations you're running and over what resources.
Atomics in Node.js
It is possible to create such locks in Node.js simply because of how JavaScript and Node.js as a is designed. The closest thing we can get is to mark such sections with Atomics
.
import { isMainThread, Worker, workerData, threadId } from 'node:worker_threads';
if (isMainThread) {
const buffer = new SharedArrayBuffer(12);
new Worker(import.meta.filename, { workerData: buffer });
new Worker(import.meta.filename, { workerData: buffer });
} else {
const typedArray = new Int32Array(workerData);
const itemIndex = 0;
if (threadId !== 1) {
Atomics.wait(typedArray, itemIndex, 0);
}
const value = Atomics.store(typedArray, itemIndex, threadId);
Atomics.notify(typedArray, itemIndex);
console.dir({ threadId, value });
}
As you can see, it is not even close to Golang. There are simply too many things you have to be aware of to make things work properly in a very basic use case.
Use
Atomics.wait
everytime to stop a thread execution until some condition has been met.Create
Int32Array
to manage concurrent access to shared resources.Call
Atomics.notify
to let other sleeping threads know they can proceed with execution.
From one side, it might look like a reasonable limitation for high-level language that runs in an end-user environment such as browser.
However, when comparing such API with what Golang or other languages have to offer, it is not nearly as convenient as it could be.
We can fix that by creating similar abstractions from other languages ourselves. While they still may not be 100% perfect because of the language and the platform limitations, we can get pretty close to what we see in other languages.
That said, let's move forward and implement 2 of the most popular synchronization primitives: Semaphore and Mutex.
Building Semaphore and Mutex
Both semaphores and mutexes are meant to solve the same problem - limit access to shared resources. At the same time, there are nuances in how they are doing so which we'll dive into next.
Semaphore
In simple words, semaphore is an abstraction that allows multiple threads to work with the same shared resources. Semaphores are based on a counting mechanism. You can specify how many threads you're willing to have access to the shared resources.
It could be as little as 1 or as many as you want. Semaphore where only 1 thread is allowed to access the resources is called a binary semaphore.
Let's walk through the creation of semaphore step by step to better understand how it works. First, we create the class.
class Semaphore {
constructor(buffer, maxCount) {
this.typedArray = new Int32Array(buffer);
}
}
We want to pass only 2 arguments: an instance of SharedArrayBuffer
and the number which sets the limit to the number of threads that can access the buffer at the same time.
Here is how we'll use the Semaphore
class:
import { Worker, isMainThread, workerData, threadId } from 'node:worker_threads';
if (isMainThread) {
const buffer = new SharedArrayBuffer(12);
const semaphore = new Semaphore(buffer, 5);
new Worker(import.meta.filename, { workerData: buffer });
new Worker(import.meta.filename, { workerData: buffer });
} else {
const semaphore = new Semaphore(workerData);
// Rest of the code
}
We create Semaphore
instance in every thread simply because there is no way to share the same object reference across multiple threads by design.
The difference here is how we create Semaphore
instance. We only want to pass the maxCount
argument in the main thread because only the main thread must dictate how many threads can access the underlying buffer.
The next step is to set up the shared counter:
class Semaphore {
constructor(buffer, maxCount) {
this.typedArray = new Int32Array(buffer);
if (maxCount !== undefined) {
Atomics.store(this.typedArray, 0, maxCount);
}
}
}
The idea here is to have the first element of the array as some sort of counter. Whenever a new thread comes in, we want to decrement this counter. When this thread releases the semaphore, we want to increment this counter.
It is important to have this counter as the first element of the buffer because the buffer is the only thing that is shared between all threads and changes made in one thread are visible in the other.
It is time to implement the acquire
method. When using this method, the shared counter should be decremented. If the number of threads that can access the shared resources has reached the limit, we want to completely stop the thread execution until the semaphore signals that we can move forward.
class Semaphore {
constructor(buffer, maxCount) {
this.typedArray = new Int32Array(buffer);
if (maxCount !== undefined) {
Atomics.store(this.typedArray, 0, maxCount);
}
}
acquire() {
while (true) {
const value = Atomics.load(this.typedArray, 0);
if (value === 0) {
Atomics.wait(this.typedArray, 0, 0);
continue;
}
if (Atomics.compareExchange(this.typedArray, 0, value, value - 1) === value) {
return;
}
}
}
}
Let's walk through the implementation step by step and understand how it works.
We'll leave the while loop at the end.
After we enter the loop, the first thing we do is load the current counter using Atomics.load
function.
If the counter is 0
it means that there is no room for one more thread to access the resources. Therefore, we want to wait until the counter has a value higher than 0
. We're doing so by using Atomics.wait
function.
If the value is not 0
, we want to ensure that the value we've loaded with Atomics.load
function is indeed the previous value that the counter contains.
Using Atomics.compareExchange
we only replace a new value value - 1
at a given index if the expected value value
is equal to the value stored in the counter at the moment of calling this function.
The reason why we're doing so is because multiple threads can go through the same steps at the same time. They can enter the function at the same time, load the value at the same time, and start the compareExchange
function at the same time.
function acquire() {
while (true) {
// Multiple threads can load the
// current counter at the same time
const value = Atomics.load(this.typedArray, 0);
// Those multiple threads can then run
// this check at the same time and pass it
if (value === 0) {
Atomics.wait(this.typedArray, 0, 0);
continue;
}
// However, only one of those threads can actually set
// the value using `compareExchange` at a time
if (Atomics.compareExchange(this.typedArray, 0, value, value - 1) === value) {
return;
}
}
}
Even when multiple threads are running compareExchange
, only one of them can actually write the value. After the first thread is done writing the value, all the others will fail to match the condition in the if
block.
After that, the while
loop comes into play. We want to repeat this action until all the cases match the if
block condition and exit the acquire
function.
It might sound a bit strange that we need to run an infinite loop to keep things working, but threads spend most of the time in sleep mode, so there is no heavy load on the CPU.
Coming back to implementation. The last step would be to implement the release
method.
export class Semaphore {
constructor({ buffer, maxCount }) {
this.typedArray = new Int32Array(buffer);
if (maxCount != null ) {
this.maxCount = maxCount;
Atomics.store(this.typedArray, 0, this.maxCount);
}
}
acquire() {
while (true) {
const value = Atomics.load(this.typedArray, 0);
if (value === 0) {
Atomics.wait(this.typedArray, 0, 0);
continue;
}
if (Atomics.compareExchange(this.typedArray, 0, value, value - 1) === value) {
return;
}
}
}
release() {
Atomics.add(this.typedArray, 0, 1);
Atomics.notify(this.typedArray, 0, 1);
}
}
Whenever the release
method is called we increment the shared counter by 1
and notify only a single sleeping thread that the value has been changed, and it can try to access the shared resources.
The semaphore is completed, and we can now use it.
import { isMainThread, Worker, workerData, threadId } from 'node:worker_threads';
if (isMainThread) {
const buffer = new SharedArrayBuffer(10);
const semaphore = new Semaphore(buffer, 1);
new Worker(import.meta.filename, { workerData: buffer });
new Worker(import.meta.filename, { workerData: buffer });
} else {
const typedArray = new Int8Array(workerData);
const semaphore = new Semaphore(workerData);
// Critical section
semaphore.acquire();
typedArray[4] = threadId;
console.dir({ threadId, value: typedArray[4] });
semaphore.release();
// End of critical section
}
Notice how we're using it. It is almost the same API as we've seen in Golang. Just by calling acquire
and release
functions, we can clearly mark the boundaries of a critical section.
And the best part is that we don't even need to explicitly use Atomics
. Semaphore
guarantees a thread-safe environment in between acquire
and release
calls.
Of course, if we increase the number of maximum threads that can operate over the resources from one to 2 or more, then we're still prone to race conditions.
Mutex
Mutex stands for "mutual exclusion". It is a synchronization mechanism that allows only one thread to access shared resources at a time, unlike semaphore, where we can configure a number of threads that have access to the shared resources.
Another feature of mutex is ownership. If a thread locks a mutex, it means that this thread becomes the owner of the shared resources, and only this thread can unblock other threads and make the shared resources available.
When working with semaphores, it is not important who is unblocking the locked resources.
The last thing to mention is that mutex is generally easier to manage, because it is always about binary state. On the other hand, the more threads we allow semaphore to have the more complexity we introduce into a system.
Since mutex and semaphore share a similar goal - to limit access to the shared resources, their implementation would be quite similar except for some details that we just mentioned.
We'll start with the constructor.
class Mutex {
constructor(buffer) {
this.typedArray = new Int32Array(buffer);
this.isOwner = false;
}
}
We're not passing the maxCount
argument because mutex is always binary. We're also adding a new property isOwner
to keep track of the mutex ownership.
The next step is to implement the lock
method.
const unlocked = 0;
const locked = 1;
class Mutex {
constructor(buffer) {
this.typedArray = new Int32Array(buffer);
this.isOwner = false;
}
lock() {
while (true) {
const value = Atomics.load(this.typedArray, 0);
if (value === locked) {
Atomics.wait(this.typedArray, 0, locked);
continue;
}
if (Atomics.compareExchange(this.typedArray, 0, unlocked, locked) === unlocked) {
this.isOwner = true;
return;
}
}
}
}
It is pretty similar to the semaphore's implementation of acquire
method. Except we're setting the isOwner
property to true
and implementing our logic in a way where only 2 possible states are allowed: locked
and unlocked
.
The final step is to implement the unlock
method.
const unlocked = 0;
const locked = 1;
class Mutex {
constructor(buffer) {
this.typedArray = new Int32Array(buffer);
this.isOwner = false;
}
lock() {
while (true) {
const value = Atomics.load(this.typedArray, 0);
if (value === locked) {
Atomics.wait(this.typedArray, 0, locked);
continue;
}
if (Atomics.compareExchange(this.typedArray, 0, unlocked, locked) === unlocked) {
this.isOwner = true;
return;
}
}
}
unlock() {
if (!this.isOwner) {
throw new Error('Thread that tries to unlock the mutex is not the owner of the mutex');
}
Atomics.store(this.typedArray, 0, unlocked);
Atomics.notify(this.typedArray, 0, 1);
this.isOwner = false;
}
}
The crucial part here is to check whether the current thread is the owner of the mutex before unlocking it. We're throwing an error if that's not the case, because thread is not allowed to unlock the mutex it doesn't own.
Here is how you use it:
import { isMainThread, Worker, workerData, threadId } from 'node:worker_threads';
if (isMainThread) {
const buffer = new SharedArrayBuffer(10);
new Worker(import.meta.filename, { workerData: buffer });
new Worker(import.meta.filename, { workerData: buffer });
} else {
const typedArray = new Int8Array(workerData);
const mutex = new Mutex(workerData);
// Critical section start
mutex.lock();
typedArray[4] = threadId;
console.dir({ threadId, value: typedArray[4] });
mutex.unlock();
// End of critical section
}
Notice that we're not creating a mutex inside of the main thread as we did with semaphore.
Conclusion
We've covered a lot in this article:
What is the critical section, and how does it look like Node.js
How other programming languages work with critical sections and shared resources across multiple threads
Implemented custom
Semaphore
andMutex
classes in Node.js to abstract from low-levelAtomics
API work
While things like Semaphore
and Mutex
definitely makes our lives easier, but they also create new problems like deadlocks and livelocks.
In the upcoming article, we'll see how we can run into common multithreading problems in Node.js and how to solve them.