Michal Kohutek

Merging many rarely active threads into one

It often happens that a program needs to run a lot of short functions periodically, with various periods. An easy way to do it is to create a thread for each that performs the function and sleeps the rest of the time. This approach, however, doesn’t scale well. Here is an efficient and relatively simple solution to the problem in C++.

It often happens that a program needs to run a lot of short functions periodically, with various periods. An easy way to do it is to create a thread for each that performs the function and sleeps the rest of the time.

This approach, while relatively convenient, has a big problem. It can’t be scaled well. Operating systems enforce a certain maximum number of threads a process can have. Threads are also resource-heavy.

Here is a relatively simple solution to the problem in C++.

The use case

There is a some number of functions that have to be run with certain periods, or a function that is run on some objects with certain periods. It’s not necessary if they are run exactly at the right time usually, which can be used to reduce the number of CPU wakeups.

The solution

The solution itself is not particularly difficult. All that’s needed is to implement a suitable cooperative scheduler.

The scheduler keeps a number of tasks ordered by their deadlines. C++ has a convenient data structure for this, std::map. All it does is that it takes the earliest task from it, executes it, creates a new task for the same function scheduled to happen again when the period passes and sleeps until the next task’s deadline.

This would lead to a large number of CPU wakeups, so it can be reduced by executing additional tasks whose deadlines are close to each other before returning to sleep (the maximum delay is called _imprecision permitted).

For the sleeping mechanism, a library called looping_thread is used. It calls a single routine periodically with adjustable period and is capable of interrupting a wait when shutting down (which happens when it runs out of scope). Its instance is called _worker.

The tasks are represented by objects held in structure _entries (the exact type of these objects are a template argument, they can be lambdas) and are managed by a lambda function held in variable _merger (which accepts a vector of tasks).

Here is the periodically called code managed by looping_thread:

std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point until = start +
                                             _imprecisionPermitted;

std::vector<T*> merged;
{
 std::lock_guard<std::mutex> lock(_lock);
 for(auto it = _entries.begin(); it != _entries.end();) {
  if(it->first > until) break;

  std::unique_ptr<Subscription> contents = std::move(it->second);
  merged.push_back(&contents->data);

  std::chrono::steady_clock::time_point next = it->first +
                                                  contents->period;
  _entries.insert(std::make_pair(next, std::move(contents)));
  it = _entries.erase(it);
 }
}
if(!merged.empty())
 _merger(merged);

if(!_entries.empty()) {
 std::chrono::steady_clock::time_point now =
                                   std::chrono::steady_clock::now();
 _worker->setPeriod(_entries.begin()->first - start);
}

Usage

Here is an example of its usage:

RoutineStreamliner<std::string> repeater(std::chrono::milliseconds(300), [] (const std::vector<std::string*>& lines) {
		for (std::string* line : lines)
		std::cout << *line << " ";
		std::cout << std::endl;
});
repeater.add("1.1s", std::chrono::milliseconds(1100));
repeater.add("0.4s", std::chrono::milliseconds(400));
repeater.add("0.5s", std::chrono::milliseconds(500));
repeater.add("0.7s", std::chrono::milliseconds(700));
repeater.add("1.3s", std::chrono::milliseconds(1300));
std::this_thread::sleep_for(std::chrono::seconds(10));

The entire source code can be found on github.

Extension: Throttling

Now, delays in executing tasks or general CPU overloads can lead to delays longer than deadlines of some short-period tasks. With the implementation above, the tasks are executed the correct number of times, just not always at the right time. In practice, it’s rarely useful to execute these tasks several times at the same time point.

To deal with this, there is an additional parameter called throttling (defaults to false) that will cause each run to skip duplicate tasks.

2 thoughts on “Merging many rarely active threads into one

    1. Yes, but in many cases, it’s an overkill. The idea behind this isn’t to provide a general purpose tool to handle complex cases, but to offer a small tool for simple cases whose use requires very few lines of code and can be easily attached to a project. Because simple cases are very common and need solutions that take little time to learn and little code to use.

Leave a Reply

Your email address will not be published. Required fields are marked *