libfreerdp-core: improve receiving of TSG data
This commit is contained in:
parent
8aa1143cc6
commit
dc978a967c
@ -345,186 +345,13 @@ int rpc_in_write(rdpRpc* rpc, BYTE* data, int length)
|
||||
return status;
|
||||
}
|
||||
|
||||
int rpc_recv_pdu_fragment(rdpRpc* rpc)
|
||||
{
|
||||
wStream* fragment;
|
||||
DWORD dwMilliseconds;
|
||||
rpcconn_hdr_t* header;
|
||||
|
||||
dwMilliseconds = (rpc->client->SynchronousReceive) ? INFINITE : 0;
|
||||
|
||||
if (WaitForSingleObject(Queue_Event(rpc->client->FragmentQueue), dwMilliseconds) != WAIT_OBJECT_0)
|
||||
{
|
||||
if (dwMilliseconds == INFINITE)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
fragment = Queue_Dequeue(rpc->client->FragmentQueue);
|
||||
|
||||
rpc->FragBuffer = fragment->buffer;
|
||||
header = (rpcconn_hdr_t*) rpc->FragBuffer;
|
||||
|
||||
if ((rpc->PipeCallId) && (header->common.call_id == rpc->PipeCallId))
|
||||
{
|
||||
/* TsProxySetupReceivePipe response! */
|
||||
|
||||
#if 0
|
||||
printf("ignoring TsProxySetupReceivePipe response\n");
|
||||
|
||||
if (rpc->client->SynchronousReceive)
|
||||
return rpc_recv_pdu_fragment(rpc);
|
||||
else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
if (header->common.ptype == PTYPE_RESPONSE)
|
||||
{
|
||||
UINT32 StubOffset;
|
||||
UINT32 StubLength;
|
||||
|
||||
if (!rpc_get_stub_data_info(rpc, rpc->FragBuffer, &StubOffset, &StubLength))
|
||||
{
|
||||
printf("rpc_recv_pdu_fragment: expected stub\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if 1
|
||||
if (StubLength == 4)
|
||||
{
|
||||
printf("Ignoring TsProxySendToServer Response\n");
|
||||
|
||||
if (rpc->client->SynchronousReceive)
|
||||
return rpc_recv_pdu_fragment(rpc);
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
else if (header->common.ptype == PTYPE_RTS)
|
||||
{
|
||||
if (rpc->VirtualConnection->State < VIRTUAL_CONNECTION_STATE_OPENED)
|
||||
return header->common.frag_length;
|
||||
|
||||
printf("Receiving Out-of-Sequence RTS PDU\n");
|
||||
rts_recv_out_of_sequence_pdu(rpc, rpc->FragBuffer, header->common.frag_length);
|
||||
|
||||
return rpc_recv_pdu_fragment(rpc);
|
||||
}
|
||||
else if (header->common.ptype == PTYPE_FAULT)
|
||||
{
|
||||
rpc_recv_fault_pdu(header);
|
||||
return -1;
|
||||
}
|
||||
|
||||
rpc->VirtualConnection->DefaultOutChannel->BytesReceived += header->common.frag_length;
|
||||
rpc->VirtualConnection->DefaultOutChannel->ReceiverAvailableWindow -= header->common.frag_length;
|
||||
|
||||
#if 0
|
||||
printf("BytesReceived: %d ReceiverAvailableWindow: %d ReceiveWindow: %d\n",
|
||||
rpc->VirtualConnection->DefaultOutChannel->BytesReceived,
|
||||
rpc->VirtualConnection->DefaultOutChannel->ReceiverAvailableWindow,
|
||||
rpc->ReceiveWindow);
|
||||
#endif
|
||||
|
||||
if (rpc->VirtualConnection->DefaultOutChannel->ReceiverAvailableWindow < (rpc->ReceiveWindow / 2))
|
||||
{
|
||||
printf("Sending Flow Control Ack PDU\n");
|
||||
rts_send_flow_control_ack_pdu(rpc);
|
||||
}
|
||||
|
||||
#if 0
|
||||
rpc_pdu_header_print((rpcconn_hdr_t*) header);
|
||||
printf("rpc_recv_pdu_fragment: length: %d\n", header->common.frag_length);
|
||||
freerdp_hexdump(rpc->FragBuffer, header->common.frag_length);
|
||||
printf("\n");
|
||||
#endif
|
||||
|
||||
return header->common.frag_length;
|
||||
}
|
||||
|
||||
RPC_PDU* rpc_recv_pdu(rdpRpc* rpc)
|
||||
{
|
||||
int status;
|
||||
UINT32 StubOffset;
|
||||
UINT32 StubLength;
|
||||
rpcconn_hdr_t* header;
|
||||
RPC_PDU* pdu;
|
||||
|
||||
status = rpc_recv_pdu_fragment(rpc);
|
||||
pdu = rpc_recv_dequeue_pdu(rpc);
|
||||
|
||||
if (status <= 0)
|
||||
return NULL;
|
||||
|
||||
header = (rpcconn_hdr_t*) rpc->FragBuffer;
|
||||
|
||||
if (rpc->State < RPC_CLIENT_STATE_CONTEXT_NEGOTIATED)
|
||||
{
|
||||
rpc->pdu->Flags = 0;
|
||||
rpc->pdu->Buffer = rpc->FragBuffer;
|
||||
rpc->pdu->Size = rpc->FragBufferSize;
|
||||
rpc->pdu->Length = status;
|
||||
rpc->pdu->CallId = header->common.call_id;
|
||||
|
||||
return rpc->pdu;
|
||||
}
|
||||
|
||||
if (header->common.ptype != PTYPE_RESPONSE)
|
||||
{
|
||||
printf("rpc_recv_pdu: unexpected ptype 0x%02X\n", header->common.ptype);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!rpc_get_stub_data_info(rpc, rpc->FragBuffer, &StubOffset, &StubLength))
|
||||
{
|
||||
printf("rpc_recv_pdu: expected stub\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (header->response.alloc_hint > rpc->StubBufferSize)
|
||||
{
|
||||
rpc->StubBufferSize = header->response.alloc_hint;
|
||||
rpc->StubBuffer = (BYTE*) realloc(rpc->StubBuffer, rpc->StubBufferSize);
|
||||
}
|
||||
|
||||
if (rpc->StubFragCount == 0)
|
||||
rpc->StubCallId = header->common.call_id;
|
||||
|
||||
if (rpc->StubCallId != header->common.call_id)
|
||||
{
|
||||
printf("invalid call_id: actual: %d, expected: %d, frag_count: %d\n",
|
||||
rpc->StubCallId, header->common.call_id, rpc->StubFragCount);
|
||||
}
|
||||
|
||||
CopyMemory(&rpc->StubBuffer[rpc->StubOffset], &rpc->FragBuffer[StubOffset], StubLength);
|
||||
rpc->StubOffset += StubLength;
|
||||
rpc->StubFragCount++;
|
||||
|
||||
/**
|
||||
* If alloc_hint is set to a nonzero value and a request or a response is fragmented into multiple
|
||||
* PDUs, implementations of these extensions SHOULD set the alloc_hint field in every PDU to be the
|
||||
* combined stub data length of all remaining fragment PDUs.
|
||||
*/
|
||||
|
||||
if ((header->response.alloc_hint == StubLength))
|
||||
{
|
||||
rpc->pdu->CallId = rpc->StubCallId;
|
||||
rpc->StubLength = rpc->StubOffset;
|
||||
|
||||
rpc->StubOffset = 0;
|
||||
rpc->StubFragCount = 0;
|
||||
rpc->StubCallId = 0;
|
||||
|
||||
rpc->pdu->Flags = RPC_PDU_FLAG_STUB;
|
||||
rpc->pdu->Buffer = rpc->StubBuffer;
|
||||
rpc->pdu->Size = rpc->StubBufferSize;
|
||||
rpc->pdu->Length = rpc->StubLength;
|
||||
|
||||
return rpc->pdu;
|
||||
}
|
||||
|
||||
return rpc_recv_pdu(rpc);
|
||||
return pdu;
|
||||
}
|
||||
|
||||
int rpc_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum)
|
||||
@ -638,8 +465,6 @@ BOOL rpc_connect(rdpRpc* rpc)
|
||||
rpc->TlsIn = rpc->transport->TlsIn;
|
||||
rpc->TlsOut = rpc->transport->TlsOut;
|
||||
|
||||
//rpc_client_start(rpc);
|
||||
|
||||
if (!rts_connect(rpc))
|
||||
{
|
||||
printf("rts_connect error!\n");
|
||||
|
@ -724,6 +724,7 @@ struct rpc_client
|
||||
HANDLE Thread;
|
||||
HANDLE StopEvent;
|
||||
|
||||
wQueue* FragmentPool;
|
||||
wQueue* FragmentQueue;
|
||||
|
||||
HANDLE PduSentEvent;
|
||||
|
@ -30,8 +30,159 @@
|
||||
#include <winpr/thread.h>
|
||||
#include <winpr/stream.h>
|
||||
|
||||
#include "rpc_fault.h"
|
||||
|
||||
#include "rpc_client.h"
|
||||
|
||||
wStream* rpc_client_fragment_pool_take(rdpRpc* rpc)
|
||||
{
|
||||
wStream* fragment = NULL;
|
||||
|
||||
if (WaitForSingleObject(Queue_Event(rpc->client->FragmentPool), 0) == WAIT_OBJECT_0)
|
||||
fragment = Queue_Dequeue(rpc->client->FragmentPool);
|
||||
|
||||
if (!fragment)
|
||||
fragment = Stream_New(NULL, rpc->max_recv_frag);
|
||||
|
||||
return fragment;
|
||||
}
|
||||
|
||||
int rpc_client_fragment_pool_return(rdpRpc* rpc, wStream* fragment)
|
||||
{
|
||||
Queue_Enqueue(rpc->client->FragmentPool, fragment);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rpc_client_on_fragment_received_event(rdpRpc* rpc)
|
||||
{
|
||||
BYTE* buffer;
|
||||
UINT32 StubOffset;
|
||||
UINT32 StubLength;
|
||||
wStream* fragment;
|
||||
rpcconn_hdr_t* header;
|
||||
|
||||
fragment = Queue_Dequeue(rpc->client->FragmentQueue);
|
||||
|
||||
buffer = (BYTE*) Stream_Buffer(fragment);
|
||||
header = (rpcconn_hdr_t*) Stream_Buffer(fragment);
|
||||
rpc->FragBuffer = fragment->buffer;
|
||||
|
||||
if (rpc->State < RPC_CLIENT_STATE_CONTEXT_NEGOTIATED)
|
||||
{
|
||||
rpc->pdu->Flags = 0;
|
||||
rpc->pdu->Buffer = Stream_Buffer(fragment);
|
||||
rpc->pdu->Size = Stream_Length(fragment);
|
||||
rpc->pdu->Length = Stream_Length(fragment);
|
||||
rpc->pdu->CallId = header->common.call_id;
|
||||
|
||||
Queue_Enqueue(rpc->ReceiveQueue, rpc->pdu);
|
||||
rpc->pdu = (RPC_PDU*) malloc(sizeof(RPC_PDU));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (header->common.ptype != PTYPE_RESPONSE)
|
||||
{
|
||||
printf("unexpected ptype: %d\n", header->common.ptype);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (header->common.ptype == PTYPE_RESPONSE)
|
||||
{
|
||||
rpc->VirtualConnection->DefaultOutChannel->BytesReceived += header->common.frag_length;
|
||||
rpc->VirtualConnection->DefaultOutChannel->ReceiverAvailableWindow -= header->common.frag_length;
|
||||
|
||||
if (!rpc_get_stub_data_info(rpc, buffer, &StubOffset, &StubLength))
|
||||
{
|
||||
printf("rpc_recv_pdu_fragment: expected stub\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (StubLength == 4)
|
||||
{
|
||||
printf("Ignoring TsProxySendToServer Response\n");
|
||||
rpc_client_fragment_pool_return(rpc, fragment);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
else if (header->common.ptype == PTYPE_RTS)
|
||||
{
|
||||
if (rpc->VirtualConnection->State < VIRTUAL_CONNECTION_STATE_OPENED)
|
||||
return header->common.frag_length;
|
||||
|
||||
printf("Receiving Out-of-Sequence RTS PDU\n");
|
||||
rts_recv_out_of_sequence_pdu(rpc, buffer, header->common.frag_length);
|
||||
|
||||
rpc_client_fragment_pool_return(rpc, fragment);
|
||||
|
||||
return 0;
|
||||
}
|
||||
else if (header->common.ptype == PTYPE_FAULT)
|
||||
{
|
||||
rpc_recv_fault_pdu(header);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (header->response.alloc_hint > rpc->StubBufferSize)
|
||||
{
|
||||
rpc->StubBufferSize = header->response.alloc_hint;
|
||||
rpc->StubBuffer = (BYTE*) realloc(rpc->StubBuffer, rpc->StubBufferSize);
|
||||
}
|
||||
|
||||
if (rpc->StubFragCount == 0)
|
||||
rpc->StubCallId = header->common.call_id;
|
||||
|
||||
if (rpc->StubCallId != header->common.call_id)
|
||||
{
|
||||
printf("invalid call_id: actual: %d, expected: %d, frag_count: %d\n",
|
||||
rpc->StubCallId, header->common.call_id, rpc->StubFragCount);
|
||||
}
|
||||
|
||||
CopyMemory(&rpc->StubBuffer[rpc->StubOffset], &buffer[StubOffset], StubLength);
|
||||
rpc->StubOffset += StubLength;
|
||||
rpc->StubFragCount++;
|
||||
|
||||
rpc_client_fragment_pool_return(rpc, fragment);
|
||||
|
||||
if (rpc->VirtualConnection->DefaultOutChannel->ReceiverAvailableWindow < (rpc->ReceiveWindow / 2))
|
||||
{
|
||||
printf("Sending Flow Control Ack PDU\n");
|
||||
rts_send_flow_control_ack_pdu(rpc);
|
||||
}
|
||||
|
||||
/**
|
||||
* If alloc_hint is set to a nonzero value and a request or a response is fragmented into multiple
|
||||
* PDUs, implementations of these extensions SHOULD set the alloc_hint field in every PDU to be the
|
||||
* combined stub data length of all remaining fragment PDUs.
|
||||
*/
|
||||
|
||||
if ((header->response.alloc_hint == StubLength))
|
||||
{
|
||||
rpc->pdu->CallId = rpc->StubCallId;
|
||||
rpc->StubLength = rpc->StubOffset;
|
||||
|
||||
rpc->StubOffset = 0;
|
||||
rpc->StubFragCount = 0;
|
||||
rpc->StubCallId = 0;
|
||||
|
||||
rpc->pdu->Flags = RPC_PDU_FLAG_STUB;
|
||||
rpc->pdu->Buffer = rpc->StubBuffer;
|
||||
rpc->pdu->Size = rpc->StubBufferSize;
|
||||
rpc->pdu->Length = rpc->StubLength;
|
||||
|
||||
rpc->StubBufferSize = rpc->max_recv_frag;
|
||||
rpc->StubBuffer = (BYTE*) malloc(rpc->StubBufferSize);
|
||||
|
||||
Queue_Enqueue(rpc->ReceiveQueue, rpc->pdu);
|
||||
rpc->pdu = (RPC_PDU*) malloc(sizeof(RPC_PDU));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rpc_client_on_read_event(rdpRpc* rpc)
|
||||
{
|
||||
int position;
|
||||
@ -97,7 +248,9 @@ int rpc_client_on_read_event(rdpRpc* rpc)
|
||||
Stream_SetPosition(rpc->RecvFrag, 0);
|
||||
|
||||
Queue_Enqueue(rpc->client->FragmentQueue, rpc->RecvFrag);
|
||||
rpc->RecvFrag = Stream_New(NULL, rpc->max_recv_frag);
|
||||
rpc->RecvFrag = rpc_client_fragment_pool_take(rpc);
|
||||
|
||||
rpc_client_on_fragment_received_event(rpc);
|
||||
}
|
||||
|
||||
return status;
|
||||
@ -212,33 +365,6 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
|
||||
return status;
|
||||
}
|
||||
|
||||
int rpc_recv_enqueue_pdu(rdpRpc* rpc)
|
||||
{
|
||||
RPC_PDU* pdu;
|
||||
|
||||
pdu = rpc_recv_pdu(rpc);
|
||||
|
||||
if (!pdu)
|
||||
return 0;
|
||||
|
||||
rpc->pdu = (RPC_PDU*) malloc(sizeof(RPC_PDU));
|
||||
|
||||
if (pdu->Flags & RPC_PDU_FLAG_STUB)
|
||||
{
|
||||
rpc->StubBufferSize = rpc->max_recv_frag;
|
||||
rpc->StubBuffer = (BYTE*) malloc(rpc->StubBufferSize);
|
||||
}
|
||||
else
|
||||
{
|
||||
rpc->FragBufferSize = rpc->max_recv_frag;
|
||||
rpc->FragBuffer = (BYTE*) malloc(rpc->FragBufferSize);
|
||||
}
|
||||
|
||||
Queue_Enqueue(rpc->ReceiveQueue, pdu);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
|
||||
{
|
||||
RPC_PDU* pdu;
|
||||
@ -247,9 +373,6 @@ RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
|
||||
pdu = NULL;
|
||||
dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0;
|
||||
|
||||
if (rpc->client->SynchronousReceive)
|
||||
rpc_recv_enqueue_pdu(rpc);
|
||||
|
||||
if (WaitForSingleObject(Queue_Event(rpc->ReceiveQueue), dwMilliseconds) == WAIT_OBJECT_0)
|
||||
{
|
||||
pdu = (RPC_PDU*) Queue_Dequeue(rpc->ReceiveQueue);
|
||||
@ -288,9 +411,6 @@ static void* rpc_client_thread(void* arg)
|
||||
if (WaitForSingleObject(ReadEvent, 0) == WAIT_OBJECT_0)
|
||||
{
|
||||
rpc_client_on_read_event(rpc);
|
||||
|
||||
if (!rpc->client->SynchronousReceive)
|
||||
rpc_recv_enqueue_pdu(rpc);
|
||||
}
|
||||
|
||||
if (WaitForSingleObject(Queue_Event(rpc->SendQueue), 0) == WAIT_OBJECT_0)
|
||||
@ -315,6 +435,7 @@ int rpc_client_new(rdpRpc* rpc)
|
||||
rpc->client->StopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
|
||||
rpc->client->PduSentEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
|
||||
|
||||
rpc->client->FragmentPool = Queue_New(TRUE, -1, -1);
|
||||
rpc->client->FragmentQueue = Queue_New(TRUE, -1, -1);
|
||||
|
||||
return 0;
|
||||
|
@ -24,6 +24,9 @@
|
||||
|
||||
#include <winpr/interlocked.h>
|
||||
|
||||
wStream* rpc_client_fragment_pool_take(rdpRpc* rpc);
|
||||
int rpc_client_fragment_pool_return(rdpRpc* rpc, wStream* fragment);
|
||||
|
||||
RpcClientCall* rpc_client_call_find_by_id(rdpRpc* rpc, UINT32 CallId);
|
||||
|
||||
RpcClientCall* rpc_client_call_new(UINT32 CallId, UINT32 OpNum);
|
||||
|
@ -1103,7 +1103,6 @@ BOOL tsg_connect(rdpTsg* tsg, const char* hostname, UINT16 port)
|
||||
int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
|
||||
{
|
||||
int CopyLength;
|
||||
rpcconn_hdr_t* header;
|
||||
rdpRpc* rpc = tsg->rpc;
|
||||
|
||||
DEBUG_TSG("tsg_read: %d, pending: %d", length, tsg->PendingPdu);
|
||||
|
Loading…
x
Reference in New Issue
Block a user