Multithreading

Atomic Data Types

C++ has a set of simple atomic data types. These are booleans, characters, numbers and pointers in many variants. They need the header <atomic>. You can define your atomic data type with the class template std::atomic, but there are serious restrictions for your type std::atomic<MyType>:

For MyType there are the following restrictions:

  • The copy assignment operator for MyType, for all base classes of MyType and all non-static members of MyType, must be trivial. Only a compiler generated copy assignment operator is trivial.
  • MyType must not have virtual methods or base classes.
  • MyType must be bitwise copyable and comparable so that the C functions memcpy or memcmp can be applied.

Atomic data types have atomic operations. For example load and store:

int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // undefined behavior

std::atomic<int> cnt{0};
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // OK

what is trivial ?

trivial type is a type whose storage is contiguous (trivially copyable) and which only supports static default initialization (trivially default constructible), either cv-qualified or not. It includes scalar typestrivial classes and arrays of any such types.

trivial class is a class (defined with class, struct or union) that is both trivially default constructible and trivially copyable, which implies that:

  • uses the implicitly defined default, copy and move constructors, copy and move assignments, and destructor.
  • has no virtual members.
  • has no non-static data members with brace- or equal- initializers.
  • its base class and non-static data members (if any) are themselves also trivial types.

is_trivial inherits from integral_constant as being either true_type or false_type, depending on whether T is a trivial type.

class A {};
class B { B() {} };
class C : B {};
class D { virtual void fn() {} };

int main() {
  std::cout << std::boolalpha;
  std::cout << "is_trivial:" << std::endl;
  std::cout << "int: " << std::is_trivial<int>::value << std::endl;
  std::cout << "A: " << std::is_trivial<A>::value << std::endl;
  std::cout << "B: " << std::is_trivial<B>::value << std::endl;
  std::cout << "C: " << std::is_trivial<C>::value << std::endl;
  std::cout << "D: " << std::is_trivial<D>::value << std::endl;
  return 0;
}
is_trivial:
int: true
A: true
B: false
C: false
D: false

Threads

A thread std::thread represents an executable unit. This executable unit, which the thread immediately starts, gets its work package as a callable unit. A callable unit can be a function, a function object or a lambda function

#include <thread>
...
using namespace std;

void helloFunction(){
  cout << "function" << endl;
}

class HelloFunctionObject {
public:
  void operator()() const {
    cout << "function object" << endl;
  }
};

thread t1(helloFunction);                      // function

HelloFunctionObject helloFunctionObject;
thread t2(helloFunctionObject);                // function object

thread t3([]{ cout << "lambda function"; });   // lambda function

The creator of a thread has to take care of the lifetime of its created thread. The executable unit of the created thread ends with the end of the callable. Either the creator is waiting until the created thread t is done (t.join()) or the creator detaches itself from the created thread: t.detach(). A thread t is joinable if no operation t.join() or t.detach() was performed on it. A joinable thread calls in its destructor the exception std::terminate, and the program terminates.

thread t1(helloFunction);                    // function

HelloFunctionObject helloFunctionObject;
thread t2(helloFunctionObject);              // function object

thread t3([]{ cout << "lambda function"; }); // lambda function

t1.join();
t2.join();
t3.join();

You can move a callable from one thread to another.

#include <thread>
...
std::thread t([]{ cout << "lambda function"; });  
std::thread t2;
t2= std::move(t);

std::thread t3([]{ cout << "lambda function"; });
t2= std::move(t3);                              // std::terminate

By performing t2= std::move(t) thread t2 has the callable of thread t. Assuming thread t2 already had a callable and is joinable the C++ runtime would call std::terminate. This happens exactly in t2= std::move(t3) because t2 neither executed t2.join() nor t2.detach() before.

std::thread is a variadic template.

void printStringCopy(string s){ cout << s; }
void printStringRef(const string& s){ cout << s; }

string s{"C++"};

thread tPerCopy([=]{ cout << s; });           // C++
thread tPerCopy2(printStringCopy, s);         // C++
tPerCopy.join();
tPerCopy2.join();

thread tPerReference([&]{ cout << s; });       // C++
thread tPerReference2(printStringRef, s);      // C++
tPerReference.join();
tPerReference2.join();
MethodDescription
t.join()Waits until thread t has finished its executable unit.
t.detach()Executes the created thread t independent of the creator.
t.joinable()Checks if thread t supports the calls join or detach.
t.get_id() andReturns the identity of the thread.
this_thread::get_id() 
thread::hardware_concurrency()Indicates the number of threads that can be run in parallel.
this_thread::sleep_until(absTime)Puts the thread t to sleep until time absTime.
this_thread::sleep_for(relTime)Puts the thread t to sleep for the duration relTime.
this_thread::yield()Offers the system to run another thread.
t.swap(t2) andSwaps the threads.
swap(t1, t2) 

