Interprocess message passing is an important tool, especially when developing large complex systems like robots. ROS is commonly used for pretty much every small/medium scale robot, and it’s no different at Project MANAS where we were using ROS across the board. ROS uses a publish-subscribe paradigm for communicating between messages, and internally it uses either TCP/IP or UDP, which are both quite slow compared to using, say, the system’s shared memory.
I decided to build a small simple IPC library that uses the same pub-sub pattern, but using shared memory instead. I call this library Shadesmar, and it’s mostly used for transporting large messages (pointclouds, images) between processes and it works quite well with our pre-existing ROS codebase.
The reason I implemented the library from scratch
instead of using Boost’s message_queue
was that we needed the ability to
do memory copies from GPU to shared memory and vice-versa, without the
need to do an intermediate copy to the process’s memory space.
So we can achieve:
GPU -> Shared Memory -> GPU
instead of
GPU -> Process Memory -> Shared Memory -> Process Memory -> GPU
The implementation of Shadesmar (shm
) is quite simple. We allocate
a large chunk of system memory using the topic name as the key, and the
memory is used as a circular buffer. Each publisher (writer) writes at the
last index position, overwriting older pre-existing data. The subscribes
regularly poll to know if there’s been a new message and copy the
message if there’s a new message. Alternatively, I provide the ability
to do serialization using the excellent library msgpack.
You can find the full details of the implementation in the
GitHub repo, but moving onto the
title of the blog post: Interprocess Locks
Since multiple publishers and subscribers have the ability to read and write from the same memory buffer, there’s a possibility of data corruption if the reads and writes overlap. To prevent this, each publisher acquires an exclusive lock for the buffer, writes the data and unlock the lock. Since the subscribers are read-only, they acquire a shared lock on the buffer, read the data and unlock the lock.
Boost has an interprocess sharable mutex that can be stored on shared memory, and it is perfect for this.
Psuedo-code for reading and writing:
// buffer is the shared memory circular queue
void read(MessageType *msg, int idx) {
ipc_mut->lock_sharable();
memcpy(buffer[idx], msg, sizeof(MessageType));
ipc_mut->unlock_sharable();
}
void write(MessageType *msg) {
int idx = get_last_idx();
ipc_mut->lock();
memcpy(msg, buffer[idx], sizeof(MessageType));
ipc_mut->unlock();
}
This would work fine as long as we can assume that no process involved
in this communication will crash or be killed. This, unfortunately,
is not something we can guarantee, as simply Ctrl+C
ing a running process
will cause this to fail.
If a process dies while having exclusive or shared access to ipc_mut
and
it dies, then any other process trying to acquire this process will
starve, causing the system to halt. We could assume that if a process fails to
acquire access to ipc_mut
in a small time span, the process previously
holding ipc_mut
is now dead so we forcefully acquire access and proceed with
the read/write operation. However, this could lead to overlaps if the
operation of the previous process took longer than the time span we waited.
There are a number of ways to tackle this problem, and I’ll be first talking about my approach, followed by other approaches.
I create a new lock type called robust_ipc_mutex
that can handle the
death of underlying processes. It keeps track of PID of the last process
that acquired it, and if another process can’t acquire it, it can
check if the PID is still alive using stat()
, if it’s dead it will unlock
the mutex and reset the field of the previous owner.
class robust_ipc_mutex {
public:
robust_ipc_mutex() = default;
void lock() {
// in a spin lock until we get access
while(!mut.try_lock()) {
if (exclusive_owner != 0) {
if (proc_dead(exclusive_owner)) {
exclusive_owner = 0;
mut.unlock();
continue;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
exclusive_owner = getpid();
}
void unlock() {
exclusive_owner = 0;
mut.unlock();
}
private:
interprocess_sharable_mutex mut;
__pid_t exclusive_owner{0};
}
Although the above code looks correct, it can lead to faults by unlocking
mut
more than necessary. Assume the following order of execution by 2 processes,
when they call lock()
, and exclusive_owner
is dead.
// TIMESTEP 1
while(!mut.try_lock()) { //<-- proc 1 (enters the loop)
if (exclusive_owner != 0) { //<-- proc 2 (enters the if)
if (proc_dead(exclusive_owner)) {
exclusive_owner = 0;
mut.unlock();
continue;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
// TIMESTEP 2
while(!mut.try_lock()) {
if (exclusive_owner != 0) { //<-- proc 1 (enters the if)
if (proc_dead(exclusive_owner)) { //<-- proc 2 (enters the if, since exclusive_owner is dead)
exclusive_owner = 0;
mut.unlock();
continue;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
// TIMESTEP 3
while(!mut.try_lock()) {
if (exclusive_owner != 0) {
if (proc_dead(exclusive_owner)) { //<-- proc 1 (enters the if, if proc 1 runs before proc 2)
exclusive_owner = 0; //<-- proc 2 (might run after proc 1)
mut.unlock();
continue;
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
After timestep 3, both proc 1 and 2 call mut.unlock()
which is incorrect.
Further, exclusive_owner
is reset to 0 twice instead of once as expected. This
condition arises dues to the lack of atomicity of the operations leading to
interleaved execution. We can tackle the issue by using C++’s atomic
functionality.
Instead of simply assigning exclusive_owner = 0
, we use a CAS to ensure
atomicity across processes.
class robust_ipc_mutex {
public:
robust_ipc_mutex() = default;
void lock() {
// in a spin lock until we get access
while(!mut.try_lock()) {
if (exclusive_owner != 0) {
auto ex_proc = exclusive_owner.load(); // atomic load
if (proc_dead(ex_proc)) {
if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
// ensures that the process which we checked for liveness
// is the same as the value we're replacing
// if the condition returns false
// exclusive_owner was reset by some other process
mut.unlock();
continue;
}
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
exclusive_owner = getpid();
}
void unlock() {
auto current_pid = getpid();
if (exclusive_owner.compare_exchange_strong(current_pid, 0)) {
mut.unlock();
}
}
private:
interprocess_sharable_mutex mut;
std::atomic<__pid_t> exclusive_owner{0};
}
Using atomic
, we can guarantee that lock()
and unlock()
work even
if the process dies. This only tackles the problem of exclusive access
where only one process can hold the lock. We also need to add the
ability to have shared access (lock_sharable()
, unlock_sharable()
). This
is a little tough since we need to keep track of multiple shared
processes instead of a single exclusive process, and like above, we need
to ensure that the data structure we use to store the shared processes
are correct under concurrent use without the use of any locks. I’ll build
a simple lock-less linear set for this. This
is a great video of building a lock-free hash table which is where I started off
when learning how to build this data structure.
The set needs to be able to insert PIDs, delete PIDs, and traverse through the set.
template <uint32_t max_size>
class LocklessSet {
public:
LocklessSet();
void insert(uint32_t);
bool remove(uint32_t);
std::atomic<uint32_t> pids[max_size];
}
(NOTE: I used a linear set since max_size
is very small (32), but the
implementation can be easily expanded to make a hash-set with closed hashing.)
The constructor initializes pids
to the default value of 0
.
LocklessSet() {
std::memset(pids, 0, max_size);
}
insert()
and remove()
use atomic CAS for inserting the elements concurrently.
bool insert(uint32_t elem) {
for (uint32_t idx = 0; idx < size; ++idx) {
auto probedElem = pids[idx].load();
if (probedElem != elem) {
// The entry is either free or contains another key
if (probedElem != 0) {
continue; // contains another key
}
// Entry is free, time for CAS
// probedKey or pids[idx] is expected to be zero
uint32_t exp = 0;
if (pids[idx].compare_exchange_strong(exp, elem)) {
// successfully insert the element
return true;
} else {
// some other proc got to it before us, continue searching
continue;
}
}
// no space in the set
return false;
}
}
bool remove(uint32_t elem) {
for (uint32_t idx = 0; idx < size; ++idx) {
auto probedElem = pids[idx].load();
if (probedElem == elem) {
return pids[idx].compare_exchange_strong(elem, 0);
// if true, we successfully do a CAS and the element was deleted by current proc
// if false, some other proc deleted the element before
}
}
// we exit after doing a full pass through the array
// but we failed to delete an element, maybe already deleted
return false;
}
We can use LocklessSet
to build lock_sharable()
and unlock_sharable()
into
robust_ipc_mutex
. We also need to modify lock()
to take
the shared processes into account.
class robust_ipc_mutex {
public:
robust_ipc_mutex() = default;
void lock();
void unlock();
void lock_sharable();
void unlock_sharable();
private:
void prune_sharable_procs();
LocklessSet<32> shared_owners;
interprocess_sharable_mutex mut;
__pid_t exclusive_owner{0};
}
prune_sharable_procs()
iterates through the set and atomically removes
dead processes that are holding a shared access on the lock.
void prune_sharable_procs() {
for (auto &i : shared_owners.__array) {
uint32_t shared_owner = i.load();
if (shared_owner == 0)
continue;
if (proc_dead(shared_owner)) {
if (shared_owners.remove(shared_owner)) {
// removal of element was a success
// this ensures no duplicate deletion
mutex_.unlock_sharable();
}
}
}
}
In lock()
if we fail to get exclusive access, and exclusive_owner = 0
, we
can prune_sharable_procs()
to remove any dead processes.
void lock() {
while (!mutex_.try_lock()) {
// failed to get mutex_ within timeout,
// so mutex_ is either held properly
// or some process which holds it has died
if (exclusive_owner != 0) {
// exclusive_owner is not default value, some other proc
// has access already
auto ex_proc = exclusive_owner.load();
if (proc_dead(ex_proc)) {
// ex_proc is dead, we unlock
// and continue immediately to next loop
if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
mutex_.unlock();
continue;
}
}
} else {
// exclusive_owner = 0, so the writers are blocking us
prune_sharable_procs();
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
// only a single proc can get here at a time
exclusive_owner = getpid();
}
lock_sharable()
is similar to lock()
, but we only need to check
for exclusive_owner
.
void lock_sharable() {
while (!mutex_.try_lock_sharable()) {
// only reason for failure is that exclusive lock is held
if (exclusive_owner != 0) {
auto ex_proc = exclusive_owner.load();
if (proc_dead(ex_proc)) {
// exclusive_owner is dead
if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
exclusive_owner = 0;
mutex_.unlock();
}
}
}
std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}
// loop until we insert our
while(!shared_owners.insert(getpid()));
}
We use LocklessSet
’s remove()
to delete the current PID
when unlock_sharable()
is called.
void unlock_sharable() {
if (shared_owners.remove(getpid())) {
mutex_.unlock_sharable();
}
}
robust_ipc_mutex
can be placed in the shared memory segment that
you’re trying to synchronize.
I created a benchmark to calculate the time for various operations compared to
a basic interprocess_sharable_mutex
. robust_ipc_mutex
can handle up to
64 shared processes and TIMEOUT
is set as 1 μs.
Locking and unlocking continuously for 1000000 times:
Base mutex: 95656.8 ± 9585.832 μs
Robust mutex: 1176544.0 ± 19633.354 μs
Shared locking and unlocking continuously for 1000000 times:
Base mutex: 80629.0 ± 3651.181 μs
Robust mutex: 1172940.6 ± 7039.073 μs
robust_ipc_mutex
is 12-15x slower compared to the underlying base
mutex.
Within shadesmar
we replaced the base mutex with the robust mutex,
the number of messages of size 7 bytes written in 1 ms (1 publisher, 1 subscriber)
are reported below:
Base mutex: 206335.2 ± 5148.852
Robust mutex: 167751.4 ± 1717.818
robust_ipc_mutex
is around 1.25x slower than the base mutex.
From synthetic benchmarks, it seems that robust_ipc_mutex
is quite slow
compared to using the base mutex, but in a more realistic scenario the difference
is not found at all.
Publishing message of size 10000000 bytes from 1 publisher to 8 subscribers with message serialization (using msgpack). Below are the messages sent in 1 s:
Base mutex: 533.0 ± 34.438
Modified mutex: 521.0 ± 10.358
The difference between the two is far smaller, and since the typical message size that Shadesmar will transport will be in the order of several megabytes (uncompressed images/pointclouds/tensors), the time trade-off is worth the robustness.
This isn’t the only way to solve this problem. While researching if someone else has already tackled a similar problem, I stumbled upon other possible solutions:
POSIX’s pthread
can be made robust using pthread_mutexattr_setrobust()
,
where if a thread holding the mutex dies, a waiting thread receives
EOWNERDEAD
. The waiting thread acquires the mutex, and it is its
responsibility to make the mutex consistent using pthread_mutex_consistent()
.
By default, pthread
s can’t be used between process via shared memory,
but it can be set to work in shared memory using pthread_mutexattr_setpshared()
.
pthread
s support very basic sync mechanism and a sharable mutex should be built
using a pthread
taking EOWNERDEAD
into consideration. I’m currently working
on this as an alternative robust IPC lock, and I’ll make a blog post when I’m done.
You can read about the solution (and its failure) here.
Using a centralized process for tracking processes. We register
each process using the mutex with the master
. The maser
creates a
non-blocking TCP socket with a registering process. The master
will
try to read data from the socket. If the other process is read, it reads 0 bytes.
If the other process is alive, it gets EWOULDBLOCK
. The master
would regularly
check that all registered processes are alive, and clean up any dead process
and make the mutex consistent.
The header file with the actual implementation can be found here: robust_lock.h
Discussion: Hacker News, Lobsters
Written on October 10th, 2019 by Dheeraj R. Reddy