Atomic Data Types
C++ has a set of simple atomic data types. These are booleans, characters, numbers and pointers in many variants. They need the header <atomic>
. You can define your atomic data type with the class template std::atomic
, but there are serious restrictions for your type std::atomic<MyType>
:
For MyType
there are the following restrictions:
- The copy assignment operator for
MyType
, for all base classes ofMyType
and all non-static members ofMyType
, must be trivial. Only a compiler generated copy assignment operator is trivial. MyType
must not have virtual methods or base classes.MyType
must be bitwise copyable and comparable so that the C functionsmemcpy
ormemcmp
can be applied.
Atomic data types have atomic operations. For example load
and store
:
int cnt = 0;
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // undefined behavior
std::atomic<int> cnt{0};
auto f = [&]{cnt++;};
std::thread t1{f}, t2{f}, t3{f}; // OK
what is trivial ?
A trivial type is a type whose storage is contiguous (trivially copyable) and which only supports static default initialization (trivially default constructible), either cv-qualified or not. It includes scalar types, trivial classes and arrays of any such types.
A trivial class is a class (defined with class, struct or union) that is both trivially default constructible and trivially copyable, which implies that:
- uses the implicitly defined default, copy and move constructors, copy and move assignments, and destructor.
- has no virtual members.
- has no non-static data members with brace- or equal- initializers.
- its base class and non-static data members (if any) are themselves also trivial types.
is_trivial inherits from integral_constant as being either true_type or false_type, depending on whether T is a trivial type.
class A {};
class B { B() {} };
class C : B {};
class D { virtual void fn() {} };
int main() {
std::cout << std::boolalpha;
std::cout << "is_trivial:" << std::endl;
std::cout << "int: " << std::is_trivial<int>::value << std::endl;
std::cout << "A: " << std::is_trivial<A>::value << std::endl;
std::cout << "B: " << std::is_trivial<B>::value << std::endl;
std::cout << "C: " << std::is_trivial<C>::value << std::endl;
std::cout << "D: " << std::is_trivial<D>::value << std::endl;
return 0;
}
is_trivial:
int: true
A: true
B: false
C: false
D: false
Threads
A thread std::thread
represents an executable unit. This executable unit, which the thread immediately starts, gets its work package as a callable unit. A callable unit can be a function, a function object or a lambda function
#include <thread>
...
using namespace std;
void helloFunction(){
cout << "function" << endl;
}
class HelloFunctionObject {
public:
void operator()() const {
cout << "function object" << endl;
}
};
thread t1(helloFunction); // function
HelloFunctionObject helloFunctionObject;
thread t2(helloFunctionObject); // function object
thread t3([]{ cout << "lambda function"; }); // lambda function
The creator of a thread has to take care of the lifetime of its created thread. The executable unit of the created thread ends with the end of the callable. Either the creator is waiting until the created thread t
is done (t.join()
) or the creator detaches itself from the created thread: t.detach()
. A thread t
is joinable if no operation t.join()
or t.detach()
was performed on it. A joinable thread calls in its destructor the exception std::terminate
, and the program terminates.
thread t1(helloFunction); // function
HelloFunctionObject helloFunctionObject;
thread t2(helloFunctionObject); // function object
thread t3([]{ cout << "lambda function"; }); // lambda function
t1.join();
t2.join();
t3.join();
You can move a callable from one thread to another.
#include <thread>
...
std::thread t([]{ cout << "lambda function"; });
std::thread t2;
t2= std::move(t);
std::thread t3([]{ cout << "lambda function"; });
t2= std::move(t3); // std::terminate
By performing t2= std::move(t)
thread t2
has the callable of thread t
. Assuming thread t2
already had a callable and is joinable the C++ runtime would call std::terminate
. This happens exactly in t2= std::move(t3)
because t2
neither executed t2.join()
nor t2.detach()
before.
A std::thread
is a variadic template.
void printStringCopy(string s){ cout << s; }
void printStringRef(const string& s){ cout << s; }
string s{"C++"};
thread tPerCopy([=]{ cout << s; }); // C++
thread tPerCopy2(printStringCopy, s); // C++
tPerCopy.join();
tPerCopy2.join();
thread tPerReference([&]{ cout << s; }); // C++
thread tPerReference2(printStringRef, s); // C++
tPerReference.join();
tPerReference2.join();
Method | Description |
---|---|
t.join() | Waits until thread t has finished its executable unit. |
t.detach() | Executes the created thread t independent of the creator. |
t.joinable() | Checks if thread t supports the calls join or detach . |
t.get_id() and | Returns the identity of the thread. |
this_thread::get_id() | |
thread::hardware_concurrency() | Indicates the number of threads that can be run in parallel. |
this_thread::sleep_until(absTime) | Puts the thread t to sleep until time absTime . |
this_thread::sleep_for(relTime) | Puts the thread t to sleep for the duration relTime . |
this_thread::yield() | Offers the system to run another thread. |
t.swap(t2) and | Swaps the threads. |
swap(t1, t2) |
You can only call t.join()
or t.detach()
once on a thread t
. If you attempt to call these more than once you get the exception std::system_error
. std::thread::hardware_concurrency
returns the number of cores or 0 if the runtime cannot determine the number. The sleep_until
and sleep_for
operations needs a time point or a time duration as argument.
Threads cannot be copied but can be moved. A swap operation performs a move when possible.
using std::this_thread::get_id;
std::thread::hardware_concurrency(); // 4
std::thread t1([]{ get_id(); }); // 139783038650112
std::thread t2([]{ get_id(); }); // 139783030257408
t1.get_id(); // 139783038650112
t2.get_id(); // 139783030257408
t1.swap(t2);
t1.get_id(); // 139783030257408
t2.get_id(); // 139783038650112
get_id(); // 140159896602432
Shared Variables
mutex
#include <mutex>
#include <thread>
...
using namespace std;
std::mutex mutexCout;
struct Worker{
Worker(string n):name(n){};
void operator() (){
for (int i= 1; i <= 3; ++i){
this_thread::sleep_for(chrono::milliseconds(200));
mutexCout.lock();
cout << name << ": " << "Work " << i << endl;
mutexCout.unlock();
}
}
private:
string name;
};
thread herb= thread(Worker("Herb"));
thread andrei= thread(Worker(" Andrei"));
thread scott= thread(Worker (" Scott"));
thread bjarne= thread(Worker(" Bjarne"));
Method | mutex | recursive_mutex | timed_mutex | recursive_timed_mutex | shared_timed_mutex |
---|---|---|---|---|---|
m.lock | yes | yes | yes | yes | yes |
m.unlock | yes | yes | yes | yes | yes |
m.try_lock | yes | yes | yes | yes | yes |
m.try_lock_for | yes | yes | yes | ||
m.try_lock_until | yes | yes | yes |
The std::shared_time_mutex
enables it to implement reader-writer locks. The method m.try_lock_for(relTime)
needs a relative time duration; the method m.try_lock_until(absTime)
a absolute time point.
Deadlocks A deadlock is a state in which two or more threads are blocked because each thread waits for the release of a resource before it releases its resource.
You can get a deadlock very quickly if you forget to call m.unlock()
. That happens for example in case of an exception in the function getVar()
.
m.lock();
sharedVar= getVar();//dont use lock inside getVar
m.unlock()
Locks You should encapsulate a mutex in a lock to release the mutex automatically. A lock is an implementation of the RAII idiom because the lock binds the lifetime of the mutex to its lifetime. C++11 has std::lock_guard
for the simple and std::unique_lock
for the advanced use case, respectively. Both need the header <mutex>
. With C++14 C++ has a std::shared_lock
which is in the combination with the mutex std::shared_time_mutex
the base for reader-writer locks.
std::lock_guard
supports only the simple use case. Therefore it can only bind its mutex in the constructor and release it in the destructor.
std::mutex coutMutex;
struct Worker{
Worker(std::string n):name(n){};
void operator() (){
for (int i= 1; i <= 3; ++i){
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::lock_guard<std::mutex> myLock(coutMutex);
std::cout << name << ": " << "Work " << i << std::endl;
}
}
private:
std::string name;
};
unique_lock is a general-purpose mutex ownership wrapper allowing deferred locking, time-constrained attempts at locking, recursive locking, transfer of lock ownership, and use with condition variables.
#include <mutex>
#include <thread>
#include <chrono>
struct Box {
explicit Box(int num) : num_things{num} {}
int num_things;
std::mutex m;
};
void transfer(Box &from, Box &to, int num)
{
// don't actually take the locks yet
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
// lock both unique_locks without deadlock
std::lock(lock1, lock2);
from.num_things -= num;
to.num_things += num;
// 'from.m' and 'to.m' mutexes unlocked in 'unique_lock' dtors
}
int main()
{
Box acc1(100);
Box acc2(50);
std::thread t1(transfer, std::ref(acc1), std::ref(acc2), 10);
std::thread t2(transfer, std::ref(acc2), std::ref(acc1), 5);
t1.join();
t2.join();
}
Thread Local Data
By using the keyword thread_local
, you have thread local data also known as thread local storage. Each thread has its copy of the data. Thread-local data behaves like static variables. They are created at their first usage, and their lifetime is bound to the lifetime of the thread.
std::mutex coutMutex;
thread_local std::string s("hello from ");
void addThreadLocal(std::string const& s2){
s+= s2;
std::lock_guard<std::mutex> guard(coutMutex);
std::cout << s << std::endl;
std::cout << "&s: " << &s << std::endl;
std::cout << std::endl;
}
std::thread t1(addThreadLocal, "t1");
std::thread t2(addThreadLocal, "t2");
std::thread t3(addThreadLocal, "t3");
std::thread t4(addThreadLocal, "t4");
Condition Variables
Condition variables enable threads to be synchronised via messages. They need the header <condition_variable>
. One thread acts as a sender, and the other as a receiver of the message. The receiver waits for the notification of the sender.
Method | Description |
---|---|
cv.notify_one() | Notifies a waiting thread. |
cv.notify_all() | Notifies all waiting threads. |
cv.wait(lock, ...) | Waits for the notification while holding a std::unique_lock . |
cv.wait_for(lock, relTime, ...) | Waits for a time duration for the notification while holding a std::unique_lock . |
cv.wait_until(lock, absTime, ...) | Waits until a time for the notification while holding a std::unique_lock . |
Sender and receiver need a lock. In case of the sender a std::lock_guard
is sufficient, because it only once calls lock and unlock. In the case of the receiver a std::unique_lock
is necessary, because it typically often locks and unlocks its mutex.
// conditionVariable.cpp
...
#include <condition_variable>
...
std::mutex mutex_;
std::condition_variable condVar;
bool dataReady= false;
void doTheWork(){
std::cout << "Processing shared data." << std::endl;
}
void waitingForWork(){
std::cout << "Worker: Waiting for work." << std::endl;
std::unique_lock<std::mutex> lck(mutex_);
condVar.wait(lck, []{ return dataReady; });//wait the notify
doTheWork();
std::cout << "Work done." << std::endl;
}
void setDataReady(){
std::lock_guard<std::mutex> lck(mutex_);
dataReady=true;
std::cout << "Sender: Data is ready." << std::endl;
condVar.notify_one();
}
std::thread t1(waitingForWork);
std::thread t2(setDataReady);
Tasks
In addition to threads, C++ has tasks to perform work asynchronously. Tasks need the header <future>
. A task is parameterised with a work package and consists of the two associated components, a promise and a future. Both are connected via a data channel. The promise executes the work packages and puts the result in the data channel; the associated future picks up the result. Both communication endpoints can run in separate threads. What’s special is that the future can pick up the result at a later time. Therefore the calculation of the result by the promise is independent of the query of the result by the associated future.
Threads are very different from tasks.
For the communication between the creator thread and the created thread, you have to use a shared variable. The task communicates via its data channel, which is implicitly protected. Therefore a task must not use a protection mechanism like a mutex.
The creator thread is waiting for its child with the join
call. The future fut
is using the `fut.get() call which is blocking if no result is there.
If an exception happens in the created thread, the created thread terminates and therefore the creator and the whole process. On the contrary, the promise can send the exceptions to the future, which has to handle the exception.
A promise can serve one or many futures. It can send a value, an exception or only a notification. You can use a task as a safe replacement for a condition variable.
#include <future>
#include <thread>
...
int res;
std::thread t([&]{ res= 2000+11;});
t.join();
std::cout << res << std::endl; // 2011
auto fut= std::async([]{ return 2000+11; });
std::cout << fut.get() << std::endl; // 2011
std::async
behaves like an asynchronous function call. This function call takes a callable and its arguments. std::async
is a variadic template and can, therefore, take an arbitrary number of arguments. The call of std::async
returns a future object fut
. That’s your handle for getting the result via fut.get()
. Optionally you can specify a start policy for std::async
. You can explicitly determine with the start policy if the asynchronous call should be executed in the same thread (std::launch::deferred
) or in another thread (std::launch::async
).
What’s special about the call auto fut= std::async(std::launch::deferred, ... )
is that the promise will not immediately be executed. The call fut.get()
lazy starts the promise.Lazy and eager with std::async
#include <future>
...
using std::chrono::duration;
using std::chrono::system_clock;
using std::launch;
auto begin= system_clock::now();
auto asyncLazy= std::async(launch::deferred, []{ return system_clock::now(); });
auto asyncEager= std::async(launch::async, []{ return system_clock::now(); });
std::this_thread::sleep_for(std::chrono::seconds(1));
auto lazyStart= asyncLazy.get() - begin;
auto eagerStart= asyncEager.get() - begin;
auto lazyDuration= duration<double>(lazyStart).count();
auto eagerDuration= duration<double>(eagerStart).count();
std::cout << lazyDuration << " sec"; // 1.00018 sec.
std::cout << eagerDuration << " sec"; // 0.00015489 sec.
The output of the program shows that the promise associated with the future asyncLazy
is executed one second later than the promise associated with the future asyncEager
. One second is exactly the time duration the creator is sleeping before the future asyncLazy
asks for its result.
std::packaged_task
enables you to build a simple wrapper for a callable, which can later be executed on a separate thread.
Therefore four steps are necessary.
std::packaged_task<int(int, int)> sumTask([](int a, int b){ return a+b; });
std::future<int> sumResult= sumTask.get_future();
sumTask(2000, 11);
sumResult.get();
...
#include <future>
...
using namespace std;
struct SumUp{
int operator()(int beg, int end){
for (int i= beg; i < end; ++i ) sum += i;
return sum;
}
private:
int beg;
int end;
int sum{0};
};
SumUp sumUp1, sumUp2;
packaged_task<int(int, int)> sumTask1(sumUp1);
packaged_task<int(int, int)> sumTask2(sumUp2);
future<int> sum1= sumTask1.get_future();
future<int> sum2= sumTask2.get_future();
deque< packaged_task<int(int, int)>> allTasks;
allTasks.push_back(move(sumTask1));
allTasks.push_back(move(sumTask2));
int begin{1};
int increment{5000};
int end= begin + increment;
while (not allTasks.empty()){
packaged_task<int(int, int)> myTask= move(allTasks.front());
allTasks.pop_front();
thread sumThread(move(myTask), begin, end);
begin= end;
end += increment;
sumThread.detach();
}
auto sum= sum1.get() + sum2.get();
cout << sum; // 50005000
The promises (std::packaged_task
) are moved into the std::deque allTasks
. The program iterates in the while
loop through all promises. Each promise runs in its thread and performs its addition in the background (sumThread.detach()
). The result is the sum of all numbers from 0 to 100000.
The pair std::promise
and std::future
give the full control over the task.
Method | Description |
---|---|
prom.swap(prom2) and | Swaps the promises. |
std::swap(prom, prom2) | |
prom.get_future() | Returns the future. |
prom.set_value(val) | Sets the value. |
prom.set_exception(ex) | Sets the exception. |
prom.set_value_at_thread_exit(val) | Stores the value and makes it ready if the promise exits. |
prom.set_exception_at_thread_exit(ex) | Stores the exception and makes it ready if the promise exits. |
Method | Description |
---|---|
fut.share() | Returns a std::shared_future . |
fut.get() | Returns the result which can be a value or an exception. |
fut.valid() | Checks if the result is available. Returns after the call fut.get() false. |
fut.wait() | Waits for the result. |
fut.wait_for(relTime) | Waits for a time duration for the result. |
fut.wait_until(absTime) | Waits until a time for the result. |
If a future fut
asks more than once for the result, a std::future_error
exception is thrown. The future creates a shared future by the call fut.share()
. Shared future are associated with their promise and can independently ask for the result. A shared future has the same interface as a future.
Here is the usage of promise and future.
#include <future>
...
void product(std::promise<int>&& intPromise, int a, int b){
intPromise.set_value(a*b);
}
int a= 20;
int b= 10;
std::promise<int> prodPromise;
std::future<int> prodResult= prodPromise.get_future();
std::thread prodThread(product, std::move(prodPromise), a, b);
std::cout << "20*10= " << prodResult.get();//wait intPromise.set_value 20*10= 200
The promise prodPromise
is moved into a separate thread and performs its calculation. The future gets the result by prodResult.get()
.
A future fut
can be synchronised with is associated promise by the call fut.wait()
. Contrary to condition variables, you need no locks and mutexes, and spurious and lost wakeups are not possible.
// promiseFutureSynchronise.cpp
...
#include <future>
...
void doTheWork(){
std::cout << "Processing shared data." << std::endl;
}
void waitingForWork(std::future<void>&& fut){
std::cout << "Worker: Waiting for work." <<
std::endl;
fut.wait();//wait for set_value
doTheWork();
std::cout << "Work done." << std::endl;
}
void setDataReady(std::promise<void>&& prom){
std::cout << "Sender: Data is ready." <<
std::endl;
prom.set_value();
}
std::promise<void> sendReady;
auto fut= sendReady.get_future();
std::thread t1(waitingForWork, std::move(fut));
std::thread t2(setDataReady, std::move(sendReady));
The call of the promise prom.set_value()
wakes up the future which then can perform its work