emme coverage


Directory: src/
File: src/thread_pool.c
Date: 2026-03-27 20:24:50
Exec Total Coverage
Lines: 105 154 68.2%
Functions: 9 9 100.0%
Branches: 36 68 52.9%

Line Branch Exec Source
1 #include "thread_pool.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <time.h>
6 #include <errno.h>
7
8 #define TASK_QUEUE_INITIAL_CAPACITY 256
9 #define THREAD_IDLE_TIMEOUT 5 // Idle timeout in seconds
10
11 typedef struct {
12 Task task;
13 char padding[64 - sizeof(Task)];
14 } CacheAlignedTask;
15
16 typedef struct {
17 CacheAlignedTask *tasks;
18 size_t capacity;
19 volatile size_t count;
20 volatile size_t front;
21 volatile size_t rear;
22 char padding[64];
23 } TaskQueue;
24
25 typedef struct {
26 struct ThreadPool *pool;
27 size_t index;
28 } WorkerArg;
29
30 // The ThreadPool structure.
31 struct ThreadPool {
32 pthread_mutex_t lock; // Mutex to protect the pool and queue.
33 pthread_cond_t cond; // Condition variable to signal new tasks.
34 TaskQueue queue; // Task queue.
35 pthread_t *threads; // Array of worker thread IDs.
36 WorkerArg *worker_args; // Worker thread arguments.
37 size_t num_threads; // Current number of worker threads.
38 size_t threads_created; // Total threads created (upper bound for joins).
39 size_t min_threads; // Minimum number of worker threads.
40 size_t max_threads; // Maximum number of worker threads.
41 size_t *free_indices; // Stack of reusable thread slots.
42 size_t free_count; // Number of reusable slots.
43 bool shutdown; // Flag to signal shutdown.
44 };
45
46 // Initializes the task queue.
47 11 static bool task_queue_init(TaskQueue *queue) {
48 11 queue->capacity = TASK_QUEUE_INITIAL_CAPACITY;
49 11 queue->tasks = malloc(queue->capacity * sizeof(CacheAlignedTask));
50 11 queue->count = 0;
51 11 queue->front = 0;
52 11 queue->rear = 0;
53 11 return queue->tasks != NULL;
54 }
55
56 // Destroys the task queue.
57 11 static void task_queue_destroy(TaskQueue *queue) {
58 11 free(queue->tasks);
59 11 }
60
61 // Pushes a task onto the task queue; returns true on success.
62 20 static bool task_queue_push(TaskQueue *queue, Task task) {
63
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 if (queue->count == queue->capacity) {
64 size_t new_capacity = queue->capacity * 2;
65 CacheAlignedTask *new_tasks = malloc(new_capacity * sizeof(CacheAlignedTask));
66 if (!new_tasks) return false;
67
68 if (queue->front > 0) {
69 for (size_t i = 0; i < queue->count; i++) {
70 new_tasks[i].task = queue->tasks[(queue->front + i) % queue->capacity].task;
71 }
72 queue->front = 0;
73 queue->rear = queue->count;
74 }
75 free(queue->tasks);
76 queue->tasks = new_tasks;
77 queue->capacity = new_capacity;
78 }
79
80 20 queue->tasks[queue->rear].task = task;
81 20 queue->rear = (queue->rear + 1) % queue->capacity;
82 20 queue->count++;
83 20 return true;
84 }
85
86 // Pops a task from the task queue; returns true if a task was retrieved.
87 20 static bool task_queue_pop(TaskQueue *queue, Task *task) {
88
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 if (queue->count == 0) return false;
89
90 20 *task = queue->tasks[queue->front].task;
91 20 queue->front = (queue->front + 1) % queue->capacity;
92 20 queue->count--;
93 20 return true;
94 }
95
96 262 static void mark_thread_exit(ThreadPool *pool, size_t index) {
97
1/2
✓ Branch 0 taken 262 times.
✗ Branch 1 not taken.
262 if (pool->free_count < pool->max_threads) {
98 262 pool->free_indices[pool->free_count++] = index;
99 }
100
1/2
✓ Branch 0 taken 262 times.
✗ Branch 1 not taken.
262 if (pool->num_threads > 0) {
101 262 pool->num_threads--;
102 }
103 262 }
104
105 // Worker thread function. Each thread waits for tasks and executes them.
106 262 static void *worker_thread(void *arg) {
107 262 WorkerArg *warg = (WorkerArg *)arg;
108 262 ThreadPool *pool = warg->pool;
109 262 size_t index = warg->index;
110 Task task;
111 20 while (1) {
112 282 pthread_mutex_lock(&pool->lock);
113 // Wait for a task if the queue is empty.
114
4/4
✓ Branch 0 taken 533 times.
✓ Branch 1 taken 20 times.
✓ Branch 2 taken 271 times.
✓ Branch 3 taken 262 times.
553 while (pool->queue.count == 0 && !pool->shutdown) {
115 // Wait with a timeout to allow dynamic thread reduction.
116 struct timespec ts;
117 271 clock_gettime(CLOCK_REALTIME, &ts);
118 271 ts.tv_sec += THREAD_IDLE_TIMEOUT;
119 271 int ret = pthread_cond_timedwait(&pool->cond, &pool->lock, &ts);
120 // If timed out and still no task, consider exiting.
121
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 271 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
271 if (ret == ETIMEDOUT && pool->queue.count == 0) {
122 if (pool->num_threads > pool->min_threads) {
123 mark_thread_exit(pool, index);
124 pthread_mutex_unlock(&pool->lock);
125 pthread_exit(NULL);
126 }
127 }
128 }
129 // If shutdown has been signaled, exit the thread.
130
2/2
✓ Branch 0 taken 262 times.
✓ Branch 1 taken 20 times.
282 if (pool->shutdown) {
131 262 mark_thread_exit(pool, index);
132 262 pthread_mutex_unlock(&pool->lock);
133 262 pthread_exit(NULL);
134 }
135 // Retrieve the next task from the queue.
136 20 bool has_task = task_queue_pop(&pool->queue, &task);
137 20 pthread_mutex_unlock(&pool->lock);
138
1/2
✓ Branch 0 taken 20 times.
✗ Branch 1 not taken.
20 if (has_task) {
139 // Execute the task.
140 20 task.function(task.arg);
141 }
142 }
143 return NULL;
144 }
145
146 // Creates a new thread pool with dynamic resizing capabilities.
147 11 ThreadPool *thread_pool_create(size_t min_threads, size_t max_threads) {
148 11 ThreadPool *pool = malloc(sizeof(ThreadPool));
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
11 if (!pool)
150 return NULL;
151
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
11 if (max_threads == 0) {
152 free(pool);
153 return NULL;
154 }
155
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 11 times.
11 if (min_threads == 0)
156 min_threads = 1;
157
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 10 times.
11 if (min_threads > max_threads)
158 1 min_threads = max_threads;
159 11 pool->min_threads = min_threads;
160 11 pool->max_threads = max_threads;
161 11 pool->num_threads = min_threads;
162 11 pool->threads_created = 0;
163 11 pool->shutdown = false;
164 11 pthread_mutex_init(&pool->lock, NULL);
165 11 pthread_cond_init(&pool->cond, NULL);
166
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
11 if (!task_queue_init(&pool->queue)) {
167 pthread_mutex_destroy(&pool->lock);
168 pthread_cond_destroy(&pool->cond);
169 free(pool);
170 return NULL;
171 }
172 11 pool->threads = malloc(max_threads * sizeof(pthread_t));
173 11 pool->worker_args = malloc(max_threads * sizeof(WorkerArg));
174 11 pool->free_indices = malloc(max_threads * sizeof(size_t));
175 11 pool->free_count = 0;
176
3/6
✓ Branch 0 taken 11 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 11 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 11 times.
11 if (!pool->threads || !pool->worker_args || !pool->free_indices) {
177 free(pool->threads);
178 free(pool->worker_args);
179 free(pool->free_indices);
180 task_queue_destroy(&pool->queue);
181 pthread_mutex_destroy(&pool->lock);
182 pthread_cond_destroy(&pool->cond);
183 free(pool);
184 return NULL;
185 }
186 // Create the initial minimum number of worker threads.
187
2/2
✓ Branch 0 taken 260 times.
✓ Branch 1 taken 11 times.
271 for (size_t i = 0; i < min_threads; i++) {
188 260 pool->worker_args[i].pool = pool;
189 260 pool->worker_args[i].index = i;
190
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 260 times.
260 if (pthread_create(&pool->threads[i], NULL, worker_thread, &pool->worker_args[i]) != 0) {
191 pool->shutdown = true;
192 pthread_cond_broadcast(&pool->cond);
193 for (size_t j = 0; j < pool->threads_created; j++)
194 pthread_join(pool->threads[j], NULL);
195 free(pool->threads);
196 free(pool->worker_args);
197 free(pool->free_indices);
198 task_queue_destroy(&pool->queue);
199 pthread_mutex_destroy(&pool->lock);
200 pthread_cond_destroy(&pool->cond);
201 free(pool);
202 return NULL;
203 }
204 260 pool->threads_created++;
205 }
206 11 return pool;
207 }
208
209 // Adds a task to the thread pool. If the task queue length exceeds the current number
210 // of threads and we have not reached max_threads, a new thread is spawned.
211 20 bool thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *arg) {
212 20 pthread_mutex_lock(&pool->lock);
213
214
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 20 times.
20 if (pool->shutdown) {
215 pthread_mutex_unlock(&pool->lock);
216 return false;
217 }
218
219 20 Task task = {function, arg};
220
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 20 times.
20 if (!task_queue_push(&pool->queue, task)) {
221 pthread_mutex_unlock(&pool->lock);
222 return false;
223 }
224
225
4/4
✓ Branch 0 taken 8 times.
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 6 times.
20 if (pool->queue.count > pool->num_threads && pool->num_threads < pool->max_threads) {
226 size_t index;
227
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (pool->free_count > 0) {
228 index = pool->free_indices[--pool->free_count];
229 } else {
230
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (pool->threads_created >= pool->max_threads) {
231 index = pool->max_threads;
232 } else {
233 2 index = pool->threads_created++;
234 }
235 }
236
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (index < pool->max_threads) {
237 2 pool->worker_args[index].pool = pool;
238 2 pool->worker_args[index].index = index;
239
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (pthread_create(&pool->threads[index], NULL, worker_thread, &pool->worker_args[index]) == 0) {
240 2 pool->num_threads++;
241 }
242 }
243 }
244
245 20 pthread_cond_signal(&pool->cond);
246 20 pthread_mutex_unlock(&pool->lock);
247 20 return true;
248 }
249
250 // Destroys the thread pool by signaling shutdown and joining all threads.
251 11 void thread_pool_destroy(ThreadPool *pool) {
252 11 pthread_mutex_lock(&pool->lock);
253 11 pool->shutdown = true;
254 11 pthread_cond_broadcast(&pool->cond);
255 11 pthread_mutex_unlock(&pool->lock);
256 // Join all threads that were created.
257
2/2
✓ Branch 0 taken 262 times.
✓ Branch 1 taken 11 times.
273 for (size_t i = 0; i < pool->threads_created; i++) {
258 262 pthread_join(pool->threads[i], NULL);
259 }
260 11 free(pool->threads);
261 11 free(pool->worker_args);
262 11 free(pool->free_indices);
263 11 task_queue_destroy(&pool->queue);
264 11 pthread_mutex_destroy(&pool->lock);
265 11 pthread_cond_destroy(&pool->cond);
266 11 free(pool);
267 11 }
268