Queue operations are thread-safe

This commit is contained in:
pithikos 2014-12-29 13:14:45 +00:00
parent 720c3ba68c
commit cfff149a5c
4 changed files with 53 additions and 56 deletions

BIN
test Executable file

Binary file not shown.

25
test.c
View File

@ -5,22 +5,23 @@ pthread_mutex_t mutex_sum = PTHREAD_MUTEX_INITIALIZER;
int sum=0;
void add_num(int num) {
/*void add_num(int num) {
// serialize access to sum variable (solves race conditions)
pthread_mutex_lock(&mutex_sum);
printf("%u: sum is %d\n", pthread_self(), sum);
sum+=num;
printf("%u: added %d\n", pthread_self(), num);
pthread_mutex_unlock(&mutex_sum);
}
}*/
void task1() {
pthread_mutex_lock(&mutex_sum);
sum++;
printf("%u: sum+1=%d\n", pthread_self(), sum);
pthread_mutex_unlock(&mutex_sum);
puts("&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&");
pthread_mutex_lock(&mutex_sum);
sum++;
printf("%u: sum+1=%d\n", (int)pthread_self(), sum);
pthread_mutex_unlock(&mutex_sum);
}
@ -30,22 +31,16 @@ int main(){
threadpool=thpool_init(1);
// Call a bunch of tasks
int i;
for (i=1; i<=5; i++){
printf("added work %d\n", i);
//thpool_add_work(threadpool, (void*)add_num, (void*)1);
thpool_add_work(threadpool, (void*)task1, NULL);
};
thpool_add_work(threadpool, (void*)task1, NULL);
printf("there are %d jobs in queue\n", jobqueue_len(threadpool));
//puts("Waiting for work to finish");
puts("MAIN THREAD SLEEPING 2 SECS");
sleep(2);
//thpool_wait(threadpool);
//thpool_destroy(threadpool);
//thpool_destroy(threadpool);
printf("Sum is: %d\n", sum);
return 0;
return 0;
}

View File

@ -29,7 +29,7 @@
#include "thpool.h" /* here you can also find the interface to each function */
static int thpool_keepalive=1;
static int thpool_keepalive = 1;
/* Create mutex variable */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; /* used to serialize queue access */
@ -81,11 +81,13 @@ thpool_t* thpool_init(int threadsN){
* the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */
void thpool_thread_do(thpool_t* tp_p){
sleep(1);
while(thpool_keepalive){
if (sem_wait(tp_p->queued_jobsN)) {/* WAITING until there is work in the queue */
printf("LEN: %d jobs in queue\n", jobqueue_len(tp_p));
/* WAITING until there is work in the queue.
* Notice that sem_wait will decrement the jobqueue by 1 */
if (sem_wait(tp_p->queued_jobsN)) {
perror("thpool_thread_do(): Waiting for semaphore");
exit(1);
}
@ -98,16 +100,17 @@ void thpool_thread_do(thpool_t* tp_p){
job_t* job_p;
pthread_mutex_lock(&mutex); /* LOCK */
puts("pulling job");
puts("pulling job");
job_p = jobqueue_pull(tp_p);
pthread_mutex_unlock(&mutex); /* UNLOCK */
pthread_mutex_unlock(&mutex); /* UNLOCK */
if (job_p) {
func_buff=job_p->function;
arg_buff =job_p->arg;
func_buff(arg_buff); /* run function */
free(job_p); /* DEALLOC job */
}
if (job_p) {
puts("WILL RUN THE JOB NOW");
func_buff=job_p->function;
arg_buff =job_p->arg;
func_buff(arg_buff); /* run function */
free(job_p); /* DEALLOC job */
}
}
else
{
@ -133,11 +136,9 @@ int thpool_add_work(thpool_t* tp_p, void *(*function_p)(void*), void* arg_p){
newJob->arg=arg_p;
/* add job to queue */
pthread_mutex_lock(&mutex); /* LOCK */
jobqueue_push(tp_p, newJob);
sem_post(tp_p->queued_jobsN);
printf("pushed job. queue len: %d\n", jobqueue_len(tp_p));
pthread_mutex_unlock(&mutex); /* UNLOCK */
sem_post(tp_p->queued_jobsN);
printf("pushed job. queue len: %d\n", jobqueue_len(tp_p));
return 0;
}
@ -171,7 +172,7 @@ void thpool_destroy(thpool_t* tp_p){
/* Dealloc */
free(tp_p->threads); /* DEALLOC threads */
free(tp_p->queued_jobsN); /* DEALLOC job queue semaphore */
free(tp_p->queued_jobsN); /* DEALLOC job queue semaphore */
free(tp_p->jobqueue); /* DEALLOC job queue */
free(tp_p); /* DEALLOC thread pool */
}
@ -184,10 +185,12 @@ void thpool_destroy(thpool_t* tp_p){
/* Initialise queue */
int jobqueue_init(thpool_t* tp_p){
tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC job queue */
tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue)); /* MALLOC */
if (tp_p->jobqueue==NULL) return -1;
tp_p->jobqueue->tail=NULL;
tp_p->jobqueue->head=NULL;
tp_p->jobqueue->has_jobs=(sem_t*)malloc(sizeof(sem_t)); /* MALLOC */
sem_init(tp_p->jobqueue->has_jobs, 0, 0);
return 0;
}
@ -202,10 +205,10 @@ int jobqueue_len(thpool_t* tp_p){
/* Add job to queue */
void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev and next point to NULL */
pthread_mutex_lock(&mutex);
newjob_p->next=NULL;
newjob_p->next=NULL;
//printf("jobqueue_push() ---> sem_jobs: %d\n", jobqueue_len(tp_p));
switch(jobqueue_len(tp_p)){
case 0: /* if there are no jobs in queue */
@ -217,8 +220,8 @@ void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev a
tp_p->jobqueue->tail->next=newjob_p;
newjob_p->prev=tp_p->jobqueue->tail;
tp_p->jobqueue->tail=newjob_p;
}
pthread_mutex_unlock(&mutex);
}
@ -226,9 +229,9 @@ void jobqueue_push(thpool_t* tp_p, job_t* newjob_p){ /* remember that job prev a
job_t* jobqueue_pull(thpool_t* tp_p){
/* get first job */
pthread_mutex_lock(&mutex);
job_t* job_p;
job_p = tp_p->jobqueue->head;
printf("jobqueue_pull(): queue length: %d\n", jobqueue_len(tp_p));
/* remove job from queue */
@ -247,7 +250,7 @@ job_t* jobqueue_pull(thpool_t* tp_p){
tp_p->jobqueue->head=job_p->next;
job_p->next->prev=tp_p->jobqueue->head;
}
pthread_mutex_unlock(&mutex);
return job_p;
}
@ -267,4 +270,7 @@ void jobqueue_empty(thpool_t* tp_p){
/* Fix head and tail */
tp_p->jobqueue->tail=NULL;
tp_p->jobqueue->head=NULL;
/* Deallocs */
free(tp_p->jobqueue->has_jobs);
}

View File

@ -62,6 +62,7 @@
* |___________| | |..
*/
#ifndef _THPOOL_
#define _THPOOL_
@ -74,36 +75,30 @@
/* ========================== STRUCTURES ============================ */
/* Individual job */
/* Job */
typedef struct job_t{
void* (*function)(void* arg); /* function pointer */
void* arg; /* function's argument */
struct job_t* next; /* pointer to next job */
struct job_t* prev; /* pointer to previous job */
}job_t;
} job_t;
/* Job queue as doubly linked list */
/* Job queue */
typedef struct thpool_jobqueue{
job_t *head; /* pointer to head of queue */
job_t *tail; /* pointer to tail of queue */
}thpool_jobqueue;
sem_t *has_jobs; /* binary semaphore */
} thpool_jobqueue;
/* The threadpool */
/* Threadpool */
typedef struct thpool_t{
pthread_t* threads; /* pointer to threads' ID */
int threadsN; /* amount of threads */
thpool_jobqueue* jobqueue; /* pointer to the job queue */
sem_t *queued_jobsN; /* number of jobs in queue */
}thpool_t;
/* Container for all things that each thread is going to need */
typedef struct thread_data{
pthread_mutex_t *mutex_p;
thpool_t *tp_p;
}thread_data;
sem_t *queued_jobsN; /* number of jobs in queue */
} thpool_t;
@ -183,7 +178,7 @@ int jobqueue_init(thpool_t* tp_p);
*
* A new job will be added to the queue. The new job MUST be allocated
* before passed to this function or else other functions like jobqueue_empty()
* will be broken.
* will be broken. NOTICE: This function is thread-safe.
*
* @param pointer to threadpool
* @param pointer to the new job(MUST BE ALLOCATED)
@ -197,6 +192,7 @@ void jobqueue_push(thpool_t* tp_p, job_t* newjob_p);
*
* This does not free allocated memory so be sure to have peeked() \n
* before invoking this as else there will result lost memory pointers.
* NOTICE: This function is thread-safe.
*
* @param pointer to threadpool
* @return point to job on success,