Merge pull request #1588 from akallabeth/message_queue_clear

Message queue clear
This commit is contained in:
Marc-André Moreau 2013-11-12 09:59:38 -08:00
commit b1bcbea483
12 changed files with 631 additions and 100 deletions

View File

@ -661,7 +661,7 @@ void drive_register_drive_path(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints, char*
drive->files = ListDictionary_New(TRUE);
ListDictionary_Object(drive->files)->fnObjectFree = (OBJECT_FREE_FN) drive_file_free;
drive->IrpQueue = MessageQueue_New();
drive->IrpQueue = MessageQueue_New(NULL);
drive->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) drive_thread_func, drive, CREATE_SUSPENDED, NULL);
pEntryPoints->RegisterDevice(pEntryPoints->devman, (DEVICE*) drive);

View File

@ -327,7 +327,7 @@ int DeviceServiceEntry(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints)
parallel->path = path;
parallel->queue = MessageQueue_New();
parallel->queue = MessageQueue_New(NULL);
pEntryPoints->RegisterDevice(pEntryPoints->devman, (DEVICE*) parallel);

View File

@ -208,6 +208,10 @@ BOOL freerdp_check_fds(freerdp* instance)
int status;
rdpRdp* rdp;
assert(instance);
assert(instance->context);
assert(instance->context->rdp);
rdp = instance->context->rdp;
status = rdp_check_fds(rdp);

View File

@ -486,8 +486,15 @@ int input_process_events(rdpInput* input)
return input_message_queue_process_pending_messages(input);
}
static void input_free_queued_message(void *obj)
{
wMessage *msg = (wMessage*)obj;
input_message_queue_free_message(msg);
}
rdpInput* input_new(rdpRdp* rdp)
{
const wObject cb = { .fnObjectFree = input_free_queued_message };
rdpInput* input;
input = (rdpInput*) malloc(sizeof(rdpInput));
@ -496,7 +503,7 @@ rdpInput* input_new(rdpRdp* rdp)
{
ZeroMemory(input, sizeof(rdpInput));
input->queue = MessageQueue_New();
input->queue = MessageQueue_New(&cb);
}
return input;

File diff suppressed because it is too large Load Diff

View File

@ -123,6 +123,8 @@ struct rdp_update_proxy
};
int update_message_queue_process_message(rdpUpdate* update, wMessage* message);
int update_message_queue_free_message(wMessage* message);
int update_message_queue_process_pending_messages(rdpUpdate* update);
rdpUpdateProxy* update_message_proxy_new(rdpUpdate* update);
@ -148,6 +150,7 @@ struct rdp_input_proxy
};
int input_message_queue_process_message(rdpInput* input, wMessage* message);
int input_message_queue_free_message(wMessage* message);
int input_message_queue_process_pending_messages(rdpInput* input);
rdpInputProxy* input_message_proxy_new(rdpInput* input);

View File

