Moved update thread from client to library.

This commit is contained in:
Armin Novak 2014-07-14 19:27:50 +02:00
parent 29cb8680ce
commit 689902c995
9 changed files with 49 additions and 149 deletions

View File

@ -270,40 +270,6 @@ static void android_process_channel_event(rdpChannels* channels, freerdp* instan
}
}
static void *jni_update_thread(void *arg)
{
int status;
wMessage message;
wMessageQueue* queue;
freerdp* instance = (freerdp*) arg;
assert( NULL != instance);
DEBUG_ANDROID("Start.");
status = 1;
queue = freerdp_get_message_queue(instance, FREERDP_UPDATE_MESSAGE_QUEUE);
while (MessageQueue_Wait(queue))
{
while (MessageQueue_Peek(queue, &message, TRUE))
{
status = freerdp_message_queue_process_message(instance, FREERDP_UPDATE_MESSAGE_QUEUE, &message);
if (!status)
break;
}
if (!status)
break;
}
DEBUG_ANDROID("Quit.");
ExitThread(0);
return NULL;
}
static void* jni_input_thread(void* arg)
{
HANDLE event[3];
@ -394,11 +360,9 @@ static int android_freerdp_run(freerdp* instance)
const rdpSettings* settings = instance->context->settings;
HANDLE update_thread;
HANDLE input_thread;
HANDLE channels_thread;
BOOL async_update = settings->AsyncUpdate;
BOOL async_input = settings->AsyncInput;
BOOL async_channels = settings->AsyncChannels;
BOOL async_transport = settings->AsyncTransport;
@ -417,12 +381,6 @@ static int android_freerdp_run(freerdp* instance)
return 0;
}
if (async_update)
{
update_thread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) jni_update_thread, instance, 0, NULL);
}
if (async_input)
{
input_thread = CreateThread(NULL, 0,
@ -571,15 +529,7 @@ static int android_freerdp_run(freerdp* instance)
WaitForSingleObject(channels_thread, INFINITE);
CloseHandle(channels_thread);
}
if (async_update)
{
wMessageQueue* update_queue = freerdp_get_message_queue(instance, FREERDP_UPDATE_MESSAGE_QUEUE);
MessageQueue_PostQuit(update_queue, 0);
WaitForSingleObject(update_thread, INFINITE);
CloseHandle(update_thread);
}
if (async_input)
{
wMessageQueue* input_queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE);

View File

@ -294,6 +294,7 @@ void* thread_func(void* param)
if (g_thread_count < 1)
ReleaseSemaphore(g_sem, 1, NULL);
ExitThread(0);
return NULL;
}

View File

