diff --git a/winpr/include/winpr/collections.h b/winpr/include/winpr/collections.h index 0aa4949f6..4446f415f 100644 --- a/winpr/include/winpr/collections.h +++ b/winpr/include/winpr/collections.h @@ -263,19 +263,23 @@ struct _wMessageQueue int tail; int size; int capacity; - wMessage** array; + wMessage* array; HANDLE mutex; HANDLE event; }; typedef struct _wMessageQueue wMessageQueue; +#define WMQ_QUIT 0xFFFF + WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue); +WINPR_API BOOL MessageQueue_Wait(wMessageQueue* queue); WINPR_API void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message); WINPR_API void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam); +WINPR_API void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode); -WINPR_API wMessage* MessageQueue_Get(wMessageQueue* queue); -WINPR_API wMessage* MessageQueue_Peek(wMessageQueue* queue); +WINPR_API int MessageQueue_Get(wMessageQueue* queue, wMessage* message); +WINPR_API int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove); WINPR_API wMessageQueue* MessageQueue_New(); WINPR_API void MessageQueue_Free(wMessageQueue* queue); diff --git a/winpr/libwinpr/utils/collections/MessageQueue.c b/winpr/libwinpr/utils/collections/MessageQueue.c index ebce3ae2b..4ab6e25dd 100644 --- a/winpr/libwinpr/utils/collections/MessageQueue.c +++ b/winpr/libwinpr/utils/collections/MessageQueue.c @@ -47,21 +47,14 @@ HANDLE MessageQueue_Event(wMessageQueue* queue) * Methods */ -void MessageQueue_Clear(wMessageQueue* queue) +BOOL MessageQueue_Wait(wMessageQueue* queue) { - int index; + BOOL status = FALSE; - WaitForSingleObject(queue->mutex, INFINITE); + if (WaitForSingleObject(queue->event, INFINITE) == WAIT_OBJECT_0) + status = TRUE; - for (index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity) - { - queue->array[index] = NULL; - } - - queue->size = 0; - queue->head = queue->tail = 0; - - ReleaseMutex(queue->mutex); + return status; } void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message) @@ -77,17 +70,17 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message) new_capacity = queue->capacity * 2; queue->capacity = new_capacity; - queue->array = (wMessage**) realloc(queue->array, sizeof(wMessage*) * queue->capacity); - ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage*)); + queue->array = (wMessage*) realloc(queue->array, sizeof(wMessage) * queue->capacity); + ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage)); if (queue->tail < (old_capacity - 1)) { - CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage*)); + CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage)); queue->tail += old_capacity; } } - queue->array[queue->tail] = message; + CopyMemory(&(queue->array[queue->tail]), message, sizeof(wMessage)); queue->tail = (queue->tail + 1) % queue->capacity; queue->size++; @@ -98,33 +91,38 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message) void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam) { - wMessage* message; + wMessage message; - message = (wMessage*) malloc(sizeof(wMessage)); + message.context = context; + message.type = type; + message.wParam = wParam; + message.lParam = lParam; - if (message) - { - message->context = context; - message->type = type; - message->wParam = wParam; - message->lParam = lParam; - - MessageQueue_Dispatch(queue, message); - } + MessageQueue_Dispatch(queue, &message); } -wMessage* MessageQueue_Get(wMessageQueue* queue) +void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode) { - wMessage* message = NULL; + MessageQueue_Post(queue, NULL, WMQ_QUIT, (void*) (size_t) nExitCode, NULL); +} + +int MessageQueue_Get(wMessageQueue* queue, wMessage* message) +{ + int status = -1; + + if (!MessageQueue_Wait(queue)) + return status; WaitForSingleObject(queue->mutex, INFINITE); if (queue->size > 0) { - message = queue->array[queue->head]; - queue->array[queue->head] = NULL; + CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); + ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); queue->head = (queue->head + 1) % queue->capacity; queue->size--; + + status = (message->type != WMQ_QUIT) ? 1 : 0; } if (queue->size < 1) @@ -132,21 +130,31 @@ wMessage* MessageQueue_Get(wMessageQueue* queue) ReleaseMutex(queue->mutex); - return message; + return status; } -wMessage* MessageQueue_Peek(wMessageQueue* queue) +int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove) { - wMessage* message = NULL; + int status = 0; WaitForSingleObject(queue->mutex, INFINITE); if (queue->size > 0) - message = queue->array[queue->head]; + { + CopyMemory(message, &(queue->array[queue->head]), sizeof(wMessage)); + status = 1; + + if (remove) + { + ZeroMemory(&(queue->array[queue->head]), sizeof(wMessage)); + queue->head = (queue->head + 1) % queue->capacity; + queue->size--; + } + } ReleaseMutex(queue->mutex); - return message; + return status; } /** @@ -166,7 +174,7 @@ wMessageQueue* MessageQueue_New() queue->size = 0; queue->capacity = 32; - queue->array = (wMessage**) malloc(sizeof(wMessage*) * queue->capacity); + queue->array = (wMessage*) malloc(sizeof(wMessage) * queue->capacity); queue->mutex = CreateMutex(NULL, FALSE, NULL); queue->event = CreateEvent(NULL, TRUE, FALSE, NULL); @@ -177,8 +185,6 @@ wMessageQueue* MessageQueue_New() void MessageQueue_Free(wMessageQueue* queue) { - MessageQueue_Clear(queue); - CloseHandle(queue->event); CloseHandle(queue->mutex); diff --git a/winpr/libwinpr/utils/test/CMakeLists.txt b/winpr/libwinpr/utils/test/CMakeLists.txt index e5d6dc04c..97117735c 100644 --- a/winpr/libwinpr/utils/test/CMakeLists.txt +++ b/winpr/libwinpr/utils/test/CMakeLists.txt @@ -7,7 +7,8 @@ set(${MODULE_PREFIX}_DRIVER ${MODULE_NAME}.c) set(${MODULE_PREFIX}_TESTS TestQueue.c TestArrayList.c - TestCmdLine.c) + TestCmdLine.c + TestMessageQueue.c) create_test_sourcelist(${MODULE_PREFIX}_SRCS ${${MODULE_PREFIX}_DRIVER} @@ -18,7 +19,7 @@ add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS}) set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS MONOLITHIC ${MONOLITHIC_BUILD} MODULE winpr - MODULES winpr-crt winpr-utils) + MODULES winpr-crt winpr-thread winpr-utils) target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS}) diff --git a/winpr/libwinpr/utils/test/TestMessageQueue.c b/winpr/libwinpr/utils/test/TestMessageQueue.c new file mode 100644 index 000000000..fccf80ff7 --- /dev/null +++ b/winpr/libwinpr/utils/test/TestMessageQueue.c @@ -0,0 +1,48 @@ + +#include +#include +#include + +static void* message_queue_consumer_thread(void* arg) +{ + wMessage message; + wMessageQueue* queue; + + queue = (wMessageQueue*) arg; + + while (MessageQueue_Wait(queue)) + { + if (MessageQueue_Peek(queue, &message, TRUE)) + { + if (message.type == WMQ_QUIT) + break; + + printf("Message.Type: %d\n", message.type); + } + } + + return NULL; +} + +int TestMessageQueue(int argc, char* argv[]) +{ + HANDLE thread; + wMessageQueue* queue; + + printf("Message Queue\n"); + + queue = MessageQueue_New(); + + thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_queue_consumer_thread, (void*) queue, 0, NULL); + + MessageQueue_Post(queue, NULL, 123, NULL, NULL); + MessageQueue_Post(queue, NULL, 456, NULL, NULL); + MessageQueue_Post(queue, NULL, 789, NULL, NULL); + MessageQueue_PostQuit(queue, 0); + + WaitForSingleObject(thread, INFINITE); + + MessageQueue_Free(queue); + + return 0; +}