Added timeout on blocking send, receive operations

This commit is contained in:
Benoît LeBlanc 2013-12-20 18:22:29 -05:00
parent 44e7d2f36c
commit ad4d5c1ce7
3 changed files with 51 additions and 10 deletions

View File

@ -25,6 +25,8 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <freerdp/utils/tcp.h>
#include <winpr/crt.h> #include <winpr/crt.h>
#include <winpr/print.h> #include <winpr/print.h>
#include <winpr/synch.h> #include <winpr/synch.h>
@ -37,6 +39,8 @@
#include "../rdp.h" #include "../rdp.h"
#define SYNCHRONOUS_TIMEOUT 5000
wStream* rpc_client_fragment_pool_take(rdpRpc* rpc) wStream* rpc_client_fragment_pool_take(rdpRpc* rpc)
{ {
wStream* fragment = NULL; wStream* fragment = NULL;
@ -360,6 +364,7 @@ void rpc_client_call_free(RpcClientCall* clientCall)
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length) int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
{ {
RPC_PDU* pdu; RPC_PDU* pdu;
int status;
pdu = (RPC_PDU*) malloc(sizeof(RPC_PDU)); pdu = (RPC_PDU*) malloc(sizeof(RPC_PDU));
pdu->s = Stream_New(buffer, length); pdu->s = Stream_New(buffer, length);
@ -368,7 +373,13 @@ int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
if (rpc->client->SynchronousSend) if (rpc->client->SynchronousSend)
{ {
WaitForSingleObject(rpc->client->PduSentEvent, INFINITE); status = WaitForSingleObject(rpc->client->PduSentEvent, SYNCHRONOUS_TIMEOUT);
if (status == WAIT_TIMEOUT)
{
fprintf(stderr, "rpc_send_enqueue_pdu: timed out waiting for pdu sent event\n");
return -1;
}
ResetEvent(rpc->client->PduSentEvent); ResetEvent(rpc->client->PduSentEvent);
} }
@ -425,9 +436,16 @@ RPC_PDU* rpc_recv_dequeue_pdu(rdpRpc* rpc)
DWORD dwMilliseconds; DWORD dwMilliseconds;
pdu = NULL; pdu = NULL;
dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0; dwMilliseconds = rpc->client->SynchronousReceive ? SYNCHRONOUS_TIMEOUT : 0;
if (WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds) == WAIT_OBJECT_0) DWORD result = WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds);
if (result == WAIT_TIMEOUT)
{
fprintf(stderr, "rpc_recv_dequeue_pdu: timed out waiting for receive event\n");
return NULL;
}
if (result == WAIT_OBJECT_0)
{ {
pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->ReceiveQueue); pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->ReceiveQueue);
@ -450,11 +468,18 @@ RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc)
{ {
RPC_PDU* pdu; RPC_PDU* pdu;
DWORD dwMilliseconds; DWORD dwMilliseconds;
DWORD result;
pdu = NULL; pdu = NULL;
dwMilliseconds = rpc->client->SynchronousReceive ? INFINITE : 0; dwMilliseconds = rpc->client->SynchronousReceive ? SYNCHRONOUS_TIMEOUT : 0;
if (WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds) == WAIT_OBJECT_0) result = WaitForSingleObject(Queue_Event(rpc->client->ReceiveQueue), dwMilliseconds);
if (result == WAIT_TIMEOUT)
{
return NULL;
}
if (result == WAIT_OBJECT_0)
{ {
pdu = (RPC_PDU*) Queue_Peek(rpc->client->ReceiveQueue); pdu = (RPC_PDU*) Queue_Peek(rpc->client->ReceiveQueue);
return pdu; return pdu;

View File

@ -1486,6 +1486,12 @@ int tsg_read(rdpTsg* tsg, BYTE* data, UINT32 length)
rpc = tsg->rpc; rpc = tsg->rpc;
if (rpc->transport->layer == TRANSPORT_LAYER_CLOSED)
{
fprintf(stderr, "tsg_read error: connection lost\n");
return -1;
}
if (tsg->PendingPdu) if (tsg->PendingPdu)
{ {
CopyLength = (length < tsg->BytesAvailable) ? length : tsg->BytesAvailable; CopyLength = (length < tsg->BytesAvailable) ? length : tsg->BytesAvailable;

View File

@ -553,7 +553,11 @@ int transport_read_layer(rdpTransport* transport, BYTE* data, int bytes)
return status; return status;
if (status < 0) if (status < 0)
{
/* A read error indicates that the peer has dropped the connection */
transport->layer = TRANSPORT_LAYER_CLOSED;
return status; return status;
}
read += status; read += status;
@ -1032,14 +1036,20 @@ static void* transport_client_thread(void* arg)
transport_get_read_handles(transport, (HANDLE*) &handles, &nCount); transport_get_read_handles(transport, (HANDLE*) &handles, &nCount);
status = WaitForMultipleObjects(nCount, handles, FALSE, INFINITE); status = WaitForMultipleObjects(nCount, handles, FALSE, 100);
if (transport->layer == TRANSPORT_LAYER_CLOSED)
{
break;
}
else if (status != WAIT_TIMEOUT)
{
if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0) if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0)
break; break;
if (!freerdp_check_fds(instance)) if (!freerdp_check_fds(instance))
break; break;
} }
}
WLog_Print(transport->log, WLOG_DEBUG, "Terminating transport thread"); WLog_Print(transport->log, WLOG_DEBUG, "Terminating transport thread");