1/*
2 * Multithreading and Concurrency Demo
3 *
4 * Demonstrates:
5 * - std::thread basics
6 * - std::mutex, std::lock_guard, std::unique_lock
7 * - std::async and std::future
8 * - std::condition_variable
9 * - Thread-safe queue example
10 * - std::jthread (C++20)
11 *
12 * Compile: g++ -std=c++20 -Wall -Wextra -pthread 04_threading.cpp -o threading
13 */
14
15#include <iostream>
16#include <thread>
17#include <mutex>
18#include <condition_variable>
19#include <future>
20#include <vector>
21#include <queue>
22#include <chrono>
23#include <string>
24#include <numeric>
25
26// ============ Basic Thread ============
27void print_numbers(int id, int count) {
28 for (int i = 0; i < count; i++) {
29 std::cout << "Thread " << id << ": " << i << "\n";
30 std::this_thread::sleep_for(std::chrono::milliseconds(100));
31 }
32}
33
34void demo_basic_thread() {
35 std::cout << "\n=== Basic std::thread ===\n";
36
37 std::thread t1(print_numbers, 1, 3);
38 std::thread t2(print_numbers, 2, 3);
39
40 // Must join or detach
41 t1.join();
42 t2.join();
43
44 std::cout << "All threads finished\n";
45}
46
47// ============ Mutex and Lock Guard ============
48std::mutex cout_mutex;
49int shared_counter = 0;
50
51void increment_counter(int id, int iterations) {
52 for (int i = 0; i < iterations; i++) {
53 {
54 std::lock_guard<std::mutex> lock(cout_mutex);
55 shared_counter++;
56 std::cout << "Thread " << id << " incremented counter to "
57 << shared_counter << "\n";
58 }
59 std::this_thread::sleep_for(std::chrono::milliseconds(50));
60 }
61}
62
63void demo_mutex() {
64 std::cout << "\n=== Mutex and Lock Guard ===\n";
65
66 shared_counter = 0;
67
68 std::thread t1(increment_counter, 1, 5);
69 std::thread t2(increment_counter, 2, 5);
70
71 t1.join();
72 t2.join();
73
74 std::cout << "Final counter value: " << shared_counter << "\n";
75}
76
77// ============ std::async and std::future ============
78int compute_sum(int n) {
79 std::cout << "Computing sum(1.." << n << ")...\n";
80 std::this_thread::sleep_for(std::chrono::milliseconds(500));
81 return n * (n + 1) / 2;
82}
83
84void demo_async() {
85 std::cout << "\n=== std::async and std::future ===\n";
86
87 // Launch async task
88 std::future<int> result1 = std::async(std::launch::async, compute_sum, 100);
89 std::future<int> result2 = std::async(std::launch::async, compute_sum, 200);
90
91 std::cout << "Doing other work while tasks run...\n";
92
93 // Get results (blocks if not ready)
94 std::cout << "Result 1: " << result1.get() << "\n";
95 std::cout << "Result 2: " << result2.get() << "\n";
96}
97
98// ============ Condition Variable ============
99std::queue<int> data_queue;
100std::mutex queue_mutex;
101std::condition_variable data_cond;
102bool finished = false;
103
104void producer() {
105 for (int i = 0; i < 5; i++) {
106 std::this_thread::sleep_for(std::chrono::milliseconds(200));
107
108 {
109 std::lock_guard<std::mutex> lock(queue_mutex);
110 data_queue.push(i);
111 std::cout << "Produced: " << i << "\n";
112 }
113
114 data_cond.notify_one();
115 }
116
117 {
118 std::lock_guard<std::mutex> lock(queue_mutex);
119 finished = true;
120 }
121 data_cond.notify_all();
122}
123
124void consumer(int id) {
125 while (true) {
126 std::unique_lock<std::mutex> lock(queue_mutex);
127
128 // Wait for data or finish signal
129 data_cond.wait(lock, [] { return !data_queue.empty() || finished; });
130
131 if (!data_queue.empty()) {
132 int value = data_queue.front();
133 data_queue.pop();
134 lock.unlock();
135
136 std::cout << "Consumer " << id << " consumed: " << value << "\n";
137 std::this_thread::sleep_for(std::chrono::milliseconds(300));
138 } else if (finished) {
139 break;
140 }
141 }
142}
143
144void demo_condition_variable() {
145 std::cout << "\n=== Condition Variable (Producer-Consumer) ===\n";
146
147 // Clear state
148 while (!data_queue.empty()) data_queue.pop();
149 finished = false;
150
151 std::thread prod(producer);
152 std::thread cons1(consumer, 1);
153 std::thread cons2(consumer, 2);
154
155 prod.join();
156 cons1.join();
157 cons2.join();
158
159 std::cout << "Producer-consumer finished\n";
160}
161
162// ============ Thread-Safe Queue ============
163template<typename T>
164class ThreadSafeQueue {
165private:
166 std::queue<T> queue_;
167 mutable std::mutex mutex_;
168 std::condition_variable cond_;
169
170public:
171 void push(T value) {
172 std::lock_guard<std::mutex> lock(mutex_);
173 queue_.push(std::move(value));
174 cond_.notify_one();
175 }
176
177 bool try_pop(T& value) {
178 std::lock_guard<std::mutex> lock(mutex_);
179 if (queue_.empty()) {
180 return false;
181 }
182 value = std::move(queue_.front());
183 queue_.pop();
184 return true;
185 }
186
187 void wait_and_pop(T& value) {
188 std::unique_lock<std::mutex> lock(mutex_);
189 cond_.wait(lock, [this] { return !queue_.empty(); });
190 value = std::move(queue_.front());
191 queue_.pop();
192 }
193
194 bool empty() const {
195 std::lock_guard<std::mutex> lock(mutex_);
196 return queue_.empty();
197 }
198
199 size_t size() const {
200 std::lock_guard<std::mutex> lock(mutex_);
201 return queue_.size();
202 }
203};
204
205void demo_thread_safe_queue() {
206 std::cout << "\n=== Thread-Safe Queue ===\n";
207
208 ThreadSafeQueue<int> queue;
209
210 auto producer_task = [&queue]() {
211 for (int i = 0; i < 5; i++) {
212 queue.push(i);
213 std::cout << "Pushed: " << i << "\n";
214 std::this_thread::sleep_for(std::chrono::milliseconds(100));
215 }
216 };
217
218 auto consumer_task = [&queue](int id) {
219 for (int i = 0; i < 3; i++) {
220 int value;
221 queue.wait_and_pop(value);
222 std::cout << "Consumer " << id << " popped: " << value << "\n";
223 }
224 };
225
226 std::thread prod(producer_task);
227 std::thread cons1(consumer_task, 1);
228 std::thread cons2(consumer_task, 2);
229
230 prod.join();
231 cons1.join();
232 cons2.join();
233
234 std::cout << "Thread-safe queue demo finished\n";
235}
236
237// ============ std::jthread (C++20) ============
238void demo_jthread() {
239 std::cout << "\n=== std::jthread (C++20 - Auto-joining) ===\n";
240
241 {
242 std::jthread t([](std::stop_token stoken) {
243 int count = 0;
244 while (!stoken.stop_requested() && count < 5) {
245 std::cout << "jthread working... " << count << "\n";
246 std::this_thread::sleep_for(std::chrono::milliseconds(200));
247 count++;
248 }
249 std::cout << "jthread stopping\n";
250 });
251
252 std::this_thread::sleep_for(std::chrono::milliseconds(600));
253 std::cout << "Requesting stop...\n";
254 t.request_stop();
255
256 // No need to call join() - automatic on destruction
257 }
258
259 std::cout << "jthread automatically joined\n";
260}
261
262// ============ Parallel Accumulation ============
263template<typename Iterator, typename T>
264T parallel_accumulate(Iterator first, Iterator last, T init) {
265 unsigned long const length = std::distance(first, last);
266
267 if (length == 0) return init;
268
269 unsigned long const min_per_thread = 25;
270 unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
271 unsigned long const hardware_threads = std::thread::hardware_concurrency();
272 unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
273 unsigned long const block_size = length / num_threads;
274
275 std::vector<std::future<T>> futures(num_threads - 1);
276
277 Iterator block_start = first;
278 for (unsigned long i = 0; i < (num_threads - 1); ++i) {
279 Iterator block_end = block_start;
280 std::advance(block_end, block_size);
281
282 futures[i] = std::async(std::launch::async,
283 [block_start, block_end]() {
284 return std::accumulate(block_start, block_end, T{});
285 });
286
287 block_start = block_end;
288 }
289
290 T result = std::accumulate(block_start, last, init);
291
292 for (auto& f : futures) {
293 result += f.get();
294 }
295
296 return result;
297}
298
299void demo_parallel_accumulate() {
300 std::cout << "\n=== Parallel Accumulation ===\n";
301
302 std::vector<int> data(1000);
303 for (size_t i = 0; i < data.size(); i++) {
304 data[i] = i + 1;
305 }
306
307 auto start = std::chrono::high_resolution_clock::now();
308 int result = parallel_accumulate(data.begin(), data.end(), 0);
309 auto end = std::chrono::high_resolution_clock::now();
310
311 std::cout << "Sum: " << result << "\n";
312 std::cout << "Time: "
313 << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
314 << " μs\n";
315}
316
317// ============ Main ============
318int main() {
319 std::cout << "Multithreading and Concurrency Demo\n";
320 std::cout << "====================================\n";
321 std::cout << "Hardware concurrency: " << std::thread::hardware_concurrency() << "\n";
322
323 demo_basic_thread();
324 demo_mutex();
325 demo_async();
326 demo_condition_variable();
327 demo_thread_safe_queue();
328 demo_jthread();
329 demo_parallel_accumulate();
330
331 std::cout << "\nAll demos completed!\n";
332 return 0;
333}