Implemented thread handling for WaitForMultipleObjects.

Implemented thread specific functions.
This commit is contained in:
Armin Novak 2014-07-14 19:36:31 +02:00 committed by Armin Novak
parent 2de73e0243
commit c304f457cf
8 changed files with 426 additions and 101 deletions

View File

@ -109,6 +109,7 @@ option(WITH_DEBUG_SCARD "Print smartcard debug messages" ${DEFAULT_DEBUG_OPTION}
option(WITH_DEBUG_SND "Print rdpsnd debug messages" ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_SVC "Print static virtual channel debug messages." ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_TRANSPORT "Print transport debug messages." ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_THREADS "Print thread debug messages, enables handle dump" ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_TIMEZONE "Print timezone debug messages." ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_WND "Print window order debug messages" ${DEFAULT_DEBUG_OPTION})
option(WITH_DEBUG_X11_CLIPRDR "Print X11 clipboard redirection debug messages" ${DEFAULT_DEBUG_OPTION})

View File

@ -80,6 +80,7 @@
#cmakedefine WITH_DEBUG_SVC
#cmakedefine WITH_DEBUG_RDPEI
#cmakedefine WITH_DEBUG_TIMEZONE
#cmakedefine WITH_DEBUG_THREADS
#cmakedefine WITH_DEBUG_TRANSPORT
#cmakedefine WITH_DEBUG_WND
#cmakedefine WITH_DEBUG_X11

View File

@ -200,7 +200,11 @@ WINPR_API BOOL TlsFree(DWORD dwTlsIndex);
/* CommandLineToArgvA is not present in the original Windows API, WinPR always exports it */
WINPR_API LPSTR* CommandLineToArgvA(LPCSTR lpCmdLine, int* pNumArgs);
WINPR_API LPSTR *CommandLineToArgvA(LPCSTR lpCmdLine, int *pNumArgs);
#if defined(WITH_DEBUG_THREADS)
WINPR_API VOID DumpThreadHandles(void);
#endif
#ifdef __cplusplus
}

View File

@ -127,20 +127,7 @@ BOOL CloseHandle(HANDLE hObject)
LeaveCriticalSection(&_HandleCloseCbsLock);
if (Type == HANDLE_TYPE_THREAD)
{
WINPR_THREAD* thread;
thread = (WINPR_THREAD*) Object;
if (thread->started)
{
pthread_detach(thread->thread);
}
free(thread);
return TRUE;
}
else if (Type == HANDLE_TYPE_PROCESS)
if (Type == HANDLE_TYPE_PROCESS)
{
WINPR_PROCESS* process;
process = (WINPR_PROCESS*) Object;

View File

@ -100,11 +100,13 @@ static void* thread_pool_work_func(void* arg)
}
}
ExitThread(0);
return NULL;
}
static void threads_close(void *thread)
{
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
}
@ -184,15 +186,6 @@ VOID CloseThreadpool(PTP_POOL ptpp)
SetEvent(ptpp->TerminateEvent);
index = ArrayList_Count(ptpp->Threads) - 1;
while (index >= 0)
{
thread = (HANDLE) ArrayList_GetItem(ptpp->Threads, index);
WaitForSingleObject(thread, INFINITE);
index--;
}
ArrayList_Free(ptpp->Threads);
Queue_Free(ptpp->PendingQueue);
CountdownEvent_Free(ptpp->WorkComplete);

View File

