libfreerdp-core: add non-blocking read.

This commit is contained in:
Vic Lee 2011-08-01 12:43:53 +08:00
parent 72b48585fb
commit a4ec778f02
7 changed files with 134 additions and 9 deletions

View File

@ -170,7 +170,6 @@ void rdp_send_client_font_list_pdu(rdpRdp* rdp, uint16 flags)
void rdp_recv_server_font_map_pdu(rdpRdp* rdp, STREAM* s, rdpSettings* settings)
{
rdp->activated = True;
rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, False);
}
void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s)
@ -184,6 +183,5 @@ void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s)
stream_seek(s, lengthSourceDescriptor); /* sourceDescriptor (should be 0x00) */
rdp->activated = False;
rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, True);
}

View File

@ -95,6 +95,7 @@ boolean rdp_client_connect(rdpRdp* rdp)
rdp->licensed = True;
rdp_client_activate(rdp);
rdp_set_blocking_mode(rdp, False);
return True;
}

View File

@ -54,10 +54,13 @@ boolean freerdp_get_fds(freerdp* instance, void** rfds, int* rcount, void** wfds
boolean freerdp_check_fds(freerdp* instance)
{
rdpRdp* rdp;
int status;
rdp = (rdpRdp*) instance->rdp;
rdp_recv(rdp);
status = rdp_check_fds(rdp);
if (status < 0)
return False;
return True;
}

View File

@ -344,13 +344,13 @@ void rdp_read_data_pdu(rdpRdp* rdp, STREAM* s)
}
/**
* Receive an RDP packet.\n
* Process an RDP packet.\n
* @param rdp RDP module
* @param s stream
*/
void rdp_recv(rdpRdp* rdp)
void rdp_process_pdu(rdpRdp* rdp, STREAM* s)
{
STREAM* s;
int length;
uint16 pduType;
uint16 pduLength;
@ -359,9 +359,6 @@ void rdp_recv(rdpRdp* rdp)
uint16 sec_flags;
enum DomainMCSPDU MCSPDU;
s = transport_recv_stream_init(rdp->transport, 4096);
transport_read(rdp->transport, s);
MCSPDU = DomainMCSPDU_SendDataIndication;
mcs_read_domain_mcspdu_header(s, &MCSPDU, &length);
@ -421,6 +418,47 @@ void rdp_recv(rdpRdp* rdp)
}
}
/**
* Receive an RDP packet.\n
* @param rdp RDP module
*/
void rdp_recv(rdpRdp* rdp)
{
STREAM* s;
s = transport_recv_stream_init(rdp->transport, 4096);
transport_read(rdp->transport, s);
rdp_process_pdu(rdp, s);
}
static int rdp_recv_callback(rdpTransport* transport, STREAM* s, void* extra)
{
rdpRdp* rdp = (rdpRdp*) extra;
rdp_process_pdu(rdp, s);
return 1;
}
/**
* Set non-blocking mode information.
* @param rdp RDP module
* @param blocking blocking mode
*/
void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking)
{
rdp->transport->recv_callback = rdp_recv_callback;
rdp->transport->recv_extra = rdp;
transport_set_blocking_mode(rdp->transport, blocking);
}
int rdp_check_fds(rdpRdp* rdp)
{
return transport_check_fds(rdp->transport);
}
/**
* Instantiate new RDP module.
* @return new RDP module

View File

@ -236,6 +236,9 @@ void rdp_send_data_pdu(rdpRdp* rdp, STREAM* s, uint16 type, uint16 channel_id);
void rdp_send(rdpRdp* rdp, STREAM* s);
void rdp_recv(rdpRdp* rdp);
void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking);
int rdp_check_fds(rdpRdp* rdp);
rdpRdp* rdp_new();
void rdp_free(rdpRdp* rdp);

View File

@ -135,6 +135,21 @@ int transport_read(rdpTransport* transport, STREAM* s)
return status;
}
static int transport_read_nonblocking(rdpTransport* transport)
{
int status;
stream_check_size(transport->recv_buffer, 4096);
status = transport_read(transport, transport->recv_buffer);
if (status <= 0)
return status;
stream_seek(transport->recv_buffer, status);
return status;
}
int transport_write(rdpTransport* transport, STREAM* s)
{
int status = -1;
@ -149,6 +164,63 @@ int transport_write(rdpTransport* transport, STREAM* s)
int transport_check_fds(rdpTransport* transport)
{
int pos;
int status;
uint8 header;
uint16 length;
STREAM* received;
status = transport_read_nonblocking(transport);
if (status <= 0)
return status;
while ((pos = stream_get_pos(transport->recv_buffer)) > 0)
{
/* Ensure the TPKT or Fast Path header is available. */
if (pos <= 4)
return 0;
stream_set_pos(transport->recv_buffer, 0);
stream_peek_uint8(transport->recv_buffer, header);
if (header == 0x03) /* TPKT */
length = tpkt_read_header(transport->recv_buffer);
else /* TODO: Fast Path */
length = 0;
if (length == 0)
{
printf("transport_check_fds: protocol error, not a TPKT header (%d).\n", header);
return -1;
}
if (pos < length)
{
stream_set_pos(transport->recv_buffer, pos);
return 0; /* Packet is not yet completely received. */
}
/*
* A complete packet has been received. In case there are trailing data
* for the next packet, we copy it to the new receive buffer.
*/
received = transport->recv_buffer;
transport->recv_buffer = stream_new(BUFFER_SIZE);
if (pos > length)
{
stream_set_pos(received, length);
stream_check_size(transport->recv_buffer, pos - length);
stream_copy(transport->recv_buffer, received, pos - length);
}
stream_set_pos(received, 0);
status = transport->recv_callback(transport, received, transport->recv_extra);
stream_free(received);
if (status < 0)
return status;
}
return 0;
}
@ -158,6 +230,12 @@ void transport_init(rdpTransport* transport)
transport->state = TRANSPORT_STATE_NEGO;
}
boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking)
{
transport->blocking = blocking;
return transport->tcp->set_blocking_mode(transport->tcp, blocking);
}
rdpTransport* transport_new(rdpSettings* settings)
{
rdpTransport* transport;
@ -179,6 +257,8 @@ rdpTransport* transport_new(rdpSettings* settings)
/* buffers for blocking read/write */
transport->recv_stream = stream_new(BUFFER_SIZE);
transport->send_stream = stream_new(BUFFER_SIZE);
transport->blocking = True;
}
return transport;

View File

@ -63,6 +63,7 @@ struct rdp_transport
void* recv_extra;
STREAM* recv_buffer;
TransportRecv recv_callback;
boolean blocking;
};
STREAM* transport_recv_stream_init(rdpTransport* transport, int size);
@ -75,6 +76,7 @@ boolean transport_connect_nla(rdpTransport* transport);
int transport_read(rdpTransport* transport, STREAM* s);
int transport_write(rdpTransport* transport, STREAM* s);
int transport_check_fds(rdpTransport* transport);
boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking);
rdpTransport* transport_new(rdpSettings* settings);
void transport_free(rdpTransport* transport);