libfreerdp-core: fix locking with TS Gateway

This commit is contained in:
Marc-André Moreau 2013-03-27 20:06:10 -04:00
parent 92114d1d38
commit 8b9ea43d78
8 changed files with 52 additions and 13 deletions

View File

@ -426,6 +426,23 @@ RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
return pdu;
}
RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc)
{
RPC_PDU* pdu;
DWORD dwMilliseconds;
pdu = NULL;
dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0;
if (WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds) == WAIT_OBJECT_0)
{
pdu = (RPC_PDU*) Queue_Peek(rpc->client->ReceiveQueue);
return pdu;
}
return pdu;
}
static void* rpc_client_thread(void* arg)
{
rdpRpc* rpc;

View File

@ -40,6 +40,7 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc);
int rpc_recv_enqueue_pdu(rdpRpc* rpc);
RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc);
RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc);
int rpc_client_new(rdpRpc* rpc);
int rpc_client_start(rdpRpc* rpc);

View File

@ -1399,6 +1399,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
if (tsg->BytesAvailable < 1)
{
tsg->PendingPdu = FALSE;
rpc_recv_dequeue_pdu(rpc);
rpc_client_receive_pool_return(rpc, tsg->pdu);
}
@ -1406,7 +1407,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
}
else
{
tsg->pdu = rpc_recv_dequeue_pdu(rpc);
tsg->pdu = rpc_recv_peek_pdu(rpc);
if (!tsg->pdu)
{
@ -1429,6 +1430,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
if (tsg->BytesAvailable < 1)
{
tsg->PendingPdu = FALSE;
rpc_recv_dequeue_pdu(rpc);
rpc_client_receive_pool_return(rpc, tsg->pdu);
}
@ -1446,6 +1448,8 @@ BOOL tsg_set_blocking_mode(rdpTsg* tsg, BOOL blocking)
tsg->rpc->client->SynchronousSend = TRUE;
tsg->rpc->client->SynchronousReceive = blocking;
tsg->transport->GatewayEvent = Queue_Event(tsg->rpc->client->ReceiveQueue);
return TRUE;
}

View File

@ -599,6 +599,17 @@ void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount)
rfds[*rcount] = pfd;
(*rcount)++;
}
if (transport->GatewayEvent)
{
pfd = GetEventWaitObject(transport->GatewayEvent);
if (pfd)
{
rfds[*rcount] = pfd;
(*rcount)++;
}
}
}
int transport_check_fds(rdpTransport** ptransport)
@ -769,8 +780,6 @@ static void* transport_client_thread(void* arg)
while (1)
{
printf("transport_client_thread\n");
status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE);
if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0)
@ -805,10 +814,11 @@ rdpTransport* transport_new(rdpSettings* settings)
rdpTransport* transport;
transport = (rdpTransport*) malloc(sizeof(rdpTransport));
ZeroMemory(transport, sizeof(rdpTransport));
if (transport != NULL)
{
ZeroMemory(transport, sizeof(rdpTransport));
transport->TcpIn = tcp_new(settings);
transport->settings = settings;

View File

@ -66,6 +66,7 @@ struct rdp_transport
wStream* ReceiveBuffer;
TransportRecv ReceiveCallback;
HANDLE ReceiveEvent;
HANDLE GatewayEvent;
BOOL blocking;
BOOL SplitInputOutput;
wObjectPool* ReceivePool;

View File

@ -61,8 +61,10 @@ struct _wQueue
typedef struct _wQueue wQueue;
WINPR_API int Queue_Count(wQueue* queue);
WINPR_API BOOL Queue_IsSynchronized(wQueue* queue);
WINPR_API HANDLE Queue_SyncRoot(wQueue* queue);
WINPR_API BOOL Queue_Lock(wQueue* queue);
WINPR_API BOOL Queue_Unlock(wQueue* queue);
WINPR_API HANDLE Queue_Event(wQueue* queue);
#define Queue_Object(_queue) (&_queue->object)

View File

@ -64,6 +64,7 @@ HANDLE CreateEventW(LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManualReset,
#ifdef HAVE_EVENTFD_H
event->pipe_fd[0] = eventfd(0, EFD_NONBLOCK);
if (event->pipe_fd[0] < 0)
{
printf("CreateEventW: failed to create event\n");
@ -286,10 +287,13 @@ void* GetEventWaitObject(HANDLE hEvent)
{
#ifndef _WIN32
int fd;
void* obj;
fd = GetEventFileDescriptor(hEvent);
return ((void*) (long) fd);
obj = ((void*) (long) fd);
return obj;
#else
return hEvent;
#endif

View File

@ -44,21 +44,21 @@ int Queue_Count(wQueue* queue)
}
/**
* Gets a value indicating whether access to the Queue is synchronized (thread safe).
* Lock access to the ArrayList
*/
BOOL Queue_IsSynchronized(wQueue* queue)
BOOL Queue_Lock(wQueue* queue)
{
return queue->synchronized;
return (WaitForSingleObject(queue->mutex, INFINITE) == WAIT_OBJECT_0) ? TRUE : FALSE;
}
/**
* Gets an object that can be used to synchronize access to the Queue.
* Unlock access to the ArrayList
*/
HANDLE Queue_SyncRoot(wQueue* queue)
BOOL Queue_Unlock(wQueue* queue)
{
return queue->mutex;
return ReleaseMutex(queue->mutex);
}
/**