diff --git a/thpool.c b/thpool.c index a67836a..c88b73a 100644 --- a/thpool.c +++ b/thpool.c @@ -18,8 +18,7 @@ #include #include - -#include "thpool.h" /* here you can also find the interface to each function */ +#include "thpool.h" #define POLLING_INTERVAL 1 @@ -34,45 +33,46 @@ static volatile int threads_on_hold; /* Binary semaphore */ -typedef struct bsem_t { +typedef struct bsem { pthread_mutex_t mutex; pthread_cond_t cond; int v; -} bsem_t; +} bsem; /* Job */ -typedef struct job_t{ +typedef struct job{ void* (*function)(void* arg); /* function pointer */ - void* arg; /* function's argument */ - struct job_t* prev; /* pointer to previous job */ -} job_t; + void* arg; /* function's argument */ + struct job* prev; /* pointer to previous job */ +} job; /* Job queue */ -typedef struct jobqueue_t{ - pthread_mutex_t rwmutex; /* used for queue r/w access */ - job_t *front; /* pointer to front of queue */ - job_t *rear; /* pointer to rear of queue */ - bsem_t *has_jobs; /* flag as binary semaphore */ - int len; /* number of jobs in queue */ -} jobqueue_t; +typedef struct jobqueue{ + pthread_mutex_t rwmutex; /* used for queue r/w access */ + job *front; /* pointer to front of queue */ + job *rear; /* pointer to rear of queue */ + bsem *has_jobs; /* flag as binary semaphore */ + int len; /* number of jobs in queue */ +} jobqueue; /* Thread */ -typedef struct thread_t{ - int id; /* friendly id */ - pthread_t pthread; /* pointer to actual thread */ - struct thpool_t* thpool; /* access to thpool */ -} thread_t; +typedef struct thread{ + int id; /* friendly id */ + pthread_t pthread; /* pointer to actual thread */ + struct thpool_* thpool_p; /* access to thpool */ +} thread; + /* Threadpool */ -typedef struct thpool_t{ - thread_t** threads; /* pointer to threads */ - int threads_alive; /* threads currently alive */ - pthread_mutex_t thcount_lock; /* used for thread count etc */ - jobqueue_t* jobqueue; /* pointer to the job queue */ -} thpool_t; +typedef struct thpool_{ + thread** threads; /* pointer to threads */ + int num_threads_alive; /* threads currently alive */ + pthread_mutex_t thcount_lock; /* used for thread count etc */ + jobqueue* jobqueue_p; /* pointer to the job queue */ +} thpool_; @@ -80,22 +80,23 @@ typedef struct thpool_t{ /* ========================== PROTOTYPES ============================ */ -static void thread_init(thpool_t* thpool, thread_t** thread, int id); -static void* thread_do(thread_t* thread); -static void thread_hold(); -static void thread_destroy(thread_t* thread); -static int jobqueue_init(thpool_t* thpool); -static void jobqueue_clear(thpool_t* thpool); -static void jobqueue_push(thpool_t* thpool, job_t* newjob); -static job_t* jobqueue_pull(thpool_t* thpool); -static void jobqueue_destroy(thpool_t* thpool); +static void thread_init(thpool_* thpool_p, struct thread** thread_p, int id); +static void* thread_do(struct thread* thread_p); +static void thread_hold(); +static void thread_destroy(struct thread* thread_p); -static void bsem_init(bsem_t *bsem, int value); -static void bsem_reset(bsem_t *bsem); -static void bsem_post(bsem_t *bsem); -static void bsem_post_all(bsem_t *bsem); -static void bsem_wait(bsem_t *bsem); +static int jobqueue_init(thpool_* thpool_p); +static void jobqueue_clear(thpool_* thpool_p); +static void jobqueue_push(thpool_* thpool_p, struct job* newjob_p); +static struct job* jobqueue_pull(thpool_* thpool_p); +static void jobqueue_destroy(thpool_* thpool_p); + +static void bsem_init(struct bsem *bsem_p, int value); +static void bsem_reset(struct bsem *bsem_p); +static void bsem_post(struct bsem *bsem_p); +static void bsem_post_all(struct bsem *bsem_p); +static void bsem_wait(struct bsem *bsem_p); @@ -103,58 +104,58 @@ static void bsem_wait(bsem_t *bsem); /* ========================== THREADPOOL ============================ */ + /* Initialise thread pool */ -thpool_t* thpool_init(int threadsN){ +struct thpool_* thpool_init(int num_threads){ threads_on_hold = 0; threads_keepalive = 1; - if ( threadsN < 0){ - threadsN = 0; + if ( num_threads < 0){ + num_threads = 0; } - /* Make new thread pool */ - thpool_t* thpool; - thpool = (thpool_t*)malloc(sizeof(thpool_t)); - if (thpool==NULL){ + thpool_* thpool_p; + thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_)); + if (thpool_p==NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n"); exit(1); } - thpool->threads_alive = 0; + thpool_p->num_threads_alive = 0; /* Initialise the job queue */ - if (jobqueue_init(thpool)==-1){ + if (jobqueue_init(thpool_p)==-1){ fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n"); exit(1); } /* Make threads in pool */ - thpool->threads = (thread_t**)malloc(threadsN * sizeof(thread_t)); - if (thpool->threads==NULL){ + thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread)); + if (thpool_p->threads==NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n"); exit(1); } /* Thread init */ int n; - for (n=0; nthreads[n], n); + for (n=0; nthreads[n], n); printf("Created thread %d in pool \n", n); } /* Wait for threads to initialize */ - while (thpool->threads_alive != threadsN) {} + while (thpool_p->num_threads_alive != num_threads) {} - return thpool; + return thpool_p; } /* Add work to the thread pool */ -int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){ - job_t* newjob; +int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){ + job* newjob; - newjob=(job_t*)malloc(sizeof(job_t)); + newjob=(struct job*)malloc(sizeof(struct job)); if (newjob==NULL){ fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n"); return -1; @@ -165,26 +166,26 @@ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){ newjob->arg=arg_p; /* add job to queue */ - pthread_mutex_lock(&thpool->jobqueue->rwmutex); - jobqueue_push(thpool, newjob); - pthread_mutex_unlock(&thpool->jobqueue->rwmutex); + pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); + jobqueue_push(thpool_p, newjob); + pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); return 0; } /* Wait until all jobs in queue have finished */ -void thpool_wait(thpool_t* thpool){ - while (thpool->jobqueue->len) { +void thpool_wait(thpool_* thpool_p){ + while (thpool_p->jobqueue_p->len) { sleep(POLLING_INTERVAL); } } /* Destroy the threadpool */ -void thpool_destroy(thpool_t* thpool){ +void thpool_destroy(thpool_* thpool_p){ - volatile int threads_total = thpool->threads_alive; + volatile int threads_total = thpool_p->num_threads_alive; /* End each thread 's infinite loop */ threads_keepalive = 0; @@ -194,43 +195,43 @@ void thpool_destroy(thpool_t* thpool){ time_t start, end; double tpassed; time (&start); - while (tpassed < TIMEOUT && thpool->threads_alive){ - bsem_post_all(thpool->jobqueue->has_jobs); + while (tpassed < TIMEOUT && thpool_p->num_threads_alive){ + bsem_post_all(thpool_p->jobqueue_p->has_jobs); time (&end); tpassed = difftime(end,start); } /* Poll remaining threads */ - while (thpool->threads_alive){ - bsem_post_all(thpool->jobqueue->has_jobs); + while (thpool_p->num_threads_alive){ + bsem_post_all(thpool_p->jobqueue_p->has_jobs); sleep(1); } /* Job queue cleanup */ - jobqueue_destroy(thpool); - free(thpool->jobqueue); + jobqueue_destroy(thpool_p); + free(thpool_p->jobqueue_p); /* Deallocs */ int n; for (n=0; n < threads_total; n++){ - puts("FREEING THREAD"); - //thread_destroy(thpool->threads[n]); - free(thpool->threads[n]); + thread_destroy(thpool_p->threads[n]); } - free(thpool->threads); - free(thpool); + free(thpool_p->threads); + free(thpool_p); } -void thpool_pause(thpool_t* thpool) { +/* Pause all threads in threadpool */ +void thpool_pause(thpool_* thpool_p) { int n; - for (n=0; n < thpool->threads_alive; n++){ - pthread_kill(thpool->threads[n]->pthread, SIGUSR1); + for (n=0; n < thpool_p->num_threads_alive; n++){ + pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1); } } -void thpool_resume(thpool_t* thpool) { +/* Resume all threads in threadpool */ +void thpool_resume(thpool_* thpool_p) { threads_on_hold = 0; } @@ -241,39 +242,30 @@ void thpool_resume(thpool_t* thpool) { /* ============================ THREAD ============================== */ -/** - * @brief Initialize a thread in the thread pool - * - * Will initialize a new thread for the given threadpool and give the - * the thread an ID - * - * Notice also that the thread's id is not populated automatically. +/* Initialize a thread in the thread pool * * @param thread address to the pointer of the thread to be created * @param id id to be given to the thread * */ -static void thread_init (thpool_t *thpool, thread_t **thread, int id){ +static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){ - *thread = (thread_t*)malloc(sizeof(thread_t)); - if (thread == NULL){ + *thread_p = (struct thread*)malloc(sizeof(struct thread)); + if (thread_p == NULL){ fprintf(stderr, "thpool_init(): Could not allocate memory for thread\n"); exit(1); } - (*thread)->thpool = thpool; - (*thread)->id = id; + (*thread_p)->thpool_p = thpool_p; + (*thread_p)->id = id; - pthread_create(&(*thread)->pthread, NULL, (void *)thread_do, (*thread)); - pthread_detach((*thread)->pthread); + pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p)); + pthread_detach((*thread_p)->pthread); } -/** - * @brief Sets the calling thread on hold until threads_on_hold is set to 1 - * @param thread - */ +/* Sets the calling thread on hold */ static void thread_hold () { threads_on_hold = 1; while (threads_on_hold){ @@ -282,8 +274,7 @@ static void thread_hold () { } -/** -* @brief What each thread is doing +/* What each thread is doing * * In principle this is an endless loop. The only time this loop gets interuppted is once * thpool_destroy() is invoked or the program exits. @@ -291,10 +282,10 @@ static void thread_hold () { * @param thread thread that will run this function * @return nothing */ -static void* thread_do(thread_t* thread){ +static void* thread_do(struct thread* thread_p){ /* Assure all threads have been created before starting serving */ - thpool_t* thpool = thread->thpool; + thpool_* thpool_p = thread_p->thpool_p; /* Register signal handler */ struct sigaction act; @@ -304,172 +295,143 @@ static void* thread_do(thread_t* thread){ } /* Mark thread as alive (initialized) */ - pthread_mutex_lock(&thpool->thcount_lock); - thpool->threads_alive += 1; - pthread_mutex_unlock(&thpool->thcount_lock); + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive += 1; + pthread_mutex_unlock(&thpool_p->thcount_lock); while(threads_keepalive){ - bsem_wait(thpool->jobqueue->has_jobs); + bsem_wait(thpool_p->jobqueue_p->has_jobs); if (threads_keepalive){ /* Read job from queue and execute it */ void*(*func_buff)(void* arg); void* arg_buff; - job_t* job; - pthread_mutex_lock(&thpool->jobqueue->rwmutex); - job = jobqueue_pull(thpool); - pthread_mutex_unlock(&thpool->jobqueue->rwmutex); - if (job) { - func_buff = job->function; - arg_buff = job->arg; + job* job_p; + pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex); + job_p = jobqueue_pull(thpool_p); + pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex); + if (job_p) { + func_buff = job_p->function; + arg_buff = job_p->arg; func_buff(arg_buff); - free(job); + free(job_p); } } } - pthread_mutex_lock(&thpool->thcount_lock); - thpool->threads_alive --; - pthread_mutex_unlock(&thpool->thcount_lock); + pthread_mutex_lock(&thpool_p->thcount_lock); + thpool_p->num_threads_alive --; + pthread_mutex_unlock(&thpool_p->thcount_lock); return NULL; } -/** - * @brief Frees a thread - * @param thread - */ -static void thread_destroy (thread_t* thread){ - //puts("Destroying thread"); - free(thread); +/* Frees a thread */ +static void thread_destroy (thread* thread_p){ + free(thread_p); } + + /* ============================ JOB QUEUE =========================== */ -/** - * @brief Initialize queue - * @return 0 on success, - * -1 on memory allocation error - */ -static int jobqueue_init(thpool_t* thpool){ - thpool->jobqueue=(jobqueue_t*)malloc(sizeof(jobqueue_t)); - if (thpool->jobqueue==NULL){ +/* Initialize queue */ +static int jobqueue_init(thpool_* thpool_p){ + thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct jobqueue)); + if (thpool_p->jobqueue_p == NULL){ return -1; } - thpool->jobqueue->has_jobs = (bsem_t*)malloc(sizeof(bsem_t)); - bsem_init(thpool->jobqueue->has_jobs, 0); - jobqueue_clear(thpool); + thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct bsem)); + bsem_init(thpool_p->jobqueue_p->has_jobs, 0); + jobqueue_clear(thpool_p); return 0; } -/** - * @brief Clears the queue - */ -static void jobqueue_clear(thpool_t* thpool){ +/* Clear the queue */ +static void jobqueue_clear(thpool_* thpool_p){ - while(thpool->jobqueue->len){ - free(jobqueue_pull(thpool)); + while(thpool_p->jobqueue_p->len){ + free(jobqueue_pull(thpool_p)); } - - thpool->jobqueue->front = NULL; - thpool->jobqueue->rear = NULL; - bsem_reset(thpool->jobqueue->has_jobs); - thpool->jobqueue->len = 0; + + thpool_p->jobqueue_p->front = NULL; + thpool_p->jobqueue_p->rear = NULL; + bsem_reset(thpool_p->jobqueue_p->has_jobs); + thpool_p->jobqueue_p->len = 0; } -/** - * @brief Add job to queue - * - * A new job will be added to the queue. The new job MUST be allocated - * before passed to this function or else other functions like jobqueue_empty() - * will be broken. - * - * CALLER MUST HOLD A MUTEX - * - * @param pointer to the new (allocated) job +/* Add (allocated) job to queue + * + * Notice: Caller MUST hold a mutex */ -static void jobqueue_push(thpool_t* thpool, job_t* newjob){ /* remember that job prev and next point to NULL */ +static void jobqueue_push(thpool_* thpool_p, struct job* newjob){ newjob->prev = NULL; - switch(thpool->jobqueue->len){ + switch(thpool_p->jobqueue_p->len){ case 0: /* if no jobs in queue */ - thpool->jobqueue->front = newjob; - thpool->jobqueue->rear = newjob; + thpool_p->jobqueue_p->front = newjob; + thpool_p->jobqueue_p->rear = newjob; break; default: /* if jobs in queue */ - thpool->jobqueue->rear->prev = newjob; - thpool->jobqueue->rear = newjob; + thpool_p->jobqueue_p->rear->prev = newjob; + thpool_p->jobqueue_p->rear = newjob; } - thpool->jobqueue->len++; - bsem_post(thpool->jobqueue->has_jobs); + thpool_p->jobqueue_p->len++; + bsem_post(thpool_p->jobqueue_p->has_jobs); } -/** - * @brief Get first job from queue(removes it from queue) +/* Get first job from queue(removes it from queue) * - * This does not free allocated memory so be sure to have peeked() \n - * before invoking this as else there will result lost memory pointers. - * - * CALLER MUST HOLD A MUTEX - * - * @return point to job on success, - * NULL if there is no job in queue + * Notice: Caller MUST hold a mutex */ -static job_t* jobqueue_pull(thpool_t* thpool){ +static struct job* jobqueue_pull(thpool_* thpool_p){ - job_t* job; - job = thpool->jobqueue->front; + job* job_p; + job_p = thpool_p->jobqueue_p->front; - switch(thpool->jobqueue->len){ + switch(thpool_p->jobqueue_p->len){ case 0: /* if no jobs in queue */ return NULL; case 1: /* if one job in queue */ - thpool->jobqueue->front = NULL; - thpool->jobqueue->rear = NULL; + thpool_p->jobqueue_p->front = NULL; + thpool_p->jobqueue_p->rear = NULL; break; default: /* if >1 jobs in queue */ - thpool->jobqueue->front = job->prev; + thpool_p->jobqueue_p->front = job_p->prev; } - thpool->jobqueue->len--; + thpool_p->jobqueue_p->len--; /* Make sure has_jobs has right value */ - if (thpool->jobqueue->len > 0) { - bsem_post(thpool->jobqueue->has_jobs); + if (thpool_p->jobqueue_p->len > 0) { + bsem_post(thpool_p->jobqueue_p->has_jobs); } - return job; + return job_p; } -/** - * @brief Remove and deallocate all jobs in queue - * - * This function will deallocate all jobs in the queue and set the - * jobqueue to its initialization values, thus tail and head pointing - * to NULL and amount of jobs equal to 0. - * - * */ -static void jobqueue_destroy(thpool_t* thpool){ - jobqueue_clear(thpool); - free(thpool->jobqueue->has_jobs); +/* Free all queue resources back to the system */ +static void jobqueue_destroy(thpool_* thpool_p){ + jobqueue_clear(thpool_p); + free(thpool_p->jobqueue_p->has_jobs); } @@ -479,56 +441,46 @@ static void jobqueue_destroy(thpool_t* thpool){ /* ======================== SYNCHRONISATION ========================= */ -/** - * @brief Inits semaphore to given value (1 or 0) - * */ -static void bsem_init(bsem_t *bsem, int value) { +/* Init semaphore to 1 or 0 */ +static void bsem_init(bsem *bsem_p, int value) { if (value < 0 || value > 1) { printf("ERROR: bsem_init(): Binary semaphore can take only values 1 or 0"); exit(1); } - bsem->v = value; + bsem_p->v = value; } -/** - * @brief Resets semaphore to 0 - * */ -static void bsem_reset(bsem_t *bsem) { - bsem_init(bsem, 0); +/* Reset semaphore to 0 */ +static void bsem_reset(bsem *bsem_p) { + bsem_init(bsem_p, 0); } -/** - * @brief Sets semaphore to one and notifies at least one thread - * */ -static void bsem_post(bsem_t *bsem) { - pthread_mutex_lock(&bsem->mutex); - bsem->v = 1; - pthread_cond_signal(&bsem->cond); - pthread_mutex_unlock(&bsem->mutex); +/* Post to at least one thread */ +static void bsem_post(bsem *bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_signal(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); } -/** - * @brief Sets semaphore to one and notifies all threads - * */ -static void bsem_post_all(bsem_t *bsem) { - pthread_mutex_lock(&bsem->mutex); - bsem->v = 1; - pthread_cond_broadcast(&bsem->cond); - pthread_mutex_unlock(&bsem->mutex); +/* Post to all threads */ +static void bsem_post_all(bsem *bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + bsem_p->v = 1; + pthread_cond_broadcast(&bsem_p->cond); + pthread_mutex_unlock(&bsem_p->mutex); } -/** - * @brief Waits on semaphore until semaphore has value 0 - * */ -static void bsem_wait(bsem_t *bsem) { - pthread_mutex_lock(&bsem->mutex); - while (bsem->v != 1) { - pthread_cond_wait(&bsem->cond, &bsem->mutex); +/* Wait on semaphore until semaphore has value 0 */ +static void bsem_wait(bsem* bsem_p) { + pthread_mutex_lock(&bsem_p->mutex); + while (bsem_p->v != 1) { + pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex); } - bsem->v = 0; - pthread_mutex_unlock(&bsem->mutex); + bsem_p->v = 0; + pthread_mutex_unlock(&bsem_p->mutex); } diff --git a/thpool.h b/thpool.h index c908686..50fd09d 100644 --- a/thpool.h +++ b/thpool.h @@ -13,9 +13,8 @@ #include -/* thpool is a pointer to a thpool_t data structure */ -typedef struct thpool_t* thpool; - +/* thpool is a pointer to a thpool data structure */ +typedef struct thpool_* thpool; /* =========================== FUNCTIONS ============================ */ @@ -27,11 +26,11 @@ typedef struct thpool_t* thpool; * Initializes a threadpool. This function will not return untill all * threads have initialized successfully. * - * @param threadsN number of threads to be created in the threadpool - * @return thpool pointer to created threadpool on success, - * pointer to NULL on error + * @param num_threads number of threads to be created in the threadpool + * @return thpool pointer to created threadpool on success, + * pointer to NULL on error */ -extern thpool* thpool_init(int threadsN); +extern thpool thpool_init(int num_threads); /** @@ -43,9 +42,9 @@ extern thpool* thpool_init(int threadsN); * * NOTICE: You have to cast both the function and argument to not get warnings. * - * @param thpool threadpool to which the work will be added - * @param function function to add as work - * @param argument to the above function + * @param thpool threadpool to which the work will be added + * @param function function to add as work + * @param argument single argument to passed function * @return int */ extern int thpool_add_work(thpool, void *(*function)(void*), void* arg_p);