You can only call t.join() or t.detach() once on a thread t. If you attempt to call these more than once you get the exception std::system_errorstd::thread::hardware_concurrency returns the number of cores or 0 if the runtime cannot determine the number. The sleep_until and sleep_for operations needs a time point or a time duration as argument.

Threads cannot be copied but can be moved. A swap operation performs a move when possible.

using std::this_thread::get_id;

std::thread::hardware_concurrency();      // 4

std::thread t1([]{ get_id(); });            // 139783038650112
std::thread t2([]{ get_id(); });            // 139783030257408
t1.get_id();                                // 139783038650112
t2.get_id();                                // 139783030257408

t1.swap(t2);

t1.get_id();                                // 139783030257408
t2.get_id();                                // 139783038650112
get_id();                                   // 140159896602432

Shared Variables

mutex

#include <mutex>
#include <thread>
...

using namespace std;

std::mutex mutexCout;

struct Worker{
  Worker(string n):name(n){};
  void operator() (){
    for (int i= 1; i <= 3; ++i){
      this_thread::sleep_for(chrono::milliseconds(200));
      mutexCout.lock();
      cout << name << ": " << "Work " << i << endl;
      mutexCout.unlock();
    }
  }
private:
  string name; 
};

thread herb= thread(Worker("Herb"));
thread andrei= thread(Worker(" Andrei"));
thread scott= thread(Worker ("    Scott"));
thread bjarne= thread(Worker("      Bjarne"));
Methodmutexrecursive_mutextimed_mutexrecursive_timed_mutexshared_timed_mutex
m.lockyesyesyesyesyes
m.unlockyesyesyesyesyes
m.try_lockyesyesyesyesyes
m.try_lock_for  yesyesyes
m.try_lock_until  yesyesyes

The std::shared_time_mutex enables it to implement reader-writer locks. The method m.try_lock_for(relTime) needs a relative time duration; the method m.try_lock_until(absTime) a absolute time point.

Deadlocks A deadlock is a state in which two or more threads are blocked because each thread waits for the release of a resource before it releases its resource.

You can get a deadlock very quickly if you forget to call m.unlock(). That happens for example in case of an exception in the function getVar().

m.lock();
sharedVar= getVar();//dont use lock inside getVar
m.unlock()

Locks You should encapsulate a mutex in a lock to release the mutex automatically. A lock is an implementation of the RAII idiom because the lock binds the lifetime of the mutex to its lifetime. C++11 has std::lock_guard for the simple and std::unique_lock for the advanced use case, respectively. Both need the header <mutex>. With C++14 C++ has a std::shared_lock which is in the combination with the mutex std::shared_time_mutex the base for reader-writer locks.

std::lock_guard supports only the simple use case. Therefore it can only bind its mutex in the constructor and release it in the destructor.

std::mutex coutMutex;

struct Worker{
  Worker(std::string n):name(n){};
  void operator() (){
    for (int i= 1; i <= 3; ++i){
      std::this_thread::sleep_for(std::chrono::milliseconds(200));
      std::lock_guard<std::mutex> myLock(coutMutex);
      std::cout << name << ": " << "Work " << i << std::endl;
    }
  }
private:
  std::string name;
};

unique_lock is a general-purpose mutex ownership wrapper allowing deferred locking, time-constrained attempts at locking, recursive locking, transfer of lock ownership, and use with condition variables.

#include <mutex>
#include <thread>
#include <chrono>
 
struct Box {
    explicit Box(int num) : num_things{num} {}
 
    int num_things;
    std::mutex m;
};
 
void transfer(Box &from, Box &to, int num)
{
    // don't actually take the locks yet
    std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
    std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
 
    // lock both unique_locks without deadlock
    std::lock(lock1, lock2);
 
    from.num_things -= num;
    to.num_things += num;
 
    // 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
}
 
int main()
{
    Box acc1(100);
    Box acc2(50);
 
    std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
    std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);
 
    t1.join();
    t2.join();
}

Thread Local Data

By using the keyword thread_local, you have thread local data also known as thread local storage. Each thread has its copy of the data. Thread-local data behaves like static variables. They are created at their first usage, and their lifetime is bound to the lifetime of the thread.

