libwinpr-utils: test MessageQueue

This commit is contained in:
Marc-André Moreau 2013-01-24 15:08:49 -05:00
parent fa30eeaef9
commit 470defa4af
4 changed files with 103 additions and 44 deletions

View File

@ -263,19 +263,23 @@ struct _wMessageQueue
int tail;
int size;
int capacity;
wMessage** array;
wMessage* array;
HANDLE mutex;
HANDLE event;
};
typedef struct _wMessageQueue wMessageQueue;
#define WMQ_QUIT 0xFFFF
WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue);
WINPR_API BOOL MessageQueue_Wait(wMessageQueue* queue);
WINPR_API void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message);
WINPR_API void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam);
WINPR_API void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode);
WINPR_API wMessage* MessageQueue_Get(wMessageQueue* queue);
WINPR_API wMessage* MessageQueue_Peek(wMessageQueue* queue);
WINPR_API int MessageQueue_Get(wMessageQueue* queue, wMessage* message);
WINPR_API int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove);
WINPR_API wMessageQueue* MessageQueue_New();
WINPR_API void MessageQueue_Free(wMessageQueue* queue);

View File

@ -47,21 +47,14 @@ HANDLE MessageQueue_Event(wMessageQueue* queue)
* Methods
*/
void MessageQueue_Clear(wMessageQueue* queue)
BOOL MessageQueue_Wait(wMessageQueue* queue)
{
int index;
BOOL status = FALSE;
WaitForSingleObject(queue->mutex, INFINITE);
if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0)
status = TRUE;
for (index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity)
{
queue->array[index] = NULL;
}
queue->size = 0;
queue->head = queue->tail = 0;
ReleaseMutex(queue->mutex);
return status;
}
void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
@ -77,17 +70,17 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
new_capacity = queue->capacity * 2;
queue->capacity = new_capacity;
queue->array = (wMessage**) realloc(queue->array, sizeof(wMessage*) * queue->capacity);
ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage*));
queue->array = (wMessage*) realloc(queue->array, sizeof(wMessage) * queue->capacity);
ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage));
if (queue->tail < (old_capacity - 1))
{
CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage*));
CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
queue->tail += old_capacity;
}
}
queue->array[queue->tail] = message;
CopyMemory(&(queue->array[queue->tail]), message, sizeof(wMessage));
queue->tail = (queue->tail + 1) % queue->capacity;
queue->size++;
@ -98,33 +91,38 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
{
wMessage* message;
wMessage message;
message = (wMessage*) malloc(sizeof(wMessage));
message.context = context;
message.type = type;
message.wParam = wParam;
message.lParam = lParam;
if (message)
{
message->context = context;
message->type = type;
message->wParam = wParam;
message->lParam = lParam;
MessageQueue_Dispatch(queue, message);
}
MessageQueue_Dispatch(queue, &message);
}
wMessage* MessageQueue_Get(wMessageQueue* queue)
void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode)
{
wMessage* message = NULL;
MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*) (size_t) nExitCode, NULL);
}
int MessageQueue_Get(wMessageQueue* queue, wMessage* message)
{
int status = -1;
if (!MessageQueue_Wait(queue))
return status;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
{
message = queue->array[queue->head];
queue->array[queue->head] = NULL;
CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
status = (message->type != WMQ_QUIT) ? 1 : 0;
}
if (queue->size < 1)
@ -132,21 +130,31 @@ wMessage* MessageQueue_Get(wMessageQueue* queue)
ReleaseMutex(queue->mutex);
return message;
return status;
}
wMessage* MessageQueue_Peek(wMessageQueue* queue)
int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
{
wMessage* message = NULL;
int status = 0;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
message = queue->array[queue->head];
{
CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage));
status = 1;
if (remove)
{
ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage));
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
}
}
ReleaseMutex(queue->mutex);
return message;
return status;
}
/**
@ -166,7 +174,7 @@ wMessageQueue* MessageQueue_New()
queue->size = 0;
queue->capacity = 32;
queue->array = (wMessage**) malloc(sizeof(wMessage*) * queue->capacity);
queue->array = (wMessage*) malloc(sizeof(wMessage) * queue->capacity);
queue->mutex = CreateMutex(NULL, FALSE, NULL);
queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
@ -177,8 +185,6 @@ wMessageQueue* MessageQueue_New()
void MessageQueue_Free(wMessageQueue* queue)
{
MessageQueue_Clear(queue);
CloseHandle(queue->event);
CloseHandle(queue->mutex);

View File

@ -7,7 +7,8 @@ set(${MODULE_PREFIX}_DRIVER ${MODULE_NAME}.c)
set(${MODULE_PREFIX}_TESTS
TestQueue.c
TestArrayList.c
TestCmdLine.c)
TestCmdLine.c
TestMessageQueue.c)
create_test_sourcelist(${MODULE_PREFIX}_SRCS
${${MODULE_PREFIX}_DRIVER}
@ -18,7 +19,7 @@ add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS})
set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
MONOLITHIC ${MONOLITHIC_BUILD}
MODULE winpr
MODULES winpr-crt winpr-utils)
MODULES winpr-crt winpr-thread winpr-utils)
target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS})

View File

@ -0,0 +1,48 @@
#include <winpr/crt.h>
#include <winpr/thread.h>
#include <winpr/collections.h>
static void* message_queue_consumer_thread(void* arg)
{
wMessage message;
wMessageQueue* queue;
queue = (wMessageQueue*) arg;
while (MessageQueue_Wait(queue))
{
if (MessageQueue_Peek(queue, &message, TRUE))
{
if (message.type == WMQ_QUIT)
break;
printf("Message.Type: %d\n", message.type);
}
}
return NULL;
}
int TestMessageQueue(int argc, char* argv[])
{
HANDLE thread;
wMessageQueue* queue;
printf("Message Queue\n");
queue = MessageQueue_New();
thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_queue_consumer_thread, (void*) queue, 0, NULL);
MessageQueue_Post(queue, NULL, 123, NULL, NULL);
MessageQueue_Post(queue, NULL, 456, NULL, NULL);
MessageQueue_Post(queue, NULL, 789, NULL, NULL);
MessageQueue_PostQuit(queue, 0);
WaitForSingleObject(thread, INFINITE);
MessageQueue_Free(queue);
return 0;
}