C-Thread-Pool/thpool.c

293 lines
6.3 KiB
C
Raw Normal View History

2011-11-05 16:35:31 +04:00
/* ********************************
* Author: Johan Hanssen Seferidis
* Date: 12/08/2011
* License: MIT
2014-12-29 14:48:08 +03:00
* Description: Library providing a threading pool where you can add
* work. For an example on usage refer to the main file
* found in the same package
2014-12-29 14:48:08 +03:00
*
2011-11-05 16:35:31 +04:00
*//** @file thpool.h *//*
********************************/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
2014-12-30 15:28:43 +03:00
#include "thpool.h" /* here you can also find the interface to each function */
2014-12-30 15:56:19 +03:00
#define POLLING_INTERVAL 1
2014-12-30 15:56:19 +03:00
2014-12-29 16:14:45 +03:00
static int thpool_keepalive = 1;
2011-11-05 16:35:31 +04:00
2014-12-30 15:56:19 +03:00
2011-11-05 16:35:31 +04:00
/* Initialise thread pool */
thpool_t* thpool_init(int threadsN){
2014-12-30 15:32:15 +03:00
thpool_t* thpool;
2011-11-05 16:35:31 +04:00
2014-11-23 14:38:34 +03:00
if (threadsN<0) threadsN=0;
2011-11-05 16:35:31 +04:00
/* Make new thread pool */
2014-12-30 15:32:15 +03:00
thpool=(thpool_t*)malloc(sizeof(thpool_t));
if (thpool==NULL){
2011-11-05 16:35:31 +04:00
fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
}
2014-12-30 15:32:15 +03:00
thpool->threads=(pthread_t*)malloc(threadsN*sizeof(pthread_t));
if (thpool->threads==NULL){
2011-11-05 16:35:31 +04:00
fprintf(stderr, "thpool_init(): Could not allocate memory for thread IDs\n");
return NULL;
}
2014-12-30 15:32:15 +03:00
thpool->threadsN=threadsN;
2011-11-05 16:35:31 +04:00
/* Initialise the job queue */
2014-12-30 15:32:15 +03:00
if (jobqueue_init(thpool)==-1){
2011-11-05 16:35:31 +04:00
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
return NULL;
}
/* Make threads in pool */
int t;
for (t=0; t<threadsN; t++){
printf("Created thread %d in pool \n", t);
2014-12-30 15:32:15 +03:00
pthread_create(&(thpool->threads[t]), NULL, (void *)thpool_thread_do, (void *)thpool);
2011-11-05 16:35:31 +04:00
}
2014-12-30 15:32:15 +03:00
return thpool;
2011-11-05 16:35:31 +04:00
}
/* What each individual thread is doing
2014-12-29 19:42:27 +03:00
*
* There are two scenarios here. One is everything works as it should and second if
2011-11-05 16:35:31 +04:00
* the thpool is to be killed. In that manner we try to BYPASS sem_wait and end each thread. */
2014-12-30 15:32:15 +03:00
static void thpool_thread_do(thpool_t* thpool){
2011-11-05 16:35:31 +04:00
while(thpool_keepalive){
2014-12-29 16:14:45 +03:00
2014-12-30 15:32:15 +03:00
//printf("**** pre bsem_wait: bsem: %d, len: %d\n", thpool->jobqueue->has_jobs->v, thpool->jobqueue->len);
bsem_wait(thpool->jobqueue->has_jobs);
2011-11-05 16:35:31 +04:00
if (thpool_keepalive){
2014-12-29 16:14:45 +03:00
2011-11-05 16:35:31 +04:00
/* Read job from queue and execute it */
void*(*func_buff)(void* arg);
void* arg_buff;
2014-12-30 15:32:15 +03:00
job_t* job;
pthread_mutex_lock(&thpool->rwmutex);
job = jobqueue_pull(thpool);
pthread_mutex_unlock(&thpool->rwmutex);
if (job) {
func_buff=job->function;
arg_buff =job->arg;
func_buff(arg_buff);
2014-12-30 15:32:15 +03:00
free(job);
2014-12-29 16:14:45 +03:00
}
2011-11-05 16:35:31 +04:00
}
else
{
2014-12-30 15:28:43 +03:00
exit(0); /* EXIT thread*/
2011-11-05 16:35:31 +04:00
}
}
2014-12-30 15:28:43 +03:00
exit(0);
2011-11-05 16:35:31 +04:00
}
/* Add work to the thread pool */
2014-12-30 15:32:15 +03:00
int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){
2014-11-23 14:38:34 +03:00
job_t* newJob;
2011-11-05 16:35:31 +04:00
2014-12-29 19:42:27 +03:00
newJob=(job_t*)malloc(sizeof(job_t));
2011-11-05 16:35:31 +04:00
if (newJob==NULL){
fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
exit(1);
}
/* add function and argument */
newJob->function=function_p;
newJob->arg=arg_p;
/* add job to queue */
2014-12-30 15:32:15 +03:00
pthread_mutex_lock(&thpool->rwmutex);
jobqueue_push(thpool, newJob);
pthread_mutex_unlock(&thpool->rwmutex);
return 0;
}
2014-12-29 19:42:27 +03:00
/* Wait until all jobs in queue have finished */
2014-12-30 15:32:15 +03:00
void thpool_wait(thpool_t* thpool){
while (thpool->jobqueue->len > 0) {
sleep(POLLING_INTERVAL);
}
2011-11-05 16:35:31 +04:00
}
/* Destroy the threadpool */
2014-12-30 15:32:15 +03:00
void thpool_destroy(thpool_t* thpool){
2014-12-30 15:28:43 +03:00
2011-11-05 16:35:31 +04:00
int t;
2014-12-30 15:28:43 +03:00
/* End each thread's infinite loop */
thpool_keepalive = 0;
2014-12-29 19:42:27 +03:00
2014-12-30 15:32:15 +03:00
jobqueue_empty(thpool);
2014-12-30 15:28:43 +03:00
2014-12-30 15:32:15 +03:00
//for (t=0; t < (thpool->threadsN); t++){
2014-12-30 15:28:43 +03:00
// sleep(1);
2014-12-30 15:32:15 +03:00
/// bsem_post(thpool->jobqueue->has_jobs);
// pthread_join(thpool->threads[t], NULL);
2014-12-29 19:42:27 +03:00
//}
2014-12-30 15:28:43 +03:00
2011-11-05 16:35:31 +04:00
/* Wait for threads to finish */
2014-12-30 15:32:15 +03:00
//for (t=0; t < (thpool->threadsN); t++){
// pthread_join(thpool->threads[t], NULL);
2014-12-30 15:28:43 +03:00
//}
2014-12-29 19:42:27 +03:00
2014-12-30 15:28:43 +03:00
/* Awake idle threads waiting at semaphore */
2014-12-30 15:32:15 +03:00
//bsem_post(thpool->jobqueue->has_jobs);
2014-12-30 15:28:43 +03:00
2014-12-30 15:32:15 +03:00
//jobqueue_empty(thpool);
2014-12-29 19:42:27 +03:00
2011-11-05 16:35:31 +04:00
/* Dealloc */
2014-12-30 15:32:15 +03:00
//free(thpool->threads);
//free(thpool->jobqueue);
//free(thpool);
2011-11-05 16:35:31 +04:00
}
2014-12-30 15:56:19 +03:00
/* ===================== JOB QUEUE OPERATIONS ======================= */
2011-11-05 16:35:31 +04:00
/* Initialise queue */
2014-12-30 15:32:15 +03:00
static int jobqueue_init(thpool_t* thpool){
thpool->jobqueue=(jobqueue_t*)malloc(sizeof(jobqueue_t));
if (thpool->jobqueue==NULL) return -1;
thpool->jobqueue->tail=NULL;
thpool->jobqueue->head=NULL;
2014-12-30 15:28:43 +03:00
2014-12-30 15:32:15 +03:00
thpool->jobqueue->has_jobs = (bsem_t*)malloc(sizeof(bsem_t));
thpool->jobqueue->has_jobs->v = 0;
2014-12-30 15:28:43 +03:00
2014-12-30 15:32:15 +03:00
thpool->jobqueue->len = 0;
2011-11-05 16:35:31 +04:00
return 0;
}
/* Add job to queue */
2014-12-30 15:32:15 +03:00
static void jobqueue_push(thpool_t* thpool, job_t* newjob){ /* remember that job prev and next point to NULL */
2014-12-30 15:32:15 +03:00
newjob->next=NULL;
2011-11-05 16:35:31 +04:00
2014-12-30 15:32:15 +03:00
switch(thpool->jobqueue->len){
2014-11-23 14:38:34 +03:00
case 0: /* if there are no jobs in queue */
2014-12-30 15:32:15 +03:00
thpool->jobqueue->tail=newjob;
thpool->jobqueue->head=newjob;
2011-11-05 16:35:31 +04:00
break;
2014-11-23 14:38:34 +03:00
default: /* if there are already jobs in queue */
2014-12-30 15:32:15 +03:00
thpool->jobqueue->tail->next=newjob;
newjob->prev=thpool->jobqueue->tail;
thpool->jobqueue->tail=newjob;
2014-11-23 14:38:34 +03:00
}
2014-12-30 15:32:15 +03:00
thpool->jobqueue->len++;
bsem_post(thpool->jobqueue->has_jobs);
2011-11-05 16:35:31 +04:00
}
2014-11-23 14:38:34 +03:00
/* Get first element from queue */
2014-12-30 15:32:15 +03:00
static job_t* jobqueue_pull(thpool_t* thpool){
2014-12-30 15:28:43 +03:00
/* get first job */
2014-12-30 15:32:15 +03:00
job_t* job;
job = thpool->jobqueue->head;
2014-11-23 14:38:34 +03:00
/* remove job from queue */
2014-12-30 15:32:15 +03:00
switch(thpool->jobqueue->len){
2011-11-05 16:35:31 +04:00
2014-11-23 14:38:34 +03:00
case 0: /* if there are no jobs in queue */
return NULL;
2011-11-05 16:35:31 +04:00
break;
2014-11-23 14:38:34 +03:00
case 1: /* if there is only one job in queue */
2014-12-30 15:32:15 +03:00
thpool->jobqueue->tail=NULL;
thpool->jobqueue->head=NULL;
2011-11-05 16:35:31 +04:00
break;
2014-11-23 14:38:34 +03:00
default: /* if there are more than one jobs in queue */
2014-12-30 15:32:15 +03:00
thpool->jobqueue->head=job->next;
job->next->prev=thpool->jobqueue->head;
2011-11-05 16:35:31 +04:00
}
2014-12-30 15:32:15 +03:00
thpool->jobqueue->len--;
// Make sure has_jobs has right value
2014-12-30 15:32:15 +03:00
if (thpool->jobqueue->len > 0) {
bsem_post(thpool->jobqueue->has_jobs);
}
2014-12-30 15:28:43 +03:00
2014-12-30 15:32:15 +03:00
return job;
2011-11-05 16:35:31 +04:00
}
2014-11-23 14:38:34 +03:00
2011-11-05 16:35:31 +04:00
/* Remove and deallocate all jobs in queue */
2014-12-30 15:32:15 +03:00
static void jobqueue_empty(thpool_t* thpool){
2014-11-23 14:38:34 +03:00
job_t* curjob;
2014-12-30 15:32:15 +03:00
curjob=thpool->jobqueue->tail;
2011-11-05 16:35:31 +04:00
2014-12-30 15:32:15 +03:00
while(thpool->jobqueue->len){
thpool->jobqueue->tail=curjob->prev;
2011-11-05 16:35:31 +04:00
free(curjob);
2014-12-30 15:32:15 +03:00
curjob=thpool->jobqueue->tail;
2011-11-05 16:35:31 +04:00
}
/* Fix head and tail */
2014-12-30 15:32:15 +03:00
thpool->jobqueue->tail=NULL;
thpool->jobqueue->head=NULL;
2014-12-29 16:14:45 +03:00
/* Deallocs */
2014-12-30 15:32:15 +03:00
free(thpool->jobqueue->has_jobs);
2011-11-05 16:35:31 +04:00
}
2014-12-30 15:56:19 +03:00
2014-12-29 19:42:27 +03:00
/* ======================== SYNCHRONISATION ========================= */
2014-12-30 15:56:19 +03:00
/* Binary semaphore post */
2014-12-29 19:42:27 +03:00
static void bsem_post(bsem_t *bsem) {
2014-12-30 15:28:43 +03:00
pthread_mutex_lock(&bsem->mutex);
bsem->v = 1;
pthread_cond_signal(&bsem->cond);
pthread_mutex_unlock(&bsem->mutex);
}
2014-12-30 15:56:19 +03:00
/* Binary semaphore wait */
2014-12-29 19:42:27 +03:00
static void bsem_wait(bsem_t *bsem) {
2014-12-30 15:28:43 +03:00
pthread_mutex_lock(&bsem->mutex);
while (bsem->v != 1) {
pthread_cond_wait(&bsem->cond, &bsem->mutex);
}
bsem->v = 0;
pthread_mutex_unlock(&bsem->mutex);
}