From 04cf949f9fb0ff640ddcb9888a3c5fec8b50a4c2 Mon Sep 17 00:00:00 2001 From: Drew Weymouth Date: Mon, 21 Dec 2015 19:36:36 -0800 Subject: [PATCH 1/2] refactoring jobqueue interface, jobqueue embedded in thpool struct --- thpool.c | 115 ++++++++++++++++++++++++++----------------------------- 1 file changed, 54 insertions(+), 61 deletions(-) diff --git a/thpool.c b/thpool.c index 3d682cd..ba9963a 100644 --- a/thpool.c +++ b/thpool.c @@ -75,7 +75,7 @@ typedef struct thpool_{ volatile int num_threads_working; /* threads currently working */ pthread_mutex_t thcount_lock; /* used for thread count etc */ pthread_cond_t threads_all_idle; /* signal to thpool_wait */ - jobqueue* jobqueue_p; /* pointer to the job queue */ + jobqueue jobqueue; /* job queue */ } thpool_; @@ -90,11 +90,11 @@ static void* thread_do(struct thread* thread_p); static void thread_hold(); static void thread_destroy(struct thread* thread_p); -static int jobqueue_init(thpool_* thpool_p); -static void jobqueue_clear(thpool_* thpool_p); -static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p); -static struct job* jobqueue_pull(thpool_* thpool_p); -static void jobqueue_destroy(thpool_* thpool_p); +static int jobqueue_init(jobqueue* jobqueue_p); +static void jobqueue_clear(jobqueue* jobqueue_p); +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p); +static struct job* jobqueue_pull(jobqueue* jobqueue_p); +static void jobqueue_destroy(jobqueue* jobqueue_p); static void bsem_init(struct bsem *bsem_p, int value); static void bsem_reset(struct bsem *bsem_p); @@ -130,7 +130,7 @@ struct thpool_* thpool_init(int num_threads){ thpool_p->num_threads_working = 0; /* Initialise the job queue */ - if (jobqueue_init(thpool_p) == -1){ + if (jobqueue_init(&thpool_p->jobqueue) == -1){ fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); free(thpool_p); return NULL; @@ -140,8 +140,7 @@ struct thpool_* thpool_init(int num_threads){ thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread)); if (thpool_p->threads == NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n"); - jobqueue_destroy(thpool_p); - free(thpool_p->jobqueue_p); + jobqueue_destroy(&thpool_p->jobqueue); free(thpool_p); return NULL; } @@ -179,9 +178,9 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ newjob->arg=arg_p; /* add job to queue */ - pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); - jobqueue_push(thpool_p, newjob); - pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); + pthread_mutex_lock(&thpool_p->jobqueue.rwmutex); + jobqueue_push(&thpool_p->jobqueue, newjob); + pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex); return 0; } @@ -190,7 +189,7 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ /* Wait until all jobs have finished */ void thpool_wait(thpool_* thpool_p){ pthread_mutex_lock(&thpool_p->thcount_lock); - while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working) { + while (thpool_p->jobqueue.len || thpool_p->num_threads_working) { pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock); } pthread_mutex_unlock(&thpool_p->thcount_lock); @@ -211,20 +210,19 @@ void thpool_destroy(thpool_* thpool_p){ double tpassed = 0.0; time (&start); while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue_p->has_jobs); + bsem_post_all(thpool_p->jobqueue.has_jobs); time (&end); tpassed = difftime(end,start); } /* Poll remaining threads */ while (thpool_p->num_threads_alive){ - bsem_post_all(thpool_p->jobqueue_p->has_jobs); + bsem_post_all(thpool_p->jobqueue.has_jobs); sleep(1); } /* Job queue cleanup */ - jobqueue_destroy(thpool_p); - free(thpool_p->jobqueue_p); + jobqueue_destroy(&thpool_p->jobqueue); /* Deallocs */ int n; @@ -322,7 +320,7 @@ static void* thread_do(struct thread* thread_p){ while(threads_keepalive){ - bsem_wait(thpool_p->jobqueue_p->has_jobs); + bsem_wait(thpool_p->jobqueue.has_jobs); if (threads_keepalive){ @@ -334,9 +332,9 @@ static void* thread_do(struct thread* thread_p){ void*(*func_buff)(void* arg); void* arg_buff; job* job_p; - pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); - job_p = jobqueue_pull(thpool_p); - pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); + pthread_mutex_lock(&thpool_p->jobqueue.rwmutex); + job_p = jobqueue_pull(&thpool_p->jobqueue); + pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex); if (job_p) { func_buff = job_p->function; arg_buff = job_p->arg; @@ -374,39 +372,34 @@ static void thread_destroy (thread* thread_p){ /* Initialize queue */ -static int jobqueue_init(thpool_* thpool_p){ - - thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue)); - if (thpool_p->jobqueue_p == NULL){ - return -1; - } - thpool_p->jobqueue_p->len = 0; - thpool_p->jobqueue_p->front = NULL; - thpool_p->jobqueue_p->rear = NULL; +static int jobqueue_init(jobqueue* jobqueue_p){ + jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; - thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); - if (thpool_p->jobqueue_p->has_jobs == NULL){ + jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); + if (jobqueue_p->has_jobs == NULL){ return -1; } - pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL); - bsem_init(thpool_p->jobqueue_p->has_jobs, 0); + pthread_mutex_init(&(jobqueue_p->rwmutex), NULL); + bsem_init(jobqueue_p->has_jobs, 0); return 0; } /* Clear the queue */ -static void jobqueue_clear(thpool_* thpool_p){ +static void jobqueue_clear(jobqueue* jobqueue_p){ - while(thpool_p->jobqueue_p->len){ - free(jobqueue_pull(thpool_p)); + while(jobqueue_p->len){ + free(jobqueue_pull(jobqueue_p)); } - thpool_p->jobqueue_p->front = NULL; - thpool_p->jobqueue_p->rear = NULL; - bsem_reset(thpool_p->jobqueue_p->has_jobs); - thpool_p->jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + bsem_reset(jobqueue_p->has_jobs); + jobqueue_p->len = 0; } @@ -415,25 +408,25 @@ static void jobqueue_clear(thpool_* thpool_p){ * * Notice: Caller MUST hold a mutex */ -static void jobqueue_push(thpool_* thpool_p, struct job* newjob){ +static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ newjob->prev = NULL; - switch(thpool_p->jobqueue_p->len){ + switch(jobqueue_p->len){ case 0: /* if no jobs in queue */ - thpool_p->jobqueue_p->front = newjob; - thpool_p->jobqueue_p->rear = newjob; + jobqueue_p->front = newjob; + jobqueue_p->rear = newjob; break; default: /* if jobs in queue */ - thpool_p->jobqueue_p->rear->prev = newjob; - thpool_p->jobqueue_p->rear = newjob; + jobqueue_p->rear->prev = newjob; + jobqueue_p->rear = newjob; } - thpool_p->jobqueue_p->len++; + jobqueue_p->len++; - bsem_post(thpool_p->jobqueue_p->has_jobs); + bsem_post(jobqueue_p->has_jobs); } @@ -441,27 +434,27 @@ static void jobqueue_push(thpool_* thpool_p, struct job* newjob){ * * Notice: Caller MUST hold a mutex */ -static struct job* jobqueue_pull(thpool_* thpool_p){ +static struct job* jobqueue_pull(jobqueue* jobqueue_p){ job* job_p; - job_p = thpool_p->jobqueue_p->front; + job_p = jobqueue_p->front; - switch(thpool_p->jobqueue_p->len){ + switch(jobqueue_p->len){ case 0: /* if no jobs in queue */ break; case 1: /* if one job in queue */ - thpool_p->jobqueue_p->front = NULL; - thpool_p->jobqueue_p->rear = NULL; - thpool_p->jobqueue_p->len = 0; + jobqueue_p->front = NULL; + jobqueue_p->rear = NULL; + jobqueue_p->len = 0; break; default: /* if >1 jobs in queue */ - thpool_p->jobqueue_p->front = job_p->prev; - thpool_p->jobqueue_p->len--; + jobqueue_p->front = job_p->prev; + jobqueue_p->len--; /* more than one job in queue -> post it */ - bsem_post(thpool_p->jobqueue_p->has_jobs); + bsem_post(jobqueue_p->has_jobs); } @@ -470,9 +463,9 @@ static struct job* jobqueue_pull(thpool_* thpool_p){ /* Free all queue resources back to the system */ -static void jobqueue_destroy(thpool_* thpool_p){ - jobqueue_clear(thpool_p); - free(thpool_p->jobqueue_p->has_jobs); +static void jobqueue_destroy(jobqueue* jobqueue_p){ + jobqueue_clear(jobqueue_p); + free(jobqueue_p->has_jobs); } From da2c0fe45e43ce0937f272c8cd2704bdc0afb490 Mon Sep 17 00:00:00 2001 From: Drew Weymouth Date: Mon, 21 Dec 2015 19:45:06 -0800 Subject: [PATCH 2/2] move locking inside jobqueue_push and pull --- thpool.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/thpool.c b/thpool.c index ba9963a..4f4c618 100644 --- a/thpool.c +++ b/thpool.c @@ -178,9 +178,7 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ newjob->arg=arg_p; /* add job to queue */ - pthread_mutex_lock(&thpool_p->jobqueue.rwmutex); jobqueue_push(&thpool_p->jobqueue, newjob); - pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex); return 0; } @@ -331,10 +329,7 @@ static void* thread_do(struct thread* thread_p){ /* Read job from queue and execute it */ void*(*func_buff)(void* arg); void* arg_buff; - job* job_p; - pthread_mutex_lock(&thpool_p->jobqueue.rwmutex); - job_p = jobqueue_pull(&thpool_p->jobqueue); - pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex); + job* job_p = jobqueue_pull(&thpool_p->jobqueue); if (job_p) { func_buff = job_p->function; arg_buff = job_p->arg; @@ -405,11 +400,10 @@ static void jobqueue_clear(jobqueue* jobqueue_p){ /* Add (allocated) job to queue - * - * Notice: Caller MUST hold a mutex */ static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ + pthread_mutex_lock(&jobqueue_p->rwmutex); newjob->prev = NULL; switch(jobqueue_p->len){ @@ -427,17 +421,16 @@ static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){ jobqueue_p->len++; bsem_post(jobqueue_p->has_jobs); + pthread_mutex_unlock(&jobqueue_p->rwmutex); } /* Get first job from queue(removes it from queue) - * - * Notice: Caller MUST hold a mutex */ static struct job* jobqueue_pull(jobqueue* jobqueue_p){ - job* job_p; - job_p = jobqueue_p->front; + pthread_mutex_lock(&jobqueue_p->rwmutex); + job* job_p = jobqueue_p->front; switch(jobqueue_p->len){ @@ -457,7 +450,8 @@ static struct job* jobqueue_pull(jobqueue* jobqueue_p){ bsem_post(jobqueue_p->has_jobs); } - + + pthread_mutex_unlock(&jobqueue_p->rwmutex); return job_p; }