OSDN Git Service

Some code refactoring in the thread pool module.
[slunkcrypt/SlunkCrypt.git] / libslunkcrypt / src / thread.c
1 /******************************************************************************/
2 /* SlunkCrypt, by LoRd_MuldeR <MuldeR2@GMX.de>                                */
3 /* This work has been released under the CC0 1.0 Universal license!           */
4 /******************************************************************************/
5
6 /* Internal */
7 #include "thread.h"
8 #include "compiler.h"
9
10 /* CRT */
11 #include <stdlib.h>
12 #include <string.h>
13
14 /* PThread */
15 #if defined(_MSC_VER) && !defined(_DLL)
16 #  define PTW32_STATIC_LIB 1
17 #endif
18 #include <pthread.h>
19
20 /* System info */
21 #ifdef __unix__
22 #  include <sys/sysinfo.h>
23 #endif
24
25 /* States */
26 #define TSTATE_IDLE 0U
27 #define TSTATE_WORK 1U
28 #define TSTATE_EXIT 2U
29
30 // ==========================================================================
31 // Data types
32 // ==========================================================================
33
34 typedef struct
35 {
36         thrdpl_worker_t worker;
37         void *context;
38         size_t length;
39         uint8_t *buffer;
40 }
41 thrdpl_task_t;
42
43 typedef struct
44 {
45         size_t thread_count, pending;
46         pthread_mutex_t mutex;
47         pthread_cond_t cond_pending;
48 }
49 thrdpl_shared_t;
50
51 typedef struct
52 {
53         thrdpl_shared_t *shared;
54         size_t state;
55         pthread_cond_t cond_state;
56         pthread_t thread;
57         thrdpl_task_t task;
58 }
59 thrdpl_thread_t;
60
61 struct thrdpl_data_t
62 {
63         thrdpl_shared_t shared;
64         thrdpl_thread_t thread_data[MAX_THREADS];
65 };
66
67 // ==========================================================================
68 // Utilities
69 // ==========================================================================
70
71 static INLINE size_t bound(const size_t min, const size_t value, const size_t max)
72 {
73         return (value < min) ? min : ((value > max) ? max : value);
74 }
75
76 #define PTHRD_MUTEX_LOCK(X) do \
77 { \
78         if (pthread_mutex_lock((X)) != 0) \
79         { \
80                 abort(); \
81         } \
82 } \
83 while(0)
84
85 #define PTHRD_MUTEX_UNLOCK(X) do \
86 { \
87         if (pthread_mutex_unlock((X)) != 0) \
88         { \
89                 abort(); \
90         } \
91 } \
92 while(0)
93
94 #define PTHRD_COND_SIGNAL(X) do \
95 { \
96         if (pthread_cond_signal((X)) != 0) \
97         { \
98                 abort(); \
99         } \
100 } \
101 while(0)
102
103 #define PTHRD_COND_BROADCAST(X) do \
104 { \
105         if (pthread_cond_broadcast((X)) != 0) \
106         { \
107                 abort(); \
108         } \
109 } \
110 while(0)
111
112 #define PTHRD_COND_WAIT(X,Y) do \
113 { \
114         if (pthread_cond_wait((X), (Y)) != 0) \
115         { \
116                 abort(); \
117         } \
118 } \
119 while(0)
120
121 #define CHECK_IF_CANCELLED() do \
122 { \
123         if (data->state == TSTATE_EXIT) \
124         { \
125                 PTHRD_MUTEX_UNLOCK(&shared->mutex); \
126                 return NULL; \
127         } \
128 } \
129 while(0)
130
131 // ==========================================================================
132 // Thread main
133 // ==========================================================================
134
135 static void *worker_thread_main(void *const arg)
136 {
137         thrdpl_thread_t *const data = (thrdpl_thread_t*) arg;
138         thrdpl_shared_t *const shared = (thrdpl_shared_t*) data->shared;
139         
140         thrdpl_task_t *task;
141
142         for (;;)
143         {
144                 PTHRD_MUTEX_LOCK(&shared->mutex);
145                 CHECK_IF_CANCELLED();
146
147                 while (data->state != TSTATE_WORK)
148                 {
149                         PTHRD_COND_WAIT(&data->cond_state, &shared->mutex);
150                         CHECK_IF_CANCELLED();
151                 }
152
153                 task = &data->task;
154                 PTHRD_MUTEX_UNLOCK(&shared->mutex);
155
156                 task->worker(shared->thread_count, task->context, task->buffer, task->length);
157
158                 PTHRD_MUTEX_LOCK(&shared->mutex);
159                 CHECK_IF_CANCELLED();
160
161                 data->state = TSTATE_IDLE;
162                 if (!(--shared->pending))
163                 {
164                         PTHRD_COND_BROADCAST(&shared->cond_pending);
165                 }
166
167                 PTHRD_MUTEX_UNLOCK(&shared->mutex);
168                 PTHRD_COND_SIGNAL(&data->cond_state);
169         }
170 }
171
172 // ==========================================================================
173 // System info
174 // ==========================================================================
175
176 #if defined(__unix__)
177 #  define NUM_PROCESSORS_FUNC get_nprocs
178 #elif defined(PTW32_VERSION)
179 #  define NUM_PROCESSORS_FUNC pthread_num_processors_np
180 #endif
181
182 static size_t detect_cpu_count(void)
183 {
184 #ifdef NUM_PROCESSORS_FUNC
185         const int cpu_count = NUM_PROCESSORS_FUNC();
186         if (cpu_count > 0)
187         {
188                 return (size_t) cpu_count;
189         }
190 #endif
191         return 1U;
192 }
193
194 // ==========================================================================
195 // Manage threads
196 // ==========================================================================
197
198 static int create_worker(thrdpl_shared_t *const shared, thrdpl_thread_t *const thread_data)
199 {
200         thread_data->state = TSTATE_IDLE;
201         thread_data->shared = shared;
202
203         if (pthread_cond_init(&thread_data->cond_state, NULL) != 0)
204         {
205                 return -1;
206         }
207
208         if (pthread_create(&thread_data->thread, NULL, worker_thread_main, thread_data) != 0)
209         {
210                 pthread_cond_destroy(&thread_data->cond_state);
211                 return -1;
212         }
213
214         return 0;
215 }
216
217 static int destroy_worker(thrdpl_thread_t *const thread_data)
218 {
219         PTHRD_MUTEX_LOCK(&thread_data->shared->mutex);
220         thread_data->state = TSTATE_EXIT;
221         PTHRD_MUTEX_UNLOCK(&thread_data->shared->mutex);
222
223         PTHRD_COND_BROADCAST(&thread_data->cond_state);
224         pthread_join(thread_data->thread, NULL);
225         pthread_cond_destroy(&thread_data->cond_state);
226
227         return 0;
228 }
229
230 // ==========================================================================
231 // Thread pool API
232 // ==========================================================================
233
234 thrdpl_t *slunkcrypt_thrdpl_create(const size_t count)
235 {
236         size_t i, j;
237         thrdpl_t *thrdpl = NULL;
238
239         const size_t cpu_count = bound(1U, (count > 0U) ? count : detect_cpu_count(), MAX_THREADS);
240         if (cpu_count < 2U)
241         {
242                 return NULL;
243         }
244
245         if (!(thrdpl = (thrdpl_t*)malloc(sizeof(thrdpl_t))))
246         {
247                 return NULL;
248         }
249
250         memset(thrdpl, 0, sizeof(thrdpl_t));
251         thrdpl->shared.thread_count = cpu_count;
252
253         if (pthread_mutex_init(&thrdpl->shared.mutex, NULL) != 0)
254         {
255                 goto failure;
256         }
257         
258         if (pthread_cond_init(&thrdpl->shared.cond_pending, NULL) != 0)
259         {
260                 pthread_mutex_destroy(&thrdpl->shared.mutex);
261                 goto failure;
262         }
263
264         for (i = 0U; i < cpu_count; ++i)
265         {
266                 if (create_worker(&thrdpl->shared, &thrdpl->thread_data[i]) != 0)
267                 {
268                         for (j = 0U; j < i; ++j)
269                         {
270                                 destroy_worker(&thrdpl->thread_data[j]);
271                         }
272                         pthread_cond_destroy(&thrdpl->shared.cond_pending);
273                         pthread_mutex_destroy(&thrdpl->shared.mutex);
274                         goto failure;
275                 }
276         }
277
278         return thrdpl;
279
280 failure:
281         free(thrdpl);
282         return NULL;
283 }
284
285 size_t slunkcrypt_thrdpl_count(const thrdpl_t *const thrdpl)
286 {
287         return thrdpl->shared.thread_count;
288 }
289
290 void slunkcrypt_thrdpl_exec(thrdpl_t *const thrdpl, const size_t index, const thrdpl_worker_t worker, void *const context, uint8_t *const buffer, const size_t length)
291 {
292         thrdpl_thread_t *const thread = &thrdpl->thread_data[index];
293
294         PTHRD_MUTEX_LOCK(&thrdpl->shared.mutex);
295
296         while ((thread->state != TSTATE_IDLE) && (thread->state != TSTATE_EXIT))
297         {
298                 PTHRD_COND_WAIT(&thread->cond_state, &thrdpl->shared.mutex);
299         }
300
301         if (thread->state == TSTATE_EXIT)
302         {
303                 abort(); /*this is not supposed to happen!*/
304         }
305
306         thread->state = TSTATE_WORK;
307         thread->task.worker = worker;
308         thread->task.context = context;
309         thread->task.buffer = buffer;
310         thread->task.length = length;
311
312         ++thrdpl->shared.pending;
313
314         PTHRD_MUTEX_UNLOCK(&thrdpl->shared.mutex);
315         PTHRD_COND_SIGNAL(&thread->cond_state);
316 }
317
318 void slunkcrypt_thrdpl_await(thrdpl_t *const thrdpl)
319 {
320         PTHRD_MUTEX_LOCK(&thrdpl->shared.mutex);
321
322         while (thrdpl->shared.pending)
323         {
324                 PTHRD_COND_WAIT(&thrdpl->shared.cond_pending, &thrdpl->shared.mutex);
325         }
326
327         PTHRD_MUTEX_UNLOCK(&thrdpl->shared.mutex);
328 }
329
330 void slunkcrypt_thrdpl_destroy(thrdpl_t *const thrdpl)
331 {
332         size_t i;
333
334         if (thrdpl)
335         {
336                 for (i = 0U; i < thrdpl->shared.thread_count; ++i)
337                 {
338                         destroy_worker(&thrdpl->thread_data[i]);
339                 }
340                 pthread_cond_destroy(&thrdpl->shared.cond_pending);
341                 pthread_mutex_destroy(&thrdpl->shared.mutex);
342                 free(thrdpl);
343         }
344 }