std::mutex coutMutex;
thread_local std::string s("hello from ");

void addThreadLocal(std::string const& s2){
  s+= s2;
  std::lock_guard<std::mutex> guard(coutMutex);
  std::cout << s << std::endl;
  std::cout << "&s: " << &s << std::endl;
  std::cout << std::endl;
}

std::thread t1(addThreadLocal, "t1");
std::thread t2(addThreadLocal, "t2");
std::thread t3(addThreadLocal, "t3");
std::thread t4(addThreadLocal, "t4");

Condition Variables

Condition variables enable threads to be synchronised via messages. They need the header <condition_variable>. One thread acts as a sender, and the other as a receiver of the message. The receiver waits for the notification of the sender.

MethodDescription
cv.notify_one()Notifies a waiting thread.
cv.notify_all()Notifies all waiting threads.
cv.wait(lock, ...)Waits for the notification while holding a std::unique_lock.
cv.wait_for(lock, relTime, ...)Waits for a time duration for the notification while holding a std::unique_lock.
cv.wait_until(lock, absTime, ...)Waits until a time for the notification while holding a std::unique_lock.

Sender and receiver need a lock. In case of the sender a std::lock_guard is sufficient, because it only once calls lock and unlock. In the case of the receiver a std::unique_lock is necessary, because it typically often locks and unlocks its mutex.

// conditionVariable.cpp 
...
#include <condition_variable>
...

std::mutex mutex_;
std::condition_variable condVar;
bool dataReady= false;

void doTheWork(){
  std::cout << "Processing shared data." << std::endl;
}

void waitingForWork(){
  std::cout << "Worker: Waiting for work." << std::endl;
  std::unique_lock<std::mutex> lck(mutex_);
  condVar.wait(lck, []{ return dataReady; });//wait the notify
  doTheWork();
  std::cout << "Work done." << std::endl;
}

void setDataReady(){
  std::lock_guard<std::mutex> lck(mutex_);
  dataReady=true;
  std::cout << "Sender: Data is ready." << std::endl;
  condVar.notify_one();
}

std::thread t1(waitingForWork);
std::thread t2(setDataReady);

Tasks

In addition to threads, C++ has tasks to perform work asynchronously. Tasks need the header <future>. A task is parameterised with a work package and consists of the two associated components, a promise and a future. Both are connected via a data channel. The promise executes the work packages and puts the result in the data channel; the associated future picks up the result. Both communication endpoints can run in separate threads. What’s special is that the future can pick up the result at a later time. Therefore the calculation of the result by the promise is independent of the query of the result by the associated future.

asks behave like data channels. The promise puts its result in the data channel. The future waits for it and picks it up.

Threads are very different from tasks.

For the communication between the creator thread and the created thread, you have to use a shared variable. The task communicates via its data channel, which is implicitly protected. Therefore a task must not use a protection mechanism like a mutex.

