Refactor thread condition logic

* Add better return value checks
* Combine logic blocks in structs
* Add (easier to read) static functions for blocks
* Use condition variables only in combination with BOOL
This commit is contained in:
Armin Novak 2022-04-27 13:33:08 +02:00 committed by akallabeth
parent ad20e431cc
commit 833a445e4e
2 changed files with 292 additions and 117 deletions

View File

@ -127,14 +127,190 @@ static int ThreadGetFd(HANDLE handle)
return pThread->event.fds[0];
}
#define run_mutex_init(fkt, mux, arg) run_mutex_init_(fkt, #fkt, mux, arg)
static BOOL run_mutex_init_(int (*fkt)(pthread_mutex_t*, const pthread_mutexattr_t*),
const char* name, pthread_mutex_t* mutex,
const pthread_mutexattr_t* mutexattr)
{
int rc;
WINPR_ASSERT(fkt);
WINPR_ASSERT(mutex);
rc = fkt(mutex, mutexattr);
if (rc != 0)
{
WLog_WARN(TAG, "[%s] failed with [%s]", name, strerror(rc));
}
return rc == 0;
}
#define run_mutex_fkt(fkt, mux) run_mutex_fkt_(fkt, #fkt, mux)
static BOOL run_mutex_fkt_(int (*fkt)(pthread_mutex_t* mux), const char* name,
pthread_mutex_t* mutex)
{
int rc;
WINPR_ASSERT(fkt);
WINPR_ASSERT(mutex);
rc = fkt(mutex);
if (rc != 0)
{
WLog_WARN(TAG, "[%s] failed with [%s]", name, strerror(rc));
}
return rc == 0;
}
#define run_cond_init(fkt, cond, arg) run_cond_init_(fkt, #fkt, cond, arg)
static BOOL run_cond_init_(int (*fkt)(pthread_cond_t*, const pthread_condattr_t*), const char* name,
pthread_cond_t* condition, const pthread_condattr_t* conditionattr)
{
int rc;
WINPR_ASSERT(fkt);
WINPR_ASSERT(condition);
rc = fkt(condition, conditionattr);
if (rc != 0)
{
WLog_WARN(TAG, "[%s] failed with [%s]", name, strerror(rc));
}
return rc == 0;
}
#define run_cond_fkt(fkt, cond) run_cond_fkt_(fkt, #fkt, cond)
static BOOL run_cond_fkt_(int (*fkt)(pthread_cond_t* mux), const char* name,
pthread_cond_t* condition)
{
int rc;
WINPR_ASSERT(fkt);
WINPR_ASSERT(condition);
rc = fkt(condition);
if (rc != 0)
{
WLog_WARN(TAG, "[%s] failed with [%s]", name, strerror(rc));
}
return rc == 0;
}
static int pthread_mutex_checked_unlock(pthread_mutex_t* mutex)
{
WINPR_ASSERT(mutex);
WINPR_ASSERT(pthread_mutex_trylock(mutex) == EBUSY);
return pthread_mutex_unlock(mutex);
}
static BOOL mux_condition_bundle_init(mux_condition_bundle* bundle)
{
WINPR_ASSERT(bundle);
bundle->val = FALSE;
if (!run_mutex_init(pthread_mutex_init, &bundle->mux, NULL))
return FALSE;
if (!run_cond_init(pthread_cond_init, &bundle->cond, NULL))
return FALSE;
return TRUE;
}
static void mux_condition_bundle_uninit(mux_condition_bundle* bundle)
{
mux_condition_bundle empty = { 0 };
WINPR_ASSERT(bundle);
run_cond_fkt(pthread_cond_destroy, &bundle->cond);
run_mutex_fkt(pthread_mutex_destroy, &bundle->mux);
*bundle = empty;
}
static BOOL mux_condition_bundle_signal(mux_condition_bundle* bundle)
{
BOOL rc = TRUE;
WINPR_ASSERT(bundle);
if (!run_mutex_fkt(pthread_mutex_lock, &bundle->mux))
return FALSE;
bundle->val = TRUE;
if (!run_cond_fkt(pthread_cond_signal, &bundle->cond))
rc = FALSE;
if (!run_mutex_fkt(pthread_mutex_checked_unlock, &bundle->mux))
rc = FALSE;
return rc;
}
static BOOL mux_condition_bundle_lock(mux_condition_bundle* bundle)
{
WINPR_ASSERT(bundle);
return run_mutex_fkt(pthread_mutex_lock, &bundle->mux);
}
static BOOL mux_condition_bundle_unlock(mux_condition_bundle* bundle)
{
WINPR_ASSERT(bundle);
return run_mutex_fkt(pthread_mutex_checked_unlock, &bundle->mux);
}
static BOOL mux_condition_bundle_wait(mux_condition_bundle* bundle, const char* name)
{
BOOL rc = FALSE;
WINPR_ASSERT(bundle);
WINPR_ASSERT(name);
WINPR_ASSERT(pthread_mutex_trylock(&bundle->mux) == EBUSY);
while (!bundle->val)
{
int r = pthread_cond_wait(&bundle->cond, &bundle->mux);
if (r != 0)
{
WLog_ERR(TAG, "failed to wait for %s [%s]", name, strerror(r));
switch (r)
{
case ENOTRECOVERABLE:
case EPERM:
case ETIMEDOUT:
case EINVAL:
goto fail;
default:
break;
}
}
}
rc = bundle->val;
fail:
return rc;
}
static BOOL signal_thread_ready(WINPR_THREAD* thread)
{
WINPR_ASSERT(thread);
return mux_condition_bundle_signal(&thread->isCreated);
}
static BOOL signal_thread_is_running(WINPR_THREAD* thread)
{
WINPR_ASSERT(thread);
return mux_condition_bundle_signal(&thread->isRunning);
}
static DWORD ThreadCleanupHandle(HANDLE handle)
{
DWORD status = WAIT_FAILED;
WINPR_THREAD* thread = (WINPR_THREAD*)handle;
if (!ThreadIsHandled(handle))
return WAIT_FAILED;
if (pthread_mutex_lock(&thread->mutex))
if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
return WAIT_FAILED;
if (!thread->joined)
@ -145,17 +321,19 @@ static DWORD ThreadCleanupHandle(HANDLE handle)
if (status != 0)
{
WLog_ERR(TAG, "pthread_join failure: [%d] %s", status, strerror(status));
pthread_mutex_unlock(&thread->mutex);
return WAIT_FAILED;
goto fail;
}
else
thread->joined = TRUE;
}
if (pthread_mutex_unlock(&thread->mutex))
status = WAIT_OBJECT_0;
fail:
if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
return WAIT_FAILED;
return WAIT_OBJECT_0;
return status;
}
static HANDLE_OPS ops = { ThreadIsHandled,
@ -286,12 +464,43 @@ out:
return TRUE;
}
static BOOL signal_and_wait_for_ready(WINPR_THREAD* thread)
{
BOOL res = FALSE;
WINPR_ASSERT(thread);
if (!mux_condition_bundle_lock(&thread->isRunning))
return FALSE;
if (!signal_thread_ready(thread))
goto fail;
if (!mux_condition_bundle_wait(&thread->isRunning, "threadIsRunning"))
goto fail;
#if defined(WITH_THREAD_LIST)
if (!ListDictionary_Contains(thread_list, &thread->thread))
{
WLog_ERR(TAG, "Thread not in thread_list, startup failed!");
goto fail;
}
#endif
res = TRUE;
fail:
if (!mux_condition_bundle_unlock(&thread->isRunning))
return FALSE;
return res;
}
/* Thread launcher function responsible for registering
* cleanup handlers and calling pthread_exit, if not done
* in thread function. */
static void* thread_launcher(void* arg)
{
struct timespec waittime = { 1, 0 };
DWORD rc = 0;
WINPR_THREAD* thread = (WINPR_THREAD*)arg;
LPTHREAD_START_ROUTINE fkt;
@ -314,27 +523,7 @@ static void* thread_launcher(void* arg)
goto exit;
}
if (pthread_mutex_lock(&thread->threadIsReadyMutex))
goto exit;
if (pthread_cond_signal(&thread->threadReady) != 0)
{
WLog_ERR(TAG, "The thread could not be made ready");
pthread_mutex_unlock(&thread->threadIsReadyMutex);
goto exit;
}
pthread_cond_timedwait(&thread->threadIsReady, &thread->threadIsReadyMutex, &waittime);
#if defined(WITH_THREAD_LIST)
if (!ListDictionary_Contains(thread_list, &thread->thread))
{
WLog_ERR(TAG, "Thread not in thread_list, startup failed!");
pthread_mutex_unlock(&thread->threadIsReadyMutex);
goto exit;
}
#endif
if (pthread_mutex_unlock(&thread->threadIsReadyMutex))
if (!signal_and_wait_for_ready(thread))
goto exit;
rc = fkt(thread->lpParameter);
@ -349,7 +538,8 @@ exit:
set_event(thread);
pthread_cond_signal(&thread->threadReady);
signal_thread_ready(thread);
if (thread->detached || !thread->started)
cleanup_handle(thread);
}
@ -359,8 +549,14 @@ exit:
static BOOL winpr_StartThread(WINPR_THREAD* thread)
{
struct timespec waittime = { 1, 0 };
pthread_attr_t attr;
BOOL rc = FALSE;
BOOL locked = FALSE;
pthread_attr_t attr = { 0 };
if (!mux_condition_bundle_lock(&thread->isCreated))
return FALSE;
locked = TRUE;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
@ -370,41 +566,44 @@ static BOOL winpr_StartThread(WINPR_THREAD* thread)
thread->started = TRUE;
reset_event(thread);
if (pthread_mutex_lock(&thread->threadReadyMutex))
goto error;
#if defined(WITH_THREAD_LIST)
if (!ListDictionary_Add(thread_list, &thread->thread, thread))
{
WLog_ERR(TAG, "failed to add the thread to the thread list");
pthread_mutex_unlock(&thread->threadReadyMutex);
goto error;
}
#endif
if (pthread_create(&thread->thread, &attr, thread_launcher, thread))
{
pthread_mutex_unlock(&thread->threadReadyMutex);
goto error;
}
pthread_cond_timedwait(&thread->threadReady, &thread->threadReadyMutex, &waittime);
if (pthread_mutex_unlock(&thread->threadReadyMutex))
goto error;
if (pthread_cond_signal(&thread->threadIsReady) != 0)
if (!mux_condition_bundle_wait(&thread->isCreated, "threadIsCreated"))
goto error;
locked = FALSE;
if (!mux_condition_bundle_unlock(&thread->isCreated))
goto error;
if (!signal_thread_is_running(thread))
{
WLog_ERR(TAG, "failed to signal the thread was ready");
goto error;
}
pthread_attr_destroy(&attr);
dump_thread(thread);
return TRUE;
rc = TRUE;
error:
if (locked)
{
if (!mux_condition_bundle_unlock(&thread->isCreated))
rc = FALSE;
}
pthread_attr_destroy(&attr);
return FALSE;
if (rc)
dump_thread(thread);
return rc;
}
HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize,
@ -433,7 +632,7 @@ HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize
goto fail;
}
if (pthread_mutex_init(&thread->mutex, NULL) != 0)
if (!run_mutex_init(pthread_mutex_init, &thread->mutex, NULL))
{
WLog_ERR(TAG, "failed to initialize thread mutex");
goto fail;
@ -445,28 +644,10 @@ HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize
goto fail;
}
if (pthread_mutex_init(&thread->threadIsReadyMutex, NULL) != 0)
{
WLog_ERR(TAG, "failed to initialize a mutex for a condition variable");
if (!mux_condition_bundle_init(&thread->isCreated))
goto fail;
}
if (pthread_mutex_init(&thread->threadReadyMutex, NULL) != 0)
{
WLog_ERR(TAG, "failed to initialize a mutex for a condition variable");
if (!mux_condition_bundle_init(&thread->isRunning))
goto fail;
}
if (pthread_cond_init(&thread->threadIsReady, NULL) != 0)
{
WLog_ERR(TAG, "failed to initialize a condition variable");
goto fail;
}
if (pthread_cond_init(&thread->threadReady, NULL) != 0)
{
WLog_ERR(TAG, "failed to initialize a condition variable");
goto fail;
}
WINPR_HANDLE_SET_TYPE_AND_MODE(thread, HANDLE_TYPE_THREAD, WINPR_FD_READ);
handle = (HANDLE)thread;
@ -492,7 +673,6 @@ fail:
void cleanup_handle(void* obj)
{
int rc;
WINPR_THREAD* thread = (WINPR_THREAD*)obj;
if (!thread)
return;
@ -500,29 +680,9 @@ void cleanup_handle(void* obj)
if (!apc_uninit(&thread->apc))
WLog_ERR(TAG, "failed to destroy APC");
rc = pthread_cond_destroy(&thread->threadIsReady);
if (rc)
WLog_ERR(TAG, "failed to destroy thread->threadIsReady [%d] %s (%d)", rc, strerror(errno),
errno);
rc = pthread_cond_destroy(&thread->threadReady);
if (rc)
WLog_ERR(TAG, "failed to destroy thread->threadReady [%d] %s (%d)", rc, strerror(errno),
errno);
rc = pthread_mutex_destroy(&thread->threadIsReadyMutex);
if (rc)
WLog_ERR(TAG, "failed to destroy thread->threadIsReadyMutex [%d] %s (%d)", rc,
strerror(errno), errno);
rc = pthread_mutex_destroy(&thread->threadReadyMutex);
if (rc)
WLog_ERR(TAG, "failed to destroy thread->threadReadyMutex [%d] %s (%d)", rc,
strerror(errno), errno);
rc = pthread_mutex_destroy(&thread->mutex);
if (rc)
WLog_ERR(TAG, "failed to destroy thread->mutex [%d] %s (%d)", rc, strerror(errno), errno);
mux_condition_bundle_uninit(&thread->isCreated);
mux_condition_bundle_uninit(&thread->isRunning);
run_mutex_fkt(pthread_mutex_destroy, &thread->mutex);
winpr_event_uninit(&thread->event);
@ -747,21 +907,21 @@ DWORD ResumeThread(HANDLE hThread)
thread = (WINPR_THREAD*)Object;
if (pthread_mutex_lock(&thread->mutex))
if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
return (DWORD)-1;
if (!thread->started)
{
if (!winpr_StartThread(thread))
{
pthread_mutex_unlock(&thread->mutex);
run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex);
return (DWORD)-1;
}
}
else
WLog_WARN(TAG, "Thread already started!");
if (pthread_mutex_unlock(&thread->mutex))
if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
return (DWORD)-1;
return 0;
@ -799,7 +959,7 @@ BOOL TerminateThread(HANDLE hThread, DWORD dwExitCode)
thread->exited = TRUE;
thread->dwExitCode = dwExitCode;
if (pthread_mutex_lock(&thread->mutex))
if (!run_mutex_fkt(pthread_mutex_lock, &thread->mutex))
return FALSE;
#ifndef ANDROID
@ -808,7 +968,7 @@ BOOL TerminateThread(HANDLE hThread, DWORD dwExitCode)
WLog_ERR(TAG, "Function not supported on this platform!");
#endif
if (pthread_mutex_unlock(&thread->mutex))
if (!run_mutex_fkt(pthread_mutex_checked_unlock, &thread->mutex))
return FALSE;
set_event(thread);

