Implemented Async TCP connect with abort event

* Implemented unified freerdp_tcp_connect_timeout with connect abort.
* Implemented unified freerdp_tcp_connect_multi with connect abort.
* Added connect abort to freerdp_tcp_connect.
* Added freerdp_abort_connect and abortEvent.
This commit is contained in:
Armin Novak 2015-07-03 11:49:40 +02:00
parent 223e0c7900
commit e6c23cb534
7 changed files with 112 additions and 270 deletions

View File

@ -134,7 +134,8 @@ struct rdp_context
ALIGN64 rdpMetrics* metrics; /* 41 */
ALIGN64 rdpCodecs* codecs; /* 42 */
ALIGN64 rdpAutoDetect* autodetect; /* 43 */
UINT64 paddingC[64 - 44]; /* 44 */
ALIGN64 HANDLE abortEvent; /* 44 */
UINT64 paddingC[64 - 45]; /* 45 */
UINT64 paddingD[96 - 64]; /* 64 */
UINT64 paddingE[128 - 96]; /* 96 */
@ -248,6 +249,7 @@ FREERDP_API BOOL freerdp_context_new(freerdp* instance);
FREERDP_API void freerdp_context_free(freerdp* instance);
FREERDP_API BOOL freerdp_connect(freerdp* instance);
FREERDP_API BOOL freerdp_abort_connect(freerdp* instance);
FREERDP_API BOOL freerdp_shall_disconnect(freerdp* instance);
FREERDP_API BOOL freerdp_disconnect(freerdp* instance);
FREERDP_API BOOL freerdp_reconnect(freerdp* instance);

View File

@ -181,6 +181,14 @@ freerdp_connect_finally:
return status;
}
BOOL freerdp_abort_connect(freerdp* instance)
{
if (!instance || !instance->context)
return FALSE;
return SetEvent(instance->context->abortEvent);
}
BOOL freerdp_get_fds(freerdp* instance, void** rfds, int* rcount, void** wfds, int* wcount)
{
rdpRdp* rdp = instance->context->rdp;
@ -526,6 +534,10 @@ BOOL freerdp_context_new(freerdp* instance)
update_register_client_callbacks(rdp->update);
instance->context->abortEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!instance->context->abortEvent)
goto out_error_abort_event;
IFCALLRET(instance->ContextNew, ret, instance, instance->context);
if (ret)
@ -576,6 +588,9 @@ void freerdp_context_free(freerdp* instance)
CloseHandle(instance->context->channelErrorEvent);
free(instance->context->errorDescription);
CloseHandle(instance->context->abortEvent);
instance->context->abortEvent = NULL;
free(instance->context);
instance->context = NULL;

View File