@ -107,33 +107,6 @@ static long long ts_difftime(const struct timespec* o,
return newValue - oldValue;
}
static int pthread_timedjoin_np(pthread_t td, void** res,
struct timespec* timeout)
{
struct timespec timenow;
struct timespec sleepytime;
/* This is just to avoid a completely busy wait */
sleepytime.tv_sec = 0;
sleepytime.tv_nsec = 10000000; /* 10ms */
do
{
if (pthread_kill(td, 0))
return pthread_join(td, res);
nanosleep(&sleepytime, NULL);
clock_gettime(CLOCK_MONOTONIC, &timenow);
if (ts_difftime(timeout, &timenow) >= 0)
{
return ETIMEDOUT;
}
}
while (TRUE);
return ETIMEDOUT;
}
#if defined(__FreeBSD__)
/*the only way to get it work is to remove the static*/
int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec* timeout)
@ -143,10 +116,13 @@ static int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec
{
struct timespec timenow;
struct timespec sleepytime;
unsigned long long diff;
int retcode;
/* This is just to avoid a completely busy wait */
sleepytime.tv_sec = 0;
sleepytime.tv_nsec = 10000000; /* 10ms */
clock_gettime(CLOCK_MONOTONIC, &timenow);
diff = ts_difftime(&timenow, timeout);
sleepytime.tv_sec = diff / 1000000000LL;
sleepytime.tv_nsec = diff % 1000000000LL;
while ((retcode = pthread_mutex_trylock(mutex)) == EBUSY)
{
@ -223,43 +199,30 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
if (Type == HANDLE_TYPE_THREAD)
{
int status = 0;
WINPR_THREAD* thread;
void* thread_status = NULL;
thread = (WINPR_THREAD*) Object;
void *thread_status;
int status;
WINPR_THREAD *thread = (WINPR_THREAD *)Object;
status = waitOnFd(thread->pipe_fd[0], dwMilliseconds);
if (thread->started)
if (status < 0)
{
if (dwMilliseconds != INFINITE)
{
struct timespec timeout;
/* pthread_timedjoin_np returns ETIMEDOUT in case the timeout is 0,
* so set it to the smallest value to get a proper return value. */
if (dwMilliseconds == 0)
dwMilliseconds ++;
clock_gettime(CLOCK_MONOTONIC, &timeout);
ts_add_ms(&timeout, dwMilliseconds);
status = pthread_timedjoin_np(thread->thread, &thread_status, &timeout);
if (ETIMEDOUT == status)
return WAIT_TIMEOUT;
}
else
status = pthread_join(thread->thread, &thread_status);
thread->started = FALSE;
if (status != 0)
{
WLog_ERR(TAG, "pthread_join failure: [%d] %s",
status, strerror(status));
}
if (thread_status)
thread->dwExitCode = ((DWORD)(size_t) thread_status);
WLog_ERR(TAG, "waitOnFd() failure [%d] %s", errno, strerror(errno));
return WAIT_FAILED;
}
>
if (status != 1)
return WAIT_TIMEOUT;
status = pthread_join(thread->thread, &thread_status);
if (status != 0)
{
WLog_ERR(TAG, "pthread_join failure: [%d] %s",
status, strerror(status));
}
if (thread_status)
thread->dwExitCode = ((DWORD)(size_t) thread_status);
}
else if (Type == HANDLE_TYPE_PROCESS)
{
@ -517,6 +480,17 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
return WAIT_FAILED;
}
}
else if (Type == HANDLE_TYPE_THREAD)
{
WINPR_THREAD *thread = (WINPR_THREAD *) Object;
fd = thread->pipe_fd[0];
if (fd == -1)
{
WLog_ERR(TAG, "invalid thread file descriptor");
return WAIT_FAILED;
}
}
else if (Type == HANDLE_TYPE_NAMED_PIPE)
{
WINPR_NAMED_PIPE* pipe = (WINPR_NAMED_PIPE*) Object;
@ -604,6 +578,11 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
WINPR_TIMER* timer = (WINPR_TIMER*) Object;
fd = timer->fd;
}
else if (Type == HANDLE_TYPE_THREAD)
{
WINPR_THREAD *thread = (WINPR_THREAD *) Object;
fd = thread->pipe_fd[0];
}
else if (Type == HANDLE_TYPE_NAMED_PIPE)
{
WINPR_NAMED_PIPE* pipe = (WINPR_NAMED_PIPE*) Object;
@ -651,6 +630,23 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
return WAIT_FAILED;
}
}
else if (Type == HANDLE_TYPE_THREAD)
{
void *thread_status;
int status;
WINPR_THREAD *thread = (WINPR_THREAD *)Object;
status = pthread_join(thread->thread, &thread_status);
if (status != 0)
{
WLog_ERR(TAG, " pthread_join failure: [%d] %s",
status, strerror(status));
return WAIT_FAILED;
}
if (thread_status)
thread->dwExitCode = ((DWORD)(size_t) thread_status);
}
return (WAIT_OBJECT_0 + index);
}

View File

