libfreerdp-core: make use of message queues for server-side virtual channels

This commit is contained in:
Marc-André Moreau 2014-02-15 18:41:40 -05:00
parent 8a81208059
commit 6a04a7b43e
2 changed files with 54 additions and 98 deletions

View File

@ -82,26 +82,15 @@ static void wts_queue_receive_data(rdpPeerChannel* channel, const BYTE* buffer,
item->buffer = malloc(length);
CopyMemory(item->buffer, buffer, length);
WaitForSingleObject(channel->mutex, INFINITE);
list_enqueue(channel->receiveQueue, item);
ReleaseMutex(channel->mutex);
SetEvent(channel->receiveEvent);
MessageQueue_Post(channel->MsgPipe->In, (void*) channel, 0, (void*) item, NULL);
}
static void wts_queue_send_item(rdpPeerChannel* channel, wts_data_item* item)
{
WTSVirtualChannelManager* vcm;
vcm = channel->vcm;
WTSVirtualChannelManager* vcm = channel->vcm;
item->channelId = channel->channelId;
WaitForSingleObject(vcm->mutex, INFINITE);
list_enqueue(vcm->sendQueue, item);
ReleaseMutex(vcm->mutex);
SetEvent(vcm->sendEvent);
MessageQueue_Post(vcm->MsgPipe->Out, (void*) channel, 0, (void*) item, NULL);
}
static int wts_read_variable_uint(wStream* s, int cbLen, UINT32* val)
@ -162,8 +151,6 @@ static void wts_read_drdynvc_create_response(rdpPeerChannel* channel, wStream* s
DEBUG_DVC("ChannelId %d creation succeeded", channel->channelId);
channel->dvc_open_state = DVC_OPEN_STATE_SUCCEEDED;
}
SetEvent(channel->receiveEvent);
}
static void wts_read_drdynvc_data_first(rdpPeerChannel* channel, wStream* s, int cbLen, UINT32 length)
@ -335,7 +322,7 @@ static void wts_write_drdynvc_create_request(wStream *s, UINT32 ChannelId, const
Stream_Write(s, ChannelName, len);
}
static void WTSProcessChannelData(rdpPeerChannel* channel, int channelId, BYTE* data, int size, int flags, int total_size)
static void WTSProcessChannelData(rdpPeerChannel* channel, UINT16 channelId, BYTE* data, int size, int flags, int totalSize)
{
if (flags & CHANNEL_FLAG_FIRST)
{
@ -347,7 +334,7 @@ static void WTSProcessChannelData(rdpPeerChannel* channel, int channelId, BYTE*
if (flags & CHANNEL_FLAG_LAST)
{
if (Stream_GetPosition(channel->receiveData) != total_size)
if (Stream_GetPosition(channel->receiveData) != totalSize)
{
fprintf(stderr, "WTSProcessChannelData: read error\n");
}
@ -403,9 +390,8 @@ WTSVirtualChannelManager* WTSCreateVirtualChannelManager(freerdp_peer* client)
vcm->client = client;
vcm->rdp = client->context->rdp;
vcm->sendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
vcm->sendQueue = list_new();
vcm->mutex = CreateMutex(NULL, FALSE, NULL);
vcm->MsgPipe = MessagePipe_New();
vcm->dvc_channel_id_seq = 1;
vcm->dvc_channel_list = list_new();
@ -417,10 +403,9 @@ WTSVirtualChannelManager* WTSCreateVirtualChannelManager(freerdp_peer* client)
void WTSDestroyVirtualChannelManager(WTSVirtualChannelManager* vcm)
{
wts_data_item* item;
rdpPeerChannel* channel;
if (vcm != NULL)
if (vcm)
{
while ((channel = (rdpPeerChannel*) list_dequeue(vcm->dvc_channel_list)) != NULL)
{
@ -429,21 +414,14 @@ void WTSDestroyVirtualChannelManager(WTSVirtualChannelManager* vcm)
list_free(vcm->dvc_channel_list);
if (vcm->drdynvc_channel != NULL)
if (vcm->drdynvc_channel)
{
WTSVirtualChannelClose(vcm->drdynvc_channel);
vcm->drdynvc_channel = NULL;
}
CloseHandle(vcm->sendEvent);
MessagePipe_Free(vcm->MsgPipe);
while ((item = (wts_data_item*) list_dequeue(vcm->sendQueue)) != NULL)
{
wts_data_item_free(item);
}
list_free(vcm->sendQueue);
CloseHandle(vcm->mutex);
free(vcm);
}
}
@ -452,7 +430,7 @@ void WTSVirtualChannelManagerGetFileDescriptor(WTSVirtualChannelManager* vcm, vo
{
void* fd;
fd = GetEventWaitObject(vcm->sendEvent);
fd = GetEventWaitObject(MessageQueue_Event(vcm->MsgPipe->Out));
if (fd)
{
@ -460,6 +438,7 @@ void WTSVirtualChannelManagerGetFileDescriptor(WTSVirtualChannelManager* vcm, vo
(*fds_count)++;
}
#if 0
if (vcm->drdynvc_channel)
{
fd = GetEventWaitObject(vcm->drdynvc_channel->receiveEvent);
@ -470,16 +449,18 @@ void WTSVirtualChannelManagerGetFileDescriptor(WTSVirtualChannelManager* vcm, vo
(*fds_count)++;
}
}
#endif
}
BOOL WTSVirtualChannelManagerCheckFileDescriptor(WTSVirtualChannelManager* vcm)
{
BOOL result = TRUE;
wMessage message;
BOOL status = TRUE;
wts_data_item* item;
rdpPeerChannel* channel;
UINT32 dynvc_caps;
if (vcm->drdynvc_state == DRDYNVC_STATE_NONE && vcm->client->activated)
if ((vcm->drdynvc_state == DRDYNVC_STATE_NONE) && vcm->client->activated)
{
/* Initialize drdynvc channel once and only once. */
vcm->drdynvc_state = DRDYNVC_STATE_INITIALIZED;
@ -494,31 +475,27 @@ BOOL WTSVirtualChannelManagerCheckFileDescriptor(WTSVirtualChannelManager* vcm)
}
}
ResetEvent(vcm->sendEvent);
WaitForSingleObject(vcm->mutex, INFINITE);
while ((item = (wts_data_item*) list_dequeue(vcm->sendQueue)) != NULL)
while (MessageQueue_Peek(vcm->MsgPipe->Out, &message, TRUE))
{
item = (wts_data_item*) message.wParam;
if (vcm->client->SendChannelData(vcm->client, item->channelId, item->buffer, item->length) == FALSE)
{
result = FALSE;
status = FALSE;
}
wts_data_item_free(item);
if (result == FALSE)
if (!status)
break;
}
ReleaseMutex(vcm->mutex);
return result;
return status;
}
HANDLE WTSVirtualChannelManagerGetEventHandle(WTSVirtualChannelManager* vcm)
{
return vcm->sendEvent;
return MessageQueue_Event(vcm->MsgPipe->Out);
}
BOOL WTSVirtualChannelManagerIsChannelJoined(WTSVirtualChannelManager* vcm, const char* name)
@ -559,6 +536,7 @@ HANDLE WTSVirtualChannelManagerOpenEx(WTSVirtualChannelManager* vcm, LPSTR pVirt
break;
}
}
if (i >= mcs->channelCount)
{
DEBUG_DVC("Dynamic virtual channel not registered.");
@ -580,14 +558,10 @@ HANDLE WTSVirtualChannelManagerOpenEx(WTSVirtualChannelManager* vcm, LPSTR pVirt
channel->client = client;
channel->channelType = RDP_PEER_CHANNEL_TYPE_DVC;
channel->receiveData = Stream_New(NULL, client->settings->VirtualChannelChunkSize);
channel->receiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
channel->receiveQueue = list_new();
channel->mutex = CreateMutex(NULL, FALSE, NULL);
channel->MsgPipe = MessagePipe_New();
WaitForSingleObject(vcm->mutex, INFINITE);
channel->channelId = vcm->dvc_channel_id_seq++;
list_enqueue(vcm->dvc_channel_list, channel);
ReleaseMutex(vcm->mutex);
s = Stream_New(NULL, 64);
wts_write_drdynvc_create_request(s, channel->channelId, pVirtualName);
@ -633,9 +607,7 @@ HANDLE WTSVirtualChannelManagerOpenEx(WTSVirtualChannelManager* vcm, LPSTR pVirt
channel->index = i;
channel->channelType = RDP_PEER_CHANNEL_TYPE_SVC;
channel->receiveData = Stream_New(NULL, client->settings->VirtualChannelChunkSize);
channel->receiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
channel->receiveQueue = list_new();
channel->mutex = CreateMutex(NULL, FALSE, NULL);
channel->MsgPipe = MessagePipe_New();
mcs->channels[i].handle = channel;
}
@ -649,16 +621,19 @@ BOOL WTSVirtualChannelQuery(HANDLE hChannelHandle, WTS_VIRTUAL_CLASS WtsVirtualC
void* pfd;
BOOL bval;
void* fds[10];
HANDLE hEvent;
int fds_count = 0;
BOOL result = FALSE;
BOOL status = FALSE;
rdpPeerChannel* channel = (rdpPeerChannel*) hChannelHandle;
ZeroMemory(fds, sizeof(fds));
hEvent = MessageQueue_Event(channel->MsgPipe->In);
switch (WtsVirtualClass)
{
case WTSVirtualFileHandle:
pfd = GetEventWaitObject(channel->receiveEvent);
pfd = GetEventWaitObject(hEvent);
if (pfd)
{
@ -669,21 +644,21 @@ BOOL WTSVirtualChannelQuery(HANDLE hChannelHandle, WTS_VIRTUAL_CLASS WtsVirtualC
*ppBuffer = malloc(sizeof(void*));
CopyMemory(*ppBuffer, &fds[0], sizeof(void*));
*pBytesReturned = sizeof(void*);
result = TRUE;
status = TRUE;
break;
case WTSVirtualEventHandle:
*ppBuffer = malloc(sizeof(HANDLE));
CopyMemory(*ppBuffer, &(channel->receiveEvent), sizeof(HANDLE));
CopyMemory(*ppBuffer, &(hEvent), sizeof(HANDLE));
*pBytesReturned = sizeof(void*);
result = TRUE;
status = TRUE;
break;
case WTSVirtualChannelReady:
if (channel->channelType == RDP_PEER_CHANNEL_TYPE_SVC)
{
bval = TRUE;
result = TRUE;
status = TRUE;
}
else
{
@ -691,17 +666,17 @@ BOOL WTSVirtualChannelQuery(HANDLE hChannelHandle, WTS_VIRTUAL_CLASS WtsVirtualC
{
case DVC_OPEN_STATE_NONE:
bval = FALSE;
result = TRUE;
status = TRUE;
break;
case DVC_OPEN_STATE_SUCCEEDED:
bval = TRUE;
result = TRUE;
status = TRUE;
break;
default:
bval = FALSE;
result = FALSE;
status = FALSE;
break;
}
}
@ -714,7 +689,7 @@ BOOL WTSVirtualChannelQuery(HANDLE hChannelHandle, WTS_VIRTUAL_CLASS WtsVirtualC
default:
break;
}
return result;
return status;
}
VOID WTSFreeMemory(PVOID pMemory)
@ -724,14 +699,20 @@ VOID WTSFreeMemory(PVOID pMemory)
BOOL WTSVirtualChannelRead(HANDLE hChannelHandle, ULONG TimeOut, PCHAR Buffer, ULONG BufferSize, PULONG pBytesRead)
{
wMessage message;
wts_data_item* item;
rdpPeerChannel* channel = (rdpPeerChannel*) hChannelHandle;
item = (wts_data_item*) list_peek(channel->receiveQueue);
if (!MessageQueue_Peek(channel->MsgPipe->In, &message, TRUE))
{
*pBytesRead = 0;
return TRUE;
}
item = (wts_data_item*) message.wParam;
if (!item)
{
ResetEvent(channel->receiveEvent);
*pBytesRead = 0;
return TRUE;
}
@ -741,15 +722,6 @@ BOOL WTSVirtualChannelRead(HANDLE hChannelHandle, ULONG TimeOut, PCHAR Buffer, U
if (item->length > BufferSize)
return FALSE;
/* remove the first element (same as what we just peek) */
WaitForSingleObject(channel->mutex, INFINITE);
list_dequeue(channel->receiveQueue);
if (list_size(channel->receiveQueue) == 0)
ResetEvent(channel->receiveEvent);
ReleaseMutex(channel->mutex);
CopyMemory(Buffer, item->buffer, item->length);
wts_data_item_free(item);
@ -780,7 +752,7 @@ BOOL WTSVirtualChannelWrite(HANDLE hChannelHandle, PCHAR Buffer, ULONG Length, P
wts_queue_send_item(channel, item);
}
else if (channel->vcm->drdynvc_channel == NULL || channel->vcm->drdynvc_state != DRDYNVC_STATE_READY)
else if (!channel->vcm->drdynvc_channel || (channel->vcm->drdynvc_state != DRDYNVC_STATE_READY))
{
DEBUG_DVC("drdynvc not ready");
return FALSE;
@ -837,7 +809,6 @@ BOOL WTSVirtualChannelClose(HANDLE hChannelHandle)
{
wStream* s;
rdpMcs* mcs;
wts_data_item* item;
WTSVirtualChannelManager* vcm;
rdpPeerChannel* channel = (rdpPeerChannel*) hChannelHandle;
@ -853,9 +824,7 @@ BOOL WTSVirtualChannelClose(HANDLE hChannelHandle)
}
else
{
WaitForSingleObject(vcm->mutex, INFINITE);
list_remove(vcm->dvc_channel_list, channel);
ReleaseMutex(vcm->mutex);
if (channel->dvc_open_state == DVC_OPEN_STATE_SUCCEEDED)
{
@ -869,22 +838,12 @@ BOOL WTSVirtualChannelClose(HANDLE hChannelHandle)
if (channel->receiveData)
Stream_Free(channel->receiveData, TRUE);
if (channel->receiveEvent)
CloseHandle(channel->receiveEvent);
if (channel->receiveQueue)
if (channel->MsgPipe)
{
while ((item = (wts_data_item*) list_dequeue(channel->receiveQueue)) != NULL)
{
wts_data_item_free(item);
}
list_free(channel->receiveQueue);
MessagePipe_Free(channel->MsgPipe);
channel->MsgPipe = NULL;
}
if (channel->mutex)
CloseHandle(channel->mutex);
free(channel);
}

View File

@ -27,6 +27,7 @@
#include <winpr/synch.h>
#include <winpr/stream.h>
#include <winpr/collections.h>
#include "rdp.h"
@ -68,9 +69,7 @@ struct rdp_peer_channel
UINT16 index;
wStream* receiveData;
HANDLE receiveEvent;
LIST* receiveQueue;
HANDLE mutex;
wMessagePipe* MsgPipe;
BYTE dvc_open_state;
UINT32 dvc_total_length;
@ -81,9 +80,7 @@ struct WTSVirtualChannelManager
rdpRdp* rdp;
freerdp_peer* client;
HANDLE sendEvent;
LIST* sendQueue;
HANDLE mutex;
wMessagePipe* MsgPipe;
rdpPeerChannel* drdynvc_channel;
BYTE drdynvc_state;