libfreerdp-core: fix TSG thread shutdown and input freeze problem

This commit is contained in:
Marc-André Moreau 2014-12-15 09:42:04 -05:00
parent 544c2f3e45
commit e2f377ae11
7 changed files with 66 additions and 53 deletions

View File

@ -1275,27 +1275,37 @@ void xf_window_free(xfContext* xfc)
void* xf_input_thread(void *arg) void* xf_input_thread(void *arg)
{ {
xfContext* xfc;
DWORD status; DWORD status;
HANDLE event[2]; DWORD nCount;
HANDLE events[2];
XEvent xevent; XEvent xevent;
wMessageQueue *queue;
wMessage msg; wMessage msg;
wMessageQueue *queue;
int pending_status = 1; int pending_status = 1;
int process_status = 1; int process_status = 1;
freerdp *instance = (freerdp*) arg; freerdp* instance = (freerdp*) arg;
assert(NULL != instance); xfContext* xfc = (xfContext*) instance->context;
xfc = (xfContext *) instance->context;
assert(NULL != xfc);
queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE); queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE);
event[0] = MessageQueue_Event(queue);
event[1] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds); nCount = 0;
events[nCount++] = MessageQueue_Event(queue);
events[nCount++] = CreateFileDescriptorEvent(NULL, FALSE, FALSE, xfc->xfds);
while(1) while(1)
{ {
status = WaitForMultipleObjects(2, event, FALSE, INFINITE); status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE);
if(status == WAIT_OBJECT_0 + 1) if (WaitForSingleObject(events[0], 0) == WAIT_OBJECT_0)
{
if (MessageQueue_Peek(queue, &msg, FALSE))
{
if (msg.id == WMQ_QUIT)
break;
}
}
if (WaitForSingleObject(events[1], 0) == WAIT_OBJECT_0)
{ {
do do
{ {
@ -1324,18 +1334,10 @@ void* xf_input_thread(void *arg)
break; break;
} }
else if(status == WAIT_OBJECT_0)
{
if(MessageQueue_Peek(queue, &msg, FALSE))
{
if(msg.id == WMQ_QUIT)
break;
}
}
else
break;
} }
CloseHandle(events[1]);
MessageQueue_PostQuit(queue, 0); MessageQueue_PostQuit(queue, 0);
ExitThread(0); ExitThread(0);
return NULL; return NULL;

View File

@ -320,6 +320,7 @@ BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* buffer, UINT32* offset, UINT32* l
int rpc_out_read(rdpRpc* rpc, BYTE* data, int length) int rpc_out_read(rdpRpc* rpc, BYTE* data, int length)
{ {
int status; int status;
status = BIO_read(rpc->TlsOut->bio, data, length); status = BIO_read(rpc->TlsOut->bio, data, length);
if (status > 0) if (status > 0)
@ -558,8 +559,11 @@ rdpRpc* rpc_new(rdpTransport* transport)
return NULL; return NULL;
rpc->State = RPC_CLIENT_STATE_INITIAL; rpc->State = RPC_CLIENT_STATE_INITIAL;
rpc->transport = transport; rpc->transport = transport;
rpc->settings = transport->settings; rpc->settings = transport->settings;
rpc->context = transport->context;
rpc->SendSeqNum = 0; rpc->SendSeqNum = 0;
rpc->ntlm = ntlm_new(); rpc->ntlm = ntlm_new();
@ -637,7 +641,7 @@ void rpc_free(rdpRpc* rpc)
{ {
if (rpc) if (rpc)
{ {
rpc_client_stop(rpc); rpc_client_free(rpc);
if (rpc->ntlm) if (rpc->ntlm)
{ {

View File

@ -733,6 +733,7 @@ struct rdp_rpc
rdpNtlmHttp* NtlmHttpIn; rdpNtlmHttp* NtlmHttpIn;
rdpNtlmHttp* NtlmHttpOut; rdpNtlmHttp* NtlmHttpOut;
rdpContext* context;
rdpSettings* settings; rdpSettings* settings;
rdpTransport* transport; rdpTransport* transport;

View File

@ -97,8 +97,6 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc)
UINT32 StubLength; UINT32 StubLength;
wStream* fragment; wStream* fragment;
rpcconn_hdr_t* header; rpcconn_hdr_t* header;
freerdp* instance;
instance = (freerdp*)rpc->transport->settings->instance;
if (!rpc->client->pdu) if (!rpc->client->pdu)
rpc->client->pdu = rpc_client_receive_pool_take(rpc); rpc->client->pdu = rpc_client_receive_pool_take(rpc);
@ -166,11 +164,11 @@ int rpc_client_on_fragment_received_event(rdpRpc* rpc)
if ((header->common.call_id == rpc->PipeCallId) && (header->common.pfc_flags & PFC_LAST_FRAG)) if ((header->common.call_id == rpc->PipeCallId) && (header->common.pfc_flags & PFC_LAST_FRAG))
{ {
TerminateEventArgs e; TerminateEventArgs e;
instance->context->rdp->disconnect = TRUE; rpc->context->rdp->disconnect = TRUE;
rpc->transport->tsg->state = TSG_STATE_TUNNEL_CLOSE_PENDING; rpc->transport->tsg->state = TSG_STATE_TUNNEL_CLOSE_PENDING;
EventArgsInit(&e, "freerdp"); EventArgsInit(&e, "freerdp");
e.code = 0; e.code = 0;
PubSub_OnTerminate(instance->context->pubSub, instance->context, &e); PubSub_OnTerminate(rpc->context->pubSub, rpc->context, &e);
} }
rpc_client_fragment_pool_return(rpc, fragment); rpc_client_fragment_pool_return(rpc, fragment);
@ -237,7 +235,7 @@ int rpc_client_on_read_event(rdpRpc* rpc)
while (Stream_GetPosition(rpc->client->RecvFrag) < RPC_COMMON_FIELDS_LENGTH) while (Stream_GetPosition(rpc->client->RecvFrag) < RPC_COMMON_FIELDS_LENGTH)
{ {
status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag), status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag),
RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag)); RPC_COMMON_FIELDS_LENGTH - Stream_GetPosition(rpc->client->RecvFrag));
if (status < 0) if (status < 0)
{ {
@ -267,7 +265,7 @@ int rpc_client_on_read_event(rdpRpc* rpc)
while (Stream_GetPosition(rpc->client->RecvFrag) < header->frag_length) while (Stream_GetPosition(rpc->client->RecvFrag) < header->frag_length)
{ {
status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag), status = rpc_out_read(rpc, Stream_Pointer(rpc->client->RecvFrag),
header->frag_length - Stream_GetPosition(rpc->client->RecvFrag)); header->frag_length - Stream_GetPosition(rpc->client->RecvFrag));
if (status < 0) if (status < 0)
{ {
@ -297,6 +295,9 @@ int rpc_client_on_read_event(rdpRpc* rpc)
if (rpc_client_on_fragment_received_event(rpc) < 0) if (rpc_client_on_fragment_received_event(rpc) < 0)
return -1; return -1;
} }
if (WaitForSingleObject(Queue_Event(rpc->client->SendQueue), 0) == WAIT_OBJECT_0)
break;
} }
return 0; return 0;
@ -395,6 +396,7 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
RpcClientCall* clientCall; RpcClientCall* clientCall;
rpcconn_common_hdr_t* header; rpcconn_common_hdr_t* header;
RpcInChannel* inChannel; RpcInChannel* inChannel;
pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->SendQueue); pdu = (RPC_PDU*) Queue_Dequeue(rpc->client->SendQueue);
if (!pdu) if (!pdu)
@ -479,15 +481,16 @@ RPC_PDU* rpc_recv_peek_pdu(rdpRpc* rpc)
static void* rpc_client_thread(void* arg) static void* rpc_client_thread(void* arg)
{ {
rdpRpc* rpc; int fd;
DWORD status; DWORD status;
DWORD nCount; DWORD nCount;
HANDLE events[3]; HANDLE events[3];
HANDLE ReadEvent; HANDLE ReadEvent;
int fd; rdpRpc* rpc = (rdpRpc*) arg;
rpc = (rdpRpc*) arg;
fd = BIO_get_fd(rpc->TlsOut->bio, NULL); fd = BIO_get_fd(rpc->TlsOut->bio, NULL);
ReadEvent = CreateFileDescriptorEvent(NULL, TRUE, FALSE, fd); ReadEvent = CreateFileDescriptorEvent(NULL, TRUE, FALSE, fd);
nCount = 0; nCount = 0;
events[nCount++] = rpc->client->StopEvent; events[nCount++] = rpc->client->StopEvent;
events[nCount++] = Queue_Event(rpc->client->SendQueue); events[nCount++] = Queue_Event(rpc->client->SendQueue);
@ -500,7 +503,7 @@ static void* rpc_client_thread(void* arg)
*/ */
if (rpc_client_on_read_event(rpc) < 0) if (rpc_client_on_read_event(rpc) < 0)
{ {
WLog_ERR(TAG, "an error occured when treating first packet"); WLog_ERR(TAG, "an error occurred when treating first packet");
goto out; goto out;
} }
@ -624,17 +627,18 @@ int rpc_client_stop(rdpRpc* rpc)
rpc->client->Thread = NULL; rpc->client->Thread = NULL;
} }
return rpc_client_free(rpc); return 0;
} }
int rpc_client_free(rdpRpc* rpc) int rpc_client_free(rdpRpc* rpc)
{ {
RpcClient* client; RpcClient* client = rpc->client;
client = rpc->client;
if (!client) if (!client)
return 0; return 0;
rpc_client_stop(rpc);
if (client->SendQueue) if (client->SendQueue)
Queue_Free(client->SendQueue); Queue_Free(client->SendQueue);
@ -669,5 +673,7 @@ int rpc_client_free(rdpRpc* rpc)
CloseHandle(client->Thread); CloseHandle(client->Thread);
free(client); free(client);
rpc->client = NULL;
return 0; return 0;
} }

View File

@ -1549,9 +1549,9 @@ void tsg_free(rdpTsg* tsg)
{ {
if (tsg) if (tsg)
{ {
rpc_free(tsg->rpc);
free(tsg->Hostname); free(tsg->Hostname);
free(tsg->MachineName); free(tsg->MachineName);
rpc_free(tsg->rpc);
free(tsg); free(tsg);
} }
} }

View File

@ -1108,20 +1108,16 @@ static void* transport_client_thread(void* arg)
DWORD status; DWORD status;
DWORD nCount; DWORD nCount;
HANDLE handles[8]; HANDLE handles[8];
freerdp* instance; rdpTransport* transport = (rdpTransport*) arg;
rdpContext* context; rdpContext* context = transport->context;
rdpTransport* transport; freerdp* instance = context->instance;
transport = (rdpTransport*) arg;
assert(NULL != transport);
assert(NULL != transport->settings);
instance = (freerdp*) transport->settings->instance;
assert(NULL != instance);
context = instance->context;
assert(NULL != instance->context);
WLog_Print(transport->log, WLOG_DEBUG, "Starting transport thread"); WLog_Print(transport->log, WLOG_DEBUG, "Starting transport thread");
nCount = 0; nCount = 0;
handles[nCount++] = transport->stopEvent; handles[nCount++] = transport->stopEvent;
handles[nCount++] = transport->connectedEvent; handles[nCount++] = transport->connectedEvent;
status = WaitForMultipleObjects(nCount, handles, FALSE, INFINITE); status = WaitForMultipleObjects(nCount, handles, FALSE, INFINITE);
if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0) if (WaitForSingleObject(transport->stopEvent, 0) == WAIT_OBJECT_0)
@ -1165,7 +1161,8 @@ static void* transport_client_thread(void* arg)
rdpTransport* transport_new(rdpSettings* settings) rdpTransport* transport_new(rdpSettings* settings)
{ {
rdpTransport* transport; rdpTransport* transport;
transport = (rdpTransport*)calloc(1, sizeof(rdpTransport));
transport = (rdpTransport*) calloc(1, sizeof(rdpTransport));
if (!transport) if (!transport)
return NULL; return NULL;
@ -1182,6 +1179,8 @@ rdpTransport* transport_new(rdpSettings* settings)
goto out_free; goto out_free;
transport->settings = settings; transport->settings = settings;
transport->context = ((freerdp*) settings->instance)->context;
/* a small 0.1ms delay when transport is blocking. */ /* a small 0.1ms delay when transport is blocking. */
transport->SleepInterval = 100; transport->SleepInterval = 100;
transport->ReceivePool = StreamPool_New(TRUE, BUFFER_SIZE); transport->ReceivePool = StreamPool_New(TRUE, BUFFER_SIZE);
@ -1240,6 +1239,12 @@ void transport_free(rdpTransport* transport)
transport_stop(transport); transport_stop(transport);
if (transport->tsg)
{
tsg_free(transport->tsg);
transport->tsg = NULL;
}
if (transport->ReceiveBuffer) if (transport->ReceiveBuffer)
Stream_Release(transport->ReceiveBuffer); Stream_Release(transport->ReceiveBuffer);
@ -1265,12 +1270,6 @@ void transport_free(rdpTransport* transport)
transport->TcpIn = NULL; transport->TcpIn = NULL;
transport->TcpOut = NULL; transport->TcpOut = NULL;
if (transport->tsg)
{
tsg_free(transport->tsg);
transport->tsg = NULL;
}
if (transport->TsgTls) if (transport->TsgTls)
{ {
tls_free(transport->TsgTls); tls_free(transport->TsgTls);

View File

@ -62,6 +62,7 @@ struct rdp_transport
rdpTls* TlsIn; rdpTls* TlsIn;
rdpTls* TlsOut; rdpTls* TlsOut;
rdpTls* TsgTls; rdpTls* TsgTls;
rdpContext* context;
rdpCredssp* credssp; rdpCredssp* credssp;
rdpSettings* settings; rdpSettings* settings;
UINT32 SleepInterval; UINT32 SleepInterval;