libwinpr-pool: implement WaitForThreadpoolWorkCallbacks

This commit is contained in:
Marc-André Moreau 2013-01-21 19:22:08 -05:00
parent 025b5bab68
commit d4d19d6473
6 changed files with 213 additions and 2 deletions

View File

@ -202,4 +202,27 @@ WINPR_API UINT32 ReferenceTable_Release(wReferenceTable* referenceTable, void* p
WINPR_API wReferenceTable* ReferenceTable_New(BOOL synchronized, void* context, REFERENCE_FREE ReferenceFree); WINPR_API wReferenceTable* ReferenceTable_New(BOOL synchronized, void* context, REFERENCE_FREE ReferenceFree);
WINPR_API void ReferenceTable_Free(wReferenceTable* referenceTable); WINPR_API void ReferenceTable_Free(wReferenceTable* referenceTable);
/* Countdown Event */
struct _wCountdownEvent
{
DWORD count;
HANDLE mutex;
HANDLE event;
DWORD initialCount;
};
typedef struct _wCountdownEvent wCountdownEvent;
WINPR_API DWORD CountdownEvent_CurrentCount(wCountdownEvent* countdown);
WINPR_API DWORD CountdownEvent_InitialCount(wCountdownEvent* countdown);
WINPR_API BOOL CountdownEvent_IsSet(wCountdownEvent* countdown);
WINPR_API HANDLE CountdownEvent_WaitHandle(wCountdownEvent* countdown);
WINPR_API void CountdownEvent_AddCount(wCountdownEvent* countdown, DWORD signalCount);
WINPR_API BOOL CountdownEvent_Signal(wCountdownEvent* countdown, DWORD signalCount);
WINPR_API void CountdownEvent_Reset(wCountdownEvent* countdown, DWORD count);
WINPR_API wCountdownEvent* CountdownEvent_New(DWORD initialCount);
WINPR_API void CountdownEvent_Free(wCountdownEvent* countdown);
#endif /* WINPR_COLLECTIONS_H */ #endif /* WINPR_COLLECTIONS_H */

View File

@ -81,6 +81,7 @@ static void* thread_pool_work_func(void* arg)
{ {
work = callbackInstance->Work; work = callbackInstance->Work;
work->WorkCallback(callbackInstance, work->CallbackParameter, work); work->WorkCallback(callbackInstance, work->CallbackParameter, work);
CountdownEvent_Signal(pool->WorkComplete, 1);
free(callbackInstance); free(callbackInstance);
} }
} }
@ -101,6 +102,7 @@ PTP_POOL GetDefaultThreadpool()
pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE)); pool->Threads = (HANDLE*) malloc(pool->ThreadCount * sizeof(HANDLE));
pool->PendingQueue = Queue_New(TRUE, -1, -1); pool->PendingQueue = Queue_New(TRUE, -1, -1);
pool->WorkComplete = CountdownEvent_New(0);
for (index = 0; index < pool->ThreadCount; index++) for (index = 0; index < pool->ThreadCount; index++)
{ {

View File

@ -37,6 +37,7 @@ struct _TP_POOL
HANDLE* Threads; HANDLE* Threads;
DWORD ThreadCount; DWORD ThreadCount;
wQueue* PendingQueue; wQueue* PendingQueue;
wCountdownEvent* WorkComplete;
}; };
struct _TP_WORK struct _TP_WORK

View File

@ -116,6 +116,7 @@ VOID SubmitThreadpoolWork(PTP_WORK pwk)
if (callbackInstance) if (callbackInstance)
{ {
callbackInstance->Work = pwk; callbackInstance->Work = pwk;
CountdownEvent_AddCount(pool->WorkComplete, 1);
Queue_Enqueue(pool->PendingQueue, callbackInstance); Queue_Enqueue(pool->PendingQueue, callbackInstance);
} }
#endif #endif
@ -141,10 +142,13 @@ VOID WaitForThreadpoolWorkCallbacks(PTP_WORK pwk, BOOL fCancelPendingCallbacks)
if (pWaitForThreadpoolWorkCallbacks) if (pWaitForThreadpoolWorkCallbacks)
pWaitForThreadpoolWorkCallbacks(pwk, fCancelPendingCallbacks); pWaitForThreadpoolWorkCallbacks(pwk, fCancelPendingCallbacks);
#else #else
HANDLE event;
PTP_POOL pool; PTP_POOL pool;
pool = pwk->CallbackEnvironment->Pool; pool = pwk->CallbackEnvironment->Pool;
event = CountdownEvent_WaitHandle(pool->WorkComplete);
if (WaitForSingleObject(event, INFINITE) != WAIT_OBJECT_0)
printf("WaitForThreadpoolWorkCallbacks: error waiting on work completion\n");
#endif #endif
} }

