| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "thread_pool.h" | ||
| 2 | #include <pthread.h> | ||
| 3 | #include <stdlib.h> | ||
| 4 | #include <stdio.h> | ||
| 5 | #include <time.h> | ||
| 6 | #include <errno.h> | ||
| 7 | |||
| 8 | #define TASK_QUEUE_INITIAL_CAPACITY 256 | ||
| 9 | #define THREAD_IDLE_TIMEOUT 5 // Idle timeout in seconds | ||
| 10 | |||
| 11 | typedef struct { | ||
| 12 | Task task; | ||
| 13 | char padding[64 - sizeof(Task)]; | ||
| 14 | } CacheAlignedTask; | ||
| 15 | |||
| 16 | typedef struct { | ||
| 17 | CacheAlignedTask *tasks; | ||
| 18 | size_t capacity; | ||
| 19 | volatile size_t count; | ||
| 20 | volatile size_t front; | ||
| 21 | volatile size_t rear; | ||
| 22 | char padding[64]; | ||
| 23 | } TaskQueue; | ||
| 24 | |||
| 25 | typedef struct { | ||
| 26 | struct ThreadPool *pool; | ||
| 27 | size_t index; | ||
| 28 | } WorkerArg; | ||
| 29 | |||
| 30 | // The ThreadPool structure. | ||
| 31 | struct ThreadPool { | ||
| 32 | pthread_mutex_t lock; // Mutex to protect the pool and queue. | ||
| 33 | pthread_cond_t cond; // Condition variable to signal new tasks. | ||
| 34 | TaskQueue queue; // Task queue. | ||
| 35 | pthread_t *threads; // Array of worker thread IDs. | ||
| 36 | WorkerArg *worker_args; // Worker thread arguments. | ||
| 37 | size_t num_threads; // Current number of worker threads. | ||
| 38 | size_t threads_created; // Total threads created (upper bound for joins). | ||
| 39 | size_t min_threads; // Minimum number of worker threads. | ||
| 40 | size_t max_threads; // Maximum number of worker threads. | ||
| 41 | size_t *free_indices; // Stack of reusable thread slots. | ||
| 42 | size_t free_count; // Number of reusable slots. | ||
| 43 | bool shutdown; // Flag to signal shutdown. | ||
| 44 | }; | ||
| 45 | |||
| 46 | // Initializes the task queue. | ||
| 47 | 11 | static bool task_queue_init(TaskQueue *queue) { | |
| 48 | 11 | queue->capacity = TASK_QUEUE_INITIAL_CAPACITY; | |
| 49 | 11 | queue->tasks = malloc(queue->capacity * sizeof(CacheAlignedTask)); | |
| 50 | 11 | queue->count = 0; | |
| 51 | 11 | queue->front = 0; | |
| 52 | 11 | queue->rear = 0; | |
| 53 | 11 | return queue->tasks != NULL; | |
| 54 | } | ||
| 55 | |||
| 56 | // Destroys the task queue. | ||
| 57 | 11 | static void task_queue_destroy(TaskQueue *queue) { | |
| 58 | 11 | free(queue->tasks); | |
| 59 | 11 | } | |
| 60 | |||
| 61 | // Pushes a task onto the task queue; returns true on success. | ||
| 62 | 20 | static bool task_queue_push(TaskQueue *queue, Task task) { | |
| 63 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (queue->count == queue->capacity) { |
| 64 | ✗ | size_t new_capacity = queue->capacity * 2; | |
| 65 | ✗ | CacheAlignedTask *new_tasks = malloc(new_capacity * sizeof(CacheAlignedTask)); | |
| 66 | ✗ | if (!new_tasks) return false; | |
| 67 | |||
| 68 | ✗ | if (queue->front > 0) { | |
| 69 | ✗ | for (size_t i = 0; i < queue->count; i++) { | |
| 70 | ✗ | new_tasks[i].task = queue->tasks[(queue->front + i) % queue->capacity].task; | |
| 71 | } | ||
| 72 | ✗ | queue->front = 0; | |
| 73 | ✗ | queue->rear = queue->count; | |
| 74 | } | ||
| 75 | ✗ | free(queue->tasks); | |
| 76 | ✗ | queue->tasks = new_tasks; | |
| 77 | ✗ | queue->capacity = new_capacity; | |
| 78 | } | ||
| 79 | |||
| 80 | 20 | queue->tasks[queue->rear].task = task; | |
| 81 | 20 | queue->rear = (queue->rear + 1) % queue->capacity; | |
| 82 | 20 | queue->count++; | |
| 83 | 20 | return true; | |
| 84 | } | ||
| 85 | |||
| 86 | // Pops a task from the task queue; returns true if a task was retrieved. | ||
| 87 | 20 | static bool task_queue_pop(TaskQueue *queue, Task *task) { | |
| 88 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (queue->count == 0) return false; |
| 89 | |||
| 90 | 20 | *task = queue->tasks[queue->front].task; | |
| 91 | 20 | queue->front = (queue->front + 1) % queue->capacity; | |
| 92 | 20 | queue->count--; | |
| 93 | 20 | return true; | |
| 94 | } | ||
| 95 | |||
| 96 | 262 | static void mark_thread_exit(ThreadPool *pool, size_t index) { | |
| 97 |
1/2✓ Branch 0 taken 262 times.
✗ Branch 1 not taken.
|
262 | if (pool->free_count < pool->max_threads) { |
| 98 | 262 | pool->free_indices[pool->free_count++] = index; | |
| 99 | } | ||
| 100 |
1/2✓ Branch 0 taken 262 times.
✗ Branch 1 not taken.
|
262 | if (pool->num_threads > 0) { |
| 101 | 262 | pool->num_threads--; | |
| 102 | } | ||
| 103 | 262 | } | |
| 104 | |||
| 105 | // Worker thread function. Each thread waits for tasks and executes them. | ||
| 106 | 262 | static void *worker_thread(void *arg) { | |
| 107 | 262 | WorkerArg *warg = (WorkerArg *)arg; | |
| 108 | 262 | ThreadPool *pool = warg->pool; | |
| 109 | 262 | size_t index = warg->index; | |
| 110 | Task task; | ||
| 111 | 20 | while (1) { | |
| 112 | 282 | pthread_mutex_lock(&pool->lock); | |
| 113 | // Wait for a task if the queue is empty. | ||
| 114 |
4/4✓ Branch 0 taken 533 times.
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 271 times.
✓ Branch 3 taken 262 times.
|
553 | while (pool->queue.count == 0 && !pool->shutdown) { |
| 115 | // Wait with a timeout to allow dynamic thread reduction. | ||
| 116 | struct timespec ts; | ||
| 117 | 271 | clock_gettime(CLOCK_REALTIME, &ts); | |
| 118 | 271 | ts.tv_sec += THREAD_IDLE_TIMEOUT; | |
| 119 | 271 | int ret = pthread_cond_timedwait(&pool->cond, &pool->lock, &ts); | |
| 120 | // If timed out and still no task, consider exiting. | ||
| 121 |
1/4✗ Branch 0 not taken.
✓ Branch 1 taken 271 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
|
271 | if (ret == ETIMEDOUT && pool->queue.count == 0) { |
| 122 | ✗ | if (pool->num_threads > pool->min_threads) { | |
| 123 | ✗ | mark_thread_exit(pool, index); | |
| 124 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 125 | ✗ | pthread_exit(NULL); | |
| 126 | } | ||
| 127 | } | ||
| 128 | } | ||
| 129 | // If shutdown has been signaled, exit the thread. | ||
| 130 |
2/2✓ Branch 0 taken 262 times.
✓ Branch 1 taken 20 times.
|
282 | if (pool->shutdown) { |
| 131 | 262 | mark_thread_exit(pool, index); | |
| 132 | 262 | pthread_mutex_unlock(&pool->lock); | |
| 133 | 262 | pthread_exit(NULL); | |
| 134 | } | ||
| 135 | // Retrieve the next task from the queue. | ||
| 136 | 20 | bool has_task = task_queue_pop(&pool->queue, &task); | |
| 137 | 20 | pthread_mutex_unlock(&pool->lock); | |
| 138 |
1/2✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
|
20 | if (has_task) { |
| 139 | // Execute the task. | ||
| 140 | 20 | task.function(task.arg); | |
| 141 | } | ||
| 142 | } | ||
| 143 | return NULL; | ||
| 144 | } | ||
| 145 | |||
| 146 | // Creates a new thread pool with dynamic resizing capabilities. | ||
| 147 | 11 | ThreadPool *thread_pool_create(size_t min_threads, size_t max_threads) { | |
| 148 | 11 | ThreadPool *pool = malloc(sizeof(ThreadPool)); | |
| 149 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
|
11 | if (!pool) |
| 150 | ✗ | return NULL; | |
| 151 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
|
11 | if (max_threads == 0) { |
| 152 | ✗ | free(pool); | |
| 153 | ✗ | return NULL; | |
| 154 | } | ||
| 155 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
|
11 | if (min_threads == 0) |
| 156 | ✗ | min_threads = 1; | |
| 157 |
2/2✓ Branch 0 taken 1 times.
✓ Branch 1 taken 10 times.
|
11 | if (min_threads > max_threads) |
| 158 | 1 | min_threads = max_threads; | |
| 159 | 11 | pool->min_threads = min_threads; | |
| 160 | 11 | pool->max_threads = max_threads; | |
| 161 | 11 | pool->num_threads = min_threads; | |
| 162 | 11 | pool->threads_created = 0; | |
| 163 | 11 | pool->shutdown = false; | |
| 164 | 11 | pthread_mutex_init(&pool->lock, NULL); | |
| 165 | 11 | pthread_cond_init(&pool->cond, NULL); | |
| 166 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
|
11 | if (!task_queue_init(&pool->queue)) { |
| 167 | ✗ | pthread_mutex_destroy(&pool->lock); | |
| 168 | ✗ | pthread_cond_destroy(&pool->cond); | |
| 169 | ✗ | free(pool); | |
| 170 | ✗ | return NULL; | |
| 171 | } | ||
| 172 | 11 | pool->threads = malloc(max_threads * sizeof(pthread_t)); | |
| 173 | 11 | pool->worker_args = malloc(max_threads * sizeof(WorkerArg)); | |
| 174 | 11 | pool->free_indices = malloc(max_threads * sizeof(size_t)); | |
| 175 | 11 | pool->free_count = 0; | |
| 176 |
3/6✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
|
11 | if (!pool->threads || !pool->worker_args || !pool->free_indices) { |
| 177 | ✗ | free(pool->threads); | |
| 178 | ✗ | free(pool->worker_args); | |
| 179 | ✗ | free(pool->free_indices); | |
| 180 | ✗ | task_queue_destroy(&pool->queue); | |
| 181 | ✗ | pthread_mutex_destroy(&pool->lock); | |
| 182 | ✗ | pthread_cond_destroy(&pool->cond); | |
| 183 | ✗ | free(pool); | |
| 184 | ✗ | return NULL; | |
| 185 | } | ||
| 186 | // Create the initial minimum number of worker threads. | ||
| 187 |
2/2✓ Branch 0 taken 260 times.
✓ Branch 1 taken 11 times.
|
271 | for (size_t i = 0; i < min_threads; i++) { |
| 188 | 260 | pool->worker_args[i].pool = pool; | |
| 189 | 260 | pool->worker_args[i].index = i; | |
| 190 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 260 times.
|
260 | if (pthread_create(&pool->threads[i], NULL, worker_thread, &pool->worker_args[i]) != 0) { |
| 191 | ✗ | pool->shutdown = true; | |
| 192 | ✗ | pthread_cond_broadcast(&pool->cond); | |
| 193 | ✗ | for (size_t j = 0; j < pool->threads_created; j++) | |
| 194 | ✗ | pthread_join(pool->threads[j], NULL); | |
| 195 | ✗ | free(pool->threads); | |
| 196 | ✗ | free(pool->worker_args); | |
| 197 | ✗ | free(pool->free_indices); | |
| 198 | ✗ | task_queue_destroy(&pool->queue); | |
| 199 | ✗ | pthread_mutex_destroy(&pool->lock); | |
| 200 | ✗ | pthread_cond_destroy(&pool->cond); | |
| 201 | ✗ | free(pool); | |
| 202 | ✗ | return NULL; | |
| 203 | } | ||
| 204 | 260 | pool->threads_created++; | |
| 205 | } | ||
| 206 | 11 | return pool; | |
| 207 | } | ||
| 208 | |||
| 209 | // Adds a task to the thread pool. If the task queue length exceeds the current number | ||
| 210 | // of threads and we have not reached max_threads, a new thread is spawned. | ||
| 211 | 20 | bool thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *arg) { | |
| 212 | 20 | pthread_mutex_lock(&pool->lock); | |
| 213 | |||
| 214 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
|
20 | if (pool->shutdown) { |
| 215 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 216 | ✗ | return false; | |
| 217 | } | ||
| 218 | |||
| 219 | 20 | Task task = {function, arg}; | |
| 220 |
1/2✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
|
20 | if (!task_queue_push(&pool->queue, task)) { |
| 221 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 222 | ✗ | return false; | |
| 223 | } | ||
| 224 | |||
| 225 |
4/4✓ Branch 0 taken 8 times.
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 6 times.
|
20 | if (pool->queue.count > pool->num_threads && pool->num_threads < pool->max_threads) { |
| 226 | size_t index; | ||
| 227 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (pool->free_count > 0) { |
| 228 | ✗ | index = pool->free_indices[--pool->free_count]; | |
| 229 | } else { | ||
| 230 |
1/2✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
|
2 | if (pool->threads_created >= pool->max_threads) { |
| 231 | ✗ | index = pool->max_threads; | |
| 232 | } else { | ||
| 233 | 2 | index = pool->threads_created++; | |
| 234 | } | ||
| 235 | } | ||
| 236 |
1/2✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
|
2 | if (index < pool->max_threads) { |
| 237 | 2 | pool->worker_args[index].pool = pool; | |
| 238 | 2 | pool->worker_args[index].index = index; | |
| 239 |
1/2✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
|
2 | if (pthread_create(&pool->threads[index], NULL, worker_thread, &pool->worker_args[index]) == 0) { |
| 240 | 2 | pool->num_threads++; | |
| 241 | } | ||
| 242 | } | ||
| 243 | } | ||
| 244 | |||
| 245 | 20 | pthread_cond_signal(&pool->cond); | |
| 246 | 20 | pthread_mutex_unlock(&pool->lock); | |
| 247 | 20 | return true; | |
| 248 | } | ||
| 249 | |||
| 250 | // Destroys the thread pool by signaling shutdown and joining all threads. | ||
| 251 | 11 | void thread_pool_destroy(ThreadPool *pool) { | |
| 252 | 11 | pthread_mutex_lock(&pool->lock); | |
| 253 | 11 | pool->shutdown = true; | |
| 254 | 11 | pthread_cond_broadcast(&pool->cond); | |
| 255 | 11 | pthread_mutex_unlock(&pool->lock); | |
| 256 | // Join all threads that were created. | ||
| 257 |
2/2✓ Branch 0 taken 262 times.
✓ Branch 1 taken 11 times.
|
273 | for (size_t i = 0; i < pool->threads_created; i++) { |
| 258 | 262 | pthread_join(pool->threads[i], NULL); | |
| 259 | } | ||
| 260 | 11 | free(pool->threads); | |
| 261 | 11 | free(pool->worker_args); | |
| 262 | 11 | free(pool->free_indices); | |
| 263 | 11 | task_queue_destroy(&pool->queue); | |
| 264 | 11 | pthread_mutex_destroy(&pool->lock); | |
| 265 | 11 | pthread_cond_destroy(&pool->cond); | |
| 266 | 11 | free(pool); | |
| 267 | 11 | } | |
| 268 |