tpool.c (15530B)
1 /* ******************************** 2 * Author: Johan Hanssen Seferidis 3 * License: MIT 4 * Description: Library providing a threading pool where you can add 5 * work. For usage, check the tpool.h file or README.md 6 * 7 *//** @file tpool.h *//* 8 * 9 ********************************/ 10 11 #define _POSIX_C_SOURCE 200809L 12 #include <unistd.h> 13 #include <signal.h> 14 #include <stdio.h> 15 #include <stdlib.h> 16 #if defined(__APPLE__) && defined(__MACH__) 17 #define _DARWIN_C_SOURCE 1 18 #endif 19 #include <pthread.h> 20 #include <time.h> 21 #if defined(__linux__) 22 #include <sys/prctl.h> 23 #endif 24 25 #include "libsheepyObject.h" // for finalizeLibsheepyRecycleContainers 26 #include "tpool.h" 27 28 #ifdef tpool_DEBUG 29 #define tpool_DEBUG 1 30 #else 31 #define tpool_DEBUG 0 32 #endif 33 34 #if !defined(DISABLE_PRINT) || defined(tpool_DEBUG) 35 #define err(str) logE(str) 36 #else 37 #define err(str) 38 #endif 39 40 static volatile int threads_keepalive; 41 static volatile int threads_on_hold; 42 43 44 // locks, max 512 threads 45 pthread_mutex_t tpoolLocks[tpoolLockCount]; // only one thread can have the lock 46 47 threadpool tpool; 48 49 /* ========================== STRUCTURES ============================ */ 50 51 52 /* Binary semaphore */ 53 typedef struct bsem { 54 pthread_mutex_t mutex; 55 pthread_cond_t cond; 56 int v; 57 } bsem; 58 59 60 /* Job */ 61 typedef struct job{ 62 struct job* prev; /* pointer to previous job */ 63 void (*function)(void* arg); /* function pointer */ 64 void* arg; /* function's argument */ 65 } job; 66 67 68 /* Job queue */ 69 typedef struct jobqueue{ 70 pthread_mutex_t rwmutex; /* used for queue r/w access */ 71 job *front; /* pointer to front of queue */ 72 job *rear; /* pointer to rear of queue */ 73 bsem *has_jobs; /* flag as binary semaphore */ 74 int len; /* number of jobs in queue */ 75 } jobqueue; 76 77 78 /* Thread */ 79 typedef struct thread{ 80 int id; /* friendly id */ 81 pthread_t pthread; /* pointer to actual thread */ 82 threadpool tpool_p; /* access to tpool */ 83 } thread; 84 85 86 /* Threadpool */ 87 typedef struct tpool_{ 88 thread** threads; /* pointer to threads */ 89 volatile int num_threads_alive; /* threads currently alive */ 90 volatile int num_threads_working; /* threads currently working */ 91 pthread_mutex_t thcount_lock; /* used for thread count etc */ 92 pthread_cond_t threads_all_idle; /* signal to tpool_wait */ 93 jobqueue jobqueue; /* job queue */ 94 } tpool_; 95 96 97 98 99 100 /* ========================== PROTOTYPES ============================ */ 101 102 103 static int thread_init(threadpool tpool_p, struct thread** thread_p, int id); 104 static void* thread_do(struct thread* thread_p); 105 static void thread_hold(int sig_id); 106 static void thread_destroy(struct thread* thread_p); 107 108 static int jobqueue_init(jobqueue* jobqueue_p); 109 static void jobqueue_clear(jobqueue* jobqueue_p); 110 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); 111 static struct job* jobqueue_pull(jobqueue* jobqueue_p); 112 static void jobqueue_destroy(jobqueue* jobqueue_p); 113 114 static void bsem_init(struct bsem *bsem_p, int value); 115 static void bsem_reset(struct bsem *bsem_p); 116 static void bsem_post(struct bsem *bsem_p); 117 static void bsem_post_all(struct bsem *bsem_p); 118 static void bsem_wait(struct bsem *bsem_p); 119 120 121 122 123 124 /* ========================== THREADPOOL ============================ */ 125 126 // use lock in the tpoolLocks array 127 void tpoolLock(int position) { 128 pthread_mutex_lock(&tpoolLocks[position]); 129 } 130 131 // unlock a lock in the tpoolLocks array 132 void tpoolUlock(int position) { 133 pthread_mutex_unlock(&tpoolLocks[position]); 134 } 135 136 /* Initialise thread pool */ 137 threadpool tpool_init(int num_threads){ 138 139 threads_on_hold = 0; 140 threads_keepalive = 1; 141 142 if (num_threads < 0){ 143 num_threads = 0; 144 } 145 146 /* Make new thread pool */ 147 threadpool tpool_p; 148 tpool_p = (threadpool)malloc(sizeof(struct tpool_)); 149 if (tpool_p == NULL){ 150 err("tpool_init(): Could not allocate memory for thread pool\n"); 151 return NULL; 152 } 153 tpool_p->num_threads_alive = 0; 154 tpool_p->num_threads_working = 0; 155 156 /* Initialise the job queue */ 157 if (jobqueue_init(&tpool_p->jobqueue) == -1){ 158 err("tpool_init(): Could not allocate memory for job queue\n"); 159 free(tpool_p); 160 return NULL; 161 } 162 163 /* Make threads in pool */ 164 tpool_p->threads = (struct thread**)malloc((size_t)num_threads * sizeof(struct thread *)); 165 if (tpool_p->threads == NULL){ 166 err("tpool_init(): Could not allocate memory for threads\n"); 167 jobqueue_destroy(&tpool_p->jobqueue); 168 free(tpool_p); 169 return NULL; 170 } 171 172 pthread_mutex_init(&(tpool_p->thcount_lock), NULL); 173 pthread_cond_init(&tpool_p->threads_all_idle, NULL); 174 175 /* Thread init */ 176 int n; 177 for (n=0; n<num_threads; n++){ 178 thread_init(tpool_p, &tpool_p->threads[n], n); 179 #if tpool_DEBUG 180 printf("tpool_DEBUG: Created thread %d in pool \n", n); 181 #endif 182 } 183 184 /* Wait for threads to initialize */ 185 while (tpool_p->num_threads_alive != num_threads) {} 186 187 return tpool_p; 188 } 189 190 191 /* Add work to the thread pool */ 192 int tpool_add_work(threadpool tpool_p, void (*function_p)(void*), void* arg_p){ 193 job* newjob; 194 195 if (!tpool_p){ 196 err("tpool_add_work(): tpool_p is NULL\n"); 197 return -1; 198 } 199 if (!function_p){ 200 err("tpool_add_work(): function_p is NULL\n"); 201 return -1; 202 } 203 newjob=(struct job*)malloc(sizeof(struct job)); 204 if (newjob==NULL){ 205 err("tpool_add_work(): Could not allocate memory for new job\n"); 206 return -1; 207 } 208 209 /* add function and argument */ 210 newjob->function=function_p; 211 newjob->arg=arg_p; 212 213 /* add job to queue */ 214 jobqueue_push(&tpool_p->jobqueue, newjob); 215 216 return 0; 217 } 218 219 220 /* Wait until all jobs have finished */ 221 void tpool_wait(threadpool tpool_p){ 222 if (!tpool_p) return; 223 pthread_mutex_lock(&tpool_p->thcount_lock); 224 while (tpool_p->jobqueue.len || tpool_p->num_threads_working) { 225 pthread_cond_wait(&tpool_p->threads_all_idle, &tpool_p->thcount_lock); 226 } 227 pthread_mutex_unlock(&tpool_p->thcount_lock); 228 } 229 230 231 /* Destroy the threadpool */ 232 void tpool_destroy(threadpool tpool_p){ 233 /* No need to destory if it's NULL */ 234 if (!tpool_p) return; 235 236 volatile int threads_total = tpool_p->num_threads_alive; 237 238 /* End each thread 's infinite loop */ 239 threads_keepalive = 0; 240 241 /* Give one second to kill idle threads */ 242 double TIMEOUT = 1.0; 243 time_t start, end; 244 double tpassed = 0.0; 245 time (&start); 246 while (tpassed < TIMEOUT && tpool_p->num_threads_alive){ 247 bsem_post_all(tpool_p->jobqueue.has_jobs); 248 time (&end); 249 tpassed = difftime(end,start); 250 } 251 252 /* Poll remaining threads */ 253 while (tpool_p->num_threads_alive){ 254 bsem_post_all(tpool_p->jobqueue.has_jobs); 255 sleep(1); 256 } 257 258 /* Job queue cleanup */ 259 jobqueue_destroy(&tpool_p->jobqueue); 260 int n; 261 /* wait until all threads are finished because when recycleContainers is enabled 262 * all threads must finish before running dArrayFree in finalizeLibsheepyAtExit 263 * in the main process */ 264 for (n=0; n < threads_total; n++){ 265 /* ignore error code from pthread_join because the threads might already be finished at this time */ 266 int result_code; 267 result_code = pthread_join(tpool_p->threads[n]->pthread, NULL); 268 if (result_code == EINVAL) { 269 logE("not a joinable thread or another thread is already waiting to join with this thread."); 270 } 271 } 272 /* Deallocs */ 273 for (n=0; n < threads_total; n++){ 274 thread_destroy(tpool_p->threads[n]); 275 } 276 free(tpool_p->threads); 277 free(tpool_p); 278 } 279 280 281 /* Pause all threads in threadpool */ 282 void tpool_pause(threadpool tpool_p) { 283 if (!tpool_p) return; 284 int n; 285 for (n=0; n < tpool_p->num_threads_alive; n++){ 286 pthread_kill(tpool_p->threads[n]->pthread, SIGUSR2); 287 } 288 } 289 290 291 /* Resume all threads in threadpool */ 292 void tpool_resume(threadpool tpool_p) { 293 if (!tpool_p) return; 294 295 // resuming a single threadpool hasn't been 296 // implemented yet, meanwhile this supresses 297 // the warnings 298 (void)tpool_p; 299 300 threads_on_hold = 0; 301 } 302 303 304 int tpool_num_threads_working(threadpool tpool_p){ 305 if (!tpool_p) return -1; 306 return tpool_p->num_threads_working; 307 } 308 309 310 311 312 313 /* ============================ THREAD ============================== */ 314 315 316 /* Initialize a thread in the thread pool 317 * 318 * @param thread address to the pointer of the thread to be created 319 * @param id id to be given to the thread 320 * @return 0 on success, -1 otherwise. 321 */ 322 static int thread_init (threadpool tpool_p, struct thread** thread_p, int id){ 323 324 *thread_p = (struct thread*)malloc(sizeof(struct thread)); 325 if (thread_p == NULL){ 326 err("thread_init(): Could not allocate memory for thread\n"); 327 return -1; 328 } 329 330 (*thread_p)->tpool_p = tpool_p; 331 (*thread_p)->id = id; 332 333 pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p)); 334 // keep thread attached to be able to join them in tpool_destroy 335 // all threads must finish before running dArrayFree in finalizeLibsheepyAtExit 336 /* pthread_detach((*thread_p)->pthread); */ 337 return 0; 338 } 339 340 341 /* Sets the calling thread on hold */ 342 static void thread_hold(int sig_id) { 343 (void)sig_id; 344 threads_on_hold = 1; 345 while (threads_on_hold){ 346 sleep(1); 347 } 348 } 349 350 351 /* What each thread is doing 352 * 353 * In principle this is an endless loop. The only time this loop gets interuppted is once 354 * tpool_destroy() is invoked or the program exits. 355 * 356 * @param thread thread that will run this function 357 * @return nothing 358 */ 359 static void* thread_do(struct thread* thread_p){ 360 361 /* Set thread name for profiling and debuging */ 362 char thread_name[128] = {0}; 363 sprintf(thread_name, "thread-pool-%d", thread_p->id); 364 365 #if defined(__linux__) 366 /* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */ 367 prctl(PR_SET_NAME, thread_name); 368 #elif defined(__APPLE__) && defined(__MACH__) 369 pthread_setname_np(thread_name); 370 #else 371 #warning "thread_do(): pthread_setname_np is not supported on this system" 372 #endif 373 374 /* Assure all threads have been created before starting serving */ 375 threadpool tpool_p = thread_p->tpool_p; 376 377 /* Register signal handler */ 378 struct sigaction act; 379 sigemptyset(&act.sa_mask); 380 act.sa_flags = 0; 381 act.sa_handler = thread_hold; 382 if (sigaction(SIGUSR2, &act, NULL) == -1) { 383 err("thread_do(): cannot handle SIGUSR2"); 384 } 385 386 #if recycleContainers 387 /* the containers are recycled in the thread local storage, the unused containers are freed at thread exit */ 388 pthread_key_t key; 389 if (pthread_key_create(&key, finalizeLibsheepyRecycleContainers)) { 390 err("pthread_key_create, finalizeLibsheepyRecycleContainers not registered, there will be memory leaks when the thread exits"); 391 } 392 else { 393 // set value to call the destructor at exit 394 pthread_setspecific(key, (const void *) 1); 395 } 396 #endif // recycleContainers 397 398 /* Mark thread as alive (initialized) */ 399 pthread_mutex_lock(&tpool_p->thcount_lock); 400 tpool_p->num_threads_alive += 1; 401 pthread_mutex_unlock(&tpool_p->thcount_lock); 402 403 while(threads_keepalive){ 404 405 bsem_wait(tpool_p->jobqueue.has_jobs); 406 407 if (threads_keepalive){ 408 409 pthread_mutex_lock(&tpool_p->thcount_lock); 410 tpool_p->num_threads_working++; 411 pthread_mutex_unlock(&tpool_p->thcount_lock); 412 413 /* Read job from queue and execute it */ 414 void (*func_buff)(void*); 415 void* arg_buff; 416 job* job_p = jobqueue_pull(&tpool_p->jobqueue); 417 if (job_p) { 418 func_buff = job_p->function; 419 arg_buff = job_p->arg; 420 func_buff(arg_buff); 421 free(job_p); 422 } 423 424 pthread_mutex_lock(&tpool_p->thcount_lock); 425 tpool_p->num_threads_working--; 426 if (!tpool_p->num_threads_working) { 427 pthread_cond_signal(&tpool_p->threads_all_idle); 428 } 429 pthread_mutex_unlock(&tpool_p->thcount_lock); 430 431 } 432 } 433 pthread_mutex_lock(&tpool_p->thcount_lock); 434 tpool_p->num_threads_alive --; 435 pthread_mutex_unlock(&tpool_p->thcount_lock); 436 437 return NULL; 438 } 439 440 441 /* Frees a thread */ 442 static void thread_destroy (thread* thread_p){ 443 free(thread_p); 444 } 445 446 447 448 449 450 /* ============================ JOB QUEUE =========================== */ 451 452 453 /* Initialize queue */ 454 static int jobqueue_init(jobqueue* jobqueue_p){ 455 jobqueue_p->len = 0; 456 jobqueue_p->front = NULL; 457 jobqueue_p->rear = NULL; 458 459 jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); 460 if (jobqueue_p->has_jobs == NULL){ 461 return -1; 462 } 463 464 pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); 465 bsem_init(jobqueue_p->has_jobs, 0); 466 467 return 0; 468 } 469 470 471 /* Clear the queue */ 472 static void jobqueue_clear(jobqueue* jobqueue_p){ 473 474 while(jobqueue_p->len){ 475 free(jobqueue_pull(jobqueue_p)); 476 } 477 478 jobqueue_p->front = NULL; 479 jobqueue_p->rear = NULL; 480 bsem_reset(jobqueue_p->has_jobs); 481 jobqueue_p->len = 0; 482 483 } 484 485 486 /* Add (allocated) job to queue 487 */ 488 static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ 489 490 pthread_mutex_lock(&jobqueue_p->rwmutex); 491 newjob->prev = NULL; 492 493 switch(jobqueue_p->len){ 494 495 case 0: /* if no jobs in queue */ 496 jobqueue_p->front = newjob; 497 jobqueue_p->rear = newjob; 498 break; 499 500 default: /* if jobs in queue */ 501 jobqueue_p->rear->prev = newjob; 502 jobqueue_p->rear = newjob; 503 504 } 505 jobqueue_p->len++; 506 507 bsem_post(jobqueue_p->has_jobs); 508 pthread_mutex_unlock(&jobqueue_p->rwmutex); 509 } 510 511 512 /* Get first job from queue(removes it from queue) 513 * 514 * Notice: Caller MUST hold a mutex 515 */ 516 static struct job* jobqueue_pull(jobqueue* jobqueue_p){ 517 518 pthread_mutex_lock(&jobqueue_p->rwmutex); 519 job* job_p = jobqueue_p->front; 520 521 switch(jobqueue_p->len){ 522 523 case 0: /* if no jobs in queue */ 524 break; 525 526 case 1: /* if one job in queue */ 527 jobqueue_p->front = NULL; 528 jobqueue_p->rear = NULL; 529 jobqueue_p->len = 0; 530 break; 531 532 default: /* if >1 jobs in queue */ 533 jobqueue_p->front = job_p->prev; 534 jobqueue_p->len--; 535 /* more than one job in queue -> post it */ 536 bsem_post(jobqueue_p->has_jobs); 537 538 } 539 540 pthread_mutex_unlock(&jobqueue_p->rwmutex); 541 return job_p; 542 } 543 544 545 /* Free all queue resources back to the system */ 546 static void jobqueue_destroy(jobqueue* jobqueue_p){ 547 jobqueue_clear(jobqueue_p); 548 free(jobqueue_p->has_jobs); 549 } 550 551 552 553 554 555 /* ======================== SYNCHRONISATION ========================= */ 556 557 558 /* Init semaphore to 1 or 0 */ 559 static void bsem_init(bsem *bsem_p, int value) { 560 if (value < 0 || value > 1) { 561 err("bsem_init(): Binary semaphore can take only values 1 or 0"); 562 exit(1); 563 } 564 pthread_mutex_init(&(bsem_p->mutex), NULL); 565 pthread_cond_init(&(bsem_p->cond), NULL); 566 bsem_p->v = value; 567 } 568 569 570 /* Reset semaphore to 0 */ 571 static void bsem_reset(bsem *bsem_p) { 572 bsem_init(bsem_p, 0); 573 } 574 575 576 /* Post to at least one thread */ 577 static void bsem_post(bsem *bsem_p) { 578 pthread_mutex_lock(&bsem_p->mutex); 579 bsem_p->v = 1; 580 pthread_cond_signal(&bsem_p->cond); 581 pthread_mutex_unlock(&bsem_p->mutex); 582 } 583 584 585 /* Post to all threads */ 586 static void bsem_post_all(bsem *bsem_p) { 587 pthread_mutex_lock(&bsem_p->mutex); 588 bsem_p->v = 1; 589 pthread_cond_broadcast(&bsem_p->cond); 590 pthread_mutex_unlock(&bsem_p->mutex); 591 } 592 593 594 /* Wait on semaphore until semaphore has value 0 */ 595 static void bsem_wait(bsem* bsem_p) { 596 pthread_mutex_lock(&bsem_p->mutex); 597 while (bsem_p->v != 1) { 598 pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); 599 } 600 bsem_p->v = 0; 601 pthread_mutex_unlock(&bsem_p->mutex); 602 }