diff --git a/thpool.c b/thpool.c index e1d57ea..ef70a12 100644 --- a/thpool.c +++ b/thpool.c @@ -10,6 +10,7 @@ ********************************/ #include +#include #include #include #include @@ -22,15 +23,17 @@ #define POLLING_INTERVAL 1 -static int thpool_keepalive = 1; - - +static int threads_keepalive; +static int threads_hold_flag; /* Initialise thread pool */ thpool_t* thpool_init(int threadsN){ + threads_hold_flag = 0; + threads_keepalive = 1; + if (threadsN < 0){ threadsN = 0; } @@ -58,62 +61,22 @@ thpool_t* thpool_init(int threadsN){ thpool->threadsN=threadsN; int n; for (n=0; nthreads[n]); + (*th).id = n; args_t args; args.arg1 = thpool; args.arg2 = th; - pthread_create(&((*th).pthread), NULL, (void *)thpool_thread_do, (args_t*)&args); - (*th).id = n; + pthread_create(&((*th).pthread), NULL, (void *)thread_do, (args_t*)&args); + pthread_detach((*th).pthread); printf("Created thread %d in pool \n", (*th).id); } - + return thpool; } -/* What each individual thread is doing - * */ -static void thpool_thread_do(args_t* args){ - - thpool_t* thpool; - thread_t* thread; - thpool = (*args).arg1; - thread = (*args).arg2; - (*thread).working = 0; - - while(thpool_keepalive){ - //sleep(1); - - printf("thread %d not working anymore\n", (*thread).id); - bsem_wait(thpool->jobqueue->has_jobs); - - if (thpool_keepalive){ - - /* Read job from queue and execute it */ - void*(*func_buff)(void* arg); - void* arg_buff; - job_t* job; - pthread_mutex_lock(&thpool->rwmutex); - job = jobqueue_pull(thpool); - pthread_mutex_unlock(&thpool->rwmutex); - if (job) { - (*thread).working = 1; - func_buff = job->function; - arg_buff = job->arg; - func_buff(arg_buff); - free(job); - } - (*thread).working = 0; - } - } - - pthread_mutex_lock(&thpool->rwmutex); - thpool->threadsN --; - pthread_mutex_unlock(&thpool->rwmutex); -} - - /* Add work to the thread pool */ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){ job_t* newjob; @@ -139,35 +102,21 @@ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){ /* Wait until all jobs in queue have finished */ void thpool_wait(thpool_t* thpool){ - - int any_threads_working(thpool_t* thpool){ - int n; - for (n=0; n < (thpool->threadsN); n++){ - //printf("thread %d working?: %d\n", thpool->threads[n].id, thpool->threads[n].working); - if (thpool->threads[n].working){ - return 1; - } - } - return 0; - } - - while (any_threads_working(thpool)) { + while (thpool->jobqueue->len) { sleep(POLLING_INTERVAL); } - } /* Destroy the threadpool */ void thpool_destroy(thpool_t* thpool){ - /* End each thread's infinite loop */ - thpool_keepalive = 0; + /* End each thread 's infinite loop */ + threads_keepalive = 0; int any_threads_idle(thpool_t* thpool){ int n; for (n=0; n < (thpool->threadsN); n++){ - //printf("thread %d working?: %d\n", thpool->threads[n].id, thpool->threads[n].working); if (!thpool->threads[n].working){ return 1; } @@ -175,42 +124,21 @@ void thpool_destroy(thpool_t* thpool){ return 0; } - /* Post semaphore untill all threads have exited */ - //while (any_threads_idle(thpool)){ - // bsem_post(thpool->jobqueue->has_jobs); - // sleep(POLLING_INTERVAL); - //} - /* Kill idle threads */ double TIMEOUT = 1.0; time_t start, end; double tpassed; time (&start); - while (any_threads_idle(thpool)){ - while (tpassed < TIMEOUT){ + while (any_threads_idle(thpool)) + { + while (tpassed < TIMEOUT) + { bsem_post(thpool->jobqueue->has_jobs); time (&end); tpassed = difftime(end,start); } - } - - /*double TIMEOUT = 1.0; - time_t start, end; - double tpassed; - time (&start); - while (tpassed < TIMEOUT){ bsem_post(thpool->jobqueue->has_jobs); - time (&end); - tpassed = difftime(end,start); - }*/ - - /* Wait for working threads to finish their work*/ - //int n; - //for (n=0; n < (thpool->threadsN); n++){ - //pthread_join(thpool->threads[n].pthread, NULL); - //} - - //sleep(1); + } /* Job queue cleanup */ jobqueue_destroy(thpool); @@ -219,7 +147,127 @@ void thpool_destroy(thpool_t* thpool){ /* Dealloc */ free(thpool->threads); free(thpool); + +} + + +void thpool_pause(thpool_t* thpool) { + threads_hold(); +} + + +void thpool_continue(thpool_t* thpool) { + threads_unhold(); +} + + + + + +/* ====================== THREAD OPERATIONS ========================= */ + + +static void threads_hold () { + threads_hold_flag = 1; + while (threads_hold_flag){ + sleep(1); + } +} + + +static void threads_unhold () { + threads_hold_flag = 0; +} + + +static void thread_suicide() { + pthread_exit(NULL); +} + + +static void thread_kill(thread_t *th, int now) { + if (!now && (*th).working){ + sleep(1); + } + pthread_kill((*th).pthread, SIGTERM); +} + + +static void signal_handler (int signum) { + switch(signum){ + + case SIGUSR1: + threads_hold(); + break; + + case SIGUSR2: + threads_unhold(); + break; + + case SIGTERM: + thread_suicide(); + break; + } +} + + + +/* + * Init point for each thread + * + * */ +static void thread_do(args_t* args){ + + /* Assure all threads have been created before starting serving */ + thpool_t* thpool; + thread_t* thread; + thpool = (*args).arg1; + thread = (*args).arg2; + (*thread).working = 0; + + /* Register signal handler */ + struct sigaction act; + act.sa_handler = signal_handler; + if (sigaction(SIGUSR1, &act, NULL) == -1) { + perror("Error: cannot handle SIGUSR1"); + } + if (sigaction(SIGUSR2, &act, NULL) == -1) { + perror("Error: cannot handle SIGUSR2"); + } + if (sigaction(SIGTERM, &act, NULL) == -1) { + perror("Error: cannot handle SIGTERM"); + } + + + while(threads_keepalive){ + + bsem_wait(thpool->jobqueue->has_jobs); + (*thread).working = 1; + + if (threads_keepalive){ + + /* Read job from queue and execute it */ + void*(*func_buff)(void* arg); + void* arg_buff; + job_t* job; + pthread_mutex_lock(&thpool->rwmutex); + job = jobqueue_pull(thpool); + pthread_mutex_unlock(&thpool->rwmutex); + if (job) { + func_buff = job->function; + arg_buff = job->arg; + func_buff(arg_buff); + free(job); + } + (*thread).working = 0; + } + } + pthread_mutex_lock(&thpool->rwmutex); + thpool->threadsN --; + pthread_mutex_unlock(&thpool->rwmutex); + printf("Thread %d exiting\n", (*thread).id); + thread_suicide(); }