View File

@ -25,7 +25,8 @@ set(${MODULE_PREFIX}_COLLECTIONS_SRCS
collections/ArrayList.c collections/ArrayList.c
collections/Dictionary.c collections/Dictionary.c
collections/ListDictionary.c collections/ListDictionary.c
collections/KeyValuePair.c) collections/KeyValuePair.c
collections/CountdownEvent.c)
set(${MODULE_PREFIX}_SRCS set(${MODULE_PREFIX}_SRCS
sam.c sam.c

View File

@ -0,0 +1,180 @@
/**
* WinPR: Windows Portable Runtime
* Countdown Event
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <winpr/crt.h>
#include <winpr/collections.h>
/**
* C equivalent of the C# CountdownEvent Class
* http://msdn.microsoft.com/en-us/library/dd235708/
*/
/**
* Properties
*/
/**
* Gets the number of remaining signals required to set the event.
*/
DWORD CountdownEvent_CurrentCount(wCountdownEvent* countdown)
{
return countdown->count;
}
/**
* Gets the numbers of signals initially required to set the event.
*/
DWORD CountdownEvent_InitialCount(wCountdownEvent* countdown)
{
return countdown->initialCount;
}
/**
* Determines whether the event is set.
*/
BOOL CountdownEvent_IsSet(wCountdownEvent* countdown)
{
BOOL status = FALSE;
if (WaitForSingleObject(countdown->event, 0) == WAIT_OBJECT_0)
status = TRUE;
return status;
}
/**
* Gets a WaitHandle that is used to wait for the event to be set.
*/
HANDLE CountdownEvent_WaitHandle(wCountdownEvent* countdown)
{
return countdown->event;
}
/**
* Methods
*/
/**
* Increments the CountdownEvent's current count by a specified value.
*/
void CountdownEvent_AddCount(wCountdownEvent* countdown, DWORD signalCount)
{
WaitForSingleObject(countdown->mutex, INFINITE);
countdown->count += signalCount;
if (countdown->count > 0)
ResetEvent(countdown->event);
ReleaseMutex(countdown->mutex);
}
/**
* Registers multiple signals with the CountdownEvent, decrementing the value of CurrentCount by the specified amount.
*/
BOOL CountdownEvent_Signal(wCountdownEvent* countdown, DWORD signalCount)
{
BOOL status;
BOOL newStatus;
BOOL oldStatus;
status = newStatus = oldStatus = FALSE;
WaitForSingleObject(countdown->mutex, INFINITE);
if (WaitForSingleObject(countdown->event, 0) == WAIT_OBJECT_0)
oldStatus = TRUE;
countdown->count -= signalCount;
if (countdown->count < 0)
{
printf("CountdownEvent_Signal warning: count is less than zero\n");
countdown->count = 0;
}
if (countdown->count == 0)
newStatus = TRUE;
if (newStatus && (!oldStatus))
{
SetEvent(countdown->event);
status = TRUE;
}
ReleaseMutex(countdown->mutex);
return status;
}
/**
* Resets the InitialCount property to a specified value.
*/
void CountdownEvent_Reset(wCountdownEvent* countdown, DWORD count)
{
countdown->initialCount = count;
}
/**
* Construction, Destruction
*/
wCountdownEvent* CountdownEvent_New(DWORD initialCount)
{
wCountdownEvent* countdown = NULL;
countdown = (wCountdownEvent*) malloc(sizeof(wCountdownEvent));
if (countdown)
{
countdown->count = initialCount;
countdown->initialCount = initialCount;
countdown->mutex = CreateMutex(NULL, FALSE, NULL);
countdown->event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (countdown->count == 0)
SetEvent(countdown->event);
}
return countdown;
}
void CountdownEvent_Free(wCountdownEvent* countdown)
{
CloseHandle(countdown->mutex);
CloseHandle(countdown->event);
free(countdown);
}