Michal Kohutek

A class to conveniently perform tasks on a secondary thread

How to use a single command to perform an action asynchronously

The arrival of C++ 11 has brought threading support to standard C++ libraries as well as many useful tools to manage concurrency without going into low level details. It is very useful for implementing algorithms using parallel for and such, but it’s not well suited for doing many operations asynchronously – one always has to manage the thread objects created. It can be anoying when doing slow I/O operations all around the program and it’s not essential to have the results any time soon, ignoring them or saving them somewhere using callbacks.

To deal with this problem, I decided to create a small library implementing a class that holds a worker thread that executes all work it’s assigned by other threads, in the form of lambdas. For maximum convenience and efficiency, I decided to create it in a way to support adding tasks from any thread and to do it without having the worker thread poll for work and cause delays and needless wakeups.

External interface

It should be usable with as little code as possible, so I decided to have it use a constructor that takes no arguments, keep up with the RAII principle and to have only one function to add work. I called it addTask().

Because it’s not clear if it should exit as soon as possible when destroyed or finish the work it’s been given, I also gave it a discardTasks() method that can be called before the destructor to accelerate the exit.

Implementation: First try

Some parts of the problem’s solution are obvious. The tasks can be represented as std::function<void()> objects and stored in a mutex-protected std::vector.

The thread waiting can be implemented using a mutex. Attempting to lock an already locked std::mutex causes the thread to wait until another thread unlocks it.

The worker thread can therefore check for work and if no work is needed, it locks a mutex. Doing this twice causes it to try to lock a mutex that’s already locked and get blocked. Other threads can unblock it by unlocking the mutex. It’s not possible to get the worker stalled by adding work after the worker thread checks there is no work and unblocking the thread before it gets blocked, because it has to find there’s no work twice in a row to get blocked.

The code for blocking/unblocking was fairly simple:

void wait() {
	if (!checkForWork())
		lock_.lock();
}

void awaken() {
	lock_.unlock();
}

However, there’s a little problem. The mutex is unlocked from a thread that does not own it and it needs additional synchronisation to prevent the code from unlocking it twice. It turned out that all worked as it should have when compiling using GCC and CLang on Linux or Windows. However, it was throwing exceptions when compiled with MSVC (which is the correct behaviour according to the standard). It had to be done in a different way.

Implementation: Second try

There’s no good way to do it with a mutex without unlocking it from a thread that doesn’t own it. So I had to use another synchronisation tool: a coondition variable. According to the standard, it should unlock a mutex and start waiting, awakening and reacquiring the mutex when the condition variable is used to send a signal.

It looks like a solution:

void wait() {
	if (!checkForWork())
		condvar_.wait(lock_);
}

void awaken() {
	condvar_.notify_one();
}

However, there is another problem. It can lead to this sequence of events:

  1. Worker thread checks if there is work and gets false
  2. Calling thread adds a new task
  3. Calling thread notifies the condition variable
  4. Worker thread starts waiting for a signal that never comes

In that case, the worker thread will be stuck until it’s given another task. The mutex that protects the vector that holds the tasks cannot be used to prevent this behaviour, because the worker cannot hold the mutex when starting the wait().

Implementation: First published version

It’s possible to allow other threads to check if the worker thread is in the critical section between the check for data and the awakening call simply by protecting this section by a mutex and having other threads try to lock it and try to do something.

After a lot of thinking, I found out how to get this working. If the lock is successful, it means the worker is not in the critical section and will prevent entering it until done. If the lock is not successful, the thread has to try to lock the mutex that’s unlocked by the condition variable’s wait, thus waiting for the other thread to actually get blocked before unblocking it. This path is not particularly fast, but it’s unlikely enough to actually happen. Phew!

void wait() {
	std::unique_lock<std::mutex> lock(sleepMutex_);
	if (!checkForWork())
		condvar_.wait(workingLock_); // it's a lock wrap for lock_
}

void awaken() {
	std::unique_lock<std::mutex> lock(sleepMutex_, std::defer_lock);
	if (!lock.try_lock()) {
		std::unique_lock<std::mutex> innerLock(lock_);
		workingCondvar_.notify_one();
	}
}

Implementation: Now less complicated

Shortly after I published this, a user on Reddit noticed that the solution so far is needlessly complicated because the mutex that protects the tasks can also be used prevent other threads from signalling the worker thread between the check and the wait.

void wait() {
	std::unique_lock<std::mutex> lock(mutex_);
	if (!checkForWork()) {
		// Cannot receive signal here, mutex is locked
		condvar_.wait(lock);
	}
}

void addTask(std:.function<void()> task) {
	std::unique_lock<std::mutex> lock(mutex_);
	tasks_.push_back(task);
	condvar_.notify_one();
}

Usage

I called the object SeasonalWorker because it’s like a worker thread that works only sometimes. Its usage is as simple as intended. Just create the object, have RAII handle everything, just call addTask() to give it some work.

SeasonalWorker worker;

worker.addTask([] () {
	std::this_thread::sleep_for(std::chrono::milliseconds(200));
	std::cout << "First task done" << std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
worker.addTask([] () {
	std::this_thread::sleep_for(std::chrono::milliseconds(200));
	std::cout << "Second task done" << std::endl;
});		std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "All is over" << std::endl;

The entire source code is at my github.

Its usage could be made even shorter without declaring the class, but that would for once either require a global variable or a singleton and also it would be less flexible. Changing it to work that way is easy anyway.

Leave a Reply

Your email address will not be published. Required fields are marked *