libsheepy

C lib for handling text files, strings and json like data structure with an object oriented system
git clone https://spartatek.se/git/libsheepy.git
Log | Files | Refs | README | LICENSE

tpool.c (15530B)


      1 /* ********************************
      2  * Author:       Johan Hanssen Seferidis
      3  * License:      MIT
      4  * Description:  Library providing a threading pool where you can add
      5  *               work. For usage, check the tpool.h file or README.md
      6  *
      7  *//** @file tpool.h *//*
      8  *
      9  ********************************/
     10 
     11 #define _POSIX_C_SOURCE 200809L
     12 #include <unistd.h>
     13 #include <signal.h>
     14 #include <stdio.h>
     15 #include <stdlib.h>
     16 #if defined(__APPLE__) && defined(__MACH__)
     17 #define _DARWIN_C_SOURCE 1
     18 #endif
     19 #include <pthread.h>
     20 #include <time.h>
     21 #if defined(__linux__)
     22 #include <sys/prctl.h>
     23 #endif
     24 
     25 #include "libsheepyObject.h" // for finalizeLibsheepyRecycleContainers
     26 #include "tpool.h"
     27 
     28 #ifdef tpool_DEBUG
     29 #define tpool_DEBUG 1
     30 #else
     31 #define tpool_DEBUG 0
     32 #endif
     33 
     34 #if !defined(DISABLE_PRINT) || defined(tpool_DEBUG)
     35 #define err(str) logE(str)
     36 #else
     37 #define err(str)
     38 #endif
     39 
     40 static volatile int threads_keepalive;
     41 static volatile int threads_on_hold;
     42 
     43 
     44 // locks, max 512 threads
     45 pthread_mutex_t tpoolLocks[tpoolLockCount]; // only one thread can have the lock
     46 
     47 threadpool tpool;
     48 
     49 /* ========================== STRUCTURES ============================ */
     50 
     51 
     52 /* Binary semaphore */
     53 typedef struct bsem {
     54   pthread_mutex_t mutex;
     55   pthread_cond_t   cond;
     56   int v;
     57 } bsem;
     58 
     59 
     60 /* Job */
     61 typedef struct job{
     62   struct job*  prev;                   /* pointer to previous job   */
     63   void   (*function)(void* arg);       /* function pointer          */
     64   void*  arg;                          /* function's argument       */
     65 } job;
     66 
     67 
     68 /* Job queue */
     69 typedef struct jobqueue{
     70   pthread_mutex_t rwmutex;             /* used for queue r/w access */
     71   job  *front;                         /* pointer to front of queue */
     72   job  *rear;                          /* pointer to rear  of queue */
     73   bsem *has_jobs;                      /* flag as binary semaphore  */
     74   int   len;                           /* number of jobs in queue   */
     75 } jobqueue;
     76 
     77 
     78 /* Thread */
     79 typedef struct thread{
     80   int       id;                        /* friendly id               */
     81   pthread_t pthread;                   /* pointer to actual thread  */
     82   threadpool tpool_p;            /* access to tpool          */
     83 } thread;
     84 
     85 
     86 /* Threadpool */
     87 typedef struct tpool_{
     88   thread**   threads;                  /* pointer to threads        */
     89   volatile int num_threads_alive;      /* threads currently alive   */
     90   volatile int num_threads_working;    /* threads currently working */
     91   pthread_mutex_t  thcount_lock;       /* used for thread count etc */
     92   pthread_cond_t  threads_all_idle;    /* signal to tpool_wait     */
     93   jobqueue  jobqueue;                  /* job queue                 */
     94 } tpool_;
     95 
     96 
     97 
     98 
     99 
    100 /* ========================== PROTOTYPES ============================ */
    101 
    102 
    103 static int  thread_init(threadpool tpool_p, struct thread** thread_p, int id);
    104 static void* thread_do(struct thread* thread_p);
    105 static void  thread_hold(int sig_id);
    106 static void  thread_destroy(struct thread* thread_p);
    107 
    108 static int   jobqueue_init(jobqueue* jobqueue_p);
    109 static void  jobqueue_clear(jobqueue* jobqueue_p);
    110 static void  jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
    111 static struct job* jobqueue_pull(jobqueue* jobqueue_p);
    112 static void  jobqueue_destroy(jobqueue* jobqueue_p);
    113 
    114 static void  bsem_init(struct bsem *bsem_p, int value);
    115 static void  bsem_reset(struct bsem *bsem_p);
    116 static void  bsem_post(struct bsem *bsem_p);
    117 static void  bsem_post_all(struct bsem *bsem_p);
    118 static void  bsem_wait(struct bsem *bsem_p);
    119 
    120 
    121 
    122 
    123 
    124 /* ========================== THREADPOOL ============================ */
    125 
    126 // use lock in the tpoolLocks array
    127 void tpoolLock(int position) {
    128   pthread_mutex_lock(&tpoolLocks[position]);
    129 }
    130 
    131 // unlock a lock in the tpoolLocks array
    132 void tpoolUlock(int position) {
    133   pthread_mutex_unlock(&tpoolLocks[position]);
    134 }
    135 
    136 /* Initialise thread pool */
    137 threadpool tpool_init(int num_threads){
    138 
    139   threads_on_hold   = 0;
    140   threads_keepalive = 1;
    141 
    142   if (num_threads < 0){
    143     num_threads = 0;
    144   }
    145 
    146   /* Make new thread pool */
    147   threadpool tpool_p;
    148   tpool_p = (threadpool)malloc(sizeof(struct tpool_));
    149   if (tpool_p == NULL){
    150     err("tpool_init(): Could not allocate memory for thread pool\n");
    151     return NULL;
    152   }
    153   tpool_p->num_threads_alive   = 0;
    154   tpool_p->num_threads_working = 0;
    155 
    156   /* Initialise the job queue */
    157   if (jobqueue_init(&tpool_p->jobqueue) == -1){
    158     err("tpool_init(): Could not allocate memory for job queue\n");
    159     free(tpool_p);
    160     return NULL;
    161   }
    162 
    163   /* Make threads in pool */
    164   tpool_p->threads = (struct thread**)malloc((size_t)num_threads * sizeof(struct thread *));
    165   if (tpool_p->threads == NULL){
    166     err("tpool_init(): Could not allocate memory for threads\n");
    167     jobqueue_destroy(&tpool_p->jobqueue);
    168     free(tpool_p);
    169     return NULL;
    170   }
    171 
    172   pthread_mutex_init(&(tpool_p->thcount_lock), NULL);
    173   pthread_cond_init(&tpool_p->threads_all_idle, NULL);
    174 
    175   /* Thread init */
    176   int n;
    177   for (n=0; n<num_threads; n++){
    178     thread_init(tpool_p, &tpool_p->threads[n], n);
    179 #if tpool_DEBUG
    180       printf("tpool_DEBUG: Created thread %d in pool \n", n);
    181 #endif
    182   }
    183 
    184   /* Wait for threads to initialize */
    185   while (tpool_p->num_threads_alive != num_threads) {}
    186 
    187   return tpool_p;
    188 }
    189 
    190 
    191 /* Add work to the thread pool */
    192 int tpool_add_work(threadpool tpool_p, void (*function_p)(void*), void* arg_p){
    193   job* newjob;
    194 
    195   if (!tpool_p){
    196     err("tpool_add_work(): tpool_p is NULL\n");
    197     return -1;
    198   }
    199   if (!function_p){
    200     err("tpool_add_work(): function_p is NULL\n");
    201     return -1;
    202   }
    203   newjob=(struct job*)malloc(sizeof(struct job));
    204   if (newjob==NULL){
    205     err("tpool_add_work(): Could not allocate memory for new job\n");
    206     return -1;
    207   }
    208 
    209   /* add function and argument */
    210   newjob->function=function_p;
    211   newjob->arg=arg_p;
    212 
    213   /* add job to queue */
    214   jobqueue_push(&tpool_p->jobqueue, newjob);
    215 
    216   return 0;
    217 }
    218 
    219 
    220 /* Wait until all jobs have finished */
    221 void tpool_wait(threadpool tpool_p){
    222   if (!tpool_p) return;
    223   pthread_mutex_lock(&tpool_p->thcount_lock);
    224   while (tpool_p->jobqueue.len || tpool_p->num_threads_working) {
    225     pthread_cond_wait(&tpool_p->threads_all_idle, &tpool_p->thcount_lock);
    226   }
    227   pthread_mutex_unlock(&tpool_p->thcount_lock);
    228 }
    229 
    230 
    231 /* Destroy the threadpool */
    232 void tpool_destroy(threadpool tpool_p){
    233   /* No need to destory if it's NULL */
    234   if (!tpool_p) return;
    235 
    236   volatile int threads_total = tpool_p->num_threads_alive;
    237 
    238   /* End each thread 's infinite loop */
    239   threads_keepalive = 0;
    240 
    241   /* Give one second to kill idle threads */
    242   double TIMEOUT = 1.0;
    243   time_t start, end;
    244   double tpassed = 0.0;
    245   time (&start);
    246   while (tpassed < TIMEOUT && tpool_p->num_threads_alive){
    247     bsem_post_all(tpool_p->jobqueue.has_jobs);
    248     time (&end);
    249     tpassed = difftime(end,start);
    250   }
    251 
    252   /* Poll remaining threads */
    253   while (tpool_p->num_threads_alive){
    254     bsem_post_all(tpool_p->jobqueue.has_jobs);
    255     sleep(1);
    256   }
    257 
    258   /* Job queue cleanup */
    259   jobqueue_destroy(&tpool_p->jobqueue);
    260   int n;
    261   /* wait until all threads are finished because when recycleContainers is enabled
    262    * all threads must finish before running dArrayFree in finalizeLibsheepyAtExit
    263    * in the main process */
    264   for (n=0; n < threads_total; n++){
    265     /* ignore error code from pthread_join because the threads might already be finished at this time */
    266     int result_code;
    267     result_code = pthread_join(tpool_p->threads[n]->pthread, NULL);
    268     if (result_code == EINVAL) {
    269       logE("not a joinable thread or another thread is already waiting to join with this thread.");
    270     }
    271   }
    272   /* Deallocs */
    273   for (n=0; n < threads_total; n++){
    274     thread_destroy(tpool_p->threads[n]);
    275   }
    276   free(tpool_p->threads);
    277   free(tpool_p);
    278 }
    279 
    280 
    281 /* Pause all threads in threadpool */
    282 void tpool_pause(threadpool tpool_p) {
    283   if (!tpool_p) return;
    284   int n;
    285   for (n=0; n < tpool_p->num_threads_alive; n++){
    286     pthread_kill(tpool_p->threads[n]->pthread, SIGUSR2);
    287   }
    288 }
    289 
    290 
    291 /* Resume all threads in threadpool */
    292 void tpool_resume(threadpool tpool_p) {
    293   if (!tpool_p) return;
    294 
    295     // resuming a single threadpool hasn't been
    296     // implemented yet, meanwhile this supresses
    297     // the warnings
    298     (void)tpool_p;
    299 
    300   threads_on_hold = 0;
    301 }
    302 
    303 
    304 int tpool_num_threads_working(threadpool tpool_p){
    305   if (!tpool_p) return -1;
    306   return tpool_p->num_threads_working;
    307 }
    308 
    309 
    310 
    311 
    312 
    313 /* ============================ THREAD ============================== */
    314 
    315 
    316 /* Initialize a thread in the thread pool
    317  *
    318  * @param thread        address to the pointer of the thread to be created
    319  * @param id            id to be given to the thread
    320  * @return 0 on success, -1 otherwise.
    321  */
    322 static int thread_init (threadpool tpool_p, struct thread** thread_p, int id){
    323 
    324   *thread_p = (struct thread*)malloc(sizeof(struct thread));
    325   if (thread_p == NULL){
    326     err("thread_init(): Could not allocate memory for thread\n");
    327     return -1;
    328   }
    329 
    330   (*thread_p)->tpool_p = tpool_p;
    331   (*thread_p)->id       = id;
    332 
    333   pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
    334   // keep thread attached to be able to join them in tpool_destroy
    335   // all threads must finish before running dArrayFree in finalizeLibsheepyAtExit
    336   /* pthread_detach((*thread_p)->pthread); */
    337   return 0;
    338 }
    339 
    340 
    341 /* Sets the calling thread on hold */
    342 static void thread_hold(int sig_id) {
    343     (void)sig_id;
    344   threads_on_hold = 1;
    345   while (threads_on_hold){
    346     sleep(1);
    347   }
    348 }
    349 
    350 
    351 /* What each thread is doing
    352 *
    353 * In principle this is an endless loop. The only time this loop gets interuppted is once
    354 * tpool_destroy() is invoked or the program exits.
    355 *
    356 * @param  thread        thread that will run this function
    357 * @return nothing
    358 */
    359 static void* thread_do(struct thread* thread_p){
    360 
    361   /* Set thread name for profiling and debuging */
    362   char thread_name[128] = {0};
    363   sprintf(thread_name, "thread-pool-%d", thread_p->id);
    364 
    365 #if defined(__linux__)
    366   /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
    367   prctl(PR_SET_NAME, thread_name);
    368 #elif defined(__APPLE__) && defined(__MACH__)
    369   pthread_setname_np(thread_name);
    370 #else
    371   #warning "thread_do(): pthread_setname_np is not supported on this system"
    372 #endif
    373 
    374   /* Assure all threads have been created before starting serving */
    375   threadpool tpool_p = thread_p->tpool_p;
    376 
    377   /* Register signal handler */
    378   struct sigaction act;
    379   sigemptyset(&act.sa_mask);
    380   act.sa_flags = 0;
    381   act.sa_handler = thread_hold;
    382   if (sigaction(SIGUSR2, &act, NULL) == -1) {
    383     err("thread_do(): cannot handle SIGUSR2");
    384   }
    385 
    386   #if recycleContainers
    387   /* the containers are recycled in the thread local storage, the unused containers are freed at thread exit */
    388   pthread_key_t key;
    389   if (pthread_key_create(&key, finalizeLibsheepyRecycleContainers)) {
    390     err("pthread_key_create, finalizeLibsheepyRecycleContainers not registered, there will be memory leaks when the thread exits");
    391   }
    392   else {
    393     // set value to call the destructor at exit
    394     pthread_setspecific(key, (const void *) 1);
    395   }
    396   #endif // recycleContainers
    397 
    398   /* Mark thread as alive (initialized) */
    399   pthread_mutex_lock(&tpool_p->thcount_lock);
    400   tpool_p->num_threads_alive += 1;
    401   pthread_mutex_unlock(&tpool_p->thcount_lock);
    402 
    403   while(threads_keepalive){
    404 
    405     bsem_wait(tpool_p->jobqueue.has_jobs);
    406 
    407     if (threads_keepalive){
    408 
    409       pthread_mutex_lock(&tpool_p->thcount_lock);
    410       tpool_p->num_threads_working++;
    411       pthread_mutex_unlock(&tpool_p->thcount_lock);
    412 
    413       /* Read job from queue and execute it */
    414       void (*func_buff)(void*);
    415       void*  arg_buff;
    416       job* job_p = jobqueue_pull(&tpool_p->jobqueue);
    417       if (job_p) {
    418         func_buff = job_p->function;
    419         arg_buff  = job_p->arg;
    420         func_buff(arg_buff);
    421         free(job_p);
    422       }
    423 
    424       pthread_mutex_lock(&tpool_p->thcount_lock);
    425       tpool_p->num_threads_working--;
    426       if (!tpool_p->num_threads_working) {
    427         pthread_cond_signal(&tpool_p->threads_all_idle);
    428       }
    429       pthread_mutex_unlock(&tpool_p->thcount_lock);
    430 
    431     }
    432   }
    433   pthread_mutex_lock(&tpool_p->thcount_lock);
    434   tpool_p->num_threads_alive --;
    435   pthread_mutex_unlock(&tpool_p->thcount_lock);
    436 
    437   return NULL;
    438 }
    439 
    440 
    441 /* Frees a thread  */
    442 static void thread_destroy (thread* thread_p){
    443   free(thread_p);
    444 }
    445 
    446 
    447 
    448 
    449 
    450 /* ============================ JOB QUEUE =========================== */
    451 
    452 
    453 /* Initialize queue */
    454 static int jobqueue_init(jobqueue* jobqueue_p){
    455   jobqueue_p->len = 0;
    456   jobqueue_p->front = NULL;
    457   jobqueue_p->rear  = NULL;
    458 
    459   jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
    460   if (jobqueue_p->has_jobs == NULL){
    461     return -1;
    462   }
    463 
    464   pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
    465   bsem_init(jobqueue_p->has_jobs, 0);
    466 
    467   return 0;
    468 }
    469 
    470 
    471 /* Clear the queue */
    472 static void jobqueue_clear(jobqueue* jobqueue_p){
    473 
    474   while(jobqueue_p->len){
    475     free(jobqueue_pull(jobqueue_p));
    476   }
    477 
    478   jobqueue_p->front = NULL;
    479   jobqueue_p->rear  = NULL;
    480   bsem_reset(jobqueue_p->has_jobs);
    481   jobqueue_p->len = 0;
    482 
    483 }
    484 
    485 
    486 /* Add (allocated) job to queue
    487  */
    488 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
    489 
    490   pthread_mutex_lock(&jobqueue_p->rwmutex);
    491   newjob->prev = NULL;
    492 
    493   switch(jobqueue_p->len){
    494 
    495     case 0:  /* if no jobs in queue */
    496           jobqueue_p->front = newjob;
    497           jobqueue_p->rear  = newjob;
    498           break;
    499 
    500     default: /* if jobs in queue */
    501           jobqueue_p->rear->prev = newjob;
    502           jobqueue_p->rear = newjob;
    503 
    504   }
    505   jobqueue_p->len++;
    506 
    507   bsem_post(jobqueue_p->has_jobs);
    508   pthread_mutex_unlock(&jobqueue_p->rwmutex);
    509 }
    510 
    511 
    512 /* Get first job from queue(removes it from queue)
    513  *
    514  * Notice: Caller MUST hold a mutex
    515  */
    516 static struct job* jobqueue_pull(jobqueue* jobqueue_p){
    517 
    518   pthread_mutex_lock(&jobqueue_p->rwmutex);
    519   job* job_p = jobqueue_p->front;
    520 
    521   switch(jobqueue_p->len){
    522 
    523     case 0:  /* if no jobs in queue */
    524             break;
    525 
    526     case 1:  /* if one job in queue */
    527           jobqueue_p->front = NULL;
    528           jobqueue_p->rear  = NULL;
    529           jobqueue_p->len = 0;
    530           break;
    531 
    532     default: /* if >1 jobs in queue */
    533           jobqueue_p->front = job_p->prev;
    534           jobqueue_p->len--;
    535           /* more than one job in queue -> post it */
    536           bsem_post(jobqueue_p->has_jobs);
    537 
    538   }
    539 
    540   pthread_mutex_unlock(&jobqueue_p->rwmutex);
    541   return job_p;
    542 }
    543 
    544 
    545 /* Free all queue resources back to the system */
    546 static void jobqueue_destroy(jobqueue* jobqueue_p){
    547   jobqueue_clear(jobqueue_p);
    548   free(jobqueue_p->has_jobs);
    549 }
    550 
    551 
    552 
    553 
    554 
    555 /* ======================== SYNCHRONISATION ========================= */
    556 
    557 
    558 /* Init semaphore to 1 or 0 */
    559 static void bsem_init(bsem *bsem_p, int value) {
    560   if (value < 0 || value > 1) {
    561     err("bsem_init(): Binary semaphore can take only values 1 or 0");
    562     exit(1);
    563   }
    564   pthread_mutex_init(&(bsem_p->mutex), NULL);
    565   pthread_cond_init(&(bsem_p->cond), NULL);
    566   bsem_p->v = value;
    567 }
    568 
    569 
    570 /* Reset semaphore to 0 */
    571 static void bsem_reset(bsem *bsem_p) {
    572   bsem_init(bsem_p, 0);
    573 }
    574 
    575 
    576 /* Post to at least one thread */
    577 static void bsem_post(bsem *bsem_p) {
    578   pthread_mutex_lock(&bsem_p->mutex);
    579   bsem_p->v = 1;
    580   pthread_cond_signal(&bsem_p->cond);
    581   pthread_mutex_unlock(&bsem_p->mutex);
    582 }
    583 
    584 
    585 /* Post to all threads */
    586 static void bsem_post_all(bsem *bsem_p) {
    587   pthread_mutex_lock(&bsem_p->mutex);
    588   bsem_p->v = 1;
    589   pthread_cond_broadcast(&bsem_p->cond);
    590   pthread_mutex_unlock(&bsem_p->mutex);
    591 }
    592 
    593 
    594 /* Wait on semaphore until semaphore has value 0 */
    595 static void bsem_wait(bsem* bsem_p) {
    596   pthread_mutex_lock(&bsem_p->mutex);
    597   while (bsem_p->v != 1) {
    598     pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
    599   }
    600   bsem_p->v = 0;
    601   pthread_mutex_unlock(&bsem_p->mutex);
    602 }