Make it possible for threads to leapfrog each other

This commit is contained in:
Martijn van Beurden 2023-07-17 16:24:19 +02:00
parent a4b8b0f0f2
commit d41127f860
1 changed files with 35 additions and 46 deletions

View File

@ -202,20 +202,12 @@ typedef struct FLAC__StreamEncoderThreadTask {
FLAC__EntropyCodingMethod_PartitionedRiceContents partitioned_rice_contents_extra[2]; /* from find_best_partition_order_() */
FLAC__bool disable_constant_subframes;
#ifdef HAVE_PTHREAD
pthread_t thread;
sem_t sem_work_available; /* To signal to thread that work is available */
sem_t sem_work_done; /* To signal from thread that work is done */
FLAC__bool returnvalue;
#endif
} FLAC__StreamEncoderThreadTask;
#ifdef HAVE_PTHREAD
typedef struct FLAC__StreamEncoderThreadctx {
FLAC__StreamEncoderThreadTask * task[2];
FLAC__StreamEncoder *encoder; /* Add this so this struct is the only thing the thread needs */
} FLAC__StreamEncoderThreadctx;
#endif
/***********************************************************************
*
* Private class method prototypes
@ -423,7 +415,7 @@ static FILE *get_binary_stdout_(void);
typedef struct FLAC__StreamEncoderPrivate {
FLAC__StreamEncoderThreadTask * threadtask[FLAC__STREAM_ENCODER_MAX_THREADTASKS];
#ifdef HAVE_PTHREAD
FLAC__StreamEncoderThreadctx * threadctx[FLAC__STREAM_ENCODER_MAX_THREADS];
pthread_t thread[FLAC__STREAM_ENCODER_MAX_THREADS];
#endif
uint32_t input_capacity; /* current size (in samples) of the signal and residual buffers */
#ifndef FLAC__INTEGER_ONLY_LIBRARY
@ -501,7 +493,8 @@ typedef struct FLAC__StreamEncoderPrivate {
#ifdef HAVE_PTHREAD
uint32_t num_created_threads;
uint32_t num_started_threadtasks;
uint32_t next_thread;
uint32_t next_thread; /* This is the next thread that needs start, or needs to finish and be restarted */
sem_t sem_work_available; /* To signal to threads that work is available */
#endif
} FLAC__StreamEncoderPrivate;
@ -1152,13 +1145,9 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
if(encoder->protected_->num_threads > 1) {
#ifdef HAVE_PTHREAD
for(t = 1; t < encoder->protected_->num_threads; t++) {
encoder->private_->threadctx[t] = safe_calloc_(1, sizeof(FLAC__StreamEncoderThreadctx));
if(encoder->private_->threadctx[t] == NULL) {
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
encoder->private_->threadctx[t]->encoder = encoder;
if(sem_init(&encoder->private_->sem_work_available, 0, 0)) {
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
for(t = 1; t < (encoder->protected_->num_threads * 2 - 1); t++) {
encoder->private_->threadtask[t] = safe_calloc_(1, sizeof(FLAC__StreamEncoderThreadTask));
@ -1651,13 +1640,13 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)
ok = false;
}
for(t = 1; t < encoder->private_->num_created_threads; t++)
pthread_cancel(encoder->private_->threadtask[t]->thread);
pthread_cancel(encoder->private_->thread[t]);
#ifdef __APPLE__
for(t = 1; t < encoder->protected_->num_threads * 2 - 1; t++)
sem_post(&encoder->private_->threadtask[t]->sem_work_available);
#endif
for(t = 1; t < encoder->private_->num_created_threads; t++)
pthread_join(encoder->private_->threadtask[t]->thread, NULL);
pthread_join(encoder->private_->thread[t], NULL);
#else
FLAC__ASSERT(0);
#endif
@ -2636,14 +2625,6 @@ void free_(FLAC__StreamEncoder *encoder)
encoder->private_->window_unaligned[i] = 0;
}
}
#endif
#ifdef HAVE_PTHREAD
for(t = 0; t < encoder->protected_->num_threads; t++) {
if(0 != encoder->private_->threadctx[t]) {
free(encoder->private_->threadctx[t]);
encoder->private_->threadctx[t] = 0;
}
}
#endif
for(t = 0; t < (encoder->protected_->num_threads * 2 - 1); t++) {
if(0 == encoder->private_->threadtask[t])
@ -2717,6 +2698,10 @@ void free_(FLAC__StreamEncoder *encoder)
}
}
#ifdef HAVE_PTHREAD
if(encoder->protected_->num_threads > 1)
sem_destroy(&encoder->private_->sem_work_available);
#endif
if(encoder->protected_->verify) {
for(i = 0; i < encoder->protected_->channels; i++) {
if(0 != encoder->private_->verify.input_fifo.data[i]) {
@ -3386,14 +3371,9 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
else {
#ifdef HAVE_PTHREAD
if(encoder->private_->num_created_threads < encoder->protected_->num_threads) {
/* Prepare threadctx */
encoder->private_->threadctx[encoder->private_->next_thread]->task[0] = encoder->private_->threadtask[encoder->private_->next_thread];
encoder->private_->threadctx[encoder->private_->next_thread]->task[1] = encoder->private_->threadtask[encoder->private_->next_thread+encoder->protected_->num_threads-1];
/* Create a new thread */
pthread_create(&encoder->private_->threadtask[encoder->private_->next_thread]->thread,
NULL,
process_frame_thread_,
encoder->private_->threadctx[encoder->private_->next_thread]);
pthread_create(&encoder->private_->thread[encoder->private_->next_thread],
NULL, process_frame_thread_, encoder);
encoder->private_->num_created_threads++;
}
else if(encoder->private_->num_started_threadtasks == encoder->protected_->num_threads * 2 - 1) {
@ -3412,6 +3392,7 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
encoder->private_->threadtask[encoder->private_->next_thread]->current_frame_number = encoder->private_->current_frame_number;
sem_post(&encoder->private_->threadtask[encoder->private_->next_thread]->sem_work_available);
sem_post(&encoder->private_->sem_work_available);
if(encoder->private_->num_started_threadtasks < encoder->protected_->num_threads * 2 - 1)
encoder->private_->num_started_threadtasks++;
@ -3435,21 +3416,30 @@ FLAC__bool process_frame_(FLAC__StreamEncoder *encoder, FLAC__bool is_last_block
#ifdef HAVE_PTHREAD
void * process_frame_thread_(void * args) {
FLAC__StreamEncoderThreadctx * threadctx = args;
FLAC__StreamEncoder * encoder = threadctx->encoder;
FLAC__bool ticktock = false;
FLAC__StreamEncoder * encoder = args;
FLAC__StreamEncoderThreadTask * task;
FLAC__uint16 crc;
uint32_t t;
while(1) {
FLAC__bool ok = true;
sem_wait(&threadctx->task[ticktock]->sem_work_available);
FLAC__ASSERT_DECLARATION(FLAC__bool got_task = false);
sem_wait(&encoder->private_->sem_work_available);
#ifdef __APPLE__
pthread_testcancel();
#endif
for(t = 1; t < encoder->protected_->num_threads * 2 - 1; t++) {
if(sem_trywait(&encoder->private_->threadtask[t]->sem_work_available) == 0) {
FLAC__ASSERT_DECLARATION(got_task = true);
task = encoder->private_->threadtask[t];
break;
}
}
FLAC__ASSERT(got_task);
/*
* Process the frame header and subframes into the frame bitbuffer
*/
if(ok && !process_subframes_(encoder, threadctx->task[ticktock])) {
if(ok && !process_subframes_(encoder, task)) {
/* the above function sets the state for us in case of an error */
ok = false;
}
@ -3457,7 +3447,7 @@ void * process_frame_thread_(void * args) {
/*
* Zero-pad the frame to a byte_boundary
*/
if(ok && !FLAC__bitwriter_zero_pad_to_byte_boundary(threadctx->task[ticktock]->frame)) {
if(ok && !FLAC__bitwriter_zero_pad_to_byte_boundary(task->frame)) {
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
ok = false;
}
@ -3465,20 +3455,19 @@ void * process_frame_thread_(void * args) {
/*
* CRC-16 the whole thing
*/
FLAC__ASSERT(FLAC__bitwriter_is_byte_aligned(threadctx->task[ticktock]->frame));
FLAC__ASSERT(FLAC__bitwriter_is_byte_aligned(task->frame));
if(
ok &&
(
!FLAC__bitwriter_get_write_crc16(threadctx->task[ticktock]->frame, &crc) ||
!FLAC__bitwriter_write_raw_uint32(threadctx->task[ticktock]->frame, crc, FLAC__FRAME_FOOTER_CRC_LEN)
!FLAC__bitwriter_get_write_crc16(task->frame, &crc) ||
!FLAC__bitwriter_write_raw_uint32(task->frame, crc, FLAC__FRAME_FOOTER_CRC_LEN)
)
) {
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
ok = false;
}
threadctx->task[ticktock]->returnvalue = ok;
sem_post(&threadctx->task[ticktock]->sem_work_done);
ticktock = !ticktock;
task->returnvalue = ok;
sem_post(&task->sem_work_done);
}
}
#endif