Simplicity

This commit is contained in:
pithikos 2015-01-02 19:23:22 +00:00
parent d42a436bfc
commit 1c20ad1cde
2 changed files with 249 additions and 146 deletions

248
thpool.c
View File

@ -14,7 +14,6 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <errno.h>
#include <time.h>
@ -24,55 +23,73 @@
#define POLLING_INTERVAL 1
static int threads_keepalive;
static int threads_hold_flag;
static int threads_on_hold;
static int return_value;
/* ========================== THREADPOOL ============================ */
/* Initialise thread pool */
thpool_t* thpool_init(int threadsN){
threads_hold_flag = 0;
threads_on_hold = 0;
threads_keepalive = 1;
if (threadsN < 0){
if ( threadsN < 0){
threadsN = 0;
}
/* Make new thread pool */
thpool_t* thpool;
thpool = (thpool_t*)malloc(sizeof(thpool_t));
if (thpool==NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
return NULL;
exit(1);
}
thpool->threads_alive = 0;
/* Initialise the job queue */
if (jobqueue_init(thpool)==-1){
fprintf(stderr, "thpool_init(): Could not allocate memory for job queue\n");
return NULL;
exit(1);
}
/* Make threads in pool */
thpool->threads = (thread_t*)malloc(threadsN*sizeof(thread_t));
thpool->threads = (thread_t**)malloc(threadsN * sizeof(thread_t));
if (thpool->threads==NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for threads\n");
return NULL;
exit(1);
}
thpool->threadsN=threadsN;
/* Thread init */
int n;
for (n=0; n<threadsN; n++){
thread_t* th;
th = &(thpool->threads[n]);
(*th).id = n;
args_t args;
args.arg1 = thpool;
args.arg2 = th;
pthread_create(&((*th).pthread), NULL, (void *)thread_do, (args_t*)&args);
pthread_detach((*th).pthread);
printf("Created thread %d in pool \n", (*th).id);
}
thread_init(thpool, thpool->threads[n], n);
//thpool->threads[n]->thpool = thpool;
/*puts("next");
thpool->threads[n] = thread_init(thpool);
(*thpool->threads[n]).id = n;
printf("Created thread %d in pool \n", n);*/
//thpool->threads[n] = malloc(sizeof(thread_t));
// thpool->threads[n]->initialized = 0;
//thpool->threads[n]->thpool = thpool;
// pthread_create(&thpool->threads[n]->pthread, NULL, (void *)thread_do, thpool->threads[n]);
// pthread_detach(thpool->threads[n]->pthread);
}
/* Wait for threads to initialize */
while (thpool->threads_alive != threadsN) {}
return thpool;
}
@ -80,21 +97,21 @@ thpool_t* thpool_init(int threadsN){
/* Add work to the thread pool */
int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p){
job_t* newjob;
newjob=(job_t*)malloc(sizeof(job_t));
if (newjob==NULL){
fprintf(stderr, "thpool_add_work(): Could not allocate memory for new job\n");
return -1;
}
/* add function and argument */
newjob->function=function_p;
newjob->arg=arg_p;
/* add job to queue */
pthread_mutex_lock(&thpool->rwmutex);
pthread_mutex_lock(&thpool->jobqueue->rwmutex);
jobqueue_push(thpool, newjob);
pthread_mutex_unlock(&thpool->rwmutex);
pthread_mutex_unlock(&thpool->jobqueue->rwmutex);
return 0;
}
@ -113,31 +130,22 @@ void thpool_destroy(thpool_t* thpool){
/* End each thread 's infinite loop */
threads_keepalive = 0;
int any_threads_idle(thpool_t* thpool){
int n;
for (n=0; n < (thpool->threadsN); n++){
if (!thpool->threads[n].working){
return 1;
}
}
return 0;
}
/* Kill idle threads */
/* Give one second to kill idle threads */
double TIMEOUT = 1.0;
time_t start, end;
double tpassed;
time (&start);
while (any_threads_idle(thpool))
{
while (tpassed < TIMEOUT)
{
bsem_post(thpool->jobqueue->has_jobs);
time (&end);
tpassed = difftime(end,start);
}
while (tpassed < TIMEOUT && thpool->threads_alive){
bsem_post(thpool->jobqueue->has_jobs);
time (&end);
tpassed = difftime(end,start);
}
/* Poll remaining threads */
while (thpool->threads_alive){
bsem_post(thpool->jobqueue->has_jobs);
sleep(1);
}
/* Job queue cleanup */
@ -152,98 +160,81 @@ void thpool_destroy(thpool_t* thpool){
void thpool_pause(thpool_t* thpool) {
threads_hold();
int n;
for (n=0; n < thpool->threads_alive; n++){
//pthread_kill(thpool->threads[n]->pthread, SIGUSR1);
pthread_kill(thpool->threads[n]->pthread, SIGUSR1);
}
}
void thpool_continue(thpool_t* thpool) {
threads_unhold();
threads_on_hold = 0;
}
/* ====================== THREAD OPERATIONS ========================= */
/* ============================ THREAD ============================== */
thread_t* thread_init (thpool_t *thpool, thread_t *thread, int id){
//thread = malloc(sizeof(thread_t));
//thread->initialized = 0;
//thread->thpool = thpool;
//pthread_create(&thread->pthread, NULL, (void *)thread_do, thread);
//pthread_detach(thread->pthread);
thread = malloc(sizeof(thread_t));
if (thread == NULL){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread\n");
exit(1);
}
static void threads_hold () {
threads_hold_flag = 1;
while (threads_hold_flag){
thread->thpool = thpool;
thread->id = id;
pthread_create(&thread->pthread, NULL, (void *)thread_do, thread);
pthread_detach(thread->pthread);
return thread;
}
static void thread_hold () {
threads_on_hold = 1;
while (threads_on_hold){
sleep(1);
}
}
static void threads_unhold () {
threads_hold_flag = 0;
}
static void thread_suicide() {
pthread_exit(NULL);
}
static void thread_kill(thread_t *th, int now) {
if (!now && (*th).working){
sleep(1);
}
pthread_kill((*th).pthread, SIGTERM);
}
static void signal_handler (int signum) {
switch(signum){
case SIGUSR1:
threads_hold();
break;
case SIGUSR2:
threads_unhold();
break;
case SIGTERM:
thread_suicide();
break;
}
}
/*
* Init point for each thread
*
* */
static void thread_do(args_t* args){
static void* thread_do(thread_t* thread){
/* Assure all threads have been created before starting serving */
thpool_t* thpool;
thread_t* thread;
thpool = (*args).arg1;
thread = (*args).arg2;
(*thread).working = 0;
thpool_t* thpool = thread->thpool;
/* Register signal handler */
struct sigaction act;
act.sa_handler = signal_handler;
act.sa_handler = thread_hold;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
perror("Error: cannot handle SIGUSR1");
}
if (sigaction(SIGUSR2, &act, NULL) == -1) {
perror("Error: cannot handle SIGUSR2");
}
if (sigaction(SIGTERM, &act, NULL) == -1) {
perror("Error: cannot handle SIGTERM");
}
/* Mark thread as alive (initialized) */
pthread_mutex_lock(&thpool->thcount_lock);
thpool->threads_alive += 1;
pthread_mutex_unlock(&thpool->thcount_lock);
// puts("ts");
//printf("Thread (%u) initialized init?: %d\n", (int)pthread_self(), (*thread).initialized);
//printf("Thread (%u) init adr: %p\n", (int)pthread_self(), &(*thread).initialized);
//printf("Thread (%u) id: %d\n", (int)pthread_self(), (*thread).id);
while(threads_keepalive){
bsem_wait(thpool->jobqueue->has_jobs);
(*thread).working = 1;
if (threads_keepalive){
@ -251,30 +242,35 @@ static void thread_do(args_t* args){
void*(*func_buff)(void* arg);
void* arg_buff;
job_t* job;
pthread_mutex_lock(&thpool->rwmutex);
pthread_mutex_lock(&thpool->jobqueue->rwmutex);
job = jobqueue_pull(thpool);
pthread_mutex_unlock(&thpool->rwmutex);
pthread_mutex_unlock(&thpool->jobqueue->rwmutex);
if (job) {
func_buff = job->function;
arg_buff = job->arg;
func_buff(arg_buff);
free(job);
}
(*thread).working = 0;
}
}
pthread_mutex_lock(&thpool->rwmutex);
thpool->threadsN --;
pthread_mutex_unlock(&thpool->rwmutex);
pthread_mutex_lock(&thpool->thcount_lock);
thpool->threads_alive --;
pthread_mutex_unlock(&thpool->thcount_lock);
printf("Thread %d exiting\n", (*thread).id);
thread_suicide();
pthread_exit(NULL);
}
static void thread_destroy (thread_t* thread){
free(thread);
}
/* ===================== JOB QUEUE OPERATIONS ======================= */
/* ============================ JOB QUEUE =========================== */
/* Initialise queue */
@ -284,6 +280,7 @@ static int jobqueue_init(thpool_t* thpool){
return -1;
}
thpool->jobqueue->has_jobs = (bsem_t*)malloc(sizeof(bsem_t));
bsem_init(thpool->jobqueue->has_jobs, 0);
jobqueue_clear(thpool);
return 0;
}
@ -371,6 +368,18 @@ static void jobqueue_destroy(thpool_t* thpool){
/* ======================== SYNCHRONISATION ========================= */
/* Binary semaphore init */
static void bsem_init(bsem_t *bsem, int value) {
bsem->v = value;
}
/* Binary semaphore reset */
static void bsem_reset(bsem_t *bsem) {
bsem_init(bsem, 0);
}
/* Binary semaphore post */
static void bsem_post(bsem_t *bsem) {
pthread_mutex_lock(&bsem->mutex);
@ -380,6 +389,15 @@ static void bsem_post(bsem_t *bsem) {
}
/* Binary semaphore post */
static void bsem_post_all(bsem_t *bsem) {
pthread_mutex_lock(&bsem->mutex);
bsem->v = 1;
pthread_cond_broadcast(&bsem->cond);
pthread_mutex_unlock(&bsem->mutex);
}
/* Binary semaphore wait */
static void bsem_wait(bsem_t *bsem) {
pthread_mutex_lock(&bsem->mutex);

147
thpool.h
View File

@ -69,13 +69,6 @@
/* ========================== STRUCTURES ============================ */
/* Generic packed args */
typedef struct args_t {
void* arg1;
void* arg2;
} args_t;
/* Binary semaphore */
typedef struct bsem_t {
pthread_mutex_t mutex;
@ -94,26 +87,28 @@ typedef struct job_t{
/* Job queue */
typedef struct jobqueue_t{
job_t *front; /* pointer to front of queue */
job_t *rear; /* pointer to rear of queue */
bsem_t *has_jobs; /* flag as binary semaphore */
int len; /* number of jobs in queue */
pthread_mutex_t rwmutex; /* used for queue r/w access */
job_t *front; /* pointer to front of queue */
job_t *rear; /* pointer to rear of queue */
bsem_t *has_jobs; /* flag as binary semaphore */
int len; /* number of jobs in queue */
} jobqueue_t;
/* Thread */
typedef struct thread_t{
int id; /* friendly id */
int working; /* is thread idle or working? */
pthread_t pthread; /* pointer to front of queue */
int initialized; /* binary to solve race conds */
pthread_t pthread; /* pointer to actual thread */
struct thpool_t* thpool; /* access to thpool */
} thread_t;
/* Threadpool */
typedef struct thpool_t{
pthread_mutex_t rwmutex; /* used for queue w/r access */
thread_t* threads; /* pointer to threads */
int threadsN; /* amount of threads */
thread_t** threads; /* pointer to threads */
int threads_alive; /* threads currently alive */
pthread_mutex_t thcount_lock; /* used for thread count etc */
jobqueue_t* jobqueue; /* pointer to the job queue */
} thpool_t;
@ -139,18 +134,6 @@ typedef struct thpool_t{
thpool_t* thpool_init(int threadsN);
/**
* @brief What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interuppted is once
* thpool_destroy() is invoked.
*
* @param threadpool to use
* @return nothing
*/
static void thpool_thread_do(args_t* args);
/**
* @brief Add work to the job queue
*
@ -179,6 +162,30 @@ int thpool_add_work(thpool_t* thpool, void *(*function_p)(void*), void* arg_p);
void thpool_wait(thpool_t* thpool);
/**
* @brief Pauses all threads immediately
*
* The threads will be paused no matter if they are idle or working.
* The threads return to their previous states once thpool_continue
* is called.
*
* While the thread is being paused, new work can be added.
*
* @param threadpool where the threads should be paused
* @return nothing
*/
void thpool_pause(thpool_t* thpool);
/**
* @brief Unpauses all threads if they are paused
*
* @param threadpool where the threads should be unpaused
* @return nothing
*/
void thpool_continue(thpool_t* thpool);
/**
* @brief Destroy the threadpool
*
@ -192,6 +199,47 @@ void thpool_destroy(thpool_t* thpool);
/* ----------------------- Thread specific --------------------------- */
/**
* @brief Initialize a thread in the thread pool
*
* Will initialize a new thread for the given threadpool and give the
* the thread an ID
*
* Notice also that the thread's id is not populated automatically.
*
* @param threadpool threadpool to create thread
* @param thread pointer to the thread that will be created
* @param id id to be given to thread
*
* @return the initialized thread
*/
thread_t* thread_init(thpool_t* thpool, thread_t* thread, int id);
/**
* @brief What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interuppted is once
* thpool_destroy() is invoked.
*
* @param threadpool to use
* @return nothing
*/
static void* thread_do(thread_t* thread);
/**
* @brief Sets the calling thread on hold until threads_on_hold is set to 1
* @param nothing
* @return nothing
*/
static void thread_hold();
/* ----------------------- Queue specific --------------------------- */
@ -219,6 +267,8 @@ static void jobqueue_clear(thpool_t* thpool);
* before passed to this function or else other functions like jobqueue_empty()
* will be broken.
*
* MUST HOLD MUTEX WHEN CALLING
*
* @param pointer to threadpool
* @param pointer to the new job(MUST BE ALLOCATED)
* @return nothing
@ -232,6 +282,8 @@ static void jobqueue_push(thpool_t* thpool, job_t* newjob);
* This does not free allocated memory so be sure to have peeked() \n
* before invoking this as else there will result lost memory pointers.
*
* MUST HOLD MUTEX WHEN CALLING
*
* @param pointer to threadpool
* @return point to job on success,
* NULL if there is no job in queue
@ -251,11 +303,44 @@ static job_t* jobqueue_pull(thpool_t* thpool);
static void jobqueue_destroy(thpool_t* thpool);
/**
* Binary semaphore
*
/* ======================== SYNCHRONISATION ========================= */
/**
* @brief Inits semaphore to given value
* @param binary semaphore
* @param value 1 or 0
* */
static void bsem_init(bsem_t *bsem, int value);
/**
* @brief Resets semaphore to 0
* @param binary semaphore
* */
static void bsem_reset(bsem_t *bsem);
/**
* @brief Sets semaphore to one and notifies at least one thread
* @param binary semaphore
* */
static void bsem_post(bsem_t *bsem);
/**
* @brief Sets semaphore to one and notifies all threads
* @param binary semaphore
* */
static void bsem_post_all(bsem_t *bsem);
/**
* @brief Waits on semaphore until semaphore has value 0
* @param binary semaphore
* */
static void bsem_wait(bsem_t *bsem);