Merge pull request #19 from marioli/master

Fixes for jobqqeue_init and removed exit()
This commit is contained in:
Johan 2015-06-28 19:05:47 +01:00
commit ab2c0bb8ad

View File

@ -125,26 +125,31 @@ struct thpool_* thpool_init(int num_threads){
/* Make new thread pool */ /* Make new thread pool */
thpool_* thpool_p; thpool_* thpool_p;
thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
if (thpool_p==NULL){ if (thpool_p == NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
exit(1); return NULL;
} }
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
thpool_p->num_threads_alive = 0; thpool_p->num_threads_alive = 0;
thpool_p->num_threads_working = 0; thpool_p->num_threads_working = 0;
/* Initialise the job queue */ /* Initialise the job queue */
if (jobqueue_init(thpool_p)==-1){ if (jobqueue_init(thpool_p) == -1){
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
exit(1); free(thpool_p);
return NULL;
} }
/* Make threads in pool */ /* Make threads in pool */
thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread)); thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread));
if (thpool_p->threads==NULL){ if (thpool_p->threads == NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n"); fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
exit(1); jobqueue_destroy(thpool_p);
free(thpool_p->jobqueue_p);
free(thpool_p);
return NULL;
} }
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
/* Thread init */ /* Thread init */
int n; int n;
@ -400,18 +405,21 @@ static void thread_destroy (thread* thread_p){
static int jobqueue_init(thpool_* thpool_p){ static int jobqueue_init(thpool_* thpool_p){
thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue)); thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue));
pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
if (thpool_p->jobqueue_p == NULL){ if (thpool_p->jobqueue_p == NULL){
return -1; return -1;
} }
thpool_p->jobqueue_p->len = 0;
thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL;
thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem));
if (thpool_p->jobqueue_p->has_jobs == NULL){ if (thpool_p->jobqueue_p->has_jobs == NULL){
return -1; return -1;
} }
pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
bsem_init(thpool_p->jobqueue_p->has_jobs, 0); bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
thpool_p->jobqueue_p->len = 0;
jobqueue_clear(thpool_p);
return 0; return 0;
} }
@ -469,24 +477,22 @@ static struct job* jobqueue_pull(thpool_* thpool_p){
switch(thpool_p->jobqueue_p->len){ switch(thpool_p->jobqueue_p->len){
case 0: /* if no jobs in queue */ case 0: /* if no jobs in queue */
return NULL; break;
case 1: /* if one job in queue */ case 1: /* if one job in queue */
thpool_p->jobqueue_p->front = NULL; thpool_p->jobqueue_p->front = NULL;
thpool_p->jobqueue_p->rear = NULL; thpool_p->jobqueue_p->rear = NULL;
thpool_p->jobqueue_p->len = 0;
break; break;
default: /* if >1 jobs in queue */ default: /* if >1 jobs in queue */
thpool_p->jobqueue_p->front = job_p->prev; thpool_p->jobqueue_p->front = job_p->prev;
thpool_p->jobqueue_p->len--;
/* more than one job in queue -> post it */
bsem_post(thpool_p->jobqueue_p->has_jobs);
} }
thpool_p->jobqueue_p->len--;
/* Make sure has_jobs has right value */
if (thpool_p->jobqueue_p->len > 0) {
bsem_post(thpool_p->jobqueue_p->has_jobs);
}
return job_p; return job_p;
} }