Merge branch 'master' into g88_trying_to_make_keepalive_local

This commit is contained in:
Manos 2023-12-18 15:54:53 +01:00 committed by GitHub
commit 4c62b1ec33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 154 additions and 81 deletions

26
.github/workflows/tests.yml vendored Normal file
View File

@ -0,0 +1,26 @@
name: Tests
on:
push:
pull_request:
jobs:
test-normal-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Valgrind
run: sudo apt-get install -y valgrind
- name: Run tests with standard compilation flags
working-directory: tests/
run: ./normal_compile.sh
test-optimized-compile:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Valgrind
run: sudo apt-get install -y valgrind
- name: Run tests with optimized compilation flags
working-directory: tests/
run: ./optimized_compile.sh

View File

@ -1,4 +1,5 @@
![Build status](http://178.62.170.124:3000/pithikos/c-thread-pool/badge/?branch=master)
[![GitHub Actions](https://github.com/Pithikos/C-Thread-Pool/workflows/tests.yml/badge.svg?branch=master)](https://github.com/Pithikos/C-Thread-Pool/actions?query=workflow%3Atests+branch%3Amaster)
# C Thread Pool
@ -65,3 +66,5 @@ You are very welcome to contribute. If you have a new feature in mind, you can a
* For coding style simply try to stick to the conventions you find in the existing codebase.
* Tests: A new fix or feature should be covered by tests. If the existing tests are not sufficient, we expect an according test to follow with the pull request.
* Documentation: for a new feature please add documentation. For an API change the documentation has to be thorough and super easy to understand.
If you wish to **get access as a collaborator** feel free to mention it in the issue https://github.com/Pithikos/C-Thread-Pool/issues/78

View File

@ -1,47 +1,47 @@
## High level
Description: Library providing a threading pool where you can add work on the fly. The number
of threads in the pool is adjustable when creating the pool. In most cases
this should equal the number of threads supported by your cpu.
For an example on how to use the threadpool, check the main.c file or just read
the documentation found in the README.md file.
In this header file a detailed overview of the functions and the threadpool's logical
scheme is presented in case you wish to tweak or alter something.
_______________________________________________________
scheme is presented in case you wish to tweak or alter something.
_______________________________________________________
/ \
| JOB QUEUE | job1 | job2 | job3 | job4 | .. |
| JOB QUEUE | job1 | job2 | job3 | job4 | .. |
| |
| threadpool | thread1 | thread2 | .. |
\_______________________________________________________/
Description: Jobs are added to the job queue. Once a thread in the pool
is idle, it is assigned with the first job from the queue(and
erased from the queue). It's each thread's job to read from
the queue serially(using lock) and executing each job
is idle, it is assigned the first job from the queue (and that job is
erased from the queue). It is each thread's job to read from
the queue serially (using lock) and executing each job
until the queue is empty.
Scheme:
thpool______ jobqueue____ ______
thpool______ jobqueue____ ______
| | | | .----------->|_job0_| Newly added job
| | | rear ----------' |_job1_|
| jobqueue----------------->| | |_job2_|
| | | front ----------. |__..__|
| | | front ----------. |__..__|
|___________| |___________| '----------->|_jobn_| Job for thread to take
job0________
job0________
| |
| function---->
| |
| arg------->
| | job1________
| | job1________
| next-------------->| |
|___________| | |..

View File

@ -1,7 +1,7 @@
### Why isn't `pthread_exit()` used to exit a thread?
###Why isn't pthread_exit() used to exit a thread?
`thread_do` used to use pthread_exit(). However that resulted in
hard times of testing for memory leaks. The reason is that on pthread_exit()
`thread_do` used to use `pthread_exit()`. However that resulted in
hard times of testing for memory leaks. The reason is that on `pthread_exit()`
not all memory is freed bt pthread (probably for future threads or false
belief that the application is terminating). For these reasons a simple return
is used.
@ -9,27 +9,28 @@ is used.
Interestingly using `pthread_exit()` results in much more memory being allocated.
###Why do you use sleep() after calling thpool_destroy()?
This is needed only in the tests. The reason is that if you call thpool_destroy
and then exit immedietely, maybe the program will exit before all the threads
### Why do you use `sleep()` after calling `thpool_destroy()`?
This is needed only in the tests. The reason is that if you call `thpool_destroy()`
and then exit immediately, maybe the program will exit before all the threads
had the time to deallocate. In that way it is impossible to check for memory
leaks.
In production you don't have to worry about this since if you call exit,
immedietely after you destroyied the pool, the threads will be freed
anyway by the OS. If you eitherway destroy the pool in the middle of your
In production you don't have to worry about this since if you call `exit()`,
immediately after you destroyed the pool, the threads will be freed
anyway by the OS. If you anyway destroy the pool in the middle of your
program it doesn't matter again since the program will not exit immediately
and thus threads will have more than enough time to terminate.
### Why does `wait()` use all my CPU?
###Why does wait() use all my CPU?
Notice: As of 11-Dec-2015 wait() doesn't use polling anymore. Instead a conditional variable is being used so in theory there should not be any CPU overhead.
Notice: As of 11-Dec-2015 `wait()` doesn't use polling anymore. Instead a conditional variable is being used so in theory there should not be any CPU overhead.
Normally `wait()` will spike CPU usage to full when called. This is normal as long as it doesn't last for more than 1 second. The reason this happens is that `wait()` goes through various phases of polling (what is called smart polling).
* Initially there is no interval between polling and hence the 100% use of your CPU.
* After that the polling interval grows exponentially.
* Finally after x seconds, if there is still work, polling falls back to a very big interval.
The reason `wait()` works in this way, is that the function is mostly used when someone wants to wait for some calculation to finish. So if the calculation is assumed to take a long time then we don't want to poll too often. Still we want to poll fast in case the calculation is a simple one. To solve these two problems, this seemingly awkward behaviour is present.

View File

@ -13,16 +13,11 @@
#include <stdio.h>
#include <pthread.h>
#include <stdint.h>
#include "thpool.h"
void task1(){
printf("Thread #%u working on task1\n", (int)pthread_self());
}
void task2(){
printf("Thread #%u working on task2\n", (int)pthread_self());
void task(void *arg){
printf("Thread #%u working on %d\n", (int)pthread_self(), (int) arg);
}
@ -33,11 +28,11 @@ int main(){
puts("Adding 40 tasks to threadpool");
int i;
for (i=0; i<20; i++){
thpool_add_work(thpool, (void*)task1, NULL);
thpool_add_work(thpool, (void*)task2, NULL);
for (i=0; i<40; i++){
thpool_add_work(thpool, task, (void*)(uintptr_t)i);
};
thpool_wait(thpool);
puts("Killing threadpool");
thpool_destroy(thpool);

View File

@ -28,25 +28,48 @@ function test_thread_free { #threads
}
function _test_thread_free_multi { #threads
output=$(valgrind --leak-check=full --track-origins=yes ./test "$1" 2>&1 > /dev/null)
heap_usage=$(echo "$output" | grep "total heap usage")
allocs=$(extract_num "[0-9]* allocs" "$heap_usage")
frees=$(extract_num "[0-9]* frees" "$heap_usage")
if (( "$allocs" == 0 )); then
err "Allocated 0 times. Something is wrong.." "$output"
return 1
fi
if (( "$allocs" != "$frees" )); then
err "Allocated $allocs times but freed only $frees" "$output"
return 1
fi
#echo "Allocs: $allocs Frees: $frees"
}
# This is the same with test_many_thread_allocs but multiplied
function test_thread_free_multi { #threads #times
function test_thread_free_multi { #threads #times #nparallel
echo "Testing multiple threads creation and destruction in pool(threads=$1 times=$2)"
compile src/no_work.c
for ((i = 1; i <= $2; i++)); do
pids=()
nparallel="${3:-10}"
# Run tests in p
for (( i = 1; i <= "$2"; i++ )); do
# Run test in background
python -c "import sys; sys.stdout.write('$i/$2\r')"
output=$(valgrind --leak-check=full --track-origins=yes ./test "$1" 2>&1 > /dev/null)
heap_usage=$(echo "$output" | grep "total heap usage")
allocs=$(extract_num "[0-9]* allocs" "$heap_usage")
frees=$(extract_num "[0-9]* frees" "$heap_usage")
if (( "$allocs" == 0 )); then
err "Allocated 0 times. Something is wrong.." "$output"
exit 1
_test_thread_free_multi "$1" &
pids+=($!)
# Wait for 10 background jobs to finish
if (( "$i" % 10 == 0 )); then
for pid in ${pids[@]}; do
wait $pid
if (( $? != 0 )); then
err "Test failed" "Test failed"
fi
done
pids=()
fi
if (( "$allocs" != "$frees" )); then
err "Allocated $allocs times but freed only $frees" "$output"
exit 1
fi
#echo "Allocs: $allocs Frees: $frees"
done
echo
}
@ -62,7 +85,12 @@ test_thread_free 8
test_thread_free 1
test_thread_free 20
test_thread_free_multi 4 20
test_thread_free_multi 3 1000
test_thread_free_multi 100 100
# test_thread_free_multi 3 1000 # Takes way too long
test_thread_free_multi 3 200
# test_thread_free_multi 100 100 # Takes way too long
test_thread_free_multi 100 20 1
echo "No memory leaks"

View File

@ -20,7 +20,7 @@ int main(int argc, char *argv[]){
char* p;
if (argc != 3){
puts("This testfile needs excactly two arguments");
puts("This testfile needs exactly two arguments");
exit(1);
}
int num_jobs = strtol(argv[1], &p, 10);

View File

@ -24,7 +24,7 @@ int main(int argc, char *argv[]){
char* p;
if (argc != 2){
puts("This testfile needs excactly one arguments");
puts("This testfile needs exactly one arguments");
exit(1);
}
int num_threads = strtol(argv[1], &p, 10);

View File

@ -8,7 +8,16 @@
*
********************************/
#if defined(__APPLE__)
#include <AvailabilityMacros.h>
#else
#ifndef _POSIX_C_SOURCE
#define _POSIX_C_SOURCE 200809L
#endif
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 500
#endif
#endif
#include <unistd.h>
#include <signal.h>
#include <stdio.h>
@ -19,6 +28,9 @@
#if defined(__linux__)
#include <sys/prctl.h>
#endif
#if defined(__FreeBSD__) || defined(__OpenBSD__)
#include <pthread_np.h>
#endif
#include "thpool.h"
@ -34,6 +46,13 @@
#define err(str)
#endif
#ifndef THPOOL_THREAD_NAME
#define THPOOL_THREAD_NAME thpool
#endif
#define STRINGIFY(x) #x
#define TOSTRING(x) STRINGIFY(x)
static volatile int threads_on_hold;
/* ========================== STRUCTURES ============================ */
@ -203,7 +222,7 @@ void thpool_wait(thpool_* thpool_p){
/* Destroy the threadpool */
void thpool_destroy(thpool_* thpool_p){
/* No need to destory if it's NULL */
/* No need to destroy if it's NULL */
if (thpool_p == NULL) return ;
volatile int threads_total = thpool_p->num_threads_alive;
@ -252,7 +271,7 @@ void thpool_pause(thpool_* thpool_p) {
/* Resume all threads in threadpool */
void thpool_resume(thpool_* thpool_p) {
// resuming a single threadpool hasn't been
// implemented yet, meanwhile this supresses
// implemented yet, meanwhile this suppresses
// the warnings
(void)thpool_p;
@ -280,7 +299,7 @@ int thpool_num_threads_working(thpool_* thpool_p){
static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
*thread_p = (struct thread*)malloc(sizeof(struct thread));
if (thread_p == NULL){
if (*thread_p == NULL){
err("thread_init(): Could not allocate memory for thread\n");
return -1;
}
@ -288,7 +307,7 @@ static int thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
(*thread_p)->thpool_p = thpool_p;
(*thread_p)->id = id;
pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, (*thread_p));
pthread_create(&(*thread_p)->pthread, NULL, (void * (*)(void *)) thread_do, (*thread_p));
pthread_detach((*thread_p)->pthread);
return 0;
}
@ -306,7 +325,7 @@ static void thread_hold(int sig_id) {
/* What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interuppted is once
* In principle this is an endless loop. The only time this loop gets interrupted is once
* thpool_destroy() is invoked or the program exits.
*
* @param thread thread that will run this function
@ -314,15 +333,18 @@ static void thread_hold(int sig_id) {
*/
static void* thread_do(struct thread* thread_p){
/* Set thread name for profiling and debuging */
char thread_name[128] = {0};
sprintf(thread_name, "thread-pool-%d", thread_p->id);
/* Set thread name for profiling and debugging */
char thread_name[16] = {0};
snprintf(thread_name, 16, TOSTRING(THPOOL_THREAD_NAME) "-%d", thread_p->id);
#if defined(__linux__)
/* Use prctl instead to prevent using _GNU_SOURCE flag and implicit declaration */
prctl(PR_SET_NAME, thread_name);
#elif defined(__APPLE__) && defined(__MACH__)
pthread_setname_np(thread_name);
#elif defined(__FreeBSD__) || defined(__OpenBSD__)
pthread_set_name_np(thread_p->pthread, thread_name);
#else
err("thread_do(): pthread_setname_np is not supported on this system");
#endif
@ -333,7 +355,7 @@ static void* thread_do(struct thread* thread_p){
/* Register signal handler */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_flags = SA_ONSTACK;
act.sa_handler = thread_hold;
if (sigaction(SIGUSR1, &act, NULL) == -1) {
err("thread_do(): cannot handle SIGUSR1");
@ -454,11 +476,7 @@ static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
/* Get first job from queue(removes it from queue)
<<<<<<< HEAD
*
* Notice: Caller MUST hold a mutex
=======
>>>>>>> da2c0fe45e43ce0937f272c8cd2704bdc0afb490
*/
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
@ -516,6 +534,8 @@ static void bsem_init(bsem *bsem_p, int value) {
/* Reset semaphore to 0 */
static void bsem_reset(bsem *bsem_p) {
pthread_mutex_destroy(&(bsem_p->mutex));
pthread_cond_destroy(&(bsem_p->cond));
bsem_init(bsem_p, 0);
}

View File

@ -20,7 +20,7 @@ typedef struct thpool_* threadpool;
/**
* @brief Initialize threadpool
*
* Initializes a threadpool. This function will not return untill all
* Initializes a threadpool. This function will not return until all
* threads have initialized successfully.
*
* @example
@ -62,7 +62,7 @@ threadpool thpool_init(int num_threads);
* @param threadpool threadpool to which the work will be added
* @param function_p pointer to function to add as work
* @param arg_p pointer to an argument
* @return 0 on successs, -1 otherwise.
* @return 0 on success, -1 otherwise.
*/
int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
@ -77,7 +77,7 @@ int thpool_add_work(threadpool, void (*function_p)(void*), void* arg_p);
* Smart polling is used in wait. The polling is initially 0 - meaning that
* there is virtually no polling at all. If after 1 seconds the threads
* haven't finished, the polling interval starts growing exponentially
* untill it reaches max_secs seconds. Then it jumps down to a maximum polling
* until it reaches max_secs seconds. Then it jumps down to a maximum polling
* interval assuming that heavy processing is being used in the threadpool.
*
* @example