Removed job info from threadpool struct

This commit is contained in:
pithikos 2014-12-29 13:23:44 +00:00
parent 47eac3da93
commit 66fd978dd4
3 changed files with 8 additions and 18 deletions

BIN
test

Binary file not shown.

View File

@ -60,10 +60,6 @@ thpool_t* thpool_init(int threadsN){
return NULL;
}
/* Initialise semaphore*/
tp_p->queued_jobsN=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC */
sem_init(tp_p->queued_jobsN, 0, 0); /* no shared, initial value */
/* Make threads in pool */
int t;
for (t=0; t<threadsN; t++){
@ -87,7 +83,7 @@ void thpool_thread_do(thpool_t* tp_p){
/* WAITING until there is work in the queue.
* Notice that sem_wait will decrement the jobqueue by 1 */
if (sem_wait(tp_p->queued_jobsN)) {
if (sem_wait(tp_p->jobqueue->has_jobs)) {
perror("thpool_thread_do(): Waiting for semaphore");
exit(1);
}
@ -135,7 +131,7 @@ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
/* add job to queue */
jobqueue_push(tp_p, newJob);
sem_post(tp_p->queued_jobsN);
printf("pushed job. queue len: %d\n", jobqueue_len(tp_p));
return 0;
@ -151,15 +147,10 @@ void thpool_destroy(thpool_t* tp_p){
/* Awake idle threads waiting at semaphore */
for (t=0; t<(tp_p->threadsN); t++){
if (sem_post(tp_p->queued_jobsN)){
if (sem_post(tp_p->jobqueue->has_jobs)){
fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
}
}
/* Kill semaphore */
if (sem_destroy(tp_p->queued_jobsN)!=0){
fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n");
}
/* Wait for threads to finish */
for (t=0; t<(tp_p->threadsN); t++){
@ -170,7 +161,6 @@ void thpool_destroy(thpool_t* tp_p){
/* Dealloc */
free(tp_p->threads); /* DEALLOC threads */
free(tp_p->queued_jobsN); /* DEALLOC job queue semaphore */
free(tp_p->jobqueue); /* DEALLOC job queue */
free(tp_p); /* DEALLOC thread pool */
}
@ -195,9 +185,7 @@ int jobqueue_init(thpool_t* tp_p){
/* How many jobs currently in queue */
int jobqueue_len(thpool_t* tp_p){
int val;
sem_getvalue(tp_p->queued_jobsN, &val);
return val;
return tp_p->jobqueue->len;
}
@ -219,6 +207,7 @@ void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev a
newjob_p->prev=tp_p->jobqueue->tail;
tp_p->jobqueue->tail=newjob_p;
}
tp_p->jobqueue->len++;
pthread_mutex_unlock(&mutex);
}
@ -248,6 +237,7 @@ job_t* jobqueue_pull(thpool_t* tp_p){
tp_p->jobqueue->head=job_p->next;
job_p->next->prev=tp_p->jobqueue->head;
}
tp_p->jobqueue->len--;
pthread_mutex_unlock(&mutex);
return job_p;
}

View File

@ -89,6 +89,7 @@ typedef struct thpool_jobqueue{
job_t *head; /* pointer to head of queue */
job_t *tail; /* pointer to tail of queue */
sem_t *has_jobs; /* binary semaphore */
int len; /* number of jobs in queue */
} thpool_jobqueue;
@ -96,8 +97,7 @@ typedef struct thpool_jobqueue{
typedef struct thpool_t{
pthread_t* threads; /* pointer to threads' ID */
int threadsN; /* amount of threads */
thpool_jobqueue* jobqueue; /* pointer to the job queue */
sem_t *queued_jobsN; /* number of jobs in queue */
thpool_jobqueue* jobqueue; /* pointer to the job queue */
} thpool_t;