emme coverage


Directory: src/
File: src/thread_pool.c
Date: 2025-08-24 07:42:18
Exec Total Coverage
Lines: 0 102 0.0%
Functions: 0 8 0.0%
Branches: 0 40 0.0%

Line Branch Exec Source
1 #include "thread_pool.h"
2 #include <pthread.h>
3 #include <stdlib.h>
4 #include <stdio.h>
5 #include <time.h>
6 #include <errno.h>
7
8 #define TASK_QUEUE_INITIAL_CAPACITY 256
9 #define THREAD_IDLE_TIMEOUT 5 // Idle timeout in seconds
10
11 typedef struct {
12 Task task;
13 char padding[64 - sizeof(Task)];
14 } CacheAlignedTask;
15
16 typedef struct {
17 CacheAlignedTask *tasks;
18 size_t capacity;
19 volatile size_t count;
20 volatile size_t front;
21 volatile size_t rear;
22 char padding[64];
23 } TaskQueue;
24
25 // The ThreadPool structure.
26 struct ThreadPool {
27 pthread_mutex_t lock; // Mutex to protect the pool and queue.
28 pthread_cond_t cond; // Condition variable to signal new tasks.
29 TaskQueue queue; // Task queue.
30 pthread_t *threads; // Array of worker thread IDs.
31 size_t num_threads; // Current number of worker threads.
32 size_t min_threads; // Minimum number of worker threads.
33 size_t max_threads; // Maximum number of worker threads.
34 bool shutdown; // Flag to signal shutdown.
35 };
36
37 // Initializes the task queue.
38 static void task_queue_init(TaskQueue *queue) {
39 queue->capacity = TASK_QUEUE_INITIAL_CAPACITY;
40 queue->tasks = malloc(queue->capacity * sizeof(CacheAlignedTask));
41 queue->count = 0;
42 queue->front = 0;
43 queue->rear = 0;
44 }
45
46 // Destroys the task queue.
47 static void task_queue_destroy(TaskQueue *queue) {
48 free(queue->tasks);
49 }
50
51 // Pushes a task onto the task queue; returns true on success.
52 static bool task_queue_push(TaskQueue *queue, Task task) {
53 if (queue->count == queue->capacity) {
54 size_t new_capacity = queue->capacity * 2;
55 CacheAlignedTask *new_tasks = malloc(new_capacity * sizeof(CacheAlignedTask));
56 if (!new_tasks) return false;
57
58 if (queue->front > 0) {
59 for (size_t i = 0; i < queue->count; i++) {
60 new_tasks[i].task = queue->tasks[(queue->front + i) % queue->capacity].task;
61 }
62 queue->front = 0;
63 queue->rear = queue->count;
64 }
65 free(queue->tasks);
66 queue->tasks = new_tasks;
67 queue->capacity = new_capacity;
68 }
69
70 queue->tasks[queue->rear].task = task;
71 queue->rear = (queue->rear + 1) % queue->capacity;
72 queue->count++;
73 return true;
74 }
75
76 // Pops a task from the task queue; returns true if a task was retrieved.
77 static bool task_queue_pop(TaskQueue *queue, Task *task) {
78 if (queue->count == 0) return false;
79
80 *task = queue->tasks[queue->front].task;
81 queue->front = (queue->front + 1) % queue->capacity;
82 queue->count--;
83 return true;
84 }
85
86 // Worker thread function. Each thread waits for tasks and executes them.
87 static void *worker_thread(void *arg) {
88 ThreadPool *pool = (ThreadPool *)arg;
89 Task task;
90 while (1) {
91 pthread_mutex_lock(&pool->lock);
92 // Wait for a task if the queue is empty.
93 while (pool->queue.count == 0 && !pool->shutdown) {
94 // Wait with a timeout to allow dynamic thread reduction.
95 struct timespec ts;
96 clock_gettime(CLOCK_REALTIME, &ts);
97 ts.tv_sec += THREAD_IDLE_TIMEOUT;
98 int ret = pthread_cond_timedwait(&pool->cond, &pool->lock, &ts);
99 // If timed out and still no task, consider exiting.
100 if (ret == ETIMEDOUT && pool->queue.count == 0) {
101 if (pool->num_threads > pool->min_threads) {
102 pool->num_threads--;
103 pthread_mutex_unlock(&pool->lock);
104 pthread_exit(NULL);
105 }
106 }
107 }
108 // If shutdown has been signaled, exit the thread.
109 if (pool->shutdown) {
110 pthread_mutex_unlock(&pool->lock);
111 pthread_exit(NULL);
112 }
113 // Retrieve the next task from the queue.
114 bool has_task = task_queue_pop(&pool->queue, &task);
115 pthread_mutex_unlock(&pool->lock);
116 if (has_task) {
117 // Execute the task.
118 task.function(task.arg);
119 }
120 }
121 return NULL;
122 }
123
124 // Creates a new thread pool with dynamic resizing capabilities.
125 ThreadPool *thread_pool_create(size_t min_threads, size_t max_threads) {
126 ThreadPool *pool = malloc(sizeof(ThreadPool));
127 if (!pool)
128 return NULL;
129 pool->min_threads = min_threads;
130 pool->max_threads = max_threads;
131 pool->num_threads = min_threads;
132 pool->shutdown = false;
133 pthread_mutex_init(&pool->lock, NULL);
134 pthread_cond_init(&pool->cond, NULL);
135 task_queue_init(&pool->queue);
136 pool->threads = malloc(max_threads * sizeof(pthread_t));
137 if (!pool->threads) {
138 free(pool);
139 return NULL;
140 }
141 // Create the initial minimum number of worker threads.
142 for (size_t i = 0; i < min_threads; i++) {
143 pthread_create(&pool->threads[i], NULL, worker_thread, pool);
144 }
145 return pool;
146 }
147
148 // Adds a task to the thread pool. If the task queue length exceeds the current number
149 // of threads and we have not reached max_threads, a new thread is spawned.
150 bool thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *arg) {
151 pthread_mutex_lock(&pool->lock);
152
153 if (pool->shutdown) {
154 pthread_mutex_unlock(&pool->lock);
155 return false;
156 }
157
158 // Pre-allocazione della memoria per evitare allocazioni durante l'esecuzione
159 if (pool->queue.count >= pool->queue.capacity * 0.8) {
160 size_t new_capacity = pool->queue.capacity * 2;
161 CacheAlignedTask *new_tasks = realloc(pool->queue.tasks,
162 new_capacity * sizeof(CacheAlignedTask));
163 if (new_tasks) {
164 pool->queue.tasks = new_tasks;
165 pool->queue.capacity = new_capacity;
166 }
167 }
168
169 Task task = {function, arg};
170 if (!task_queue_push(&pool->queue, task)) {
171 pthread_mutex_unlock(&pool->lock);
172 return false;
173 }
174
175 pthread_cond_signal(&pool->cond);
176 pthread_mutex_unlock(&pool->lock);
177 return true;
178 }
179
180 // Destroys the thread pool by signaling shutdown and joining all threads.
181 void thread_pool_destroy(ThreadPool *pool) {
182 pthread_mutex_lock(&pool->lock);
183 pool->shutdown = true;
184 pthread_cond_broadcast(&pool->cond);
185 pthread_mutex_unlock(&pool->lock);
186 // Join all threads. We use the maximum number as an upper bound.
187 for (size_t i = 0; i < pool->max_threads; i++) {
188 pthread_join(pool->threads[i], NULL);
189 }
190 free(pool->threads);
191 task_queue_destroy(&pool->queue);
192 pthread_mutex_destroy(&pool->lock);
193 pthread_cond_destroy(&pool->cond);
194 free(pool);
195 }
196