refactoring jobqueue interface, jobqueue embedded in thpool struct

This commit is contained in:
Drew Weymouth 2015-12-21 19:36:36 -08:00
parent d6a9c83c3a
commit 04cf949f9f

115
thpool.c
View File

@ -75,7 +75,7 @@ typedef struct thpool_{
volatile int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
pthread_cond_t threads_all_idle; /* signal to thpool_wait */
jobqueue* jobqueue_p; /* pointer to the job queue */
jobqueue jobqueue; /* job queue */
} thpool_;
@ -90,11 +90,11 @@ static void* thread_do(struct thread* thread_p);
static void thread_hold();
static void thread_destroy(struct thread* thread_p);
static int jobqueue_init(thpool_* thpool_p);
static void jobqueue_clear(thpool_* thpool_p);
static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p);
static struct job* jobqueue_pull(thpool_* thpool_p);
static void jobqueue_destroy(thpool_* thpool_p);
static int jobqueue_init(jobqueue* jobqueue_p);
static void jobqueue_clear(jobqueue* jobqueue_p);
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob_p);
static struct job* jobqueue_pull(jobqueue* jobqueue_p);
static void jobqueue_destroy(jobqueue* jobqueue_p);
static void bsem_init(struct bsem *bsem_p, int value);
static void bsem_reset(struct bsem *bsem_p);
@ -130,7 +130,7 @@ struct thpool_* thpool_init(int num_threads){
thpool_p->num_threads_working = 0;
/* Initialise the job queue */
if (jobqueue_init(thpool_p) == -1){
if (jobqueue_init(&thpool_p->jobqueue) == -1){
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
free(thpool_p);
return NULL;
@ -140,8 +140,7 @@ struct thpool_* thpool_init(int num_threads){
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread));
if (thpool_p->threads == NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
jobqueue_destroy(thpool_p);
free(thpool_p->jobqueue_p);
jobqueue_destroy(&thpool_p->jobqueue);
free(thpool_p);
return NULL;
}
@ -179,9 +178,9 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
newjob->arg=arg_p;
/* add job to queue */
pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
jobqueue_push(thpool_p, newjob);
pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
pthread_mutex_lock(&thpool_p->jobqueue.rwmutex);
jobqueue_push(&thpool_p->jobqueue, newjob);
pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex);
return 0;
}
@ -190,7 +189,7 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
pthread_mutex_lock(&thpool_p->thcount_lock);
while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working) {
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
}
pthread_mutex_unlock(&thpool_p->thcount_lock);
@ -211,20 +210,19 @@ void thpool_destroy(thpool_* thpool_p){
double tpassed = 0.0;
time (&start);
while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue_p->has_jobs);
bsem_post_all(thpool_p->jobqueue.has_jobs);
time (&end);
tpassed = difftime(end,start);
}
/* Poll remaining threads */
while (thpool_p->num_threads_alive){
bsem_post_all(thpool_p->jobqueue_p->has_jobs);
bsem_post_all(thpool_p->jobqueue.has_jobs);
sleep(1);
}
/* Job queue cleanup */
jobqueue_destroy(thpool_p);
free(thpool_p->jobqueue_p);
jobqueue_destroy(&thpool_p->jobqueue);
/* Deallocs */
int n;
@ -322,7 +320,7 @@ static void* thread_do(struct thread* thread_p){
while(threads_keepalive){
bsem_wait(thpool_p->jobqueue_p->has_jobs);
bsem_wait(thpool_p->jobqueue.has_jobs);
if (threads_keepalive){
@ -334,9 +332,9 @@ static void* thread_do(struct thread* thread_p){
void*(*func_buff)(void* arg);
void* arg_buff;
job* job_p;
pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
job_p = jobqueue_pull(thpool_p);
pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
pthread_mutex_lock(&thpool_p->jobqueue.rwmutex);
job_p = jobqueue_pull(&thpool_p->jobqueue);
pthread_mutex_unlock(&thpool_p->jobqueue.rwmutex);
if (job_p) {
func_buff = job_p->function;
arg_buff = job_p->arg;
@ -374,39 +372,34 @@ static void thread_destroy (thread* thread_p){
/* Initialize queue */
static int jobqueue_init(thpool_* thpool_p){
thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue));
if (thpool_p->jobqueue_p == NULL){
return -1;
}
thpool_p->jobqueue_p->len = 0;
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
static int jobqueue_init(jobqueue* jobqueue_p){
jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (thpool_p->jobqueue_p->has_jobs == NULL){
jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (jobqueue_p->has_jobs == NULL){
return -1;
}
pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
pthread_mutex_init(&(jobqueue_p->rwmutex), NULL);
bsem_init(jobqueue_p->has_jobs, 0);
return 0;
}
/* Clear the queue */
static void jobqueue_clear(thpool_* thpool_p){
static void jobqueue_clear(jobqueue* jobqueue_p){
while(thpool_p->jobqueue_p->len){
free(jobqueue_pull(thpool_p));
while(jobqueue_p->len){
free(jobqueue_pull(jobqueue_p));
}
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
bsem_reset(thpool_p->jobqueue_p->has_jobs);
thpool_p->jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
bsem_reset(jobqueue_p->has_jobs);
jobqueue_p->len = 0;
}
@ -415,25 +408,25 @@ static void jobqueue_clear(thpool_* thpool_p){
*
* Notice: Caller MUST hold a mutex
*/
static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
newjob->prev = NULL;
switch(thpool_p->jobqueue_p->len){
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
thpool_p->jobqueue_p->front = newjob;
thpool_p->jobqueue_p->rear = newjob;
jobqueue_p->front = newjob;
jobqueue_p->rear = newjob;
break;
default: /* if jobs in queue */
thpool_p->jobqueue_p->rear->prev = newjob;
thpool_p->jobqueue_p->rear = newjob;
jobqueue_p->rear->prev = newjob;
jobqueue_p->rear = newjob;
}
thpool_p->jobqueue_p->len++;
jobqueue_p->len++;
bsem_post(thpool_p->jobqueue_p->has_jobs);
bsem_post(jobqueue_p->has_jobs);
}
@ -441,27 +434,27 @@ static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
*
* Notice: Caller MUST hold a mutex
*/
static struct job* jobqueue_pull(thpool_* thpool_p){
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
job* job_p;
job_p = thpool_p->jobqueue_p->front;
job_p = jobqueue_p->front;
switch(thpool_p->jobqueue_p->len){
switch(jobqueue_p->len){
case 0: /* if no jobs in queue */
break;
case 1: /* if one job in queue */
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
thpool_p->jobqueue_p->len = 0;
jobqueue_p->front = NULL;
jobqueue_p->rear = NULL;
jobqueue_p->len = 0;
break;
default: /* if >1 jobs in queue */
thpool_p->jobqueue_p->front = job_p->prev;
thpool_p->jobqueue_p->len--;
jobqueue_p->front = job_p->prev;
jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(thpool_p->jobqueue_p->has_jobs);
bsem_post(jobqueue_p->has_jobs);
}
@ -470,9 +463,9 @@ static struct job* jobqueue_pull(thpool_* thpool_p){
/* Free all queue resources back to the system */
static void jobqueue_destroy(thpool_* thpool_p){
jobqueue_clear(thpool_p);
free(thpool_p->jobqueue_p->has_jobs);
static void jobqueue_destroy(jobqueue* jobqueue_p){
jobqueue_clear(jobqueue_p);
free(jobqueue_p->has_jobs);
}