thread_pool.c

Download
c 199 lines 4.3 KB
  1// thread_pool.c
  2// μŠ€λ ˆλ“œ ν’€ (Thread Pool) κ΅¬ν˜„
  3#include <stdio.h>
  4#include <stdlib.h>
  5#include <pthread.h>
  6#include <stdbool.h>
  7#include <unistd.h>
  8#include <time.h>
  9
 10#define POOL_SIZE 4
 11#define QUEUE_SIZE 100
 12
 13// μž‘μ—… μ •μ˜
 14typedef struct Task {
 15    void (*function)(void* arg);
 16    void* arg;
 17} Task;
 18
 19// μž‘μ—… 큐
 20typedef struct {
 21    Task tasks[QUEUE_SIZE];
 22    int front;
 23    int rear;
 24    int count;
 25
 26    pthread_mutex_t mutex;
 27    pthread_cond_t not_empty;
 28    pthread_cond_t not_full;
 29
 30    bool shutdown;
 31} TaskQueue;
 32
 33// μŠ€λ ˆλ“œ ν’€
 34typedef struct {
 35    pthread_t threads[POOL_SIZE];
 36    TaskQueue queue;
 37    int thread_count;
 38} ThreadPool;
 39
 40// μž‘μ—… 큐 μ΄ˆκΈ°ν™”
 41void queue_init(TaskQueue* q) {
 42    q->front = 0;
 43    q->rear = 0;
 44    q->count = 0;
 45    q->shutdown = false;
 46
 47    pthread_mutex_init(&q->mutex, NULL);
 48    pthread_cond_init(&q->not_empty, NULL);
 49    pthread_cond_init(&q->not_full, NULL);
 50}
 51
 52// μž‘μ—… 큐 정리
 53void queue_destroy(TaskQueue* q) {
 54    pthread_mutex_destroy(&q->mutex);
 55    pthread_cond_destroy(&q->not_empty);
 56    pthread_cond_destroy(&q->not_full);
 57}
 58
 59// μž‘μ—… μΆ”κ°€
 60bool queue_push(TaskQueue* q, Task task) {
 61    pthread_mutex_lock(&q->mutex);
 62
 63    while (q->count == QUEUE_SIZE && !q->shutdown) {
 64        pthread_cond_wait(&q->not_full, &q->mutex);
 65    }
 66
 67    if (q->shutdown) {
 68        pthread_mutex_unlock(&q->mutex);
 69        return false;
 70    }
 71
 72    q->tasks[q->rear] = task;
 73    q->rear = (q->rear + 1) % QUEUE_SIZE;
 74    q->count++;
 75
 76    pthread_cond_signal(&q->not_empty);
 77    pthread_mutex_unlock(&q->mutex);
 78
 79    return true;
 80}
 81
 82// μž‘μ—… κ°€μ Έμ˜€κΈ°
 83bool queue_pop(TaskQueue* q, Task* task) {
 84    pthread_mutex_lock(&q->mutex);
 85
 86    while (q->count == 0 && !q->shutdown) {
 87        pthread_cond_wait(&q->not_empty, &q->mutex);
 88    }
 89
 90    if (q->count == 0 && q->shutdown) {
 91        pthread_mutex_unlock(&q->mutex);
 92        return false;
 93    }
 94
 95    *task = q->tasks[q->front];
 96    q->front = (q->front + 1) % QUEUE_SIZE;
 97    q->count--;
 98
 99    pthread_cond_signal(&q->not_full);
100    pthread_mutex_unlock(&q->mutex);
101
102    return true;
103}
104
105// μ›Œμ»€ μŠ€λ ˆλ“œ ν•¨μˆ˜
106void* worker_thread(void* arg) {
107    ThreadPool* pool = (ThreadPool*)arg;
108    Task task;
109
110    printf("[μ›Œμ»€] μŠ€λ ˆλ“œ μ‹œμž‘ (TID: %lu)\n", pthread_self());
111
112    while (queue_pop(&pool->queue, &task)) {
113        printf("[μ›Œμ»€ %lu] μž‘μ—… μ‹€ν–‰\n", pthread_self());
114        task.function(task.arg);
115    }
116
117    printf("[μ›Œμ»€ %lu] μŠ€λ ˆλ“œ μ’…λ£Œ\n", pthread_self());
118    return NULL;
119}
120
121// μŠ€λ ˆλ“œ ν’€ 생성
122ThreadPool* pool_create(int size) {
123    ThreadPool* pool = malloc(sizeof(ThreadPool));
124    pool->thread_count = size;
125
126    queue_init(&pool->queue);
127
128    for (int i = 0; i < size; i++) {
129        pthread_create(&pool->threads[i], NULL, worker_thread, pool);
130    }
131
132    return pool;
133}
134
135// μž‘μ—… 제좜
136bool pool_submit(ThreadPool* pool, void (*function)(void*), void* arg) {
137    Task task = { .function = function, .arg = arg };
138    return queue_push(&pool->queue, task);
139}
140
141// μŠ€λ ˆλ“œ ν’€ μ’…λ£Œ
142void pool_shutdown(ThreadPool* pool) {
143    pthread_mutex_lock(&pool->queue.mutex);
144    pool->queue.shutdown = true;
145    pthread_cond_broadcast(&pool->queue.not_empty);
146    pthread_mutex_unlock(&pool->queue.mutex);
147
148    for (int i = 0; i < pool->thread_count; i++) {
149        pthread_join(pool->threads[i], NULL);
150    }
151
152    queue_destroy(&pool->queue);
153    free(pool);
154}
155
156// ============ ν…ŒμŠ€νŠΈ ============
157
158typedef struct {
159    int id;
160    int value;
161} WorkItem;
162
163void process_work(void* arg) {
164    WorkItem* item = (WorkItem*)arg;
165
166    printf("μž‘μ—… %d 처리 쀑 (κ°’: %d)...\n", item->id, item->value);
167    usleep((rand() % 500 + 100) * 1000);  // 100~600ms 처리
168    printf("μž‘μ—… %d μ™„λ£Œ!\n", item->id);
169
170    free(item);
171}
172
173int main(void) {
174    srand(time(NULL));
175
176    printf("μŠ€λ ˆλ“œ ν’€ 생성 (크기: %d)\n\n", POOL_SIZE);
177    ThreadPool* pool = pool_create(POOL_SIZE);
178
179    // μž‘μ—… 제좜
180    for (int i = 0; i < 10; i++) {
181        WorkItem* item = malloc(sizeof(WorkItem));
182        item->id = i;
183        item->value = rand() % 100;
184
185        printf("μž‘μ—… %d 제좜 (κ°’: %d)\n", i, item->value);
186        pool_submit(pool, process_work, item);
187
188        usleep(100000);  // 100ms 간격
189    }
190
191    printf("\nλͺ¨λ“  μž‘μ—… 제좜 μ™„λ£Œ. ν’€ μ’…λ£Œ λŒ€κΈ°...\n\n");
192    sleep(2);  // μž‘μ—… 처리 λŒ€κΈ°
193
194    pool_shutdown(pool);
195    printf("\nν”„λ‘œκ·Έλž¨ μ’…λ£Œ\n");
196
197    return 0;
198}