libwinpr-utils: started implementing asynchronous MessageQueue

This commit is contained in:
Marc-André Moreau 2013-01-24 14:09:44 -05:00
parent 8db12370b0
commit fa30eeaef9
5 changed files with 257 additions and 30 deletions

View File

@ -201,6 +201,7 @@ struct rdp_update
HANDLE thread;
wQueue* queue;
BOOL asynchronous;
};
#endif /* FREERDP_UPDATE_H */

View File

@ -378,29 +378,29 @@ void update_reset_state(rdpUpdate* update)
rdpPrimaryUpdate* primary = update->primary;
rdpAltSecUpdate* altsec = update->altsec;
memset(&primary->order_info, 0, sizeof(ORDER_INFO));
memset(&primary->dstblt, 0, sizeof(DSTBLT_ORDER));
memset(&primary->patblt, 0, sizeof(PATBLT_ORDER));
memset(&primary->scrblt, 0, sizeof(SCRBLT_ORDER));
memset(&primary->opaque_rect, 0, sizeof(OPAQUE_RECT_ORDER));
memset(&primary->draw_nine_grid, 0, sizeof(DRAW_NINE_GRID_ORDER));
memset(&primary->multi_dstblt, 0, sizeof(MULTI_DSTBLT_ORDER));
memset(&primary->multi_patblt, 0, sizeof(MULTI_PATBLT_ORDER));
memset(&primary->multi_scrblt, 0, sizeof(MULTI_SCRBLT_ORDER));
memset(&primary->multi_opaque_rect, 0, sizeof(MULTI_OPAQUE_RECT_ORDER));
memset(&primary->multi_draw_nine_grid, 0, sizeof(MULTI_DRAW_NINE_GRID_ORDER));
memset(&primary->line_to, 0, sizeof(LINE_TO_ORDER));
memset(&primary->polyline, 0, sizeof(POLYLINE_ORDER));
memset(&primary->memblt, 0, sizeof(MEMBLT_ORDER));
memset(&primary->mem3blt, 0, sizeof(MEM3BLT_ORDER));
memset(&primary->save_bitmap, 0, sizeof(SAVE_BITMAP_ORDER));
memset(&primary->glyph_index, 0, sizeof(GLYPH_INDEX_ORDER));
memset(&primary->fast_index, 0, sizeof(FAST_INDEX_ORDER));
memset(&primary->fast_glyph, 0, sizeof(FAST_GLYPH_ORDER));
memset(&primary->polygon_sc, 0, sizeof(POLYGON_SC_ORDER));
memset(&primary->polygon_cb, 0, sizeof(POLYGON_CB_ORDER));
memset(&primary->ellipse_sc, 0, sizeof(ELLIPSE_SC_ORDER));
memset(&primary->ellipse_cb, 0, sizeof(ELLIPSE_CB_ORDER));
ZeroMemory(&primary->order_info, sizeof(ORDER_INFO));
ZeroMemory(&primary->dstblt, sizeof(DSTBLT_ORDER));
ZeroMemory(&primary->patblt, sizeof(PATBLT_ORDER));
ZeroMemory(&primary->scrblt, sizeof(SCRBLT_ORDER));
ZeroMemory(&primary->opaque_rect, sizeof(OPAQUE_RECT_ORDER));
ZeroMemory(&primary->draw_nine_grid, sizeof(DRAW_NINE_GRID_ORDER));
ZeroMemory(&primary->multi_dstblt, sizeof(MULTI_DSTBLT_ORDER));
ZeroMemory(&primary->multi_patblt, sizeof(MULTI_PATBLT_ORDER));
ZeroMemory(&primary->multi_scrblt, sizeof(MULTI_SCRBLT_ORDER));
ZeroMemory(&primary->multi_opaque_rect, sizeof(MULTI_OPAQUE_RECT_ORDER));
ZeroMemory(&primary->multi_draw_nine_grid, sizeof(MULTI_DRAW_NINE_GRID_ORDER));
ZeroMemory(&primary->line_to, sizeof(LINE_TO_ORDER));
ZeroMemory(&primary->polyline, sizeof(POLYLINE_ORDER));
ZeroMemory(&primary->memblt, sizeof(MEMBLT_ORDER));
ZeroMemory(&primary->mem3blt, sizeof(MEM3BLT_ORDER));
ZeroMemory(&primary->save_bitmap, sizeof(SAVE_BITMAP_ORDER));
ZeroMemory(&primary->glyph_index, sizeof(GLYPH_INDEX_ORDER));
ZeroMemory(&primary->fast_index, sizeof(FAST_INDEX_ORDER));
ZeroMemory(&primary->fast_glyph, sizeof(FAST_GLYPH_ORDER));
ZeroMemory(&primary->polygon_sc, sizeof(POLYGON_SC_ORDER));
ZeroMemory(&primary->polygon_cb, sizeof(POLYGON_CB_ORDER));
ZeroMemory(&primary->ellipse_sc, sizeof(ELLIPSE_SC_ORDER));
ZeroMemory(&primary->ellipse_cb, sizeof(ELLIPSE_CB_ORDER));
primary->order_info.orderType = ORDER_TYPE_PATBLT;
altsec->switch_surface.bitmapId = SCREEN_BITMAP_SURFACE;
@ -755,9 +755,11 @@ rdpUpdate* update_new(rdpRdp* rdp)
update->SuppressOutput = update_send_suppress_output;
update->queue = Queue_New(TRUE, -1, -1);
update->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) update_thread, update, 0, NULL);
if (update->asynchronous)
{
update->queue = Queue_New(TRUE, -1, -1);
update->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) update_thread, update, 0, NULL);
}
}
return update;
@ -781,9 +783,11 @@ void update_free(rdpUpdate* update)
free(update->altsec);
free(update->window);
CloseHandle(update->thread);
Queue_Free(update->queue);
if (update->asynchronous)
{
CloseHandle(update->thread);
Queue_Free(update->queue);
}
free(update);
}

