diff --git a/cunit/test_chanman.c b/cunit/test_chanman.c index 3781f47dc..844a48843 100644 --- a/cunit/test_chanman.c +++ b/cunit/test_chanman.c @@ -71,6 +71,9 @@ void test_chanman(void) freerdp_chanman_post_connect(chan_man, &inst); freerdp_chanman_data(&inst, 0, "testdata", 8, CHANNEL_FLAG_FIRST | CHANNEL_FLAG_LAST, 8); + freerdp_chanman_data(&inst, 0, "testdata1", 9, CHANNEL_FLAG_FIRST | CHANNEL_FLAG_LAST, 9); + freerdp_chanman_data(&inst, 0, "testdata11", 10, CHANNEL_FLAG_FIRST | CHANNEL_FLAG_LAST, 10); + freerdp_chanman_data(&inst, 0, "testdata111", 11, CHANNEL_FLAG_FIRST | CHANNEL_FLAG_LAST, 11); freerdp_chanman_check_fds(chan_man, &inst); @@ -81,6 +84,6 @@ void test_chanman(void) printf("responded event_type %d\n", event->event_type); freerdp_event_free(event); - freerdp_chanman_close(chan_man, NULL); + freerdp_chanman_close(chan_man, &inst); freerdp_chanman_free(chan_man); } diff --git a/include/freerdp/utils/list.h b/include/freerdp/utils/list.h index 37f3b69aa..94bb420f3 100644 --- a/include/freerdp/utils/list.h +++ b/include/freerdp/utils/list.h @@ -20,6 +20,8 @@ #ifndef __LIST_UTILS_H #define __LIST_UTILS_H +#include + #define DEFINE_LIST_TYPE(_list_type, _item_type) \ \ struct _item_type##_full \ @@ -32,7 +34,7 @@ struct _item_type##_full \ static struct _item_type* _item_type##_new(void) \ { \ struct _item_type* item; \ - item = (struct _item_type*)malloc(sizeof(struct _item_type##_full));\ + item = (struct _item_type*)xmalloc(sizeof(struct _item_type##_full));\ memset(item, 0, sizeof(struct _item_type##_full)); \ return item; \ } \ @@ -58,7 +60,7 @@ struct _list_type \ static struct _list_type* _list_type##_new(void) \ { \ struct _list_type* list; \ - list = (struct _list_type*)malloc(sizeof(struct _list_type)); \ + list = (struct _list_type*)xmalloc(sizeof(struct _list_type)); \ memset(list, 0, sizeof(struct _list_type)); \ return list; \ } \ @@ -101,9 +103,9 @@ void _list_type##_free(struct _list_type* list) \ { \ item = _list_type##_dequeue(list); \ _item_type##_free(item); \ - free(item); \ + xfree(item); \ } \ - free(list); \ + xfree(list); \ } #endif diff --git a/include/freerdp/utils/thread.h b/include/freerdp/utils/thread.h new file mode 100644 index 000000000..82f6192c2 --- /dev/null +++ b/include/freerdp/utils/thread.h @@ -0,0 +1,41 @@ +/** + * FreeRDP: A Remote Desktop Protocol client. + * Thread Utils + * + * Copyright 2011 Vic Lee + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __THREAD_UTILS_H +#define __THREAD_UTILS_H + +#ifdef _WIN32 + +#define freerdp_thread_create(_proc, _arg) do { \ + DWORD thread; \ + CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)_proc, _arg, 0, &thread); \ + while (0) + +#else + +#include +#define freerdp_thread_create(_proc, _arg) do { \ + pthread_t thread; \ + pthread_create(&thread, 0, _proc, _arg); \ + pthread_detach(thread); \ + } while (0) + +#endif + +#endif /* __THREAD_UTILS_H */ diff --git a/libfreerdp-utils/svc_plugin.c b/libfreerdp-utils/svc_plugin.c index f03ef46ce..d8ea67104 100644 --- a/libfreerdp-utils/svc_plugin.c +++ b/libfreerdp-utils/svc_plugin.c @@ -22,11 +22,15 @@ #include #include #include +#include #include #include #include #include #include +#include +#include +#include #include /* The list of all plugin instances. */ @@ -42,11 +46,33 @@ static rdpSvcPluginList* g_svc_plugin_list = NULL; /* For locking the global resources */ static freerdp_mutex g_mutex = NULL; +/* Queue for receiving packets */ +struct svc_data_in_item +{ + STREAM* data_in; +}; + +DEFINE_LIST_TYPE(svc_data_in_list, svc_data_in_item); + +void svc_data_in_item_free(struct svc_data_in_item* item) +{ + stream_free(item->data_in); + item->data_in = NULL; +} + struct rdp_svc_plugin_private { void* init_handle; uint32 open_handle; STREAM* data_in; + + struct svc_data_in_list* data_in_list; + freerdp_mutex* data_in_mutex; + + struct wait_obj* signals[5]; + int num_signals; + + int thread_status; }; static rdpSvcPlugin* svc_plugin_find_by_init_handle(void* init_handle) @@ -114,6 +140,7 @@ static void svc_plugin_process_received(rdpSvcPlugin* plugin, void* pData, uint3 uint32 totalLength, uint32 dataFlags) { STREAM* data_in; + struct svc_data_in_item* item; if (dataFlags & CHANNEL_FLAG_FIRST) { @@ -132,10 +159,18 @@ static void svc_plugin_process_received(rdpSvcPlugin* plugin, void* pData, uint3 { printf("svc_plugin_process_received: read error\n"); } - /* the stream ownership is passed to the callback who is responsible for freeing it. */ + plugin->priv->data_in = NULL; stream_set_pos(data_in, 0); - plugin->receive_callback(plugin, data_in); + + item = svc_data_in_item_new(); + item->data_in = data_in; + + freerdp_mutex_lock(plugin->priv->data_in_mutex); + svc_data_in_list_enqueue(plugin->priv->data_in_list, item); + freerdp_mutex_unlock(plugin->priv->data_in_mutex); + + wait_obj_set(plugin->priv->signals[1]); } } @@ -167,6 +202,63 @@ static void svc_plugin_open_event(uint32 openHandle, uint32 event, void* pData, } } +static void svc_plugin_process_data_in(rdpSvcPlugin* plugin) +{ + struct svc_data_in_item* item; + + while (1) + { + /* terminate signal */ + if (wait_obj_is_set(plugin->priv->signals[0])) + break; + + freerdp_mutex_lock(plugin->priv->data_in_mutex); + item = svc_data_in_list_dequeue(plugin->priv->data_in_list); + freerdp_mutex_unlock(plugin->priv->data_in_mutex); + + if (item != NULL) + { + /* the ownership of the data is passed to the callback */ + plugin->receive_callback(plugin, item->data_in); + xfree(item); + } + else + break; + } +} + +static void* svc_plugin_thread_func(void* arg) +{ + rdpSvcPlugin* plugin = (rdpSvcPlugin*)arg; + + DEBUG_SVC("in"); + + plugin->connect_callback(plugin); + + while (1) + { + wait_obj_select(plugin->priv->signals, plugin->priv->num_signals, -1); + + /* terminate signal */ + if (wait_obj_is_set(plugin->priv->signals[0])) + break; + + /* data_in signal */ + if (wait_obj_is_set(plugin->priv->signals[1])) + { + wait_obj_clear(plugin->priv->signals[1]); + /* process data in */ + svc_plugin_process_data_in(plugin); + } + } + + plugin->priv->thread_status = -1; + + DEBUG_SVC("out"); + + return 0; +} + static void svc_plugin_process_connected(rdpSvcPlugin* plugin, void* pData, uint32 dataLength) { uint32 error; @@ -178,15 +270,46 @@ static void svc_plugin_process_connected(rdpSvcPlugin* plugin, void* pData, uint printf("svc_plugin_process_connected: open failed\n"); return; } - plugin->connect_callback(plugin); + + plugin->priv->data_in_list = svc_data_in_list_new(); + plugin->priv->data_in_mutex = freerdp_mutex_new(); + + /* terminate signal */ + plugin->priv->signals[plugin->priv->num_signals++] = wait_obj_new(); + /* data_in signal */ + plugin->priv->signals[plugin->priv->num_signals++] = wait_obj_new(); + + plugin->priv->thread_status = 1; + + freerdp_thread_create(svc_plugin_thread_func, plugin); } static void svc_plugin_process_terminated(rdpSvcPlugin* plugin) { + struct timespec ts; + int i; + + wait_obj_set(plugin->priv->signals[0]); + i = 0; + ts.tv_sec = 0; + ts.tv_nsec = 10000000; + while (plugin->priv->thread_status > 0 && i < 1000) + { + i++; + nanosleep(&ts, NULL); + } + plugin->channel_entry_points.pVirtualChannelClose(plugin->priv->open_handle); svc_plugin_remove(plugin); + for (i = 0; i < plugin->priv->num_signals; i++) + wait_obj_free(plugin->priv->signals[i]); + plugin->priv->num_signals = 0; + + freerdp_mutex_free(plugin->priv->data_in_mutex); + svc_data_in_list_free(plugin->priv->data_in_list); + if (plugin->priv->data_in != NULL) { stream_free(plugin->priv->data_in); @@ -261,7 +384,10 @@ int svc_plugin_send(rdpSvcPlugin* plugin, STREAM* data_out) error = plugin->channel_entry_points.pVirtualChannelWrite(plugin->priv->open_handle, stream_get_data(data_out), stream_get_length(data_out), data_out); if (error != CHANNEL_RC_OK) + { + stream_free(data_out); printf("svc_plugin_send: VirtualChannelWrite failed %d\n", error); + } return error; }