Add checks for 'overcommitting'

This scales the number of active threads back when threads have to
wait for work too often
This commit is contained in:
Martijn van Beurden 2023-07-25 13:41:54 +02:00
parent bcc37540aa
commit 021e82bfcc
1 changed files with 40 additions and 1 deletions

View File

@ -497,15 +497,18 @@ typedef struct FLAC__StreamEncoderPrivate {
uint32_t num_created_threads;
uint32_t next_thread; /* This is the next thread that needs start, or needs to finish and be restarted */
uint32_t num_started_threadtasks;
uint32_t num_available_threadtasks;
uint32_t num_available_threadtasks; /* Number of threadtasks that are available to work on */
uint32_t num_running_threads;
uint32_t next_threadtask; /* Next threadtask that is available to work on */
pthread_mutex_t mutex_md5_fifo;
pthread_mutex_t mutex_md5_active;
pthread_mutex_t mutex_work_queue; /* To lock work related variables in this struct */
pthread_cond_t cond_md5_emptied; /* To signal to main thread that MD5 queue has been emptied */
pthread_cond_t cond_work_available; /* To signal to threads that work is available */
pthread_cond_t cond_wake_up_thread; /* To signal that one sleeping thread can wake up */
FLAC__bool md5_work_available; /* To signal to threads that work is available */
FLAC__bool finish_work_threads;
int32_t overcommitted_indicator;
verify_input_fifo md5_fifo;
#endif
} FLAC__StreamEncoderPrivate;
@ -1188,6 +1191,15 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
if(pthread_cond_init(&encoder->private_->cond_wake_up_thread, NULL)) {
pthread_mutex_destroy(&encoder->private_->mutex_md5_fifo);
pthread_mutex_destroy(&encoder->private_->mutex_md5_active);
pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
pthread_cond_destroy(&encoder->private_->cond_work_available);
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
if(encoder->protected_->do_md5) {
encoder->private_->md5_fifo.size = (encoder->protected_->blocksize+OVERREAD_) * (encoder->private_->num_threadtasks + 2);
for(i = 0; i < encoder->protected_->channels; i++) {
@ -1197,6 +1209,7 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
pthread_cond_destroy(&encoder->private_->cond_work_available);
pthread_cond_destroy(&encoder->private_->cond_wake_up_thread);
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
@ -1719,6 +1732,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
for(t = 1; t < encoder->private_->num_created_threads; t++)
encoder->private_->finish_work_threads = true;
pthread_cond_broadcast(&encoder->private_->cond_wake_up_thread);
pthread_cond_broadcast(&encoder->private_->cond_work_available);
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
@ -2666,8 +2680,10 @@ void set_defaults_(FLAC__StreamEncoder *encoder)
#ifdef HAVE_PTHREAD
encoder->private_->num_created_threads = 1;
encoder->private_->next_thread = 1;
encoder->private_->num_running_threads = 1;
encoder->private_->num_started_threadtasks = 1;
encoder->private_->num_available_threadtasks = 0;
encoder->private_->overcommitted_indicator = 0;
encoder->private_->next_threadtask = 1;
encoder->private_->md5_work_available = false;
encoder->private_->finish_work_threads = false;
@ -2777,6 +2793,7 @@ void free_(FLAC__StreamEncoder *encoder)
pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
pthread_cond_destroy(&encoder->private_->cond_work_available);
pthread_cond_destroy(&encoder->private_->cond_wake_up_thread);
if(encoder->protected_->do_md5) {
for(i = 0; i < encoder->protected_->channels; i++) {
if(0 != encoder->private_->md5_fifo.data[i]) {
@ -3547,8 +3564,30 @@ void * process_frame_thread_(void * args) {
FLAC__StreamEncoder * encoder = args;
uint32_t channel;
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
encoder->private_->num_running_threads++;
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
while(1) {
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
if(encoder->private_->finish_work_threads) {
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
return NULL;
}
if(encoder->private_->num_available_threadtasks == 0)
encoder->private_->overcommitted_indicator++;
else if(encoder->private_->num_available_threadtasks > encoder->private_->num_running_threads)
encoder->private_->overcommitted_indicator--;
if(encoder->private_->overcommitted_indicator < -20) {
encoder->private_->overcommitted_indicator = 0;
pthread_cond_signal(&encoder->private_->cond_wake_up_thread);
}
else if(encoder->private_->overcommitted_indicator > 20 && encoder->private_->num_running_threads > 2) {
encoder->private_->overcommitted_indicator = 0;
encoder->private_->num_running_threads--;
pthread_cond_wait(&encoder->private_->cond_wake_up_thread, &encoder->private_->mutex_work_queue);
encoder->private_->num_running_threads++;
}
while(encoder->private_->num_available_threadtasks == 0 && !encoder->private_->md5_work_available) {
if(encoder->private_->finish_work_threads) {
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);