The creator thread is waiting for its child with the join call. The future fut is using the `fut.get() call which is blocking if no result is there.

If an exception happens in the created thread, the created thread terminates and therefore the creator and the whole process. On the contrary, the promise can send the exceptions to the future, which has to handle the exception.

A promise can serve one or many futures. It can send a value, an exception or only a notification. You can use a task as a safe replacement for a condition variable.

#include <future>
#include <thread>
...

int res;
std::thread t([&]{ res= 2000+11;});
t.join();
std::cout << res << std::endl;             // 2011

auto fut= std::async([]{ return 2000+11; });
std::cout << fut.get() << std::endl;       // 2011

std::async behaves like an asynchronous function call. This function call takes a callable and its arguments. std::async is a variadic template and can, therefore, take an arbitrary number of arguments. The call of std::async returns a future object fut. That’s your handle for getting the result via fut.get(). Optionally you can specify a start policy for std::async. You can explicitly determine with the start policy if the asynchronous call should be executed in the same thread (std::launch::deferred) or in another thread (std::launch::async).

What’s special about the call auto fut= std::async(std::launch::deferred, ... ) is that the promise will not immediately be executed. The call fut.get() lazy starts the promise.Lazy and eager with std::async

#include <future>
...
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;

auto begin= system_clock::now();

auto asyncLazy= std::async(launch::deferred, []{ return system_clock::now(); });
auto asyncEager= std::async(launch::async, []{ return system_clock::now(); });
std::this_thread::sleep_for(std::chrono::seconds(1));

auto lazyStart= asyncLazy.get() - begin;
auto eagerStart= asyncEager.get() - begin;

auto lazyDuration= duration<double>(lazyStart).count();
auto eagerDuration= duration<double>(eagerStart).count();

std::cout << lazyDuration << " sec";                 // 1.00018 sec.
std::cout << eagerDuration << " sec";                // 0.00015489 sec.

The output of the program shows that the promise associated with the future asyncLazy is executed one second later than the promise associated with the future asyncEager. One second is exactly the time duration the creator is sleeping before the future asyncLazy asks for its result.

std::packaged_task enables you to build a simple wrapper for a callable, which can later be executed on a separate thread.

Therefore four steps are necessary.

std::packaged_task<int(int, int)> sumTask([](int a, int b){ return a+b; });
std::future<int> sumResult= sumTask.get_future();
sumTask(2000, 11);  
sumResult.get();
...
#include <future>
...

using namespace std;

struct SumUp{
  int operator()(int beg, int end){
    for (int i= beg; i < end; ++i ) sum += i;
    return sum;
  }
private:
  int beg;
  int end;
  int sum{0};
};

SumUp sumUp1, sumUp2;

packaged_task<int(int, int)> sumTask1(sumUp1);
packaged_task<int(int, int)> sumTask2(sumUp2);

future<int> sum1= sumTask1.get_future();
future<int> sum2= sumTask2.get_future();

deque< packaged_task<int(int, int)>> allTasks;
allTasks.push_back(move(sumTask1));
allTasks.push_back(move(sumTask2));

int begin{1};
int increment{5000};
int end= begin + increment;

while (not allTasks.empty()){
  packaged_task<int(int, int)> myTask= move(allTasks.front());
  allTasks.pop_front();
  thread sumThread(move(myTask), begin, end);
  begin= end;
  end += increment;
  sumThread.detach();
}

auto sum= sum1.get() + sum2.get();
cout << sum;                                   // 50005000

The promises (std::packaged_task) are moved into the std::deque allTasks. The program iterates in the while loop through all promises. Each promise runs in its thread and performs its addition in the background (sumThread.detach()). The result is the sum of all numbers from 0 to 100000.

The pair std::promise and std::future give the full control over the task.

MethodDescription
prom.swap(prom2) andSwaps the promises.
std::swap(prom, prom2) 
prom.get_future()Returns the future.
prom.set_value(val)Sets the value.
prom.set_exception(ex)Sets the exception.
prom.set_value_at_thread_exit(val)Stores the value and makes it ready if the promise exits.
prom.set_exception_at_thread_exit(ex)Stores the exception and makes it ready if the promise exits.
MethodDescription
fut.share()Returns a std::shared_future.
fut.get()Returns the result which can be a value or an exception.
fut.valid()Checks if the result is available. Returns after the call fut.get() false.
fut.wait()Waits for the result.
fut.wait_for(relTime)Waits for a time duration for the result.
fut.wait_until(absTime)Waits until a time for the result.

If a future fut asks more than once for the result, a std::future_error exception is thrown. The future creates a shared future by the call fut.share(). Shared future are associated with their promise and can independently ask for the result. A shared future has the same interface as a future.

Here is the usage of promise and future.

#include <future>
...

void product(std::promise<int>&& intPromise, int a, int b){
  intPromise.set_value(a*b);
}

int a= 20;
int b= 10;

std::promise<int> prodPromise;
std::future<int> prodResult= prodPromise.get_future();

std::thread prodThread(product, std::move(prodPromise), a, b);
std::cout << "20*10= " << prodResult.get();//wait intPromise.set_value 20*10= 200

The promise prodPromise is moved into a separate thread and performs its calculation. The future gets the result by prodResult.get().

A future fut can be synchronised with is associated promise by the call fut.wait(). Contrary to condition variables, you need no locks and mutexes, and spurious and lost wakeups are not possible.

// promiseFutureSynchronise.cpp
...
#include <future>
...

void doTheWork(){
 std::cout << "Processing shared data." << std::endl;
}

void waitingForWork(std::future<void>&& fut){
 std::cout << "Worker: Waiting for work." <<
 std::endl;
 fut.wait();//wait for set_value
 doTheWork();
 std::cout << "Work done." << std::endl;
}

void setDataReady(std::promise<void>&& prom){
 std::cout << "Sender: Data is ready." <<
 std::endl;
 prom.set_value();
}

std::promise<void> sendReady;
auto fut= sendReady.get_future();

std::thread t1(waitingForWork, std::move(fut));
std::thread t2(setDataReady, std::move(sendReady));

The call of the promise prom.set_value() wakes up the future which then can perform its work