04_threading.cpp

Download
cpp 334 lines 9.0 KB
  1/*
  2 * Multithreading and Concurrency Demo
  3 *
  4 * Demonstrates:
  5 * - std::thread basics
  6 * - std::mutex, std::lock_guard, std::unique_lock
  7 * - std::async and std::future
  8 * - std::condition_variable
  9 * - Thread-safe queue example
 10 * - std::jthread (C++20)
 11 *
 12 * Compile: g++ -std=c++20 -Wall -Wextra -pthread 04_threading.cpp -o threading
 13 */
 14
 15#include <iostream>
 16#include <thread>
 17#include <mutex>
 18#include <condition_variable>
 19#include <future>
 20#include <vector>
 21#include <queue>
 22#include <chrono>
 23#include <string>
 24#include <numeric>
 25
 26// ============ Basic Thread ============
 27void print_numbers(int id, int count) {
 28    for (int i = 0; i < count; i++) {
 29        std::cout << "Thread " << id << ": " << i << "\n";
 30        std::this_thread::sleep_for(std::chrono::milliseconds(100));
 31    }
 32}
 33
 34void demo_basic_thread() {
 35    std::cout << "\n=== Basic std::thread ===\n";
 36
 37    std::thread t1(print_numbers, 1, 3);
 38    std::thread t2(print_numbers, 2, 3);
 39
 40    // Must join or detach
 41    t1.join();
 42    t2.join();
 43
 44    std::cout << "All threads finished\n";
 45}
 46
 47// ============ Mutex and Lock Guard ============
 48std::mutex cout_mutex;
 49int shared_counter = 0;
 50
 51void increment_counter(int id, int iterations) {
 52    for (int i = 0; i < iterations; i++) {
 53        {
 54            std::lock_guard<std::mutex> lock(cout_mutex);
 55            shared_counter++;
 56            std::cout << "Thread " << id << " incremented counter to "
 57                      << shared_counter << "\n";
 58        }
 59        std::this_thread::sleep_for(std::chrono::milliseconds(50));
 60    }
 61}
 62
 63void demo_mutex() {
 64    std::cout << "\n=== Mutex and Lock Guard ===\n";
 65
 66    shared_counter = 0;
 67
 68    std::thread t1(increment_counter, 1, 5);
 69    std::thread t2(increment_counter, 2, 5);
 70
 71    t1.join();
 72    t2.join();
 73
 74    std::cout << "Final counter value: " << shared_counter << "\n";
 75}
 76
 77// ============ std::async and std::future ============
 78int compute_sum(int n) {
 79    std::cout << "Computing sum(1.." << n << ")...\n";
 80    std::this_thread::sleep_for(std::chrono::milliseconds(500));
 81    return n * (n + 1) / 2;
 82}
 83
 84void demo_async() {
 85    std::cout << "\n=== std::async and std::future ===\n";
 86
 87    // Launch async task
 88    std::future<int> result1 = std::async(std::launch::async, compute_sum, 100);
 89    std::future<int> result2 = std::async(std::launch::async, compute_sum, 200);
 90
 91    std::cout << "Doing other work while tasks run...\n";
 92
 93    // Get results (blocks if not ready)
 94    std::cout << "Result 1: " << result1.get() << "\n";
 95    std::cout << "Result 2: " << result2.get() << "\n";
 96}
 97
 98// ============ Condition Variable ============
 99std::queue<int> data_queue;
