Smart polling implemented

This commit is contained in:
Pithikos 2015-01-18 12:42:37 +00:00
parent 2eed00fceb
commit e68223a4c5
4 changed files with 72 additions and 21 deletions

View File

@ -17,6 +17,7 @@ This is an updated and heavily refactored version of my original threadpool. The
* Synchronisation control from the user (pause/resume/wait)
* Thorough testing for memory leaks and race conditions
* Cleaner and more opaque API
* Smart polling - polling interval changes on-the-fly
## Compiling

View File

@ -33,8 +33,6 @@ int main(int argc, char *argv[]){
int num_threads = strtol(argv[2], &p, 10);
int wait_each_job = strtol(argv[3], &p, 10);
printf("%d\n", wait_each_job);
threadpool thpool = thpool_init(num_threads);
int n;

View File

@ -2,8 +2,7 @@
* Author: Johan Hanssen Seferidis
* License: MIT
* Description: Library providing a threading pool where you can add
* work. For an example on usage refer to the main file
* found in the same package
* work. For usage, check the thpool.h file or README.md
*
*//** @file thpool.h *//*
*
@ -20,7 +19,8 @@
#include "thpool.h"
#define POLLING_INTERVAL 1
#define MAX_NANOSEC 999999999
#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X))
static volatile int threads_keepalive;
static volatile int threads_on_hold;
@ -42,9 +42,9 @@ typedef struct bsem {
/* Job */
typedef struct job{
struct job* prev; /* pointer to previous job */
void* (*function)(void* arg); /* function pointer */
void* arg; /* function's argument */
struct job* prev; /* pointer to previous job */
void* arg; /* function's argument */
} job;
@ -60,18 +60,19 @@ typedef struct jobqueue{
/* Thread */
typedef struct thread{
int id; /* friendly id */
pthread_t pthread; /* pointer to actual thread */
struct thpool_* thpool_p; /* access to thpool */
int id; /* friendly id */
pthread_t pthread; /* pointer to actual thread */
struct thpool_* thpool_p; /* access to thpool */
} thread;
/* Threadpool */
typedef struct thpool_{
thread** threads; /* pointer to threads */
int num_threads_alive; /* threads currently alive */
pthread_mutex_t thcount_lock; /* used for thread count etc */
jobqueue* jobqueue_p; /* pointer to the job queue */
thread** threads; /* pointer to threads */
int num_threads_alive; /* threads currently alive */
int num_threads_working; /* threads currently working */
pthread_mutex_t thcount_lock; /* used for thread count etc */
jobqueue* jobqueue_p; /* pointer to the job queue */
} thpool_;
@ -122,7 +123,8 @@ struct thpool_* thpool_init(int num_threads){
fprintf(stderr, "thpool_init(): Could not allocate memory for thread pool\n");
exit(1);
}
thpool_p->num_threads_alive = 0;
thpool_p->num_threads_alive = 0;
thpool_p->num_threads_working = 0;
/* Initialise the job queue */
if (jobqueue_init(thpool_p)==-1){
@ -174,10 +176,47 @@ int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* arg_p){
}
/* Wait until all jobs in queue have finished */
/* Wait until all jobs have finished */
void thpool_wait(thpool_* thpool_p){
while (thpool_p->jobqueue_p->len) {
sleep(POLLING_INTERVAL);
/* Continuous polling */
double timeout = 1.0;
time_t start, end;
double tpassed;
time (&start);
while (tpassed < timeout &&
(thpool_p->jobqueue_p->len || thpool_p->num_threads_working))
{
time (&end);
tpassed = difftime(end,start);
}
/* Exponential polling */
long init_nano = 1; /* MUST be above 0 */
long new_nano;
double multiplier = 1.01;
int max_secs = 20;
struct timespec polling_interval;
polling_interval.tv_sec = 0;
polling_interval.tv_nsec = init_nano;
while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)
{
nanosleep(&polling_interval, NULL);
if ( polling_interval.tv_sec < max_secs ){
new_nano = CEIL(polling_interval.tv_nsec * multiplier);
polling_interval.tv_nsec = new_nano % MAX_NANOSEC;
if ( new_nano > MAX_NANOSEC ) {
polling_interval.tv_sec ++;
}
}
else break;
}
/* Fall back to max polling */
while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){
sleep(max_secs);
}
}
@ -305,6 +344,10 @@ static void* thread_do(struct thread* thread_p){
if (threads_keepalive){
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working++;
pthread_mutex_unlock(&thpool_p->thcount_lock);
/* Read job from queue and execute it */
void*(*func_buff)(void* arg);
void* arg_buff;
@ -319,6 +362,10 @@ static void* thread_do(struct thread* thread_p){
free(job_p);
}
pthread_mutex_lock(&thpool_p->thcount_lock);
thpool_p->num_threads_working--;
pthread_mutex_unlock(&thpool_p->thcount_lock);
}
}
pthread_mutex_lock(&thpool_p->thcount_lock);
@ -390,6 +437,7 @@ static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
}
thpool_p->jobqueue_p->len++;
bsem_post(thpool_p->jobqueue_p->has_jobs);
}

View File

@ -72,7 +72,11 @@ int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
* Once the queue is empty and all work has completed, the calling thread
* (probably the main program) will continue.
*
* Polling is used in wait. By default the polling interval is one second.
* Smart polling is used in wait. The polling is initially 0 - meaning that
* there is virtually no polling at all. If after 1 seconds the threads
* haven't finished, the polling interval starts growing exponentially
* untill it reaches max_secs seconds. Then it jumps down to a maximum polling
* interval assuming that heavy processing is being used in the threadpool.
*
* @example
*