@ -899,7 +899,8 @@ BOOL rdg_tls_out_connect(rdpRdg* rdg, const char* hostname, UINT16 port, int tim
assert(hostname != NULL);
sockfd = freerdp_tcp_connect(settings, settings->GatewayHostname, settings->GatewayPort, timeout);
sockfd = freerdp_tcp_connect(rdg->context, settings, settings->GatewayHostname,
settings->GatewayPort, timeout);
if (sockfd < 1)
{
@ -955,7 +956,8 @@ BOOL rdg_tls_in_connect(rdpRdg* rdg, const char* hostname, UINT16 port, int time
assert(hostname != NULL);
sockfd = freerdp_tcp_connect(settings, settings->GatewayHostname, settings->GatewayPort, timeout);
sockfd = freerdp_tcp_connect(rdg->context, settings, settings->GatewayHostname,
settings->GatewayPort, timeout);
if (sockfd < 1)
return FALSE;

View File

@ -760,7 +760,8 @@ int rpc_channel_tls_connect(RpcChannel* channel, int timeout)
rdpContext* context = rpc->context;
rdpSettings* settings = context->settings;
sockfd = freerdp_tcp_connect(settings, settings->GatewayHostname, settings->GatewayPort, timeout);
sockfd = freerdp_tcp_connect(context, settings, settings->GatewayHostname,
settings->GatewayPort, timeout);
if (sockfd < 1)
return -1;

View File

@ -359,7 +359,6 @@ static int transport_bio_simple_init(BIO* bio, SOCKET socket, int shutdown)
bio->flags = BIO_FLAGS_SHOULD_RETRY;
bio->init = 1;
#ifdef _WIN32
ptr->hEvent = WSACreateEvent();
if (!ptr->hEvent)
@ -367,12 +366,6 @@ static int transport_bio_simple_init(BIO* bio, SOCKET socket, int shutdown)
/* WSAEventSelect automatically sets the socket in non-blocking mode */
WSAEventSelect(ptr->socket, ptr->hEvent, FD_READ | FD_WRITE | FD_CLOSE);
#else
ptr->hEvent = CreateFileDescriptorEvent(NULL, FALSE, FALSE, (int) ptr->socket, WINPR_FD_READ);
if (!ptr->hEvent)
return 0;
#endif
return 1;
}
@ -693,7 +686,7 @@ char* freerdp_tcp_get_ip_address(int sockfd)
return _strdup(ipAddress);
}
int freerdp_uds_connect(const char* path)
static int freerdp_uds_connect(const char* path)
{
#ifndef _WIN32
int status;
@ -744,265 +737,78 @@ BOOL freerdp_tcp_resolve_hostname(const char* hostname)
return TRUE;
}
BOOL freerdp_tcp_connect_timeout(int sockfd, struct sockaddr* addr, socklen_t addrlen, int timeout)
static BOOL freerdp_tcp_connect_timeout(rdpContext* context, int sockfd,
struct sockaddr* addr,
socklen_t addrlen, int timeout)
{
int status;
HANDLE handles[2];
int status = 0;
int count = 0;
DWORD tout = (timeout) ? timeout * 1000 : INFINITE;
#ifndef _WIN32
int flags;
fd_set cfds;
socklen_t optlen;
struct timeval tv;
/* set socket in non-blocking mode */
flags = fcntl(sockfd, F_GETFL);
if (flags < 0)
handles[count] = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!handles[count])
return FALSE;
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
/* non-blocking tcp connect */
status = connect(sockfd, addr, addrlen);
if (status >= 0)
return TRUE; /* connection success */
if (errno != EINPROGRESS)
status = WSAEventSelect(sockfd, handles[count++], FD_READ | FD_WRITE | FD_CONNECT | FD_CLOSE);
if (status < 0)
{
WLog_ERR(TAG, "WSAEventSelect failed with %lX", WSAGetLastError());
return FALSE;
}
FD_ZERO(&cfds);
FD_SET(sockfd, &cfds);
handles[count++] = context->abortEvent;
tv.tv_sec = timeout;
tv.tv_usec = 0;
status = _select(sockfd + 1, NULL, &cfds, NULL, &tv);
if (status != 1)
return FALSE; /* connection timeout or error */
status = 0;
optlen = sizeof(status);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (void*) &status, &optlen) < 0)
status = _connect(sockfd, addr, addrlen);
if (status < 0)
{
status = WSAGetLastError();
switch(status)
{
case WSAEINPROGRESS:
case WSAEWOULDBLOCK:
break;
default:
return FALSE;
}
}
if (status != 0)
return FALSE;
/* set socket in blocking mode */
flags = fcntl(sockfd, F_GETFL);
if (flags < 0)
return FALSE;
fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK);
#else
status = connect(sockfd, addr, addrlen);
if (status >= 0)
return TRUE;
status = WaitForMultipleObjects(count, handles, FALSE, tout);
if (WAIT_OBJECT_0 != status)
{
if (status == WAIT_OBJECT_0 + 1)
freerdp_set_last_error(context, FREERDP_ERROR_CONNECT_CANCELLED);
return FALSE;
#endif
}
status = recv(sockfd, NULL, 0, 0);
if (status == SOCKET_ERROR)
{
if (WSAGetLastError() == WSAECONNRESET)
return FALSE;
}
status = WSAEventSelect(sockfd, handles[0], 0);
CloseHandle(handles[0]);
if (status < 0)
{
WLog_ERR(TAG, "WSAEventSelect failed with %lX", WSAGetLastError());
return FALSE;
}
return TRUE;
}
#ifndef _WIN32
int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int port, int timeout)
static int freerdp_tcp_connect_multi(rdpContext* context, char** hostnames,
UINT32* ports, int count, int port,
int timeout)
{
int index;
int sindex;
int status;
int flags;
int maxfds;
fd_set cfds;
int sockfd = -1;
int* sockfds;
char port_str[16];
socklen_t optlen;
struct timeval tv;
struct addrinfo hints;
struct addrinfo* addr;
struct addrinfo* result;
struct addrinfo** addrs;
struct addrinfo** results;
sindex = -1;
sprintf_s(port_str, sizeof(port_str) - 1, "%u", port);
sockfds = (int*) calloc(count, sizeof(int));
addrs = (struct addrinfo**) calloc(count, sizeof(struct addrinfo*));
results = (struct addrinfo**) calloc(count, sizeof(struct addrinfo*));
for (index = 0; index < count; index++)
{
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
if (ports)
sprintf_s(port_str, sizeof(port_str) - 1, "%u", ports[index]);
status = getaddrinfo(hostnames[index], port_str, &hints, &result);
if (status)
{
continue;
}
addr = result;
if ((addr->ai_family == AF_INET6) && (addr->ai_next != 0))
{
while ((addr = addr->ai_next))
{
if (addr->ai_family == AF_INET)
break;
}
if (!addr)
addr = result;
}
sockfds[index] = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
if (sockfds[index] < 0)
{
freeaddrinfo(result);
sockfds[index] = 0;
continue;
}
addrs[index] = addr;
results[index] = result;
}
maxfds = 0;
FD_ZERO(&cfds);
for (index = 0; index < count; index++)
{
if (!sockfds[index])
continue;
sockfd = sockfds[index];
addr = addrs[index];
/* set socket in non-blocking mode */
flags = fcntl(sockfd, F_GETFL);
if (flags < 0)
{
sockfds[index] = 0;
continue;
}
fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);
/* non-blocking tcp connect */
status = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
if (status >= 0)
{
/* connection success */
break;
}
if (errno != EINPROGRESS)
{
sockfds[index] = 0;
continue;
}
FD_SET(sockfd, &cfds);
if (sockfd > maxfds)
maxfds = sockfd;
}
tv.tv_sec = timeout;
tv.tv_usec = 0;
status = _select(maxfds + 1, NULL, &cfds, NULL, &tv);
for (index = 0; index < count; index++)
{
if (!sockfds[index])
continue;
sockfd = sockfds[index];
if (FD_ISSET(sockfd, &cfds))
{
status = 0;
optlen = sizeof(status);
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (void*) &status, &optlen) < 0)
{
sockfds[index] = 0;
continue;
}
if (status != 0)
{
sockfds[index] = 0;
continue;
}
/* set socket in blocking mode */
flags = fcntl(sockfd, F_GETFL);
if (flags < 0)
{
sockfds[index] = 0;
continue;
}
fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK);
sindex = index;
break;
}
}
if (sindex >= 0)
{
sockfd = sockfds[sindex];
}
for (index = 0; index < count; index++)
{
if (results[index])
freeaddrinfo(results[index]);
}
free(addrs);
free(results);
free(sockfds);
return sockfd;
}
#else
int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int port, int timeout)
{
int index;
int sindex;
int status;
SOCKET sockfd;
SOCKET sockfd = -1;
SOCKET* sockfds;
HANDLE* events;
DWORD waitStatus;
@ -1013,12 +819,10 @@ int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int po
struct addrinfo** addrs;
struct addrinfo** results;
sindex = -1;
sprintf_s(port_str, sizeof(port_str) - 1, "%u", port);
sockfds = (SOCKET*) calloc(count, sizeof(SOCKET));
events = (HANDLE*) calloc(count, sizeof(HANDLE));
events = (HANDLE*) calloc(count + 1, sizeof(HANDLE));
addrs = (struct addrinfo**) calloc(count, sizeof(struct addrinfo*));
results = (struct addrinfo**) calloc(count, sizeof(struct addrinfo*));
@ -1097,7 +901,9 @@ int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int po
}
}
waitStatus = WaitForMultipleObjects(count, events, FALSE, timeout * 1000);
events[count] = context->abortEvent;
waitStatus = WaitForMultipleObjects(count + 1, events, FALSE, timeout * 1000);
sindex = waitStatus - WAIT_OBJECT_0;
@ -1113,11 +919,14 @@ int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int po
WSAEventSelect(sockfd, NULL, 0);
}
if (sindex >= 0)
if ((sindex >= 0) && (sindex < count))
{
sockfd = sockfds[sindex];
}
if (sindex == count)
freerdp_set_last_error(context, FREERDP_ERROR_CONNECT_CANCELLED);
for (index = 0; index < count; index++)
{
if (results[index])
@ -1132,8 +941,6 @@ int freerdp_tcp_connect_multi(char** hostnames, UINT32* ports, int count, int po
return sockfd;
}
#endif
BOOL freerdp_tcp_set_keep_alive_mode(int sockfd)
{
#ifndef _WIN32
@ -1201,7 +1008,8 @@ BOOL freerdp_tcp_set_keep_alive_mode(int sockfd)
return TRUE;
}
int freerdp_tcp_connect(rdpSettings* settings, const char* hostname, int port, int timeout)
int freerdp_tcp_connect(rdpContext* context, rdpSettings* settings,
const char* hostname, int port, int timeout)
{
int status;
int sockfd;
@ -1232,8 +1040,12 @@ int freerdp_tcp_connect(rdpSettings* settings, const char* hostname, int port, i
{
if (settings->TargetNetAddressCount > 0)
{
sockfd = freerdp_tcp_connect_multi(settings->TargetNetAddresses,
settings->TargetNetPorts, settings->TargetNetAddressCount, port, timeout);
sockfd = freerdp_tcp_connect_multi(
context,
settings->TargetNetAddresses,
settings->TargetNetPorts,
settings->TargetNetAddressCount,
port, timeout);
}
}
}
@ -1281,7 +1093,8 @@ int freerdp_tcp_connect(rdpSettings* settings, const char* hostname, int port, i
return -1;
}
if (!freerdp_tcp_connect_timeout(sockfd, addr->ai_addr, addr->ai_addrlen, timeout))
if (!freerdp_tcp_connect_timeout(context, sockfd, addr->ai_addr,
addr->ai_addrlen, timeout))
{
freeaddrinfo(result);
close(sockfd);
@ -1340,5 +1153,11 @@ int freerdp_tcp_connect(rdpSettings* settings, const char* hostname, int port, i
}
}
if (WaitForSingleObject(context->abortEvent, 0) == WAIT_OBJECT_0)
{
close(sockfd);
return -1;
}
return sockfd;
}

View File

@ -25,6 +25,7 @@
#include <freerdp/types.h>
#include <freerdp/settings.h>
#include <freerdp/freerdp.h>
#include <winpr/crt.h>
#include <winpr/synch.h>
@ -60,6 +61,7 @@
BIO_METHOD* BIO_s_simple_socket(void);
BIO_METHOD* BIO_s_buffered_socket(void);
int freerdp_tcp_connect(rdpSettings* settings, const char* hostname, int port, int timeout);
int freerdp_tcp_connect(rdpContext* context, rdpSettings* settings,
const char* hostname, int port, int timeout);
#endif /* __TCP_H */

View File

@ -204,6 +204,7 @@ BOOL transport_connect(rdpTransport* transport, const char* hostname, UINT16 por
int sockfd;
BOOL status = FALSE;
rdpSettings* settings = transport->settings;
rdpContext* context = transport->context;
transport->async = settings->AsyncTransport;
@ -256,7 +257,7 @@ BOOL transport_connect(rdpTransport* transport, const char* hostname, UINT16 por
}
else
{
sockfd = freerdp_tcp_connect(settings, hostname, port, timeout);
sockfd = freerdp_tcp_connect(context, settings, hostname, port, timeout);
if (sockfd < 1)
return FALSE;