libwinpr-pool: improve thread pool API on Linux

This commit is contained in:
Marc-André Moreau 2013-01-22 16:19:32 -05:00
parent a98b8a1390
commit 74bba0e767
7 changed files with 95 additions and 19 deletions

View File

@ -24,6 +24,8 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#include "pool.h"
#ifdef _WIN32
static BOOL module_initialized = FALSE;

View File

@ -24,6 +24,8 @@
#include <winpr/crt.h>
#include <winpr/pool.h>
#include "pool.h"
#ifdef _WIN32
static BOOL module_initialized = FALSE;
@ -56,14 +58,16 @@ static void module_init()
PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup()
{
PTP_CLEANUP_GROUP cleanupGroup = NULL;
#ifdef _WIN32
module_init();
if (pCreateThreadpoolCleanupGroup)
return pCreateThreadpoolCleanupGroup();
#else
cleanupGroup = (PTP_CLEANUP_GROUP) malloc(sizeof(TP_CLEANUP_GROUP));
#endif
return NULL;
return cleanupGroup;
}
VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPendingCallbacks, PVOID pvCleanupContext)
@ -74,6 +78,7 @@ VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPen
if (pCloseThreadpoolCleanupGroupMembers)
pCloseThreadpoolCleanupGroupMembers(ptpcg, fCancelPendingCallbacks, pvCleanupContext);
#else
#endif
}
@ -85,6 +90,7 @@ VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg)
if (pCloseThreadpoolCleanupGroup)
pCloseThreadpoolCleanupGroup(ptpcg);
#else
free(ptpcg);
#endif
}

View File

@ -68,14 +68,27 @@ static TP_POOL DEFAULT_POOL =
static void* thread_pool_work_func(void* arg)
{
DWORD status;
PTP_POOL pool;
PTP_WORK work;
HANDLE events[2];
PTP_CALLBACK_INSTANCE callbackInstance;
pool = (PTP_POOL) arg;
while (WaitForSingleObject(Queue_Event(pool->PendingQueue), INFINITE) == WAIT_OBJECT_0)
events[0] = pool->TerminateEvent;
events[1] = Queue_Event(pool->PendingQueue);
while (1)
{
status = WaitForMultipleObjects(2, events, FALSE, INFINITE);
if (status == WAIT_OBJECT_0)
break;
if (status != (WAIT_OBJECT_0 + 1))
break;
callbackInstance = (PTP_CALLBACK_INSTANCE) Queue_Dequeue(pool->PendingQueue);
if (callbackInstance)
@ -90,28 +103,41 @@ static void* thread_pool_work_func(void* arg)
return NULL;
}
PTP_POOL GetDefaultThreadpool()
void InitializeThreadpool(PTP_POOL pool)
{
int index;
PTP_POOL pool = NULL;
pool = &DEFAULT_POOL;
HANDLE thread;
if (!pool->Threads)
{
pool->ThreadCount = 4;
pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE));
pool->Minimum = 0;
pool->Maximum = 500;
pool->Threads = ArrayList_New(TRUE);
pool->PendingQueue = Queue_New(TRUE, -1, -1);
pool->WorkComplete = CountdownEvent_New(0);
for (index = 0; index < pool->ThreadCount; index++)
pool->TerminateEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
for (index = 0; index < 4; index++)
{
pool->Threads[index] = CreateThread(NULL, 0,
thread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) thread_pool_work_func,
(void*) pool, 0, NULL);
ArrayList_Add(pool->Threads, thread);
}
}
}
PTP_POOL GetDefaultThreadpool()
{
PTP_POOL pool = NULL;
pool = &DEFAULT_POOL;
InitializeThreadpool(pool);
return pool;
}
@ -131,10 +157,7 @@ PTP_POOL CreateThreadpool(PVOID reserved)
pool = (PTP_POOL) malloc(sizeof(TP_POOL));
if (pool)
{
pool->Minimum = 0;
pool->Maximum = 500;
}
InitializeThreadpool(pool);
#endif
return pool;
@ -148,6 +171,25 @@ VOID CloseThreadpool(PTP_POOL ptpp)
if (pCloseThreadpool)
pCloseThreadpool(ptpp);
#else
int index;
HANDLE thread;
SetEvent(ptpp->TerminateEvent);
index = ArrayList_Count(ptpp->Threads) - 1;
while (index >= 0)
{
thread = (HANDLE) ArrayList_GetItem(ptpp->Threads, index);
WaitForSingleObject(thread, INFINITE);
index--;
}
ArrayList_Free(ptpp->Threads);
Queue_Free(ptpp->PendingQueue);
CountdownEvent_Free(ptpp->WorkComplete);
CloseHandle(ptpp->TerminateEvent);
free(ptpp);
#endif
}

View File

@ -34,9 +34,9 @@ struct _TP_POOL
{
DWORD Minimum;
DWORD Maximum;
HANDLE* Threads;
DWORD ThreadCount;
wArrayList* Threads;
wQueue* PendingQueue;
HANDLE TerminateEvent;
wCountdownEvent* WorkComplete;
};
@ -62,6 +62,11 @@ struct _TP_IO
void* dummy;
};
struct _TP_CLEANUP_GROUP
{
void* dummy;
};
#ifndef _WIN32
PTP_POOL GetDefaultThreadpool();

View File

@ -6,7 +6,23 @@ static int count = 0;
void test_WorkCallback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
{
printf("Hello %s: %d\n", context, count++);
int index;
BYTE a[1024];
BYTE b[1024];
BYTE c[1024];
printf("Hello %s: %d (thread: %d)\n", context, count++, GetCurrentThreadId());
for (index = 0; index < 100; index++)
{
ZeroMemory(a, 1024);
ZeroMemory(b, 1024);
ZeroMemory(c, 1024);
FillMemory(a, 1024, 0xAA);
FillMemory(b, 1024, 0xBB);
CopyMemory(c, a, 1024);
CopyMemory(c, b, 1024);
}
}
int TestPoolWork(int argc, char* argv[])

View File

@ -50,14 +50,19 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
if (Type == HANDLE_TYPE_THREAD)
{
int status;
WINPR_THREAD* thread;
void* thread_status = NULL;
if (dwMilliseconds != INFINITE)
printf("WaitForSingleObject: timeout not implemented for thread wait\n");
thread = (WINPR_THREAD*) Object;
pthread_join(thread->thread, NULL);
status = pthread_join(thread->thread, &thread_status);
if (status != 0)
printf("WaitForSingleObject: pthread_join failure: %d\n", status);
}
if (Type == HANDLE_TYPE_MUTEX)
{

View File

@ -81,7 +81,7 @@ void winpr_StartThread(WINPR_THREAD* thread)
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if (thread->dwStackSize > 0)
pthread_attr_setstacksize(&attr, (size_t) thread->dwStackSize);