| Line | Branch | Exec | Source |
|---|---|---|---|
| 1 | #include "thread_pool.h" | ||
| 2 | #include <pthread.h> | ||
| 3 | #include <stdlib.h> | ||
| 4 | #include <stdio.h> | ||
| 5 | #include <time.h> | ||
| 6 | #include <errno.h> | ||
| 7 | |||
| 8 | #define TASK_QUEUE_INITIAL_CAPACITY 256 | ||
| 9 | #define THREAD_IDLE_TIMEOUT 5 // Idle timeout in seconds | ||
| 10 | |||
| 11 | typedef struct { | ||
| 12 | Task task; | ||
| 13 | char padding[64 - sizeof(Task)]; | ||
| 14 | } CacheAlignedTask; | ||
| 15 | |||
| 16 | typedef struct { | ||
| 17 | CacheAlignedTask *tasks; | ||
| 18 | size_t capacity; | ||
| 19 | volatile size_t count; | ||
| 20 | volatile size_t front; | ||
| 21 | volatile size_t rear; | ||
| 22 | char padding[64]; | ||
| 23 | } TaskQueue; | ||
| 24 | |||
| 25 | // The ThreadPool structure. | ||
| 26 | struct ThreadPool { | ||
| 27 | pthread_mutex_t lock; // Mutex to protect the pool and queue. | ||
| 28 | pthread_cond_t cond; // Condition variable to signal new tasks. | ||
| 29 | TaskQueue queue; // Task queue. | ||
| 30 | pthread_t *threads; // Array of worker thread IDs. | ||
| 31 | size_t num_threads; // Current number of worker threads. | ||
| 32 | size_t min_threads; // Minimum number of worker threads. | ||
| 33 | size_t max_threads; // Maximum number of worker threads. | ||
| 34 | bool shutdown; // Flag to signal shutdown. | ||
| 35 | }; | ||
| 36 | |||
| 37 | // Initializes the task queue. | ||
| 38 | ✗ | static void task_queue_init(TaskQueue *queue) { | |
| 39 | ✗ | queue->capacity = TASK_QUEUE_INITIAL_CAPACITY; | |
| 40 | ✗ | queue->tasks = malloc(queue->capacity * sizeof(CacheAlignedTask)); | |
| 41 | ✗ | queue->count = 0; | |
| 42 | ✗ | queue->front = 0; | |
| 43 | ✗ | queue->rear = 0; | |
| 44 | ✗ | } | |
| 45 | |||
| 46 | // Destroys the task queue. | ||
| 47 | ✗ | static void task_queue_destroy(TaskQueue *queue) { | |
| 48 | ✗ | free(queue->tasks); | |
| 49 | ✗ | } | |
| 50 | |||
| 51 | // Pushes a task onto the task queue; returns true on success. | ||
| 52 | ✗ | static bool task_queue_push(TaskQueue *queue, Task task) { | |
| 53 | ✗ | if (queue->count == queue->capacity) { | |
| 54 | ✗ | size_t new_capacity = queue->capacity * 2; | |
| 55 | ✗ | CacheAlignedTask *new_tasks = malloc(new_capacity * sizeof(CacheAlignedTask)); | |
| 56 | ✗ | if (!new_tasks) return false; | |
| 57 | |||
| 58 | ✗ | if (queue->front > 0) { | |
| 59 | ✗ | for (size_t i = 0; i < queue->count; i++) { | |
| 60 | ✗ | new_tasks[i].task = queue->tasks[(queue->front + i) % queue->capacity].task; | |
| 61 | } | ||
| 62 | ✗ | queue->front = 0; | |
| 63 | ✗ | queue->rear = queue->count; | |
| 64 | } | ||
| 65 | ✗ | free(queue->tasks); | |
| 66 | ✗ | queue->tasks = new_tasks; | |
| 67 | ✗ | queue->capacity = new_capacity; | |
| 68 | } | ||
| 69 | |||
| 70 | ✗ | queue->tasks[queue->rear].task = task; | |
| 71 | ✗ | queue->rear = (queue->rear + 1) % queue->capacity; | |
| 72 | ✗ | queue->count++; | |
| 73 | ✗ | return true; | |
| 74 | } | ||
| 75 | |||
| 76 | // Pops a task from the task queue; returns true if a task was retrieved. | ||
| 77 | ✗ | static bool task_queue_pop(TaskQueue *queue, Task *task) { | |
| 78 | ✗ | if (queue->count == 0) return false; | |
| 79 | |||
| 80 | ✗ | *task = queue->tasks[queue->front].task; | |
| 81 | ✗ | queue->front = (queue->front + 1) % queue->capacity; | |
| 82 | ✗ | queue->count--; | |
| 83 | ✗ | return true; | |
| 84 | } | ||
| 85 | |||
| 86 | // Worker thread function. Each thread waits for tasks and executes them. | ||
| 87 | ✗ | static void *worker_thread(void *arg) { | |
| 88 | ✗ | ThreadPool *pool = (ThreadPool *)arg; | |
| 89 | Task task; | ||
| 90 | ✗ | while (1) { | |
| 91 | ✗ | pthread_mutex_lock(&pool->lock); | |
| 92 | // Wait for a task if the queue is empty. | ||
| 93 | ✗ | while (pool->queue.count == 0 && !pool->shutdown) { | |
| 94 | // Wait with a timeout to allow dynamic thread reduction. | ||
| 95 | struct timespec ts; | ||
| 96 | ✗ | clock_gettime(CLOCK_REALTIME, &ts); | |
| 97 | ✗ | ts.tv_sec += THREAD_IDLE_TIMEOUT; | |
| 98 | ✗ | int ret = pthread_cond_timedwait(&pool->cond, &pool->lock, &ts); | |
| 99 | // If timed out and still no task, consider exiting. | ||
| 100 | ✗ | if (ret == ETIMEDOUT && pool->queue.count == 0) { | |
| 101 | ✗ | if (pool->num_threads > pool->min_threads) { | |
| 102 | ✗ | pool->num_threads--; | |
| 103 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 104 | ✗ | pthread_exit(NULL); | |
| 105 | } | ||
| 106 | } | ||
| 107 | } | ||
| 108 | // If shutdown has been signaled, exit the thread. | ||
| 109 | ✗ | if (pool->shutdown) { | |
| 110 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 111 | ✗ | pthread_exit(NULL); | |
| 112 | } | ||
| 113 | // Retrieve the next task from the queue. | ||
| 114 | ✗ | bool has_task = task_queue_pop(&pool->queue, &task); | |
| 115 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 116 | ✗ | if (has_task) { | |
| 117 | // Execute the task. | ||
| 118 | ✗ | task.function(task.arg); | |
| 119 | } | ||
| 120 | } | ||
| 121 | return NULL; | ||
| 122 | } | ||
| 123 | |||
| 124 | // Creates a new thread pool with dynamic resizing capabilities. | ||
| 125 | ✗ | ThreadPool *thread_pool_create(size_t min_threads, size_t max_threads) { | |
| 126 | ✗ | ThreadPool *pool = malloc(sizeof(ThreadPool)); | |
| 127 | ✗ | if (!pool) | |
| 128 | ✗ | return NULL; | |
| 129 | ✗ | pool->min_threads = min_threads; | |
| 130 | ✗ | pool->max_threads = max_threads; | |
| 131 | ✗ | pool->num_threads = min_threads; | |
| 132 | ✗ | pool->shutdown = false; | |
| 133 | ✗ | pthread_mutex_init(&pool->lock, NULL); | |
| 134 | ✗ | pthread_cond_init(&pool->cond, NULL); | |
| 135 | ✗ | task_queue_init(&pool->queue); | |
| 136 | ✗ | pool->threads = malloc(max_threads * sizeof(pthread_t)); | |
| 137 | ✗ | if (!pool->threads) { | |
| 138 | ✗ | free(pool); | |
| 139 | ✗ | return NULL; | |
| 140 | } | ||
| 141 | // Create the initial minimum number of worker threads. | ||
| 142 | ✗ | for (size_t i = 0; i < min_threads; i++) { | |
| 143 | ✗ | pthread_create(&pool->threads[i], NULL, worker_thread, pool); | |
| 144 | } | ||
| 145 | ✗ | return pool; | |
| 146 | } | ||
| 147 | |||
| 148 | // Adds a task to the thread pool. If the task queue length exceeds the current number | ||
| 149 | // of threads and we have not reached max_threads, a new thread is spawned. | ||
| 150 | ✗ | bool thread_pool_add_task(ThreadPool *pool, void (*function)(void *), void *arg) { | |
| 151 | ✗ | pthread_mutex_lock(&pool->lock); | |
| 152 | |||
| 153 | ✗ | if (pool->shutdown) { | |
| 154 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 155 | ✗ | return false; | |
| 156 | } | ||
| 157 | |||
| 158 | // Pre-allocazione della memoria per evitare allocazioni durante l'esecuzione | ||
| 159 | ✗ | if (pool->queue.count >= pool->queue.capacity * 0.8) { | |
| 160 | ✗ | size_t new_capacity = pool->queue.capacity * 2; | |
| 161 | ✗ | CacheAlignedTask *new_tasks = realloc(pool->queue.tasks, | |
| 162 | new_capacity * sizeof(CacheAlignedTask)); | ||
| 163 | ✗ | if (new_tasks) { | |
| 164 | ✗ | pool->queue.tasks = new_tasks; | |
| 165 | ✗ | pool->queue.capacity = new_capacity; | |
| 166 | } | ||
| 167 | } | ||
| 168 | |||
| 169 | ✗ | Task task = {function, arg}; | |
| 170 | ✗ | if (!task_queue_push(&pool->queue, task)) { | |
| 171 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 172 | ✗ | return false; | |
| 173 | } | ||
| 174 | |||
| 175 | ✗ | pthread_cond_signal(&pool->cond); | |
| 176 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 177 | ✗ | return true; | |
| 178 | } | ||
| 179 | |||
| 180 | // Destroys the thread pool by signaling shutdown and joining all threads. | ||
| 181 | ✗ | void thread_pool_destroy(ThreadPool *pool) { | |
| 182 | ✗ | pthread_mutex_lock(&pool->lock); | |
| 183 | ✗ | pool->shutdown = true; | |
| 184 | ✗ | pthread_cond_broadcast(&pool->cond); | |
| 185 | ✗ | pthread_mutex_unlock(&pool->lock); | |
| 186 | // Join all threads. We use the maximum number as an upper bound. | ||
| 187 | ✗ | for (size_t i = 0; i < pool->max_threads; i++) { | |
| 188 | ✗ | pthread_join(pool->threads[i], NULL); | |
| 189 | } | ||
| 190 | ✗ | free(pool->threads); | |
| 191 | ✗ | task_queue_destroy(&pool->queue); | |
| 192 | ✗ | pthread_mutex_destroy(&pool->lock); | |
| 193 | ✗ | pthread_cond_destroy(&pool->cond); | |
| 194 | ✗ | free(pool); | |
| 195 | ✗ | } | |
| 196 |