@ -652,39 +652,6 @@ void* wf_input_thread(void* arg)
return NULL;
}
void* wf_update_thread(void* arg)
{
int status;
wMessage message;
wMessageQueue* queue;
freerdp* instance = (freerdp*) arg;
assert( NULL != instance);
status = 1;
queue = freerdp_get_message_queue(instance,
FREERDP_UPDATE_MESSAGE_QUEUE);
while (MessageQueue_Wait(queue))
{
while (MessageQueue_Peek(queue, &message, TRUE))
{
status = freerdp_message_queue_process_message(instance,
FREERDP_UPDATE_MESSAGE_QUEUE, &message);
if (!status)
break;
}
if (!status)
break;
}
ExitThread(0);
return NULL;
}
void* wf_channels_thread(void* arg)
{
int status;
@ -728,11 +695,9 @@ DWORD WINAPI wf_client_thread(LPVOID lpParam)
rdpChannels* channels;
rdpSettings* settings;
BOOL async_update;
BOOL async_input;
BOOL async_channels;
BOOL async_transport;
HANDLE update_thread;
HANDLE input_thread;
HANDLE channels_thread;
@ -751,18 +716,10 @@ DWORD WINAPI wf_client_thread(LPVOID lpParam)
channels = instance->context->channels;
settings = instance->context->settings;
async_update = settings->AsyncUpdate;
async_input = settings->AsyncInput;
async_channels = settings->AsyncChannels;
async_transport = settings->AsyncTransport;
if (async_update)
{
update_thread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) wf_update_thread,
instance, 0, NULL);
}
if (async_input)
{
input_thread = CreateThread(NULL, 0,
@ -905,17 +862,6 @@ DWORD WINAPI wf_client_thread(LPVOID lpParam)
/* cleanup */
freerdp_channels_close(channels, instance);
if (async_update)
{
wMessageQueue* update_queue;
update_queue = freerdp_get_message_queue(instance,
FREERDP_UPDATE_MESSAGE_QUEUE);
MessageQueue_PostQuit(update_queue, 0);
WaitForSingleObject(update_thread, INFINITE);
CloseHandle(update_thread);
}
if (async_input)
{
wMessageQueue* input_queue;

View File

@ -1128,30 +1128,6 @@ void xf_window_free(xfContext *xfc)
}
}
void* xf_update_thread(void *arg)
{
int status;
wMessage message;
wMessageQueue *queue;
freerdp *instance = (freerdp *) arg;
assert(NULL != instance);
status = 1;
queue = freerdp_get_message_queue(instance, FREERDP_UPDATE_MESSAGE_QUEUE);
while(MessageQueue_Wait(queue))
{
while(MessageQueue_Peek(queue, &message, TRUE))
{
status = freerdp_message_queue_process_message(instance, FREERDP_UPDATE_MESSAGE_QUEUE, &message);
if(!status)
break;
}
if(!status)
break;
}
ExitThread(0);
return NULL;
}
void *xf_input_thread(void *arg)
{
xfContext *xfc;
@ -1277,11 +1253,9 @@ void *xf_thread(void *param)
int fd_input_event;
HANDLE input_event;
int select_status;
BOOL async_update;
BOOL async_input;
BOOL async_channels;
BOOL async_transport;
HANDLE update_thread;
HANDLE input_thread;
HANDLE channels_thread;
rdpChannels *channels;
@ -1318,14 +1292,10 @@ void *xf_thread(void *param)
}
channels = instance->context->channels;
settings = instance->context->settings;
async_update = settings->AsyncUpdate;
async_input = settings->AsyncInput;
async_channels = settings->AsyncChannels;
async_transport = settings->AsyncTransport;
if(async_update)
{
update_thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) xf_update_thread, instance, 0, NULL);
}
if(async_input)
{
input_thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) xf_input_thread, instance, 0, NULL);
@ -1454,13 +1424,7 @@ void *xf_thread(void *param)
/* Close the channels first. This will signal the internal message pipes
* that the threads should quit. */
freerdp_channels_close(channels, instance);
if(async_update)
{
wMessageQueue *update_queue = freerdp_get_message_queue(instance, FREERDP_UPDATE_MESSAGE_QUEUE);
MessageQueue_PostQuit(update_queue, 0);
WaitForSingleObject(update_thread, INFINITE);
CloseHandle(update_thread);
}
if(async_input)
{
wMessageQueue *input_queue = freerdp_get_message_queue(instance, FREERDP_INPUT_MESSAGE_QUEUE);

View File

@ -317,6 +317,7 @@ BOOL freerdp_disconnect(freerdp* instance)
rdp = instance->context->rdp;
transport_disconnect(rdp->transport);
update_post_disconnect(instance->update);
IFCALL(instance->PostDisconnect, instance);
if (instance->update->pcap_rfx)

View File

@ -2159,11 +2159,37 @@ void update_message_register_interface(rdpUpdateProxy* message, rdpUpdate* updat
pointer->PointerCached = update_message_PointerCached;
}
rdpUpdateProxy* update_message_proxy_new(rdpUpdate* update)
static void *update_message_proxy_thread(void *arg)
{
rdpUpdateProxy* message;
rdpUpdate *update = (rdpUpdate *)arg;
wMessage message;
message = (rdpUpdateProxy*) malloc(sizeof(rdpUpdateProxy));
if (!update || !update->queue)
{
DEBUG_WARN("update=%p, update->queue=%p", update, update ? update->queue : NULL);
ExitThread(-1);
return NULL;
}
while (MessageQueue_Wait(update->queue))
{
int status = 0;
if (MessageQueue_Peek(update->queue, &message, TRUE))
status = update_message_queue_process_message(update, &message);
if (!status)
break;
}
ExitThread(0);
return NULL;
}
rdpUpdateProxy *update_message_proxy_new(rdpUpdate *update)
{
rdpUpdateProxy *message;
message = (rdpUpdateProxy *) malloc(sizeof(rdpUpdateProxy));
if (message)
{
@ -2171,6 +2197,7 @@ rdpUpdateProxy* update_message_proxy_new(rdpUpdate* update)
message->update = update;
update_message_register_interface(message, update);
message->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) update_message_proxy_thread, update, 0, NULL);
}
return message;
@ -2180,6 +2207,9 @@ void update_message_proxy_free(rdpUpdateProxy* message)
{
if (message)
{
MessageQueue_PostQuit(message->update->queue, 0);
WaitForSingleObject(message->thread, INFINITE);
CloseHandle(message->thread);
free(message);
}
}

View File

@ -120,6 +120,8 @@ struct rdp_update_proxy
pPointerColor PointerColor;
pPointerNew PointerNew;
pPointerCached PointerCached;
HANDLE thread;
};
int update_message_queue_process_message(rdpUpdate* update, wMessage* message);

View File

@ -568,6 +568,14 @@ void update_post_connect(rdpUpdate* update)
update->initialState = FALSE;
}
void update_post_disconnect(rdpUpdate* update)
{
update->asynchronous = update->context->settings->AsyncUpdate;
if (update->asynchronous)
update_message_proxy_free(update->proxy);
}
static void update_begin_paint(rdpContext* context)
{
wStream* s;
@ -1688,9 +1696,6 @@ void update_free(rdpUpdate* update)
free(update->altsec);
free(update->window);
if (update->asynchronous)
update_message_proxy_free(update->proxy);
MessageQueue_Free(update->queue);
free(update);

View File

@ -44,6 +44,7 @@ void update_free_bitmap(BITMAP_UPDATE* bitmap_update);
void update_reset_state(rdpUpdate* update);
void update_post_connect(rdpUpdate* update);
void update_post_disconnect(rdpUpdate* update);
BOOL update_read_bitmap_update(rdpUpdate* update, wStream* s, BITMAP_UPDATE* bitmapUpdate);
BOOL update_read_palette(rdpUpdate* update, wStream* s, PALETTE_UPDATE* palette_update);