mirror of
https://github.com/Pithikos/C-Thread-Pool
synced 2025-02-19 23:33:59 +03:00
Deadlocks resolved
This commit is contained in:
parent
84e9a02a34
commit
8fb75e9a82
116
thpool.c
116
thpool.c
@ -16,17 +16,11 @@
|
||||
#include <semaphore.h>
|
||||
#include <errno.h>
|
||||
|
||||
|
||||
#include "thpool.h" /* here you can also find the interface to each function */
|
||||
|
||||
#define POLLING_INTERVAL 1
|
||||
|
||||
|
||||
|
||||
|
||||
static int thpool_keepalive = 1;
|
||||
|
||||
/* Create mutex variable */
|
||||
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
|
||||
|
||||
|
||||
/* Initialise thread pool */
|
||||
@ -73,6 +67,7 @@ static void thpool_thread_do(thpool_t* tp_p){
|
||||
|
||||
while(thpool_keepalive){
|
||||
|
||||
//printf("**** pre bsem_wait: bsem: %d, len: %d\n", tp_p->jobqueue->has_jobs->v, tp_p->jobqueue->len);
|
||||
bsem_wait(tp_p->jobqueue->has_jobs);
|
||||
|
||||
if (thpool_keepalive){
|
||||
@ -81,7 +76,9 @@ static void thpool_thread_do(thpool_t* tp_p){
|
||||
void*(*func_buff)(void* arg);
|
||||
void* arg_buff;
|
||||
job_t* job_p;
|
||||
pthread_mutex_lock(&tp_p->rwmutex);
|
||||
job_p = jobqueue_pull(tp_p);
|
||||
pthread_mutex_unlock(&tp_p->rwmutex);
|
||||
if (job_p) {
|
||||
func_buff=job_p->function;
|
||||
arg_buff =job_p->arg;
|
||||
@ -92,10 +89,10 @@ static void thpool_thread_do(thpool_t* tp_p){
|
||||
}
|
||||
else
|
||||
{
|
||||
return; /* EXIT thread*/
|
||||
exit(0); /* EXIT thread*/
|
||||
}
|
||||
}
|
||||
return;
|
||||
exit(0);
|
||||
}
|
||||
|
||||
|
||||
@ -114,7 +111,9 @@ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
|
||||
newJob->arg=arg_p;
|
||||
|
||||
/* add job to queue */
|
||||
pthread_mutex_lock(&tp_p->rwmutex);
|
||||
jobqueue_push(tp_p, newJob);
|
||||
pthread_mutex_unlock(&tp_p->rwmutex);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -130,43 +129,41 @@ void thpool_wait(thpool_t* tp_p){
|
||||
|
||||
/* Destroy the threadpool */
|
||||
void thpool_destroy(thpool_t* tp_p){
|
||||
|
||||
int t;
|
||||
|
||||
|
||||
/* End each thread's infinite loop */
|
||||
thpool_keepalive=0;
|
||||
|
||||
/* Awake idle threads waiting at semaphore */
|
||||
thpool_keepalive = 0;
|
||||
|
||||
//for (t=0; t<(tp_p->threadsN); t++){
|
||||
//bsem_post(tp_p->jobqueue->has_jobs);
|
||||
/*if (){
|
||||
fprintf(stderr, "thpool_destroy(): Could not bypass sem_wait()\n");
|
||||
}*/
|
||||
//}
|
||||
|
||||
/* Wait for threads to finish */
|
||||
for (t=0; t < (tp_p->threadsN); t++){
|
||||
pthread_join(tp_p->threads[t], NULL);
|
||||
}
|
||||
|
||||
jobqueue_empty(tp_p);
|
||||
|
||||
//for (t=0; t < (tp_p->threadsN); t++){
|
||||
// sleep(1);
|
||||
/// bsem_post(tp_p->jobqueue->has_jobs);
|
||||
// pthread_join(tp_p->threads[t], NULL);
|
||||
//}
|
||||
|
||||
/* Wait for threads to finish */
|
||||
//for (t=0; t < (tp_p->threadsN); t++){
|
||||
// pthread_join(tp_p->threads[t], NULL);
|
||||
//}
|
||||
|
||||
/* Awake idle threads waiting at semaphore */
|
||||
|
||||
//bsem_post(tp_p->jobqueue->has_jobs);
|
||||
|
||||
//jobqueue_empty(tp_p);
|
||||
|
||||
/* Dealloc */
|
||||
free(tp_p->threads);
|
||||
free(tp_p->jobqueue);
|
||||
free(tp_p);
|
||||
//free(tp_p->threads);
|
||||
//free(tp_p->jobqueue);
|
||||
//free(tp_p);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* ===================== JOB QUEUE OPERATIONS ======================= */
|
||||
|
||||
/*
|
||||
* Requirements for job queue:
|
||||
* - Thread-safe push/pull
|
||||
* - Binary semaphore for has_jobs must be 1 if there are jobs, 0 if not
|
||||
*
|
||||
* */
|
||||
|
||||
|
||||
/* Initialise queue */
|
||||
@ -175,25 +172,22 @@ static int jobqueue_init(thpool_t* tp_p){
|
||||
if (tp_p->jobqueue==NULL) return -1;
|
||||
tp_p->jobqueue->tail=NULL;
|
||||
tp_p->jobqueue->head=NULL;
|
||||
tp_p->jobqueue->has_jobs=(bsem_t*)malloc(sizeof(bsem_t));
|
||||
|
||||
tp_p->jobqueue->has_jobs = (bsem_t*)malloc(sizeof(bsem_t));
|
||||
tp_p->jobqueue->has_jobs->v = 0;
|
||||
|
||||
tp_p->jobqueue->len = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* How many jobs currently in queue */
|
||||
static int jobqueue_len(thpool_t* tp_p){
|
||||
return tp_p->jobqueue->len;
|
||||
}
|
||||
|
||||
|
||||
/* Add job to queue */
|
||||
static void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev and next point to NULL */
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
newjob_p->next=NULL;
|
||||
|
||||
switch(jobqueue_len(tp_p)){
|
||||
switch(tp_p->jobqueue->len){
|
||||
|
||||
case 0: /* if there are no jobs in queue */
|
||||
tp_p->jobqueue->tail=newjob_p;
|
||||
@ -207,24 +201,20 @@ static void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job
|
||||
}
|
||||
tp_p->jobqueue->len++;
|
||||
bsem_post(tp_p->jobqueue->has_jobs);
|
||||
pthread_mutex_unlock(&mutex);
|
||||
}
|
||||
|
||||
|
||||
/* Get first element from queue */
|
||||
static job_t* jobqueue_pull(thpool_t* tp_p){
|
||||
|
||||
pthread_mutex_lock(&mutex);
|
||||
//puts("**** PULLING");
|
||||
|
||||
|
||||
/* get first job */
|
||||
job_t* job_p;
|
||||
job_p = tp_p->jobqueue->head;
|
||||
printf(" queue length: %d\n", jobqueue_len(tp_p));
|
||||
printf(" has jobs: %d\n", tp_p->jobqueue->has_jobs->v);
|
||||
|
||||
/* remove job from queue */
|
||||
switch(jobqueue_len(tp_p)){
|
||||
switch(tp_p->jobqueue->len){
|
||||
|
||||
case 0: /* if there are no jobs in queue */
|
||||
return NULL;
|
||||
@ -245,22 +235,19 @@ static job_t* jobqueue_pull(thpool_t* tp_p){
|
||||
if (tp_p->jobqueue->len > 0) {
|
||||
bsem_post(tp_p->jobqueue->has_jobs);
|
||||
}
|
||||
puts(" (after pull)");
|
||||
printf(" queue length: %d\n", jobqueue_len(tp_p));
|
||||
printf(" has jobs: %d\n", tp_p->jobqueue->has_jobs->v);
|
||||
|
||||
pthread_mutex_unlock(&mutex);
|
||||
|
||||
return job_p;
|
||||
}
|
||||
|
||||
|
||||
/* Remove and deallocate all jobs in queue */
|
||||
static void jobqueue_empty(thpool_t* tp_p){
|
||||
//pthread_mutex_lock(&tp_p->jobqueue->mutex);
|
||||
|
||||
job_t* curjob;
|
||||
curjob=tp_p->jobqueue->tail;
|
||||
|
||||
while(jobqueue_len(tp_p)){
|
||||
while(tp_p->jobqueue->len){
|
||||
tp_p->jobqueue->tail=curjob->prev;
|
||||
free(curjob);
|
||||
curjob=tp_p->jobqueue->tail;
|
||||
@ -272,6 +259,8 @@ static void jobqueue_empty(thpool_t* tp_p){
|
||||
|
||||
/* Deallocs */
|
||||
free(tp_p->jobqueue->has_jobs);
|
||||
|
||||
//pthread_mutex_unlock(&tp_p->jobqueue->mutex);
|
||||
}
|
||||
|
||||
|
||||
@ -280,17 +269,18 @@ static void jobqueue_empty(thpool_t* tp_p){
|
||||
|
||||
|
||||
static void bsem_post(bsem_t *bsem) {
|
||||
pthread_mutex_lock(&bsem->mutex);
|
||||
bsem->v = 1;
|
||||
pthread_cond_signal(&bsem->cond);
|
||||
pthread_mutex_unlock(&bsem->mutex);
|
||||
pthread_mutex_lock(&bsem->mutex);
|
||||
bsem->v = 1;
|
||||
pthread_cond_signal(&bsem->cond);
|
||||
pthread_mutex_unlock(&bsem->mutex);
|
||||
}
|
||||
|
||||
|
||||
static void bsem_wait(bsem_t *bsem) {
|
||||
pthread_mutex_lock(&bsem->mutex);
|
||||
while (!bsem->v)
|
||||
pthread_cond_wait(&bsem->cond, &bsem->mutex);
|
||||
bsem->v = 0;
|
||||
pthread_mutex_unlock(&bsem->mutex);
|
||||
pthread_mutex_lock(&bsem->mutex);
|
||||
while (bsem->v != 1) {
|
||||
pthread_cond_wait(&bsem->cond, &bsem->mutex);
|
||||
}
|
||||
bsem->v = 0;
|
||||
pthread_mutex_unlock(&bsem->mutex);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user