channels/rdpsnd: use MessageQueue

This commit is contained in:
Marc-André Moreau 2013-02-19 22:36:04 -05:00
parent 43fd37de6e
commit 5e8d35c4ac
4 changed files with 44 additions and 52 deletions

View File

@ -31,8 +31,10 @@
#include <string.h> #include <string.h>
#include <winpr/crt.h> #include <winpr/crt.h>
#include <winpr/synch.h>
#include <winpr/cmdline.h> #include <winpr/cmdline.h>
#include <winpr/sysinfo.h> #include <winpr/sysinfo.h>
#include <winpr/collections.h>
#include <freerdp/types.h> #include <freerdp/types.h>
#include <freerdp/addin.h> #include <freerdp/addin.h>
@ -47,7 +49,7 @@ struct rdpsnd_plugin
{ {
rdpSvcPlugin plugin; rdpSvcPlugin plugin;
LIST* data_out_list; wMessageQueue* OutQueue;
BYTE cBlockNo; BYTE cBlockNo;
rdpsndFormat* supported_formats; rdpsndFormat* supported_formats;
@ -75,40 +77,39 @@ struct rdpsnd_plugin
rdpsndDevicePlugin* device; rdpsndDevicePlugin* device;
}; };
struct data_out_item
{
STREAM* data_out;
UINT32 out_timestamp;
};
/* process the linked list of data that has queued to be sent */ /* process the linked list of data that has queued to be sent */
static void rdpsnd_process_interval(rdpSvcPlugin* plugin) static void rdpsnd_process_interval(rdpSvcPlugin* plugin)
{ {
STREAM* data;
wMessage message;
UINT16 wTimeDiff;
UINT16 wTimeStamp;
UINT16 wCurrentTime;
rdpsndPlugin* rdpsnd = (rdpsndPlugin*) plugin; rdpsndPlugin* rdpsnd = (rdpsndPlugin*) plugin;
struct data_out_item* item;
UINT32 current_time;
while (list_size(rdpsnd->data_out_list) > 0) while (MessageQueue_Peek(rdpsnd->OutQueue, &message, TRUE))
{ {
item = (struct data_out_item*) list_peek(rdpsnd->data_out_list); if (message.id == WMQ_QUIT)
current_time = GetTickCount();
if (!item || (current_time <= item->out_timestamp))
break; break;
item = (struct data_out_item*) list_dequeue(rdpsnd->data_out_list); wTimeStamp = (UINT16) (size_t) message.lParam;
svc_plugin_send(plugin, item->data_out); wCurrentTime = (UINT16) GetTickCount();
free(item);
DEBUG_SVC("processed data_out"); if (wTimeStamp - wCurrentTime > 0)
{
wTimeDiff = wTimeStamp - wCurrentTime;
//Sleep(wTimeDiff / 16);
}
data = (STREAM*) message.wParam;
svc_plugin_send(plugin, data);
DEBUG_SVC("processed output data");
} }
if (rdpsnd->is_open && (rdpsnd->close_timestamp > 0)) if (rdpsnd->is_open && (rdpsnd->close_timestamp > 0))
{ {
current_time = GetTickCount(); if (GetTickCount() > rdpsnd->close_timestamp)
if (current_time > rdpsnd->close_timestamp)
{ {
if (rdpsnd->device) if (rdpsnd->device)
IFCALL(rdpsnd->device->Close, rdpsnd->device); IFCALL(rdpsnd->device->Close, rdpsnd->device);
@ -120,10 +121,8 @@ static void rdpsnd_process_interval(rdpSvcPlugin* plugin)
} }
} }
if (list_size(rdpsnd->data_out_list) == 0 && !rdpsnd->is_open) if (!rdpsnd->is_open)
{
rdpsnd->plugin.interval_ms = 0; rdpsnd->plugin.interval_ms = 0;
}
} }
static void rdpsnd_free_supported_formats(rdpsndPlugin* rdpsnd) static void rdpsnd_free_supported_formats(rdpsndPlugin* rdpsnd)
@ -340,10 +339,9 @@ static void rdpsnd_process_message_wave_info(rdpsndPlugin* rdpsnd, STREAM* data_
/* header is not removed from data in this function */ /* header is not removed from data in this function */
static void rdpsnd_process_message_wave(rdpsndPlugin* rdpsnd, STREAM* data_in) static void rdpsnd_process_message_wave(rdpsndPlugin* rdpsnd, STREAM* data_in)
{ {
UINT16 wTimeStamp; STREAM* data;
UINT32 delay_ms; UINT32 delay_ms;
UINT32 process_ms; UINT16 wTimeStamp;
struct data_out_item* item;
rdpsnd->expectingWave = 0; rdpsnd->expectingWave = 0;
@ -360,26 +358,23 @@ static void rdpsnd_process_message_wave(rdpsndPlugin* rdpsnd, STREAM* data_in)
IFCALL(rdpsnd->device->Play, rdpsnd->device, stream_get_head(data_in), stream_get_size(data_in)); IFCALL(rdpsnd->device->Play, rdpsnd->device, stream_get_head(data_in), stream_get_size(data_in));
} }
process_ms = GetTickCount() - rdpsnd->wave_timestamp;
delay_ms = 250; delay_ms = 250;
wTimeStamp = rdpsnd->wTimeStamp + delay_ms; wTimeStamp = rdpsnd->wTimeStamp + delay_ms;
DEBUG_SVC("data_size %d delay_ms %u process_ms %u", DEBUG_SVC("data_size %d delay_ms %u process_ms %u",
stream_get_size(data_in), delay_ms, process_ms); stream_get_size(data_in), delay_ms, process_ms);
item = (struct data_out_item*) malloc(sizeof(struct data_out_item)); data = stream_new(8);
ZeroMemory(item, sizeof(struct data_out_item)); stream_write_BYTE(data, SNDC_WAVECONFIRM);
stream_write_BYTE(data, 0);
stream_write_UINT16(data, 4);
stream_write_UINT16(data, wTimeStamp);
stream_write_BYTE(data, rdpsnd->cBlockNo); /* cConfirmedBlockNo */
stream_write_BYTE(data, 0); /* bPad */
item->data_out = stream_new(8); wTimeStamp = rdpsnd->wave_timestamp + delay_ms;
stream_write_BYTE(item->data_out, SNDC_WAVECONFIRM); MessageQueue_Post(rdpsnd->OutQueue, NULL, 0, (void*) data, (void*) (size_t) wTimeStamp);
stream_write_BYTE(item->data_out, 0);
stream_write_UINT16(item->data_out, 4);
stream_write_UINT16(item->data_out, wTimeStamp);
stream_write_BYTE(item->data_out, rdpsnd->cBlockNo); /* cConfirmedBlockNo */
stream_write_BYTE(item->data_out, 0); /* bPad */
item->out_timestamp = rdpsnd->wave_timestamp + delay_ms;
list_enqueue(rdpsnd->data_out_list, item);
rdpsnd->plugin.interval_ms = 10; rdpsnd->plugin.interval_ms = 10;
} }
@ -581,9 +576,10 @@ static void rdpsnd_process_connect(rdpSvcPlugin* plugin)
plugin->interval_callback = rdpsnd_process_interval; plugin->interval_callback = rdpsnd_process_interval;
rdpsnd->data_out_list = list_new();
rdpsnd->latency = -1; rdpsnd->latency = -1;
rdpsnd->OutQueue = MessageQueue_New();
args = (ADDIN_ARGV*) plugin->channel_entry_points.pExtendedData; args = (ADDIN_ARGV*) plugin->channel_entry_points.pExtendedData;
if (args) if (args)
@ -638,21 +634,12 @@ static void rdpsnd_process_event(rdpSvcPlugin* plugin, RDP_EVENT* event)
static void rdpsnd_process_terminate(rdpSvcPlugin* plugin) static void rdpsnd_process_terminate(rdpSvcPlugin* plugin)
{ {
struct data_out_item* item;
rdpsndPlugin* rdpsnd = (rdpsndPlugin*) plugin; rdpsndPlugin* rdpsnd = (rdpsndPlugin*) plugin;
if (rdpsnd->device) if (rdpsnd->device)
IFCALL(rdpsnd->device->Free, rdpsnd->device); IFCALL(rdpsnd->device->Free, rdpsnd->device);
if (rdpsnd->data_out_list) MessageQueue_Free(rdpsnd->OutQueue);
{
while ((item = list_dequeue(rdpsnd->data_out_list)) != NULL)
{
stream_free(item->data_out);
free(item);
}
list_free(rdpsnd->data_out_list);
}
if (rdpsnd->subsystem) if (rdpsnd->subsystem)
free(rdpsnd->subsystem); free(rdpsnd->subsystem);

View File

@ -282,6 +282,7 @@ struct _wMessage
void* context; void* context;
void* wParam; void* wParam;
void* lParam; void* lParam;
UINT64 time;
}; };
typedef struct _wMessage wMessage; typedef struct _wMessage wMessage;

View File

@ -57,7 +57,7 @@ set(${MODULE_PREFIX}_LIBS
set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL MONOLITHIC ${MONOLITHIC_BUILD} INTERNAL
MODULE winpr MODULE winpr
MODULES winpr-crt winpr-synch) MODULES winpr-crt winpr-synch winpr-sysinfo)
if(MONOLITHIC_BUILD) if(MONOLITHIC_BUILD)
set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE) set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE)

View File

@ -22,6 +22,7 @@
#endif #endif
#include <winpr/crt.h> #include <winpr/crt.h>
#include <winpr/sysinfo.h>
#include <winpr/collections.h> #include <winpr/collections.h>
@ -93,6 +94,9 @@ void MessageQueue_Dispatch(wMessageQueue* queue, wMessage* message)
queue->tail = (queue->tail + 1) % queue->capacity; queue->tail = (queue->tail + 1) % queue->capacity;
queue->size++; queue->size++;
message = &(queue->array[queue->tail]);
message->time = (UINT64) GetTickCount();
if (queue->size > 0) if (queue->size > 0)
SetEvent(queue->event); SetEvent(queue->event);