@ -21,6 +21,8 @@
#include "config.h"
#endif
#include <assert.h>
#include <winpr/handle.h>
#include <winpr/thread.h>
@ -70,22 +72,154 @@
#include <winpr/crt.h>
#include <winpr/platform.h>
#if defined(__linux__) && !defined(__ANDROID__)
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#include <sys/syscall.h>
#include <sys/types.h>
#endif
#ifdef HAVE_EVENTFD_H
#include <sys/eventfd.h>
#endif
#ifdef HAVE_EXECINFO_H
#include <execinfo.h>
#endif
#include <errno.h>
#include <winpr/collections.h>
#include "thread.h"
#include "../handle/handle.h"
static pthread_once_t thread_initialized = PTHREAD_ONCE_INIT;
static HANDLE_CLOSE_CB _ThreadHandleCloseCb;
static wListDictionary *thread_list = NULL;
static BOOL ThreadCloseHandle(HANDLE handle);
static void cleanup_handle(WINPR_THREAD *thread);
static BOOL ThreadIsHandled(HANDLE handle)
{
WINPR_THREAD *pThread = (WINPR_THREAD *)handle;
if (!pThread || pThread->Type != HANDLE_TYPE_THREAD)
{
SetLastError(ERROR_INVALID_HANDLE);
return FALSE;
}
return TRUE;
}
static void ThreadInitialize(void)
{
_ThreadHandleCloseCb.IsHandled = ThreadIsHandled;
_ThreadHandleCloseCb.CloseHandle = ThreadCloseHandle;
RegisterHandleCloseCb(&_ThreadHandleCloseCb);
}
static void dump_thread(WINPR_THREAD *thread)
{
#if defined(WITH_DEBUG_THREADS) && defined(HAVE_EXECINFO_H)
void *stack[20];
fprintf(stderr, "Called from:\n");
backtrace_symbols_fd(stack, 20, STDERR_FILENO);
fprintf(stderr, "Thread handle created still not closed!\n");
backtrace_symbols_fd(thread->create_stack, 20, STDERR_FILENO);
if (thread->started)
fprintf(stderr, "Thread still running!\n");
else if (!thread->exit_stack)
fprintf(stderr, "Thread suspended.\n");
else
{
fprintf(stderr, "Thread exited at:\n");
backtrace_symbols_fd(thread->exit_stack, 20, STDERR_FILENO);
}
#endif
}
/**
* TODO: implement thread suspend/resume using pthreads
* http://stackoverflow.com/questions/3140867/suspend-pthreads-without-using-condition
*/
static BOOL set_event(WINPR_THREAD *thread)
{
int length;
BOOL status = FALSE;
#ifdef HAVE_EVENTFD_H
eventfd_t val = 1;
void winpr_StartThread(WINPR_THREAD* thread)
do
{
length = eventfd_write(thread->pipe_fd[0], val);
}
while ((length < 0) && (errno == EINTR));
status = (length == 0) ? TRUE : FALSE;
#else
if (WaitForSingleObject(thread, 0) != WAIT_OBJECT_0)
{
length = write(thread->pipe_fd[1], "-", 1);
if (length == 1)
status = TRUE;
}
else
{
status = TRUE;
}
#endif
thread->started = FALSE;
return status;
}
static BOOL reset_event(WINPR_THREAD *thread)
{
int length;
BOOL status = FALSE;
while (WaitForSingleObject(thread, 0) == WAIT_OBJECT_0)
{
#ifdef HAVE_EVENTFD_H
eventfd_t value;
do
{
length = eventfd_read(thread->pipe_fd[0], &value);
}
while ((length < 0) && (errno == EINTR));
if ((length > 0) && (!status))
status = TRUE;
#else
length = read(thread->pipe_fd[0], &length, 1);
if ((length == 1) && (!status))
status = TRUE;
#endif
}
thread->started = TRUE;
return status;
}
static int thread_compare(void *a, void *b)
{
pthread_t *p1 = a;
pthread_t *p2 = b;
int rc = pthread_equal(*p1, *p2);
return rc;
}
void winpr_StartThread(WINPR_THREAD *thread)
{
pthread_attr_t attr;
@ -95,10 +229,12 @@ void winpr_StartThread(WINPR_THREAD* thread)
if (thread->dwStackSize > 0)
pthread_attr_setstacksize(&attr, (size_t) thread->dwStackSize);
thread->started = TRUE;
pthread_create(&thread->thread, &attr, (pthread_start_routine) thread->lpStartAddress, thread->lpParameter);
pthread_attr_destroy(&attr);
reset_event(thread);
ListDictionary_Add(thread_list, &thread->thread, thread);
dump_thread(thread);
}
HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize,
@ -112,32 +248,162 @@ HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize
if (!thread)
return NULL;
thread->started = FALSE;
pthread_once(&thread_initialized, ThreadInitialize);
thread->dwStackSize = dwStackSize;
thread->lpParameter = lpParameter;
thread->lpStartAddress = lpStartAddress;
thread->lpThreadAttributes = lpThreadAttributes;
#if defined(WITH_DEBUG_THREADS) && defined(HAVE_EXECINFO_H)
backtrace(thread->create_stack, 20);
dump_thread(thread);
#endif
#ifdef HAVE_EVENTFD_H
thread->pipe_fd[0] = eventfd(0, EFD_NONBLOCK);
if (thread->pipe_fd[0] < 0)
{
fprintf(stderr, "[%s]: failed to create thread\n", __FUNCTION__);
free(thread);
return NULL;
}
#else
if (pipe(thread->pipe_fd) < 0)
{
fprintf(stderr, "[%s]: failed to create thread\n", __FUNCTION__);
free(thread);
return NULL;
}
#endif
pthread_mutex_init(&thread->mutex, 0);
WINPR_HANDLE_SET_TYPE(thread, HANDLE_TYPE_THREAD);
handle = (HANDLE) thread;
if (NULL == thread_list)
{
thread_list = ListDictionary_New(TRUE);
thread_list->objectKey.fnObjectEquals = thread_compare;
}
if (!(dwCreationFlags & CREATE_SUSPENDED))
winpr_StartThread(thread);
else
set_event(thread);
return handle;
}
void cleanup_handle(WINPR_THREAD *thread)
{
ListDictionary_Remove(thread_list, &thread->thread);
int rc = pthread_mutex_destroy(&thread->mutex);
if (rc)
{
fprintf(stderr, "[%s]: failed to destroy mutex [%d] %s (%d)\n", __FUNCTION__, rc, strerror(errno), errno);
}
if (thread->pipe_fd[0])
close(thread->pipe_fd[0]);
if (thread->pipe_fd[1])
close(thread->pipe_fd[1]);
free(thread);
}
BOOL ThreadCloseHandle(HANDLE handle)
{
WINPR_THREAD *thread = (WINPR_THREAD *)handle;
if (!thread_list)
{
fprintf(stderr, "[%s]: Thread list does not exist, check call!\n", __FUNCTION__);
dump_thread(thread);
}
else if (!ListDictionary_Contains(thread_list, &thread->thread))
{
fprintf(stderr, "[%s]: Thread list does not contain this thread! check call!\n", __FUNCTION__);
dump_thread(thread);
}
else
{
ListDictionary_Lock(thread_list);
dump_thread(thread);
if (!thread->started)
cleanup_handle(thread);
else if (WaitForSingleObject(thread, 0) == WAIT_OBJECT_0)
cleanup_handle(thread);
else
{
fprintf(stderr, "[%s]: Thread running, setting to detached state!\n", __FUNCTION__);
thread->detached = TRUE;
pthread_detach(thread->thread);
}
ListDictionary_Unlock(thread_list);
if (ListDictionary_Count(thread_list) < 1)
{
ListDictionary_Free(thread_list);
thread_list = NULL;
}
}
return TRUE;
}
HANDLE CreateRemoteThread(HANDLE hProcess, LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize,
LPTHREAD_START_ROUTINE lpStartAddress, LPVOID lpParameter, DWORD dwCreationFlags, LPDWORD lpThreadId)
{
fprintf(stderr, "[%s]: not implemented\n", __FUNCTION__);
return NULL;
}
VOID ExitThread(DWORD dwExitCode)
{
pthread_exit((void*) (size_t) dwExitCode);
pthread_t tid = pthread_self();
if (NULL == thread_list)
{
fprintf(stderr, "[%s]: function called without existing thread list!\n", __FUNCTION__);
#if defined(WITH_DEBUG_THREADS)
DumpThreadHandles();
#endif
}
else if (!ListDictionary_Contains(thread_list, &tid))
{
fprintf(stderr, "[%s]: function called, but no matching entry in thread list!\n", __FUNCTION__);
#if defined(WITH_DEBUG_THREADS)
DumpThreadHandles();
#endif
}
else
{
WINPR_THREAD *thread;
ListDictionary_Lock(thread_list);
thread = ListDictionary_GetItemValue(thread_list, &tid);
assert(thread);
#if defined(WITH_DEBUG_THREADS) && defined(HAVE_EXECINFO_H)
backtrace(thread->exit_stack, 20);
#endif
if (!thread->detached)
{
set_event(thread);
thread->dwExitCode = dwExitCode;
}
else
cleanup_handle(thread);
ListDictionary_Unlock(thread_list);
}
pthread_exit((void *)(size_t) dwExitCode);
}
BOOL GetExitCodeThread(HANDLE hThread, LPDWORD lpExitCode)
@ -158,20 +424,36 @@ BOOL GetExitCodeThread(HANDLE hThread, LPDWORD lpExitCode)
HANDLE _GetCurrentThread(VOID)
{
return NULL;
HANDLE hdl = NULL;
pthread_t tid = pthread_self();
if (NULL == thread_list)
{
fprintf(stderr, "[%s]: function called without existing thread list!\n", __FUNCTION__);
#if defined(WITH_DEBUG_THREADS)
DumpThreadHandles();
#endif
}
else if (!ListDictionary_Contains(thread_list, &tid))
{
fprintf(stderr, "[%s]: function called, but no matching entry in thread list!\n", __FUNCTION__);
#if defined(WITH_DEBUG_THREADS)
DumpThreadHandles();
#endif
}
else
{
hdl = ListDictionary_GetItemValue(thread_list, &tid);
}
return hdl;
}
DWORD GetCurrentThreadId(VOID)
{
#if defined(__linux__) && !defined(__ANDROID__)
pid_t tid;
tid = syscall(SYS_gettid);
return (DWORD) tid;
#else
pthread_t tid;
tid = pthread_self();
return (DWORD) tid;
#endif
}
DWORD ResumeThread(HANDLE hThread)
@ -189,6 +471,8 @@ DWORD ResumeThread(HANDLE hThread)
if (!thread->started)
winpr_StartThread(thread);
else
fprintf(stderr, "[%s]: Thread already started!\n", __FUNCTION__);
pthread_mutex_unlock(&thread->mutex);
@ -197,6 +481,7 @@ DWORD ResumeThread(HANDLE hThread)
DWORD SuspendThread(HANDLE hThread)
{
fprintf(stderr, "[%s]: Function not implemented!\n", __FUNCTION__);
return 0;
}
@ -220,6 +505,8 @@ BOOL TerminateThread(HANDLE hThread, DWORD dwExitCode)
#ifndef ANDROID
pthread_cancel(thread->thread);
#else
fprintf(stderr, "[%s]: Function not supported on this platform!\n", __FUNCTION__);
#endif
pthread_mutex_unlock(&thread->mutex);
@ -227,5 +514,55 @@ BOOL TerminateThread(HANDLE hThread, DWORD dwExitCode)
return TRUE;
}
#if defined(WITH_DEBUG_THREADS)
VOID DumpThreadHandles(void)
{
void *stack[20];
#if defined(HAVE_EXECINFO_H)
backtrace(stack, 20);
#endif
fprintf(stderr, "---------------- Called from ----------------------------\n");
#if defined(HAVE_EXECINFO_H)
backtrace_symbols_fd(stack, 20, STDERR_FILENO);
#endif
fprintf(stderr, "---------------- Start Dumping thread handles -----------\n");
if (!thread_list)
fprintf(stderr, "All threads properly shut down and disposed of.\n");
else
{
ULONG_PTR *keys = NULL;
ListDictionary_Lock(thread_list);
int x, count = ListDictionary_GetKeys(thread_list, &keys);
fprintf(stderr, "Dumping %d elements\n", count);
for (x=0; x<count; x++)
{
WINPR_THREAD *thread = ListDictionary_GetItemValue(thread_list, (void *)keys[x]);
fprintf(stderr, "Thread [%d] handle created still not closed!\n", x);
#if defined(HAVE_EXECINFO_H)
backtrace_symbols_fd(thread->create_stack, 20, STDERR_FILENO);
#endif
if (thread->started)
fprintf(stderr, "Thread [%d] still running!\n", x);
else
{
fprintf(stderr, "Thread [%d] exited at:\n", x);
#if defined(HAVE_EXECINFO_H)
backtrace_symbols_fd(thread->exit_stack, 20, STDERR_FILENO);
#endif
}
}
if (keys)
free(keys);
ListDictionary_Unlock(thread_list);
}
fprintf(stderr, "---------------- End Dumping thread handles -------------\n");
}
#endif
#endif

View File

@ -35,7 +35,9 @@ struct winpr_thread
WINPR_HANDLE_DEF();
BOOL started;
int pipe_fd[2];
BOOL mainProcess;
BOOL detached;
DWORD dwExitCode;
pthread_t thread;
SIZE_T dwStackSize;
@ -43,6 +45,10 @@ struct winpr_thread
pthread_mutex_t mutex;
LPTHREAD_START_ROUTINE lpStartAddress;
LPSECURITY_ATTRIBUTES lpThreadAttributes;
#if defined(WITH_DEBUG_THREADS)
void *create_stack[20];
void *exit_stack[20];
#endif
};
typedef struct winpr_thread WINPR_THREAD;