libfreerdp-core: refactor RPC receiving as a synchronized queue

This commit is contained in:
Marc-André Moreau 2012-11-28 22:03:18 -05:00
parent 3634a1bbf1
commit dde2e60a56
7 changed files with 35 additions and 21 deletions

View File

@ -738,6 +738,7 @@ rdpRpc* rpc_new(rdpTransport* transport)
rpc_client_new(rpc);
rpc->client->SynchronousSend = TRUE;
rpc->client->SynchronousReceive = TRUE;
}
return rpc;

View File

@ -792,7 +792,6 @@ int rpc_in_write(rdpRpc* rpc, BYTE* data, int length);
BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* header, UINT32* offset, UINT32* length);
int rpc_recv_pdu_header(rdpRpc* rpc, BYTE* header);
int rpc_recv_pdu_fragment(rdpRpc* rpc);
RPC_PDU* rpc_recv_pdu(rdpRpc* rpc);
int rpc_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum);

View File

@ -27,6 +27,8 @@
#include <winpr/crt.h>
#include "rpc_client.h"
#include "rpc_bind.h"
/**
@ -217,7 +219,7 @@ int rpc_recv_bind_ack_pdu(rdpRpc* rpc)
BYTE* auth_data;
rpcconn_hdr_t* header;
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return -1;

View File

@ -113,21 +113,27 @@ int rpc_recv_enqueue_pdu(rdpRpc* rpc)
InterlockedPushEntrySList(rpc->ReceiveQueue, &(pdu->ItemEntry));
ReleaseSemaphore(rpc->client->ReceiveSemaphore, 1, NULL);
if (rpc->client->SynchronousReceive)
{
WaitForSingleObject(rpc->client->PduReceivedEvent, INFINITE);
ResetEvent(rpc->client->PduReceivedEvent);
}
return 0;
}
int rpc_recv_dequeue_pdu(rdpRpc* rpc)
RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
{
if (rpc->client->SynchronousReceive)
SetEvent(rpc->client->PduReceivedEvent);
RPC_PDU* pdu;
DWORD dwMilliseconds;
return 0;
pdu = NULL;
dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0;
if (rpc->client->SynchronousReceive)
rpc_recv_enqueue_pdu(rpc);
if (WaitForSingleObject(rpc->client->ReceiveSemaphore, dwMilliseconds) == WAIT_OBJECT_0)
{
pdu = (RPC_PDU*) InterlockedPopEntrySList(rpc->ReceiveQueue);
return pdu;
}
return pdu;
}
static void* rpc_client_thread(void* arg)
@ -158,7 +164,8 @@ static void* rpc_client_thread(void* arg)
if (WaitForSingleObject(ReadEvent, 0) == WAIT_OBJECT_0)
{
if (!rpc->client->SynchronousReceive)
rpc_recv_enqueue_pdu(rpc);
}
rpc_send_dequeue_pdu(rpc);

View File

@ -27,6 +27,9 @@
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length);
int rpc_send_dequeue_pdu(rdpRpc* rpc);
int rpc_recv_enqueue_pdu(rdpRpc* rpc);
RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc);
int rpc_client_new(rdpRpc* rpc);
int rpc_client_start(rdpRpc* rpc);

View File

@ -24,6 +24,7 @@
#include <winpr/crt.h>
#include "ncacn_http.h"
#include "rpc_client.h"
#include "rts.h"
@ -56,7 +57,6 @@
BOOL rts_connect(rdpRpc* rpc)
{
int status;
RPC_PDU* pdu;
rpcconn_rts_hdr_t* rts;
HttpResponse* http_response;
@ -174,7 +174,7 @@ BOOL rts_connect(rdpRpc* rpc)
*
*/
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return FALSE;
@ -213,7 +213,7 @@ BOOL rts_connect(rdpRpc* rpc)
*
*/
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return FALSE;

View File

@ -35,6 +35,8 @@
#include <winpr/ndr.h>
#include <winpr/error.h>
#include "rpc_client.h"
#include "tsg.h"
/**
@ -212,7 +214,7 @@ BOOL TsProxyCreateTunnelReadResponse(rdpTsg* tsg)
PTSG_PACKET_CAPS_RESPONSE packetCapsResponse;
PTSG_PACKET_QUARENC_RESPONSE packetQuarEncResponse;
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return FALSE;
@ -560,7 +562,7 @@ BOOL TsProxyAuthorizeTunnelReadResponse(rdpTsg* tsg)
rdpRpc* rpc = tsg->rpc;
PTSG_PACKET_RESPONSE packetResponse;
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return FALSE;
@ -793,7 +795,7 @@ BOOL TsProxyCreateChannelReadResponse(rdpTsg* tsg)
UINT32 length;
rdpRpc* rpc = tsg->rpc;
pdu = rpc_recv_pdu(rpc);
pdu = rpc_recv_dequeue_pdu(rpc);
if (!pdu)
return FALSE;
@ -1104,7 +1106,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
}
else
{
tsg->pdu = rpc_recv_pdu(rpc);
tsg->pdu = rpc_recv_dequeue_pdu(rpc);
if ((tsg->pdu->Flags & RPC_PDU_FLAG_STUB) && (tsg->pdu->Length == 4))
{
@ -1114,7 +1116,7 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
}
tsg->PendingPdu = TRUE;
tsg->BytesAvailable = rpc->pdu->Length;
tsg->BytesAvailable = tsg->pdu->Length;
tsg->BytesRead = 0;
CopyLength = (tsg->BytesAvailable > length) ? length : tsg->BytesAvailable;