100std::mutex queue_mutex;
101std::condition_variable data_cond;
102bool finished = false;
103
104void producer() {
105    for (int i = 0; i < 5; i++) {
106        std::this_thread::sleep_for(std::chrono::milliseconds(200));
107
108        {
109            std::lock_guard<std::mutex> lock(queue_mutex);
110            data_queue.push(i);
111            std::cout << "Produced: " << i << "\n";
112        }
113
114        data_cond.notify_one();
115    }
116
117    {
118        std::lock_guard<std::mutex> lock(queue_mutex);
119        finished = true;
120    }
121    data_cond.notify_all();
122}
123
124void consumer(int id) {
125    while (true) {
126        std::unique_lock<std::mutex> lock(queue_mutex);
127
128        // Wait for data or finish signal
129        data_cond.wait(lock, [] { return !data_queue.empty() || finished; });
130
131        if (!data_queue.empty()) {
132            int value = data_queue.front();
133            data_queue.pop();
134            lock.unlock();
135
136            std::cout << "Consumer " << id << " consumed: " << value << "\n";
137            std::this_thread::sleep_for(std::chrono::milliseconds(300));
138        } else if (finished) {
139            break;
140        }
141    }
142}
143
144void demo_condition_variable() {
145    std::cout << "\n=== Condition Variable (Producer-Consumer) ===\n";
146
147    // Clear state
148    while (!data_queue.empty()) data_queue.pop();
149    finished = false;
150
151    std::thread prod(producer);
152    std::thread cons1(consumer, 1);
153    std::thread cons2(consumer, 2);
154
155    prod.join();
156    cons1.join();
157    cons2.join();
158
159    std::cout << "Producer-consumer finished\n";
160}
161
162// ============ Thread-Safe Queue ============
163template<typename T>
164class ThreadSafeQueue {
165private:
166    std::queue<T> queue_;
167    mutable std::mutex mutex_;
168    std::condition_variable cond_;
169
170public:
171    void push(T value) {
172        std::lock_guard<std::mutex> lock(mutex_);
173        queue_.push(std::move(value));
174        cond_.notify_one();
175    }
176
177    bool try_pop(T& value) {
178        std::lock_guard<std::mutex> lock(mutex_);
179        if (queue_.empty()) {
180            return false;
181        }
182        value = std::move(queue_.front());
183        queue_.pop();
184        return true;
185    }
186
187    void wait_and_pop(T& value) {
188        std::unique_lock<std::mutex> lock(mutex_);
189        cond_.wait(lock, [this] { return !queue_.empty(); });
190        value = std::move(queue_.front());
191        queue_.pop();
192    }
193
194    bool empty() const {
195        std::lock_guard<std::mutex> lock(mutex_);
196        return queue_.empty();
197    }
198
199    size_t size() const {
200        std::lock_guard<std::mutex> lock(mutex_);
201        return queue_.size();
202    }
203};
204
205void demo_thread_safe_queue() {
206    std::cout << "\n=== Thread-Safe Queue ===\n";
207
208    ThreadSafeQueue<int> queue;
209
210    auto producer_task = [&queue]() {
211        for (int i = 0; i < 5; i++) {
212            queue.push(i);
213            std::cout << "Pushed: " << i << "\n";
214            std::this_thread::sleep_for(std::chrono::milliseconds(100));
215        }
216    };
217
218    auto consumer_task = [&queue](int id) {
219        for (int i = 0; i < 3; i++) {
220            int value;
221            queue.wait_and_pop(value);
222            std::cout << "Consumer " << id << " popped: " << value << "\n";
223        }
224    };
225
226    std::thread prod(producer_task);
227    std::thread cons1(consumer_task, 1);
228    std::thread cons2(consumer_task, 2);
229
230    prod.join();
231    cons1.join();
232    cons2.join();
233
234    std::cout << "Thread-safe queue demo finished\n";
235}
236
237// ============ std::jthread (C++20) ============
238void demo_jthread() {
239    std::cout << "\n=== std::jthread (C++20 - Auto-joining) ===\n";
240
241    {
242        std::jthread t([](std::stop_token stoken) {
243            int count = 0;
244            while (!stoken.stop_requested() && count < 5) {
245                std::cout << "jthread working... " << count << "\n";
246                std::this_thread::sleep_for(std::chrono::milliseconds(200));
247                count++;
248            }
249            std::cout << "jthread stopping\n";
250        });
251
252        std::this_thread::sleep_for(std::chrono::milliseconds(600));
253        std::cout << "Requesting stop...\n";
254        t.request_stop();
255
256        // No need to call join() - automatic on destruction
257    }
258
259    std::cout << "jthread automatically joined\n";
260}
261
262// ============ Parallel Accumulation ============
263template<typename Iterator, typename T>
264T parallel_accumulate(Iterator first, Iterator last, T init) {
265    unsigned long const length = std::distance(first, last);
266
267    if (length == 0) return init;
268
269    unsigned long const min_per_thread = 25;
270    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
271    unsigned long const hardware_threads = std::thread::hardware_concurrency();
272    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
273    unsigned long const block_size = length / num_threads;
274
275    std::vector<std::future<T>> futures(num_threads - 1);
276
277    Iterator block_start = first;
278    for (unsigned long i = 0; i < (num_threads - 1); ++i) {
279        Iterator block_end = block_start;
280        std::advance(block_end, block_size);
281
282        futures[i] = std::async(std::launch::async,
283                                [block_start, block_end]() {
284                                    return std::accumulate(block_start, block_end, T{});
285                                });
286
287        block_start = block_end;
288    }
289
290    T result = std::accumulate(block_start, last, init);
291
292    for (auto& f : futures) {
293        result += f.get();
294    }
295
296    return result;
297}
298
299void demo_parallel_accumulate() {
300    std::cout << "\n=== Parallel Accumulation ===\n";
301
302    std::vector<int> data(1000);
303    for (size_t i = 0; i < data.size(); i++) {
304        data[i] = i + 1;
305    }
306
307    auto start = std::chrono::high_resolution_clock::now();
308    int result = parallel_accumulate(data.begin(), data.end(), 0);
309    auto end = std::chrono::high_resolution_clock::now();
310
311    std::cout << "Sum: " << result << "\n";
312    std::cout << "Time: "
313              << std::chrono::duration_cast<std::chrono::microseconds>(end - start).count()
314              << " μs\n";
315}
316
317// ============ Main ============
318int main() {
319    std::cout << "Multithreading and Concurrency Demo\n";
320    std::cout << "====================================\n";
321    std::cout << "Hardware concurrency: " << std::thread::hardware_concurrency() << "\n";
322
323    demo_basic_thread();
324    demo_mutex();
325    demo_async();
326    demo_condition_variable();
327    demo_thread_safe_queue();
328    demo_jthread();
329    demo_parallel_accumulate();
330
331    std::cout << "\nAll demos completed!\n";
332    return 0;
333}