View File

@ -246,4 +246,38 @@ WINPR_API void BufferPool_Clear(wBufferPool* pool);
WINPR_API wBufferPool* BufferPool_New(BOOL synchronized, int fixedSize, DWORD alignment);
WINPR_API void BufferPool_Free(wBufferPool* pool);
/* Message Queue */
struct _wMessage
{
UINT32 type;
void* context;
void* wParam;
void* lParam;
};
typedef struct _wMessage wMessage;
struct _wMessageQueue
{
int head;
int tail;
int size;
int capacity;
wMessage** array;
HANDLE mutex;
HANDLE event;
};
typedef struct _wMessageQueue wMessageQueue;
WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue);
WINPR_API void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message);
WINPR_API void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam);
WINPR_API wMessage* MessageQueue_Get(wMessageQueue* queue);
WINPR_API wMessage* MessageQueue_Peek(wMessageQueue* queue);
WINPR_API wMessageQueue* MessageQueue_New();
WINPR_API void MessageQueue_Free(wMessageQueue* queue);
#endif /* WINPR_COLLECTIONS_H */

View File

@ -27,7 +27,8 @@ set(${MODULE_PREFIX}_COLLECTIONS_SRCS
collections/ListDictionary.c
collections/KeyValuePair.c
collections/CountdownEvent.c
collections/BufferPool.c)
collections/BufferPool.c
collections/MessageQueue.c)
set(${MODULE_PREFIX}_SRCS
sam.c

View File

@ -0,0 +1,187 @@
/**
* WinPR: Windows Portable Runtime
* Message Queue
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/collections.h>
/**
* Message Queue inspired from Windows:
* http://msdn.microsoft.com/en-us/library/ms632590/
*/
/**
* Properties
*/
/**
* Gets an event which is set when the queue is non-empty
*/
HANDLE MessageQueue_Event(wMessageQueue* queue)
{
return queue->event;
}
/**
* Methods
*/
void MessageQueue_Clear(wMessageQueue* queue)
{
int index;
WaitForSingleObject(queue->mutex, INFINITE);
for (index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity)
{
queue->array[index] = NULL;
}
queue->size = 0;
queue->head = queue->tail = 0;
ReleaseMutex(queue->mutex);
}
void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
{
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size == queue->capacity)
{
int old_capacity;
int new_capacity;
old_capacity = queue->capacity;
new_capacity = queue->capacity * 2;
queue->capacity = new_capacity;
queue->array = (wMessage**) realloc(queue->array, sizeof(wMessage*) * queue->capacity);
ZeroMemory(&(queue->array[old_capacity]), old_capacity * sizeof(wMessage*));
if (queue->tail < (old_capacity - 1))
{
CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage*));
queue->tail += old_capacity;
}
}
queue->array[queue->tail] = message;
queue->tail = (queue->tail + 1) % queue->capacity;
queue->size++;
SetEvent(queue->event);
ReleaseMutex(queue->mutex);
}
void MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam, void* lParam)
{
wMessage* message;
message = (wMessage*) malloc(sizeof(wMessage));
if (message)
{
message->context = context;
message->type = type;
message->wParam = wParam;
message->lParam = lParam;
MessageQueue_Dispatch(queue, message);
}
}
wMessage* MessageQueue_Get(wMessageQueue* queue)
{
wMessage* message = NULL;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
{
message = queue->array[queue->head];
queue->array[queue->head] = NULL;
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
}
if (queue->size < 1)
ResetEvent(queue->event);
ReleaseMutex(queue->mutex);
return message;
}
wMessage* MessageQueue_Peek(wMessageQueue* queue)
{
wMessage* message = NULL;
WaitForSingleObject(queue->mutex, INFINITE);
if (queue->size > 0)
message = queue->array[queue->head];
ReleaseMutex(queue->mutex);
return message;
}
/**
* Construction, Destruction
*/
wMessageQueue* MessageQueue_New()
{
wMessageQueue* queue = NULL;
queue = (wMessageQueue*) malloc(sizeof(wMessageQueue));
if (queue)
{
queue->head = 0;
queue->tail = 0;
queue->size = 0;
queue->capacity = 32;
queue->array = (wMessage**) malloc(sizeof(wMessage*) * queue->capacity);
queue->mutex = CreateMutex(NULL, FALSE, NULL);
queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
}
return queue;
}
void MessageQueue_Free(wMessageQueue* queue)
{
MessageQueue_Clear(queue);
CloseHandle(queue->event);
CloseHandle(queue->mutex);
free(queue->array);
free(queue);
}