View File

@ -32,34 +32,49 @@
#include "../synch/event.h"
#include "apc.h"
#ifdef __GNUC__
#define ALIGN64 __attribute__((aligned(8)))
#else
#ifdef _WIN32
#define ALIGN64 __declspec(align(8))
#else
#define ALIGN64
#endif
#endif
typedef void* (*pthread_start_routine)(void*);
typedef struct winpr_APC_item WINPR_APC_ITEM;
typedef struct
{
ALIGN64 pthread_mutex_t mux;
ALIGN64 pthread_cond_t cond;
ALIGN64 BOOL val;
} mux_condition_bundle;
struct winpr_thread
{
WINPR_HANDLE_DEF();
BOOL started;
WINPR_EVENT_IMPL event;
BOOL mainProcess;
BOOL detached;
BOOL joined;
BOOL exited;
DWORD dwExitCode;
pthread_t thread;
SIZE_T dwStackSize;
LPVOID lpParameter;
pthread_mutex_t mutex;
pthread_mutex_t threadIsReadyMutex;
pthread_cond_t threadIsReady;
pthread_mutex_t threadReadyMutex;
pthread_cond_t threadReady;
LPTHREAD_START_ROUTINE lpStartAddress;
LPSECURITY_ATTRIBUTES lpThreadAttributes;
APC_QUEUE apc;
ALIGN64 BOOL started;
ALIGN64 WINPR_EVENT_IMPL event;
ALIGN64 BOOL mainProcess;
ALIGN64 BOOL detached;
ALIGN64 BOOL joined;
ALIGN64 BOOL exited;
ALIGN64 DWORD dwExitCode;
ALIGN64 pthread_t thread;
ALIGN64 SIZE_T dwStackSize;
ALIGN64 LPVOID lpParameter;
ALIGN64 pthread_mutex_t mutex;
mux_condition_bundle isRunning;
mux_condition_bundle isCreated;
ALIGN64 LPTHREAD_START_ROUTINE lpStartAddress;
ALIGN64 LPSECURITY_ATTRIBUTES lpThreadAttributes;
ALIGN64 APC_QUEUE apc;
#if defined(WITH_DEBUG_THREADS)
void* create_stack;
void* exit_stack;
ALIGN64 void* create_stack;
ALIGN64 void* exit_stack;
#endif
};