@ -1051,6 +1051,12 @@ void transport_free(rdpTransport* transport)
{
if (transport)
{
if (transport->async)
{
assert(!transport->thread);
assert(!transport->stopEvent);
}
if (transport->ReceiveBuffer)
Stream_Release(transport->ReceiveBuffer);

View File

@ -1544,8 +1544,16 @@ int update_process_messages(rdpUpdate* update)
return update_message_queue_process_pending_messages(update);
}
static void update_free_queued_message(void *obj)
{
wMessage *msg = (wMessage*)obj;
update_message_queue_free_message(msg);
}
rdpUpdate* update_new(rdpRdp* rdp)
{
const wObject cb = { .fnObjectFree = update_free_queued_message };
rdpUpdate* update;
update = (rdpUpdate*) malloc(sizeof(rdpUpdate));
@ -1587,7 +1595,7 @@ rdpUpdate* update_new(rdpRdp* rdp)
update->initialState = TRUE;
update->queue = MessageQueue_New();
update->queue = MessageQueue_New(&cb);
}
return update;

View File

@ -402,6 +402,8 @@ struct _wMessageQueue
wMessage* array;
CRITICAL_SECTION lock;
HANDLE event;
wObject object;
};
typedef struct _wMessageQueue wMessageQueue;
@ -418,7 +420,43 @@ WINPR_API void MessageQueue_PostQuit(wMessageQueue* queue, int nExitCode);
WINPR_API int MessageQueue_Get(wMessageQueue* queue, wMessage* message);
WINPR_API int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove);
WINPR_API wMessageQueue* MessageQueue_New(void);
/*! \brief Clears all elements in a message queue.
*
* \note If dynamically allocated data is part of the messages,
* a custom cleanup handler must be passed in the 'callback'
* argument for MessageQueue_New.
*
* \param queue The queue to clear.
*
* \return 0 in case of success or a error code otherwise.
*/
WINPR_API int MessageQueue_Clear(wMessageQueue *queue);
/*! \brief Creates a new message queue.
* If 'callback' is null, no custom cleanup will be done
* on message queue deallocation.
* If the 'callback' argument contains valid uninit or
* free functions those will be called by
* 'MessageQueue_Clear'.
*
* \param callback a pointer to custom initialization / cleanup functions.
* Can be NULL if not used.
*
* \return A pointer to a newly allocated MessageQueue or NULL.
*/
WINPR_API wMessageQueue* MessageQueue_New(const wObject *callback);
/*! \brief Frees resources allocated by a message queue.
* This function will only free resources allocated
* internally.
*
* \note Empty the queue before calling this function with
* 'MessageQueue_Clear', 'MessageQueue_Get' or
* 'MessageQueue_Peek' to free all resources allocated
* by the message contained.
*
* \param queue A pointer to the queue to be freed.
*/
WINPR_API void MessageQueue_Free(wMessageQueue* queue);
/* Message Pipe */

View File

@ -54,8 +54,8 @@ wMessagePipe* MessagePipe_New()
if (pipe)
{
pipe->In = MessageQueue_New();
pipe->Out = MessageQueue_New();
pipe->In = MessageQueue_New(NULL);
pipe->Out = MessageQueue_New(NULL);
}
return pipe;

View File

@ -178,7 +178,7 @@ int MessageQueue_Peek(wMessageQueue* queue, wMessage* message, BOOL remove)
* Construction, Destruction
*/
wMessageQueue* MessageQueue_New()
wMessageQueue* MessageQueue_New(const wObject *callback)
{
wMessageQueue* queue = NULL;
@ -196,6 +196,11 @@ wMessageQueue* MessageQueue_New()
InitializeCriticalSectionAndSpinCount(&queue->lock, 4000);
queue->event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (callback)
queue->object = *callback;
else
ZeroMemory(&queue->object, sizeof(queue->object));
}
return queue;
@ -209,3 +214,32 @@ void MessageQueue_Free(wMessageQueue* queue)
free(queue->array);
free(queue);
}
int MessageQueue_Clear(wMessageQueue *queue)
{
int status = 0;
EnterCriticalSection(&queue->lock);
while(queue->size > 0)
{
wMessage *msg = &(queue->array[queue->head]);
/* Free resources of message. */
if (queue->object.fnObjectUninit)
queue->object.fnObjectUninit(msg);
if (queue->object.fnObjectFree)
queue->object.fnObjectFree(msg);
ZeroMemory(msg, sizeof(wMessage));
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
}
ResetEvent(queue->event);
LeaveCriticalSection(&queue->lock);
return status;
}

View File

@ -29,7 +29,7 @@ int TestMessageQueue(int argc, char* argv[])
HANDLE thread;
wMessageQueue* queue;
queue = MessageQueue_New();
queue = MessageQueue_New(NULL);
thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) message_queue_consumer_thread, (void*) queue, 0, NULL);