11 #define _POSIX_C_SOURCE 200809L 16 #if defined(__APPLE__) && defined(__MACH__) 17 #define _DARWIN_C_SOURCE 1 21 #if defined(__linux__) 22 #include <sys/prctl.h> 34 #if !defined(DISABLE_PRINT) || defined(tpool_DEBUG) 35 #define err(str) logE(str) 40 static volatile int threads_keepalive;
41 static volatile int threads_on_hold;
63 void (*
function)(
void*
arg);
103 static int thread_init(
threadpool tpool_p,
struct thread** thread_p,
int id);
104 static void* thread_do(
struct thread* thread_p);
105 static void thread_hold(
int sig_id);
106 static void thread_destroy(
struct thread* thread_p);
108 static int jobqueue_init(
jobqueue* jobqueue_p);
109 static void jobqueue_clear(
jobqueue* jobqueue_p);
110 static void jobqueue_push(
jobqueue* jobqueue_p,
struct job* newjob_p);
111 static struct job* jobqueue_pull(
jobqueue* jobqueue_p);
112 static void jobqueue_destroy(
jobqueue* jobqueue_p);
114 static void bsem_init(
struct bsem *bsem_p,
int value);
115 static void bsem_reset(
struct bsem *bsem_p);
116 static void bsem_post(
struct bsem *bsem_p);
117 static void bsem_post_all(
struct bsem *bsem_p);
118 static void bsem_wait(
struct bsem *bsem_p);
140 threads_keepalive = 1;
142 if (num_threads < 0){
149 if (tpool_p == NULL){
150 err(
"tpool_init(): Could not allocate memory for thread pool\n");
157 if (jobqueue_init(&tpool_p->
jobqueue) == -1){
158 err(
"tpool_init(): Could not allocate memory for job queue\n");
166 err(
"tpool_init(): Could not allocate memory for threads\n");
167 jobqueue_destroy(&tpool_p->
jobqueue);
177 for (n=0; n<num_threads; n++){
178 thread_init(tpool_p, &tpool_p->
threads[n], n);
180 printf(
"tpool_DEBUG: Created thread %d in pool \n", n);
196 err(
"tpool_add_work(): tpool_p is NULL\n");
200 err(
"tpool_add_work(): function_p is NULL\n");
203 newjob=(
struct job*)malloc(
sizeof(
struct job));
205 err(
"tpool_add_work(): Could not allocate memory for new job\n");
214 jobqueue_push(&tpool_p->
jobqueue, newjob);
222 if (!tpool_p)
return;
234 if (!tpool_p)
return;
239 threads_keepalive = 0;
242 double TIMEOUT = 1.0;
244 double tpassed = 0.0;
246 while (tpassed < TIMEOUT && tpool_p->num_threads_alive){
249 tpassed = difftime(end,start);
259 jobqueue_destroy(&tpool_p->
jobqueue);
264 for (n=0; n < threads_total; n++){
268 if (result_code == EINVAL) {
269 logE(
"not a joinable thread or another thread is already waiting to join with this thread.");
273 for (n=0; n < threads_total; n++){
274 thread_destroy(tpool_p->
threads[n]);
283 if (!tpool_p)
return;
293 if (!tpool_p)
return;
305 if (!tpool_p)
return -1;
322 static int thread_init (
threadpool tpool_p,
struct thread** thread_p,
int id){
324 *thread_p = (
struct thread*)malloc(
sizeof(
struct thread));
325 if (thread_p == NULL){
326 err(
"thread_init(): Could not allocate memory for thread\n");
330 (*thread_p)->tpool_p =
tpool_p;
331 (*thread_p)->id =
id;
333 pthread_create(&(*thread_p)->pthread, NULL, (
void *)thread_do, (*thread_p));
342 static void thread_hold(
int sig_id) {
345 while (threads_on_hold){
359 static void* thread_do(
struct thread* thread_p){
362 char thread_name[128] = {0};
363 sprintf(thread_name,
"thread-pool-%d", thread_p->
id);
365 #if defined(__linux__) 367 prctl(PR_SET_NAME, thread_name);
368 #elif defined(__APPLE__) && defined(__MACH__) 369 pthread_setname_np(thread_name);
371 #warning "thread_do(): pthread_setname_np is not supported on this system" 378 struct sigaction act;
379 sigemptyset(&act.sa_mask);
381 act.sa_handler = thread_hold;
382 if (sigaction(SIGUSR2, &act, NULL) == -1) {
383 err(
"thread_do(): cannot handle SIGUSR2");
386 #if recycleContainers 390 err(
"pthread_key_create, finalizeLibsheepyRecycleContainers not registered, there will be memory leaks when the thread exits");
394 pthread_setspecific(key, (
const void *) 1);
396 #endif // recycleContainers 403 while(threads_keepalive){
407 if (threads_keepalive){
414 void (*func_buff)(
void*);
419 arg_buff = job_p->
arg;
442 static void thread_destroy (
thread* thread_p){
454 static int jobqueue_init(
jobqueue* jobqueue_p){
456 jobqueue_p->
front = NULL;
457 jobqueue_p->
rear = NULL;
464 pthread_mutex_init(&(jobqueue_p->
rwmutex), NULL);
472 static void jobqueue_clear(
jobqueue* jobqueue_p){
474 while(jobqueue_p->
len){
475 free(jobqueue_pull(jobqueue_p));
478 jobqueue_p->
front = NULL;
479 jobqueue_p->
rear = NULL;
488 static void jobqueue_push(
jobqueue* jobqueue_p,
struct job* newjob){
490 pthread_mutex_lock(&jobqueue_p->
rwmutex);
493 switch(jobqueue_p->
len){
496 jobqueue_p->
front = newjob;
497 jobqueue_p->
rear = newjob;
502 jobqueue_p->
rear = newjob;
508 pthread_mutex_unlock(&jobqueue_p->
rwmutex);
516 static struct job* jobqueue_pull(
jobqueue* jobqueue_p){
518 pthread_mutex_lock(&jobqueue_p->
rwmutex);
521 switch(jobqueue_p->
len){
527 jobqueue_p->
front = NULL;
528 jobqueue_p->
rear = NULL;
540 pthread_mutex_unlock(&jobqueue_p->
rwmutex);
546 static void jobqueue_destroy(
jobqueue* jobqueue_p){
547 jobqueue_clear(jobqueue_p);
559 static void bsem_init(
bsem *bsem_p,
int value) {
560 if (value < 0 || value > 1) {
561 err(
"bsem_init(): Binary semaphore can take only values 1 or 0");
564 pthread_mutex_init(&(bsem_p->
mutex), NULL);
565 pthread_cond_init(&(bsem_p->
cond), NULL);
571 static void bsem_reset(
bsem *bsem_p) {
572 bsem_init(bsem_p, 0);
577 static void bsem_post(
bsem *bsem_p) {
578 pthread_mutex_lock(&bsem_p->
mutex);
580 pthread_cond_signal(&bsem_p->
cond);
581 pthread_mutex_unlock(&bsem_p->
mutex);
586 static void bsem_post_all(
bsem *bsem_p) {
587 pthread_mutex_lock(&bsem_p->
mutex);
589 pthread_cond_broadcast(&bsem_p->
cond);
590 pthread_mutex_unlock(&bsem_p->
mutex);
595 static void bsem_wait(
bsem* bsem_p) {
596 pthread_mutex_lock(&bsem_p->
mutex);
597 while (bsem_p->
v != 1) {
598 pthread_cond_wait(&bsem_p->
cond, &bsem_p->
mutex);
601 pthread_mutex_unlock(&bsem_p->
mutex);
void tpool_resume(threadpool tpool_p)
This file is the API for the classes in libsheepy.
volatile int num_threads_working
int tpool_num_threads_working(threadpool tpool_p)
void tpoolLock(int position)
void finalizeLibsheepyRecycleContainers(void *arg UNUSED)
function called automatically (registered in tpool) when a thread exits This function calls all the r...
void tpoolUlock(int position)
pthread_mutex_t thcount_lock
void tpool_destroy(threadpool tpool_p)
pthread_cond_t threads_all_idle
volatile int num_threads_alive
pthread_mutex_t tpoolLocks[tpoolLockCount]
void(* function)(void *arg)
void tpool_wait(threadpool tpool_p)
void tpool_pause(threadpool tpool_p)
int tpool_add_work(threadpool tpool_p, void(*function_p)(void *), void *arg_p)
threadpool tpool_init(int num_threads)
struct tpool_ * threadpool