libfreerdp-utils: svc_plugin: make use of MessagePipe

This commit is contained in:
Marc-André Moreau 2013-02-20 18:44:52 -05:00
parent 9164d2245e
commit 814177ea54
2 changed files with 60 additions and 72 deletions

View File

@ -27,11 +27,17 @@
#include <freerdp/svc.h> #include <freerdp/svc.h>
#include <freerdp/addin.h> #include <freerdp/addin.h>
#include <winpr/crt.h>
#include <winpr/synch.h>
#include <winpr/thread.h>
#include <winpr/collections.h>
#include <freerdp/utils/stream.h> #include <freerdp/utils/stream.h>
#include <freerdp/utils/event.h> #include <freerdp/utils/event.h>
#include <freerdp/utils/debug.h> #include <freerdp/utils/debug.h>
#include <freerdp/utils/list.h>
#include <freerdp/utils/thread.h>
typedef struct rdp_svc_plugin_private rdpSvcPluginPrivate;
typedef struct rdp_svc_plugin rdpSvcPlugin; typedef struct rdp_svc_plugin rdpSvcPlugin;
struct rdp_svc_plugin struct rdp_svc_plugin
@ -47,7 +53,12 @@ struct rdp_svc_plugin
void (*interval_callback)(rdpSvcPlugin* plugin); void (*interval_callback)(rdpSvcPlugin* plugin);
void (*terminate_callback)(rdpSvcPlugin* plugin); void (*terminate_callback)(rdpSvcPlugin* plugin);
rdpSvcPluginPrivate* priv; void* init_handle;
UINT32 open_handle;
STREAM* data_in;
freerdp_thread* thread;
wMessagePipe* MsgPipe;
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -33,8 +33,6 @@
#include <freerdp/constants.h> #include <freerdp/constants.h>
#include <freerdp/utils/debug.h> #include <freerdp/utils/debug.h>
#include <freerdp/utils/stream.h> #include <freerdp/utils/stream.h>
#include <freerdp/utils/list.h>
#include <freerdp/utils/thread.h>
#include <freerdp/utils/event.h> #include <freerdp/utils/event.h>
#include <freerdp/utils/svc_plugin.h> #include <freerdp/utils/svc_plugin.h>
@ -48,7 +46,7 @@ struct _svc_data_in_item
}; };
typedef struct _svc_data_in_item svc_data_in_item; typedef struct _svc_data_in_item svc_data_in_item;
static void svc_data_in_item_free(svc_data_in_item* item) void svc_data_in_item_free(svc_data_in_item* item)
{ {
if (item->data_in) if (item->data_in)
{ {
@ -65,16 +63,6 @@ static void svc_data_in_item_free(svc_data_in_item* item)
free(item); free(item);
} }
struct rdp_svc_plugin_private
{
void* init_handle;
UINT32 open_handle;
STREAM* data_in;
LIST* data_in_list;
freerdp_thread* thread;
};
static rdpSvcPlugin* svc_plugin_find_by_init_handle(void* init_handle) static rdpSvcPlugin* svc_plugin_find_by_init_handle(void* init_handle)
{ {
int index; int index;
@ -88,7 +76,7 @@ static rdpSvcPlugin* svc_plugin_find_by_init_handle(void* init_handle)
while (plugin) while (plugin)
{ {
if (plugin->priv->init_handle == init_handle) if (plugin->init_handle == init_handle)
{ {
found = TRUE; found = TRUE;
break; break;
@ -115,7 +103,7 @@ static rdpSvcPlugin* svc_plugin_find_by_open_handle(UINT32 open_handle)
while (plugin) while (plugin)
{ {
if (plugin->priv->open_handle == open_handle) if (plugin->open_handle == open_handle)
{ {
found = TRUE; found = TRUE;
break; break;
@ -153,12 +141,12 @@ static void svc_plugin_process_received(rdpSvcPlugin* plugin, void* pData, UINT3
if (dataFlags & CHANNEL_FLAG_FIRST) if (dataFlags & CHANNEL_FLAG_FIRST)
{ {
if (plugin->priv->data_in != NULL) if (plugin->data_in != NULL)
stream_free(plugin->priv->data_in); stream_free(plugin->data_in);
plugin->priv->data_in = stream_new(totalLength); plugin->data_in = stream_new(totalLength);
} }
data_in = plugin->priv->data_in; data_in = plugin->data_in;
stream_check_size(data_in, (int) dataLength); stream_check_size(data_in, (int) dataLength);
stream_write(data_in, pData, dataLength); stream_write(data_in, pData, dataLength);
@ -169,7 +157,7 @@ static void svc_plugin_process_received(rdpSvcPlugin* plugin, void* pData, UINT3
printf("svc_plugin_process_received: read error\n"); printf("svc_plugin_process_received: read error\n");
} }
plugin->priv->data_in = NULL; plugin->data_in = NULL;
stream_set_pos(data_in, 0); stream_set_pos(data_in, 0);
item = (svc_data_in_item*) malloc(sizeof(svc_data_in_item)); item = (svc_data_in_item*) malloc(sizeof(svc_data_in_item));
@ -177,11 +165,9 @@ static void svc_plugin_process_received(rdpSvcPlugin* plugin, void* pData, UINT3
item->data_in = data_in; item->data_in = data_in;
freerdp_thread_lock(plugin->priv->thread); MessageQueue_Post(plugin->MsgPipe->In, NULL, 0, (void*) item, NULL);
list_enqueue(plugin->priv->data_in_list, item);
freerdp_thread_unlock(plugin->priv->thread);
freerdp_thread_signal(plugin->priv->thread); freerdp_thread_signal(plugin->thread);
} }
} }
@ -193,11 +179,9 @@ static void svc_plugin_process_event(rdpSvcPlugin* plugin, RDP_EVENT* event_in)
ZeroMemory(item, sizeof(svc_data_in_item)); ZeroMemory(item, sizeof(svc_data_in_item));
item->event_in = event_in; item->event_in = event_in;
freerdp_thread_lock(plugin->priv->thread); MessageQueue_Post(plugin->MsgPipe->In, NULL, 0, (void*) item, NULL);
list_enqueue(plugin->priv->data_in_list, item);
freerdp_thread_unlock(plugin->priv->thread);
freerdp_thread_signal(plugin->priv->thread); freerdp_thread_signal(plugin->thread);
} }
static void svc_plugin_open_event(UINT32 openHandle, UINT32 event, void* pData, UINT32 dataLength, static void svc_plugin_open_event(UINT32 openHandle, UINT32 event, void* pData, UINT32 dataLength,
@ -234,29 +218,33 @@ static void svc_plugin_open_event(UINT32 openHandle, UINT32 event, void* pData,
static void svc_plugin_process_data_in(rdpSvcPlugin* plugin) static void svc_plugin_process_data_in(rdpSvcPlugin* plugin)
{ {
wMessage message;
svc_data_in_item* item; svc_data_in_item* item;
while (1) while (1)
{ {
/* terminate signal */ /* terminate signal */
if (freerdp_thread_is_stopped(plugin->priv->thread)) if (freerdp_thread_is_stopped(plugin->thread))
break; break;
freerdp_thread_lock(plugin->priv->thread); if (!MessageQueue_Wait(plugin->MsgPipe->In))
item = list_dequeue(plugin->priv->data_in_list); break;
freerdp_thread_unlock(plugin->priv->thread);
if (item != NULL) if (MessageQueue_Peek(plugin->MsgPipe->In, &message, TRUE))
{ {
/* the ownership of the data is passed to the callback */ item = (svc_data_in_item*) message.wParam;
if (!item)
break;
if (item->data_in) if (item->data_in)
IFCALL(plugin->receive_callback, plugin, item->data_in); IFCALL(plugin->receive_callback, plugin, item->data_in);
if (item->event_in) if (item->event_in)
IFCALL(plugin->event_callback, plugin, item->event_in); IFCALL(plugin->event_callback, plugin, item->event_in);
free(item); free(item);
} }
else
break;
} }
} }
@ -271,21 +259,21 @@ static void* svc_plugin_thread_func(void* arg)
while (1) while (1)
{ {
if (plugin->interval_ms > 0) if (plugin->interval_ms > 0)
freerdp_thread_wait_timeout(plugin->priv->thread, plugin->interval_ms); freerdp_thread_wait_timeout(plugin->thread, plugin->interval_ms);
else else
freerdp_thread_wait(plugin->priv->thread); freerdp_thread_wait(plugin->thread);
if (freerdp_thread_is_stopped(plugin->priv->thread)) if (freerdp_thread_is_stopped(plugin->thread))
break; break;
freerdp_thread_reset(plugin->priv->thread); freerdp_thread_reset(plugin->thread);
svc_plugin_process_data_in(plugin); svc_plugin_process_data_in(plugin);
if (plugin->interval_ms > 0) if (plugin->interval_ms > 0)
IFCALL(plugin->interval_callback, plugin); IFCALL(plugin->interval_callback, plugin);
} }
freerdp_thread_quit(plugin->priv->thread); freerdp_thread_quit(plugin->thread);
DEBUG_SVC("out"); DEBUG_SVC("out");
@ -296,8 +284,8 @@ static void svc_plugin_process_connected(rdpSvcPlugin* plugin, void* pData, UINT
{ {
UINT32 error; UINT32 error;
error = plugin->channel_entry_points.pVirtualChannelOpen(plugin->priv->init_handle, error = plugin->channel_entry_points.pVirtualChannelOpen(plugin->init_handle,
&plugin->priv->open_handle, plugin->channel_def.name, svc_plugin_open_event); &plugin->open_handle, plugin->channel_def.name, svc_plugin_open_event);
if (error != CHANNEL_RC_OK) if (error != CHANNEL_RC_OK)
{ {
@ -305,40 +293,32 @@ static void svc_plugin_process_connected(rdpSvcPlugin* plugin, void* pData, UINT
return; return;
} }
plugin->priv->data_in_list = list_new(); plugin->MsgPipe = MessagePipe_New();
plugin->priv->thread = freerdp_thread_new();
freerdp_thread_start(plugin->priv->thread, svc_plugin_thread_func, plugin); plugin->thread = freerdp_thread_new();
freerdp_thread_start(plugin->thread, svc_plugin_thread_func, plugin);
} }
static void svc_plugin_process_terminated(rdpSvcPlugin* plugin) static void svc_plugin_process_terminated(rdpSvcPlugin* plugin)
{ {
svc_data_in_item* item; if (plugin->thread)
if (plugin->priv->thread)
{ {
freerdp_thread_stop(plugin->priv->thread); freerdp_thread_stop(plugin->thread);
freerdp_thread_free(plugin->priv->thread); freerdp_thread_free(plugin->thread);
} }
plugin->channel_entry_points.pVirtualChannelClose(plugin->priv->open_handle); plugin->channel_entry_points.pVirtualChannelClose(plugin->open_handle);
svc_plugin_remove(plugin); svc_plugin_remove(plugin);
if (plugin->priv->data_in_list) MessagePipe_Free(plugin->MsgPipe);
{
while ((item = list_dequeue(plugin->priv->data_in_list)) != NULL)
svc_data_in_item_free(item);
list_free(plugin->priv->data_in_list);
}
if (plugin->priv->data_in != NULL) if (plugin->data_in != NULL)
{ {
stream_free(plugin->priv->data_in); stream_free(plugin->data_in);
plugin->priv->data_in = NULL; plugin->data_in = NULL;
} }
free(plugin->priv);
plugin->priv = NULL;
IFCALL(plugin->terminate_callback, plugin); IFCALL(plugin->terminate_callback, plugin);
} }
@ -381,15 +361,12 @@ void svc_plugin_init(rdpSvcPlugin* plugin, CHANNEL_ENTRY_POINTS* pEntryPoints)
CopyMemory(&plugin->channel_entry_points, pEntryPoints, pEntryPoints->cbSize); CopyMemory(&plugin->channel_entry_points, pEntryPoints, pEntryPoints->cbSize);
plugin->priv = (rdpSvcPluginPrivate*) malloc(sizeof(rdpSvcPluginPrivate));
ZeroMemory(plugin->priv, sizeof(rdpSvcPluginPrivate));
if (!g_AddinList) if (!g_AddinList)
g_AddinList = ArrayList_New(TRUE); g_AddinList = ArrayList_New(TRUE);
ArrayList_Add(g_AddinList, (void*) plugin); ArrayList_Add(g_AddinList, (void*) plugin);
plugin->channel_entry_points.pVirtualChannelInit(&plugin->priv->init_handle, plugin->channel_entry_points.pVirtualChannelInit(&plugin->init_handle,
&plugin->channel_def, 1, VIRTUAL_CHANNEL_VERSION_WIN2000, svc_plugin_init_event); &plugin->channel_def, 1, VIRTUAL_CHANNEL_VERSION_WIN2000, svc_plugin_init_event);
} }
@ -399,10 +376,10 @@ int svc_plugin_send(rdpSvcPlugin* plugin, STREAM* data_out)
DEBUG_SVC("length %d", (int) stream_get_length(data_out)); DEBUG_SVC("length %d", (int) stream_get_length(data_out));
if (!plugin || !plugin->priv) if (!plugin)
error = CHANNEL_RC_BAD_INIT_HANDLE; error = CHANNEL_RC_BAD_INIT_HANDLE;
else else
error = plugin->channel_entry_points.pVirtualChannelWrite(plugin->priv->open_handle, error = plugin->channel_entry_points.pVirtualChannelWrite(plugin->open_handle,
stream_get_data(data_out), stream_get_length(data_out), data_out); stream_get_data(data_out), stream_get_length(data_out), data_out);
if (error != CHANNEL_RC_OK) if (error != CHANNEL_RC_OK)
@ -420,7 +397,7 @@ int svc_plugin_send_event(rdpSvcPlugin* plugin, RDP_EVENT* event)
DEBUG_SVC("event_type %d", event->event_type); DEBUG_SVC("event_type %d", event->event_type);
error = plugin->channel_entry_points.pVirtualChannelEventPush(plugin->priv->open_handle, event); error = plugin->channel_entry_points.pVirtualChannelEventPush(plugin->open_handle, event);
if (error != CHANNEL_RC_OK) if (error != CHANNEL_RC_OK)
printf("svc_plugin_send_event: VirtualChannelEventPush failed %d\n", error); printf("svc_plugin_send_event: VirtualChannelEventPush failed %d\n", error);