libsheepy
tpool.c
Go to the documentation of this file.
1 /* ********************************
2  * Author: Johan Hanssen Seferidis
3  * License: MIT
4  * Description: Library providing a threading pool where you can add
5  * work. For usage, check the tpool.h file or README.md
6  *
7  *//*
8  *
9  ********************************/
10 
11 #define _POSIX_C_SOURCE 200809L
12 #include <unistd.h>
13 #include <signal.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #if defined(__APPLE__) && defined(__MACH__)
17 #define _DARWIN_C_SOURCE 1
18 #endif
19 #include <pthread.h>
20 #include <time.h>
21 #if defined(__linux__)
22 #include <sys/prctl.h>
23 #endif
24 
25 #include "libsheepyObject.h" // for finalizeLibsheepyRecycleContainers
26 #include "tpool.h"
27 
28 #ifdef tpool_DEBUG
29 #define tpool_DEBUG 1
30 #else
31 #define tpool_DEBUG 0
32 #endif
33 
34 #if !defined(DISABLE_PRINT) || defined(tpool_DEBUG)
35 #define err(str) logE(str)
36 #else
37 #define err(str)
38 #endif
39 
40 static volatile int threads_keepalive;
41 static volatile int threads_on_hold;
42 
43 
44 // locks, max 512 threads
45 pthread_mutex_t tpoolLocks[tpoolLockCount]; // only one thread can have the lock
46 
48 
49 /* ========================== STRUCTURES ============================ */
50 
51 
52 /* Binary semaphore */
53 typedef struct bsem {
54  pthread_mutex_t mutex;
55  pthread_cond_t cond;
56  int v;
57 } bsem;
58 
59 
60 /* Job */
61 typedef struct job{
62  struct job* prev; /* pointer to previous job */
63  void (*function)(void* arg); /* function pointer */
64  void* arg; /* function's argument */
65 } job;
66 
67 
68 /* Job queue */
69 typedef struct jobqueue{
70  pthread_mutex_t rwmutex; /* used for queue r/w access */
71  job *front; /* pointer to front of queue */
72  job *rear; /* pointer to rear of queue */
73  bsem *has_jobs; /* flag as binary semaphore */
74  int len; /* number of jobs in queue */
75 } jobqueue;
76 
77 
78 /* Thread */
79 typedef struct thread{
80  int id; /* friendly id */
81  pthread_t pthread; /* pointer to actual thread */
82  threadpool tpool_p; /* access to tpool */
83 } thread;
84 
85 
86 /* Threadpool */
87 typedef struct tpool_{
88  thread** threads; /* pointer to threads */
89  volatile int num_threads_alive; /* threads currently alive */
90  volatile int num_threads_working; /* threads currently working */
91  pthread_mutex_t thcount_lock; /* used for thread count etc */
92  pthread_cond_t threads_all_idle; /* signal to tpool_wait */
93  jobqueue jobqueue; /* job queue */
94 } tpool_;
95 
96 
97 
98 
99 
100 /* ========================== PROTOTYPES ============================ */
101 
102 
103 static int thread_init(threadpool tpool_p, struct thread** thread_p, int id);
104 static void* thread_do(struct thread* thread_p);
105 static void thread_hold(int sig_id);
106 static void thread_destroy(struct thread* thread_p);
107 
108 static int jobqueue_init(jobqueue* jobqueue_p);
109 static void jobqueue_clear(jobqueue* jobqueue_p);
110 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
111 static struct job* jobqueue_pull(jobqueue* jobqueue_p);
112 static void jobqueue_destroy(jobqueue* jobqueue_p);
113 
114 static void bsem_init(struct bsem *bsem_p, int value);
115 static void bsem_reset(struct bsem *bsem_p);
116 static void bsem_post(struct bsem *bsem_p);
117 static void bsem_post_all(struct bsem *bsem_p);
118 static void bsem_wait(struct bsem *bsem_p);
119 
120 
121 
122 
123 
124 /* ========================== THREADPOOL ============================ */
125 
126 // use lock in the tpoolLocks array
127 void tpoolLock(int position) {
128  pthread_mutex_lock(&tpoolLocks[position]);
129 }
130 
131 // unlock a lock in the tpoolLocks array
132 void tpoolUlock(int position) {
133  pthread_mutex_unlock(&tpoolLocks[position]);
134 }
135 
136 /* Initialise thread pool */
137 threadpool tpool_init(int num_threads){
138 
139  threads_on_hold = 0;
140  threads_keepalive = 1;
141 
142  if (num_threads < 0){
143  num_threads = 0;
144  }
145 
146  /* Make new thread pool */
147  threadpool tpool_p;
148  tpool_p = (threadpool)malloc(sizeof(struct tpool_));
149  if (tpool_p == NULL){
150  err("tpool_init(): Could not allocate memory for thread pool\n");
151  return NULL;
152  }
153  tpool_p->num_threads_alive = 0;
154  tpool_p->num_threads_working = 0;
155 
156  /* Initialise the job queue */
157  if (jobqueue_init(&tpool_p->jobqueue) == -1){
158  err("tpool_init(): Could not allocate memory for job queue\n");
159  free(tpool_p);
160  return NULL;
161  }
162 
163  /* Make threads in pool */
164  tpool_p->threads = (struct thread**)malloc((size_t)num_threads * sizeof(struct thread *));
165  if (tpool_p->threads == NULL){
166  err("tpool_init(): Could not allocate memory for threads\n");
167  jobqueue_destroy(&tpool_p->jobqueue);
168  free(tpool_p);
169  return NULL;
170  }
171 
172  pthread_mutex_init(&(tpool_p->thcount_lock), NULL);
173  pthread_cond_init(&tpool_p->threads_all_idle, NULL);
174 
175  /* Thread init */
176  int n;
177  for (n=0; n<num_threads; n++){
178  thread_init(tpool_p, &tpool_p->threads[n], n);
179 #if tpool_DEBUG
180  printf("tpool_DEBUG: Created thread %d in pool \n", n);
181 #endif
182  }
183 
184  /* Wait for threads to initialize */
185  while (tpool_p->num_threads_alive != num_threads) {}
186 
187  return tpool_p;
188 }
189 
190 
191 /* Add work to the thread pool */
192 int tpool_add_work(threadpool tpool_p, void (*function_p)(void*), void* arg_p){
193  job* newjob;
194 
195  if (!tpool_p){
196  err("tpool_add_work(): tpool_p is NULL\n");
197  return -1;
198  }
199  if (!function_p){
200  err("tpool_add_work(): function_p is NULL\n");
201  return -1;
202  }
203  newjob=(struct job*)malloc(sizeof(struct job));
204  if (newjob==NULL){
205  err("tpool_add_work(): Could not allocate memory for new job\n");
206  return -1;
207  }
208 
209  /* add function and argument */
210  newjob->function=function_p;
211  newjob->arg=arg_p;
212 
213  /* add job to queue */
214  jobqueue_push(&tpool_p->jobqueue, newjob);
215 
216  return 0;
217 }
218 
219 
220 /* Wait until all jobs have finished */
221 void tpool_wait(threadpool tpool_p){
222  if (!tpool_p) return;
223  pthread_mutex_lock(&tpool_p->thcount_lock);
224  while (tpool_p->jobqueue.len || tpool_p->num_threads_working) {
225  pthread_cond_wait(&tpool_p->threads_all_idle, &tpool_p->thcount_lock);
226  }
227  pthread_mutex_unlock(&tpool_p->thcount_lock);
228 }
229 
230 
231 /* Destroy the threadpool */
232 void tpool_destroy(threadpool tpool_p){
233  /* No need to destory if it's NULL */
234  if (!tpool_p) return;
235 
236  volatile int threads_total = tpool_p->num_threads_alive;
237 
238  /* End each thread 's infinite loop */
239  threads_keepalive = 0;
240 
241  /* Give one second to kill idle threads */
242  double TIMEOUT = 1.0;
243  time_t start, end;
244  double tpassed = 0.0;
245  time (&start);
246  while (tpassed < TIMEOUT && tpool_p->num_threads_alive){
247  bsem_post_all(tpool_p->jobqueue.has_jobs);
248  time (&end);
249  tpassed = difftime(end,start);
250  }
251 
252  /* Poll remaining threads */
253  while (tpool_p->num_threads_alive){
254  bsem_post_all(tpool_p->jobqueue.has_jobs);
255  sleep(1);
256  }
257 
258  /* Job queue cleanup */
259  jobqueue_destroy(&tpool_p->jobqueue);
260  int n;
261  /* wait until all threads are finished because when recycleContainers is enabled
262  * all threads must finish before running dArrayFree in finalizeLibsheepyAtExit
263  * in the main process */
264  for (n=0; n < threads_total; n++){
265  /* ignore error code from pthread_join because the threads might already be finished at this time */
266  int result_code;
267  result_code = pthread_join(tpool_p->threads[n]->pthread, NULL);
268  if (result_code == EINVAL) {
269  logE("not a joinable thread or another thread is already waiting to join with this thread.");
270  }
271  }
272  /* Deallocs */
273  for (n=0; n < threads_total; n++){
274  thread_destroy(tpool_p->threads[n]);
275  }
276  free(tpool_p->threads);
277  free(tpool_p);
278 }
279 
280 
281 /* Pause all threads in threadpool */
282 void tpool_pause(threadpool tpool_p) {
283  if (!tpool_p) return;
284  int n;
285  for (n=0; n < tpool_p->num_threads_alive; n++){
286  pthread_kill(tpool_p->threads[n]->pthread, SIGUSR2);
287  }
288 }
289 
290 
291 /* Resume all threads in threadpool */
292 void tpool_resume(threadpool tpool_p) {
293  if (!tpool_p) return;
294 
295  // resuming a single threadpool hasn't been
296  // implemented yet, meanwhile this supresses
297  // the warnings
298  (void)tpool_p;
299 
300  threads_on_hold = 0;
301 }
302 
303 
305  if (!tpool_p) return -1;
306  return tpool_p->num_threads_working;
307 }
308 
309 
310 
311 
312 
313 /* ============================ THREAD ============================== */
314 
315 
316 /* Initialize a thread in the thread pool
317  *
318  * @param thread address to the pointer of the thread to be created
319  * @param id id to be given to the thread
320  * @return 0 on success, -1 otherwise.
321  */
322 static int thread_init (threadpool tpool_p, struct thread** thread_p, int id){
323 
324  *thread_p = (struct thread*)malloc(sizeof(struct thread));
325  if (thread_p == NULL){
326  err("thread_init(): Could not allocate memory for thread\n");
327  return -1;
328  }
329 
330  (*thread_p)->tpool_p = tpool_p;
331  (*thread_p)->id = id;
332 
333  pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
334  // keep thread attached to be able to join them in tpool_destroy
335  // all threads must finish before running dArrayFree in finalizeLibsheepyAtExit
336  /* pthread_detach((*thread_p)->pthread); */
337  return 0;
338 }
339 
340 
341 /* Sets the calling thread on hold */
342 static void thread_hold(int sig_id) {
343  (void)sig_id;
344  threads_on_hold = 1;
345  while (threads_on_hold){
346  sleep(1);
347  }
348 }
349 
350 
351 /* What each thread is doing
352 *
353 * In principle this is an endless loop. The only time this loop gets interuppted is once
354 * tpool_destroy() is invoked or the program exits.
355 *
356 * @param thread thread that will run this function
357 * @return nothing
358 */
359 static void* thread_do(struct thread* thread_p){
360 
361  /* Set thread name for profiling and debuging */
362  char thread_name[128] = {0};
363  sprintf(thread_name, "thread-pool-%d", thread_p->id);
364 
365 #if defined(__linux__)
366  /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
367  prctl(PR_SET_NAME, thread_name);
368 #elif defined(__APPLE__) && defined(__MACH__)
369  pthread_setname_np(thread_name);
370 #else
371  #warning "thread_do(): pthread_setname_np is not supported on this system"
372 #endif
373 
374  /* Assure all threads have been created before starting serving */
375  threadpool tpool_p = thread_p->tpool_p;
376 
377  /* Register signal handler */
378  struct sigaction act;
379  sigemptyset(&act.sa_mask);
380  act.sa_flags = 0;
381  act.sa_handler = thread_hold;
382  if (sigaction(SIGUSR2, &act, NULL) == -1) {
383  err("thread_do(): cannot handle SIGUSR2");
384  }
385 
386  #if recycleContainers
387  /* the containers are recycled in the thread local storage, the unused containers are freed at thread exit */
388  pthread_key_t key;
389  if (pthread_key_create(&key, finalizeLibsheepyRecycleContainers)) {
390  err("pthread_key_create, finalizeLibsheepyRecycleContainers not registered, there will be memory leaks when the thread exits");
391  }
392  else {
393  // set value to call the destructor at exit
394  pthread_setspecific(key, (const void *) 1);
395  }
396  #endif // recycleContainers
397 
398  /* Mark thread as alive (initialized) */
399  pthread_mutex_lock(&tpool_p->thcount_lock);
400  tpool_p->num_threads_alive += 1;
401  pthread_mutex_unlock(&tpool_p->thcount_lock);
402 
403  while(threads_keepalive){
404 
405  bsem_wait(tpool_p->jobqueue.has_jobs);
406 
407  if (threads_keepalive){
408 
409  pthread_mutex_lock(&tpool_p->thcount_lock);
410  tpool_p->num_threads_working++;
411  pthread_mutex_unlock(&tpool_p->thcount_lock);
412 
413  /* Read job from queue and execute it */
414  void (*func_buff)(void*);
415  void* arg_buff;
416  job* job_p = jobqueue_pull(&tpool_p->jobqueue);
417  if (job_p) {
418  func_buff = job_p->function;
419  arg_buff = job_p->arg;
420  func_buff(arg_buff);
421  free(job_p);
422  }
423 
424  pthread_mutex_lock(&tpool_p->thcount_lock);
425  tpool_p->num_threads_working--;
426  if (!tpool_p->num_threads_working) {
427  pthread_cond_signal(&tpool_p->threads_all_idle);
428  }
429  pthread_mutex_unlock(&tpool_p->thcount_lock);
430 
431  }
432  }
433  pthread_mutex_lock(&tpool_p->thcount_lock);
434  tpool_p->num_threads_alive --;
435  pthread_mutex_unlock(&tpool_p->thcount_lock);
436 
437  return NULL;
438 }
439 
440 
441 /* Frees a thread */
442 static void thread_destroy (thread* thread_p){
443  free(thread_p);
444 }
445 
446 
447 
448 
449 
450 /* ============================ JOB QUEUE =========================== */
451 
452 
453 /* Initialize queue */
454 static int jobqueue_init(jobqueue* jobqueue_p){
455  jobqueue_p->len = 0;
456  jobqueue_p->front = NULL;
457  jobqueue_p->rear = NULL;
458 
459  jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
460  if (jobqueue_p->has_jobs == NULL){
461  return -1;
462  }
463 
464  pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
465  bsem_init(jobqueue_p->has_jobs, 0);
466 
467  return 0;
468 }
469 
470 
471 /* Clear the queue */
472 static void jobqueue_clear(jobqueue* jobqueue_p){
473 
474  while(jobqueue_p->len){
475  free(jobqueue_pull(jobqueue_p));
476  }
477 
478  jobqueue_p->front = NULL;
479  jobqueue_p->rear = NULL;
480  bsem_reset(jobqueue_p->has_jobs);
481  jobqueue_p->len = 0;
482 
483 }
484 
485 
486 /* Add (allocated) job to queue
487  */
488 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
489 
490  pthread_mutex_lock(&jobqueue_p->rwmutex);
491  newjob->prev = NULL;
492 
493  switch(jobqueue_p->len){
494 
495  case 0: /* if no jobs in queue */
496  jobqueue_p->front = newjob;
497  jobqueue_p->rear = newjob;
498  break;
499 
500  default: /* if jobs in queue */
501  jobqueue_p->rear->prev = newjob;
502  jobqueue_p->rear = newjob;
503 
504  }
505  jobqueue_p->len++;
506 
507  bsem_post(jobqueue_p->has_jobs);
508  pthread_mutex_unlock(&jobqueue_p->rwmutex);
509 }
510 
511 
512 /* Get first job from queue(removes it from queue)
513  *
514  * Notice: Caller MUST hold a mutex
515  */
516 static struct job* jobqueue_pull(jobqueue* jobqueue_p){
517 
518  pthread_mutex_lock(&jobqueue_p->rwmutex);
519  job* job_p = jobqueue_p->front;
520 
521  switch(jobqueue_p->len){
522 
523  case 0: /* if no jobs in queue */
524  break;
525 
526  case 1: /* if one job in queue */
527  jobqueue_p->front = NULL;
528  jobqueue_p->rear = NULL;
529  jobqueue_p->len = 0;
530  break;
531 
532  default: /* if >1 jobs in queue */
533  jobqueue_p->front = job_p->prev;
534  jobqueue_p->len--;
535  /* more than one job in queue -> post it */
536  bsem_post(jobqueue_p->has_jobs);
537 
538  }
539 
540  pthread_mutex_unlock(&jobqueue_p->rwmutex);
541  return job_p;
542 }
543 
544 
545 /* Free all queue resources back to the system */
546 static void jobqueue_destroy(jobqueue* jobqueue_p){
547  jobqueue_clear(jobqueue_p);
548  free(jobqueue_p->has_jobs);
549 }
550 
551 
552 
553 
554 
555 /* ======================== SYNCHRONISATION ========================= */
556 
557 
558 /* Init semaphore to 1 or 0 */
559 static void bsem_init(bsem *bsem_p, int value) {
560  if (value < 0 || value > 1) {
561  err("bsem_init(): Binary semaphore can take only values 1 or 0");
562  exit(1);
563  }
564  pthread_mutex_init(&(bsem_p->mutex), NULL);
565  pthread_cond_init(&(bsem_p->cond), NULL);
566  bsem_p->v = value;
567 }
568 
569 
570 /* Reset semaphore to 0 */
571 static void bsem_reset(bsem *bsem_p) {
572  bsem_init(bsem_p, 0);
573 }
574 
575 
576 /* Post to at least one thread */
577 static void bsem_post(bsem *bsem_p) {
578  pthread_mutex_lock(&bsem_p->mutex);
579  bsem_p->v = 1;
580  pthread_cond_signal(&bsem_p->cond);
581  pthread_mutex_unlock(&bsem_p->mutex);
582 }
583 
584 
585 /* Post to all threads */
586 static void bsem_post_all(bsem *bsem_p) {
587  pthread_mutex_lock(&bsem_p->mutex);
588  bsem_p->v = 1;
589  pthread_cond_broadcast(&bsem_p->cond);
590  pthread_mutex_unlock(&bsem_p->mutex);
591 }
592 
593 
594 /* Wait on semaphore until semaphore has value 0 */
595 static void bsem_wait(bsem* bsem_p) {
596  pthread_mutex_lock(&bsem_p->mutex);
597  while (bsem_p->v != 1) {
598  pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
599  }
600  bsem_p->v = 0;
601  pthread_mutex_unlock(&bsem_p->mutex);
602 }
struct job * prev
Definition: tpool.c:62
void * arg
Definition: tpool.c:64
void tpool_resume(threadpool tpool_p)
Definition: tpool.c:292
This file is the API for the classes in libsheepy.
threadpool tpool
Definition: tpool.c:47
free(s)
volatile int num_threads_working
Definition: tpool.c:90
int tpool_num_threads_working(threadpool tpool_p)
Definition: tpool.c:304
int len
Definition: tpool.c:74
struct thread thread
threadpool tpool_p
Definition: tpool.c:82
bsem * has_jobs
Definition: tpool.c:73
#define tpoolLockCount
Definition: tpool.h:19
void tpoolLock(int position)
Definition: tpool.c:127
struct tpool_ tpool_
Definition: tpool.c:61
#define err(str)
Definition: tpool.c:35
Definition: tpool.c:79
void finalizeLibsheepyRecycleContainers(void *arg UNUSED)
function called automatically (registered in tpool) when a thread exits This function calls all the r...
job * front
Definition: tpool.c:71
#define logE(...)
Definition: libsheepy.h:1060
void tpoolUlock(int position)
Definition: tpool.c:132
pthread_mutex_t thcount_lock
Definition: tpool.c:91
pthread_t pthread
Definition: tpool.c:81
void tpool_destroy(threadpool tpool_p)
Definition: tpool.c:232
pthread_cond_t threads_all_idle
Definition: tpool.c:92
struct bsem bsem
jobqueue jobqueue
Definition: tpool.c:93
volatile int num_threads_alive
Definition: tpool.c:89
pthread_mutex_t rwmutex
Definition: tpool.c:70
thread ** threads
Definition: tpool.c:88
pthread_mutex_t tpoolLocks[tpoolLockCount]
Definition: tpool.c:45
pthread_cond_t cond
Definition: tpool.c:55
Definition: tpool.c:87
void(* function)(void *arg)
Definition: tpool.c:63
in value value
job * rear
Definition: tpool.c:72
int v
Definition: tpool.c:56
int id
Definition: tpool.c:80
Definition: tpool.c:53
struct jobqueue jobqueue
struct job job
Definition: tpool.c:69
void tpool_wait(threadpool tpool_p)
Definition: tpool.c:221
void tpool_pause(threadpool tpool_p)
Definition: tpool.c:282
int tpool_add_work(threadpool tpool_p, void(*function_p)(void *), void *arg_p)
Definition: tpool.c:192
threadpool tpool_init(int num_threads)
Definition: tpool.c:137
pthread_mutex_t mutex
Definition: tpool.c:54
struct tpool_ * threadpool
Definition: tpool.h:54