Multithreading and Concurrency
Multithreading and Concurrency¶
Overview¶
To utilize the multicore performance of modern processors, multithreading programming is essential. Threading support was added to the standard library starting from C++11. In this chapter, we will learn about std::thread, synchronization techniques, and asynchronous programming.
Difficulty: ****
Prerequisites: Functions, lambdas, smart pointers
Table of Contents¶
- Thread Basics
- Mutex and Locks
- Condition Variables
- Atomic Operations
- Asynchronous Programming
- Thread Pool
- Common Problems and Solutions
Thread Basics¶
std::thread¶
#include <iostream>
#include <thread>
void hello() {
std::cout << "Hello from thread!\n";
}
int main() {
std::thread t(hello); // Create and start thread
t.join(); // Wait for thread completion
return 0;
}
Compilation¶
# Linux/macOS
g++ -std=c++17 -pthread program.cpp -o program
# Windows (MSVC)
cl /std:c++17 program.cpp
Creating Thread with Lambda¶
#include <iostream>
#include <thread>
int main() {
int value = 42;
// Capture by value
std::thread t1([value]() {
std::cout << "Value: " << value << "\n";
});
// Capture by reference
std::thread t2([&value]() {
value = 100;
});
t1.join();
t2.join();
std::cout << "After: " << value << "\n";
return 0;
}
Passing Arguments to Thread¶
#include <iostream>
#include <thread>
#include <string>
void print_message(const std::string& msg, int count) {
for (int i = 0; i < count; ++i) {
std::cout << msg << "\n";
}
}
void modify_value(int& x) {
x *= 2;
}
int main() {
// Pass by value
std::thread t1(print_message, "Hello", 3);
// Pass by reference (std::ref required)
int num = 10;
std::thread t2(modify_value, std::ref(num));
t1.join();
t2.join();
std::cout << "num: " << num << "\n"; // 20
return 0;
}
join vs detach¶
#include <iostream>
#include <thread>
#include <chrono>
void task() {
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "Task completed\n";
}
int main() {
std::thread t(task);
// join: Wait until thread completes
// t.join();
// detach: Separate thread (independent execution)
t.detach();
std::cout << "Main continues\n";
// Cannot join after detach
// Note: If main exits first, thread is forcefully terminated
std::this_thread::sleep_for(std::chrono::seconds(3));
return 0;
}
Thread ID and Hardware Concurrency¶
#include <iostream>
#include <thread>
void show_id() {
std::cout << "Thread ID: " << std::this_thread::get_id() << "\n";
}
int main() {
std::cout << "Main thread ID: " << std::this_thread::get_id() << "\n";
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << "\n";
std::thread t(show_id);
t.join();
return 0;
}
RAII Thread Wrapper¶
#include <thread>
class ThreadGuard {
std::thread& t;
public:
explicit ThreadGuard(std::thread& t_) : t(t_) {}
~ThreadGuard() {
if (t.joinable()) {
t.join();
}
}
// Disable copy
ThreadGuard(const ThreadGuard&) = delete;
ThreadGuard& operator=(const ThreadGuard&) = delete;
};
// C++20: std::jthread (auto join)
#include <thread>
void task() { /* ... */ }
int main() {
std::jthread t(task); // Automatically joins in destructor
// No need to call join()
return 0;
}
Mutex and Locks¶
Data Race Problem¶
#include <iostream>
#include <thread>
#include <vector>
int counter = 0;
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // Data race!
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
// Expected: 200000, Actual: varies each time
std::cout << "Counter: " << counter << "\n";
return 0;
}
std::mutex¶
#include <iostream>
#include <thread>
#include <mutex>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 100000; ++i) {
mtx.lock();
++counter;
mtx.unlock();
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter << "\n"; // Always 200000
return 0;
}
std::lock_guard (RAII)¶
#include <iostream>
#include <thread>
#include <mutex>
int counter = 0;
std::mutex mtx;
void increment() {
for (int i = 0; i < 100000; ++i) {
std::lock_guard<std::mutex> lock(mtx);
++counter;
// lock is automatically released at end of scope
}
}
std::unique_lock (Flexible Lock)¶
#include <mutex>
std::mutex mtx;
void flexible_locking() {
std::unique_lock<std::mutex> lock(mtx);
// Perform work...
lock.unlock(); // Manual unlock
// Other work...
lock.lock(); // Re-acquire
// lock is automatically released in destructor (if locked)
}
// Deferred lock
void deferred_locking() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
// Lock not acquired yet
// ... preparation work ...
lock.lock(); // Now acquire lock
}
std::scoped_lock (C++17, Multiple Mutexes)¶
#include <mutex>
std::mutex mtx1, mtx2;
void transfer() {
// Acquire multiple mutexes without deadlock
std::scoped_lock lock(mtx1, mtx2);
// Perform work...
}
std::shared_mutex (Read-Write Lock, C++17)¶
#include <shared_mutex>
#include <mutex>
class ThreadSafeCounter {
int value = 0;
mutable std::shared_mutex mtx;
public:
// Read: Multiple threads can access simultaneously
int get() const {
std::shared_lock lock(mtx);
return value;
}
// Write: Exclusive access
void increment() {
std::unique_lock lock(mtx);
++value;
}
};
Condition Variables¶
std::condition_variable¶
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::queue<int> dataQueue;
std::mutex mtx;
std::condition_variable cv;
bool finished = false;
void producer() {
for (int i = 0; i < 10; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
dataQueue.push(i);
std::cout << "Produced: " << i << "\n";
}
cv.notify_one(); // Wake up waiting thread
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all(); // Wake up all waiting threads
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// Wait with predicate (prevents spurious wakeup)
cv.wait(lock, [] {
return !dataQueue.empty() || finished;
});
if (dataQueue.empty() && finished) {
break;
}
int value = dataQueue.front();
dataQueue.pop();
std::cout << "Consumed: " << value << "\n";
}
}
int main() {
std::thread prod(producer);
std::thread cons(consumer);
prod.join();
cons.join();
return 0;
}
wait_for and wait_until¶
#include <condition_variable>
#include <chrono>
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
void waiter() {
std::unique_lock<std::mutex> lock(mtx);
// Wait with timeout
if (cv.wait_for(lock, std::chrono::seconds(5), [] { return ready; })) {
std::cout << "Ready!\n";
} else {
std::cout << "Timeout!\n";
}
}
Atomic Operations¶
std::atomic¶
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<int> counter(0);
void increment() {
for (int i = 0; i < 100000; ++i) {
++counter; // Atomic increment
}
}
int main() {
std::thread t1(increment);
std::thread t2(increment);
t1.join();
t2.join();
std::cout << "Counter: " << counter << "\n"; // Always 200000
return 0;
}
Atomic Operations¶
#include <atomic>
std::atomic<int> value(0);
void atomic_operations() {
// Basic operations
value.store(10); // Store
int v = value.load(); // Load
int old = value.exchange(20); // Exchange
// Arithmetic operations
value++;
value--;
value += 5;
value.fetch_add(3); // Add and return previous value
value.fetch_sub(2);
// Compare and swap (CAS)
int expected = 20;
value.compare_exchange_strong(expected, 30);
// If value equals expected, change to 30
// Otherwise, store current value in expected
}
Memory Order¶
#include <atomic>
std::atomic<int> x(0);
std::atomic<int> y(0);
void thread1() {
x.store(1, std::memory_order_release);
}
void thread2() {
while (x.load(std::memory_order_acquire) == 0);
// If x is 1, all previous writes from thread1 are visible
}
| Memory Order | Description |
|---|---|
relaxed |
Only atomicity guaranteed, no ordering |
acquire |
Reads/writes before this operation won't be reordered |
release |
Reads/writes after this operation won't be reordered |
acq_rel |
acquire + release |
seq_cst |
Sequential consistency (default, strongest) |
Asynchronous Programming¶
std::async¶
#include <iostream>
#include <future>
#include <chrono>
int compute(int x) {
std::this_thread::sleep_for(std::chrono::seconds(2));
return x * x;
}
int main() {
// Asynchronous execution
std::future<int> result = std::async(std::launch::async, compute, 10);
std::cout << "Computing...\n";
// Can perform other work
// Wait and get result
int value = result.get();
std::cout << "Result: " << value << "\n";
return 0;
}
Launch Policies¶
#include <future>
// async: Execute immediately in new thread
auto f1 = std::async(std::launch::async, task);
// deferred: Execute in current thread when get() is called
auto f2 = std::async(std::launch::deferred, task);
// Default: System decides
auto f3 = std::async(task);
std::future and std::promise¶
#include <iostream>
#include <thread>
#include <future>
void producer(std::promise<int>& prom) {
std::this_thread::sleep_for(std::chrono::seconds(1));
prom.set_value(42); // Set value
}
void consumer(std::future<int>& fut) {
std::cout << "Waiting for value...\n";
int value = fut.get(); // Wait for value
std::cout << "Received: " << value << "\n";
}
int main() {
std::promise<int> prom;
std::future<int> fut = prom.get_future();
std::thread t1(producer, std::ref(prom));
std::thread t2(consumer, std::ref(fut));
t1.join();
t2.join();
return 0;
}
std::packaged_task¶
#include <iostream>
#include <thread>
#include <future>
int add(int a, int b) {
return a + b;
}
int main() {
std::packaged_task<int(int, int)> task(add);
std::future<int> result = task.get_future();
std::thread t(std::move(task), 10, 20);
std::cout << "Result: " << result.get() << "\n";
t.join();
return 0;
}
Checking Future Status¶
#include <future>
#include <chrono>
auto fut = std::async(std::launch::async, task);
// Wait with timeout
auto status = fut.wait_for(std::chrono::seconds(1));
if (status == std::future_status::ready) {
std::cout << "Ready!\n";
} else if (status == std::future_status::timeout) {
std::cout << "Timeout\n";
} else if (status == std::future_status::deferred) {
std::cout << "Deferred\n";
}
Thread Pool¶
Simple Thread Pool Implementation¶
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop = false;
public:
explicit ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::invoke_result<F, Args...>::type>
{
using return_type = typename std::invoke_result<F, Args...>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(mtx);
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return result;
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
};
// Usage example
int main() {
ThreadPool pool(4);
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
results.emplace_back(
pool.enqueue([i] {
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
})
);
}
for (auto& result : results) {
std::cout << result.get() << " ";
}
std::cout << "\n";
return 0;
}
Common Problems and Solutions¶
Deadlock¶
// Problem: Circular wait
std::mutex m1, m2;
void thread1() {
std::lock_guard<std::mutex> l1(m1);
std::lock_guard<std::mutex> l2(m2); // Waiting for m2
}
void thread2() {
std::lock_guard<std::mutex> l2(m2);
std::lock_guard<std::mutex> l1(m1); // Waiting for m1
}
// Solution: Use std::scoped_lock
void thread1_fixed() {
std::scoped_lock lock(m1, m2); // Acquire simultaneously
}
void thread2_fixed() {
std::scoped_lock lock(m1, m2);
}
Livelock¶
// Situation where two threads keep yielding to each other
// Solution: Random backoff, priority setting
Starvation¶
// Situation where a specific thread never acquires resources
// Solution: Use fair locks, priority queues
Thread-Safe Singleton¶
#include <mutex>
class Singleton {
static Singleton* instance;
static std::once_flag initFlag;
Singleton() = default;
public:
static Singleton& getInstance() {
std::call_once(initFlag, [] {
instance = new Singleton();
});
return *instance;
}
};
Singleton* Singleton::instance = nullptr;
std::once_flag Singleton::initFlag;
// Or use C++11 static local variable (thread-safe)
class Singleton2 {
Singleton2() = default;
public:
static Singleton2& getInstance() {
static Singleton2 instance;
return instance;
}
};
Exercises¶
Problem 1: Parallel Sum¶
Calculate the sum of a vector in parallel using multiple threads.
Show Answer
#include <iostream>
#include <vector>
#include <thread>
#include <numeric>
long long parallelSum(const std::vector<int>& data, int numThreads) {
std::vector<long long> partialSums(numThreads);
std::vector<std::thread> threads;
size_t chunkSize = data.size() / numThreads;
for (int i = 0; i < numThreads; ++i) {
size_t start = i * chunkSize;
size_t end = (i == numThreads - 1) ? data.size() : start + chunkSize;
threads.emplace_back([&data, &partialSums, i, start, end] {
partialSums[i] = std::accumulate(
data.begin() + start, data.begin() + end, 0LL
);
});
}
for (auto& t : threads) {
t.join();
}
return std::accumulate(partialSums.begin(), partialSums.end(), 0LL);
}
int main() {
std::vector<int> data(10000000, 1);
std::cout << "Sum: " << parallelSum(data, 4) << "\n";
return 0;
}
Problem 2: Producer-Consumer¶
Implement a thread-safe queue with multiple producers and consumers.
Show Answer
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
template<typename T>
class ThreadSafeQueue {
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
public:
void push(T value) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(std::move(value));
cv.notify_one();
}
std::optional<T> pop() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return !queue.empty(); });
T value = std::move(queue.front());
queue.pop();
return value;
}
bool tryPop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) return false;
value = std::move(queue.front());
queue.pop();
return true;
}
};
Next Step¶
- 17_CPP20_Advanced.md - Concepts, Ranges, Coroutines