Clean working

This commit is contained in:
pithikos 2015-01-01 12:02:06 +00:00
parent 2a70d4ddf8
commit d42a436bfc

230
thpool.c
View File

@ -10,6 +10,7 @@
********************************/
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
@ -22,15 +23,17 @@
#define POLLING_INTERVAL 1
static int thpool_keepalive = 1;
static int threads_keepalive;
static int threads_hold_flag;
/* Initialise thread pool */
thpool_t* thpool_init(int threadsN){
threads_hold_flag = 0;
threads_keepalive = 1;
if (threadsN < 0){
threadsN = 0;
}
@ -58,62 +61,22 @@ thpool_t* thpool_init(int threadsN){
thpool->threadsN=threadsN;
int n;
for (n=0; n<threadsN; n++){
thread_t* th;
th = &(thpool->threads[n]);
(*th).id = n;
args_t args;
args.arg1 = thpool;
args.arg2 = th;
pthread_create(&((*th).pthread), NULL, (void *)thpool_thread_do, (args_t*)&args);
(*th).id = n;
pthread_create(&((*th).pthread), NULL, (void *)thread_do, (args_t*)&args);
pthread_detach((*th).pthread);
printf("Created thread %d in pool \n", (*th).id);
}
return thpool;
}
/* What each individual thread is doing
* */
static void thpool_thread_do(args_t* args){
thpool_t* thpool;
thread_t* thread;
thpool = (*args).arg1;
thread = (*args).arg2;
(*thread).working = 0;
while(thpool_keepalive){
//sleep(1);
printf("thread %d not working anymore\n", (*thread).id);
bsem_wait(thpool->jobqueue->has_jobs);
if (thpool_keepalive){
/* Read job from queue and execute it */
void*(*func_buff)(void* arg);
void* arg_buff;
job_t* job;
pthread_mutex_lock(&thpool->rwmutex);
job = jobqueue_pull(thpool);
pthread_mutex_unlock(&thpool->rwmutex);
if (job) {
(*thread).working = 1;
func_buff = job->function;
arg_buff = job->arg;
func_buff(arg_buff);
free(job);
}
(*thread).working = 0;
}
}
pthread_mutex_lock(&thpool->rwmutex);
thpool->threadsN --;
pthread_mutex_unlock(&thpool->rwmutex);
}
/* Add work to the thread pool */
int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){
job_t* newjob;
@ -139,35 +102,21 @@ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){
/* Wait until all jobs in queue have finished */
void thpool_wait(thpool_t* thpool){
int any_threads_working(thpool_t* thpool){
int n;
for (n=0; n < (thpool->threadsN); n++){
//printf("thread %d working?: %d\n", thpool->threads[n].id, thpool->threads[n].working);
if (thpool->threads[n].working){
return 1;
}
}
return 0;
}
while (any_threads_working(thpool)) {
while (thpool->jobqueue->len) {
sleep(POLLING_INTERVAL);
}
}
/* Destroy the threadpool */
void thpool_destroy(thpool_t* thpool){
/* End each thread's infinite loop */
thpool_keepalive = 0;
/* End each thread 's infinite loop */
threads_keepalive = 0;
int any_threads_idle(thpool_t* thpool){
int n;
for (n=0; n < (thpool->threadsN); n++){
//printf("thread %d working?: %d\n", thpool->threads[n].id, thpool->threads[n].working);
if (!thpool->threads[n].working){
return 1;
}
@ -175,42 +124,21 @@ void thpool_destroy(thpool_t* thpool){
return 0;
}
/* Post semaphore untill all threads have exited */
//while (any_threads_idle(thpool)){
// bsem_post(thpool->jobqueue->has_jobs);
// sleep(POLLING_INTERVAL);
//}
/* Kill idle threads */
double TIMEOUT = 1.0;
time_t start, end;
double tpassed;
time (&start);
while (any_threads_idle(thpool)){
while (tpassed < TIMEOUT){
while (any_threads_idle(thpool))
{
while (tpassed < TIMEOUT)
{
bsem_post(thpool->jobqueue->has_jobs);
time (&end);
tpassed = difftime(end,start);
}
}
/*double TIMEOUT = 1.0;
time_t start, end;
double tpassed;
time (&start);
while (tpassed < TIMEOUT){
bsem_post(thpool->jobqueue->has_jobs);
time (&end);
tpassed = difftime(end,start);
}*/
/* Wait for working threads to finish their work*/
//int n;
//for (n=0; n < (thpool->threadsN); n++){
//pthread_join(thpool->threads[n].pthread, NULL);
//}
//sleep(1);
}
/* Job queue cleanup */
jobqueue_destroy(thpool);
@ -219,7 +147,127 @@ void thpool_destroy(thpool_t* thpool){
/* Dealloc */
free(thpool->threads);
free(thpool);
}
void thpool_pause(thpool_t* thpool) {
threads_hold();
}
void thpool_continue(thpool_t* thpool) {
threads_unhold();
}
/* ====================== THREAD OPERATIONS ========================= */
static void threads_hold () {
threads_hold_flag = 1;
while (threads_hold_flag){
sleep(1);
}
}
static void threads_unhold () {
threads_hold_flag = 0;
}
static void thread_suicide() {
pthread_exit(NULL);
}
static void thread_kill(thread_t *th, int now) {
if (!now && (*th).working){
sleep(1);
}
pthread_kill((*th).pthread, SIGTERM);
}
static void signal_handler (int signum) {
switch(signum){
case SIGUSR1:
threads_hold();
break;
case SIGUSR2:
threads_unhold();
break;
case SIGTERM:
thread_suicide();
break;
}
}
/*
* Init point for each thread
*
* */
static void thread_do(args_t* args){
/* Assure all threads have been created before starting serving */
thpool_t* thpool;
thread_t* thread;
thpool = (*args).arg1;
thread = (*args).arg2;
(*thread).working = 0;
/* Register signal handler */
struct sigaction act;
act.sa_handler = signal_handler;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
perror("Error: cannot handle SIGUSR1");
}
if (sigaction(SIGUSR2, &act, NULL) == -1) {
perror("Error: cannot handle SIGUSR2");
}
if (sigaction(SIGTERM, &act, NULL) == -1) {
perror("Error: cannot handle SIGTERM");
}
while(threads_keepalive){
bsem_wait(thpool->jobqueue->has_jobs);
(*thread).working = 1;
if (threads_keepalive){
/* Read job from queue and execute it */
void*(*func_buff)(void* arg);
void* arg_buff;
job_t* job;
pthread_mutex_lock(&thpool->rwmutex);
job = jobqueue_pull(thpool);
pthread_mutex_unlock(&thpool->rwmutex);
if (job) {
func_buff = job->function;
arg_buff = job->arg;
func_buff(arg_buff);
free(job);
}
(*thread).working = 0;
}
}
pthread_mutex_lock(&thpool->rwmutex);
thpool->threadsN --;
pthread_mutex_unlock(&thpool->rwmutex);
printf("Thread %d exiting\n", (*thread).id);
thread_suicide();
}