diff --git a/test b/test index 4fd7bf6..26c84a7 100755 Binary files a/test and b/test differ diff --git a/thpool.c b/thpool.c index f6c95ab..6ad8263 100644 --- a/thpool.c +++ b/thpool.c @@ -60,10 +60,6 @@ thpool_t* thpool_init(int threadsN){ return NULL; } - /* Initialise semaphore*/ - tp_p->queued_jobsN=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC */ - sem_init(tp_p->queued_jobsN, 0, 0); /* no shared, initial value */ - /* Make threads in pool */ int t; for (t=0; tqueued_jobsN)) { + if (sem_wait(tp_p->jobqueue->has_jobs)) { perror("thpool_thread_do(): Waiting for semaphore"); exit(1); } @@ -135,7 +131,7 @@ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){ /* add job to queue */ jobqueue_push(tp_p, newJob); - sem_post(tp_p->queued_jobsN); + printf("pushed job. queue len: %d\n", jobqueue_len(tp_p)); return 0; @@ -151,15 +147,10 @@ void thpool_destroy(thpool_t* tp_p){ /* Awake idle threads waiting at semaphore */ for (t=0; t<(tp_p->threadsN); t++){ - if (sem_post(tp_p->queued_jobsN)){ + if (sem_post(tp_p->jobqueue->has_jobs)){ fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n"); } } - - /* Kill semaphore */ - if (sem_destroy(tp_p->queued_jobsN)!=0){ - fprintf(stderr, "thpool_destroy(): Could not destroy semaphore\n"); - } /* Wait for threads to finish */ for (t=0; t<(tp_p->threadsN); t++){ @@ -170,7 +161,6 @@ void thpool_destroy(thpool_t* tp_p){ /* Dealloc */ free(tp_p->threads); /* DEALLOC threads */ - free(tp_p->queued_jobsN); /* DEALLOC job queue semaphore */ free(tp_p->jobqueue); /* DEALLOC job queue */ free(tp_p); /* DEALLOC thread pool */ } @@ -195,9 +185,7 @@ int jobqueue_init(thpool_t* tp_p){ /* How many jobs currently in queue */ int jobqueue_len(thpool_t* tp_p){ - int val; - sem_getvalue(tp_p->queued_jobsN, &val); - return val; + return tp_p->jobqueue->len; } @@ -219,6 +207,7 @@ void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev a newjob_p->prev=tp_p->jobqueue->tail; tp_p->jobqueue->tail=newjob_p; } + tp_p->jobqueue->len++; pthread_mutex_unlock(&mutex); } @@ -248,6 +237,7 @@ job_t* jobqueue_pull(thpool_t* tp_p){ tp_p->jobqueue->head=job_p->next; job_p->next->prev=tp_p->jobqueue->head; } + tp_p->jobqueue->len--; pthread_mutex_unlock(&mutex); return job_p; } diff --git a/thpool.h b/thpool.h index bb7609a..3ee03f2 100644 --- a/thpool.h +++ b/thpool.h @@ -89,6 +89,7 @@ typedef struct thpool_jobqueue{ job_t *head; /* pointer to head of queue */ job_t *tail; /* pointer to tail of queue */ sem_t *has_jobs; /* binary semaphore */ + int len; /* number of jobs in queue */ } thpool_jobqueue; @@ -96,8 +97,7 @@ typedef struct thpool_jobqueue{ typedef struct thpool_t{ pthread_t* threads; /* pointer to threads' ID */ int threadsN; /* amount of threads */ - thpool_jobqueue* jobqueue; /* pointer to the job queue */ - sem_t *queued_jobsN; /* number of jobs in queue */ + thpool_jobqueue* jobqueue; /* pointer to the job queue */ } thpool_t;