channels: get rid of semaphore locking, replace queues

This commit is contained in:
Marc-André Moreau 2013-02-21 02:56:57 -05:00
parent 7a289423c0
commit 385d0daced
6 changed files with 83 additions and 85 deletions

View File

@ -44,7 +44,7 @@
#include <winpr/file.h>
#include <winpr/synch.h>
#include <winpr/library.h>
#include <winpr/interlocked.h>
#include <winpr/collections.h>
#ifdef WITH_DEBUG_CHANNELS
#define DEBUG_CHANNELS(fmt, ...) DEBUG_CLASS(CHANNELS, fmt, ## __VA_ARGS__)
@ -383,15 +383,14 @@ struct channel_data
PCHANNEL_OPEN_EVENT_FN open_event_proc;
};
struct _SYNC_DATA
struct _CHANNEL_OPEN_EVENT
{
SLIST_ENTRY ItemEntry;
void* Data;
UINT32 DataLength;
void* UserData;
int Index;
};
typedef struct _SYNC_DATA SYNC_DATA;
typedef struct _CHANNEL_OPEN_EVENT CHANNEL_OPEN_EVENT;
typedef struct rdp_init_handle rdpInitHandle;
@ -430,15 +429,7 @@ struct rdp_channels
/* used for locating the channels for a given instance */
freerdp* instance;
/* signal for incoming data or event */
HANDLE signal;
/* used for sync write */
PSLIST_HEADER pSyncDataList;
/* used for sync event */
HANDLE event_sem;
RDP_EVENT* event;
wMessagePipe* MsgPipe;
};
/**
@ -606,7 +597,7 @@ static UINT32 FREERDP_CC MyVirtualChannelInit(void** ppInitHandle, PCHANNEL_DEF
PCHANNEL_DEF lchannel_def;
struct channel_data* lchannel_data;
if (ppInitHandle == NULL)
if (!ppInitHandle)
{
DEBUG_CHANNELS("error bad init handle");
return CHANNEL_RC_BAD_INIT_HANDLE;
@ -631,7 +622,7 @@ static UINT32 FREERDP_CC MyVirtualChannelInit(void** ppInitHandle, PCHANNEL_DEF
return CHANNEL_RC_TOO_MANY_CHANNELS;
}
if (pChannel == 0)
if (!pChannel)
{
DEBUG_CHANNELS("error bad channel");
return CHANNEL_RC_BAD_CHANNEL;
@ -786,13 +777,13 @@ static UINT32 FREERDP_CC MyVirtualChannelClose(UINT32 openHandle)
static UINT32 FREERDP_CC MyVirtualChannelWrite(UINT32 openHandle, void* pData, UINT32 dataLength, void* pUserData)
{
int index;
SYNC_DATA* item;
CHANNEL_OPEN_EVENT* item;
rdpChannels* channels;
struct channel_data* lchannel_data;
channels = freerdp_channels_find_by_open_handle(openHandle, &index);
if ((channels == NULL) || (index < 0) || (index >= CHANNEL_MAX_COUNT))
if ((!channels) || (index < 0) || (index >= CHANNEL_MAX_COUNT))
{
DEBUG_CHANNELS("error bad channel handle");
return CHANNEL_RC_BAD_CHANNEL_HANDLE;
@ -830,16 +821,13 @@ static UINT32 FREERDP_CC MyVirtualChannelWrite(UINT32 openHandle, void* pData, U
return CHANNEL_RC_NOT_CONNECTED;
}
item = (SYNC_DATA*) _aligned_malloc(sizeof(SYNC_DATA), MEMORY_ALLOCATION_ALIGNMENT);
item = (CHANNEL_OPEN_EVENT*) malloc(sizeof(CHANNEL_OPEN_EVENT));
item->Data = pData;
item->DataLength = dataLength;
item->UserData = pUserData;
item->Index = index;
InterlockedPushEntrySList(channels->pSyncDataList, &(item->ItemEntry));
/* set the event */
SetEvent(channels->signal);
MessageQueue_Post(channels->MsgPipe->Out, (void*) channels, 0, (void*) item, NULL);
return CHANNEL_RC_OK;
}
@ -878,20 +866,20 @@ static UINT32 FREERDP_CC MyVirtualChannelEventPush(UINT32 openHandle, RDP_EVENT*
return CHANNEL_RC_NOT_OPEN;
}
/* lock channels->event */
WaitForSingleObject(channels->event_sem, INFINITE);
if (!channels->is_connected)
{
ReleaseSemaphore(channels->event_sem, 1, NULL);
DEBUG_CHANNELS("error not connected");
return CHANNEL_RC_NOT_CONNECTED;
}
channels->event = event;
/**
* We really intend to use the In queue for events, but we're pushing on both
* to wake up threads waiting on the out queue. Doing this cleanly would require
* breaking freerdp_pop_event() a bit too early in this refactoring.
*/
/* set the event */
SetEvent(channels->signal);
MessageQueue_Post(channels->MsgPipe->In, (void*) channels, 1, (void*) event, NULL);
MessageQueue_Post(channels->MsgPipe->Out, (void*) channels, 1, (void*) event, NULL);
return CHANNEL_RC_OK;
}
@ -931,11 +919,7 @@ rdpChannels* freerdp_channels_new(void)
channels = (rdpChannels*) malloc(sizeof(rdpChannels));
ZeroMemory(channels, sizeof(rdpChannels));
channels->pSyncDataList = (PSLIST_HEADER) _aligned_malloc(sizeof(SLIST_HEADER), MEMORY_ALLOCATION_ALIGNMENT);
InitializeSListHead(channels->pSyncDataList);
channels->event_sem = CreateSemaphore(NULL, 1, 16, NULL);
channels->signal = CreateEvent(NULL, TRUE, FALSE, NULL);
channels->MsgPipe = MessagePipe_New();
/* Add it to the global list */
channels_list = (rdpChannelsList*) malloc(sizeof(rdpChannelsList));
@ -955,11 +939,7 @@ void freerdp_channels_free(rdpChannels* channels)
rdpChannelsList* list;
rdpChannelsList* prev;
InterlockedFlushSList(channels->pSyncDataList);
_aligned_free(channels->pSyncDataList);
CloseHandle(channels->event_sem);
CloseHandle(channels->signal);
MessagePipe_Free(channels->MsgPipe);
/* Remove from global list */
@ -1200,7 +1180,7 @@ FREERDP_API int freerdp_channels_send_event(rdpChannels* channels, RDP_EVENT* ev
name = event_class_to_name_table[event->event_class];
if (name == NULL)
if (!name)
{
DEBUG_CHANNELS("unknown event_class %d", event->event_class);
freerdp_event_free(event);
@ -1209,17 +1189,16 @@ FREERDP_API int freerdp_channels_send_event(rdpChannels* channels, RDP_EVENT* ev
lchannel_data = freerdp_channels_find_channel_data_by_name(channels, name, &index);
if (lchannel_data == NULL)
if (!lchannel_data)
{
DEBUG_CHANNELS("could not find channel name %s", name);
freerdp_event_free(event);
return 1;
}
if (lchannel_data->open_event_proc != NULL)
if (lchannel_data->open_event_proc)
{
lchannel_data->open_event_proc(lchannel_data->open_handle,
CHANNEL_EVENT_USER,
lchannel_data->open_event_proc(lchannel_data->open_handle, CHANNEL_EVENT_USER,
event, sizeof(RDP_EVENT), sizeof(RDP_EVENT), 0);
}
@ -1231,13 +1210,20 @@ FREERDP_API int freerdp_channels_send_event(rdpChannels* channels, RDP_EVENT* ev
*/
static void freerdp_channels_process_sync(rdpChannels* channels, freerdp* instance)
{
SYNC_DATA* item;
wMessage message;
RDP_EVENT* event;
CHANNEL_OPEN_EVENT* item;
rdpChannel* lrdp_channel;
struct channel_data* lchannel_data;
while (QueryDepthSList(channels->pSyncDataList) > 0)
while (MessageQueue_Peek(channels->MsgPipe->Out, &message, TRUE))
{
item = (SYNC_DATA*) InterlockedPopEntrySList(channels->pSyncDataList);
if (message.id == WMQ_QUIT)
break;
if (message.id == 0)
{
item = (CHANNEL_OPEN_EVENT*) message.wParam;
if (!item)
break;
@ -1253,10 +1239,20 @@ static void freerdp_channels_process_sync(rdpChannels* channels, freerdp* instan
if (lchannel_data->open_event_proc)
{
lchannel_data->open_event_proc(lchannel_data->open_handle,
CHANNEL_EVENT_WRITE_COMPLETE, item->UserData, sizeof(void*), sizeof(void*), 0);
CHANNEL_EVENT_WRITE_COMPLETE, item->UserData, item->DataLength, item->DataLength, 0);
}
_aligned_free(item);
free(item);
}
else if (message.id == 1)
{
event = (RDP_EVENT*) message.wParam;
/**
* Ignore for now, the same event is being pushed on the In queue,
* and we're pushing it on the Out queue just to wake other threads
*/
}
}
}
@ -1268,7 +1264,7 @@ BOOL freerdp_channels_get_fds(rdpChannels* channels, freerdp* instance, void** r
{
void* pfd;
pfd = GetEventWaitObject(channels->signal);
pfd = GetEventWaitObject(MessageQueue_Event(channels->MsgPipe->Out));
if (pfd)
{
@ -1285,7 +1281,7 @@ HANDLE freerdp_channels_get_event_handle(freerdp* instance)
rdpChannels* channels;
channels = instance->context->channels;
event = channels->signal;
event = MessageQueue_Event(channels->MsgPipe->Out);
return event;
}
@ -1296,9 +1292,8 @@ int freerdp_channels_process_pending_messages(freerdp* instance)
channels = instance->context->channels;
if (WaitForSingleObject(channels->signal, 0) == WAIT_OBJECT_0)
if (WaitForSingleObject(MessageQueue_Event(channels->MsgPipe->Out), 0) == WAIT_OBJECT_0)
{
ResetEvent(channels->signal);
freerdp_channels_process_sync(channels, instance);
}
@ -1310,9 +1305,8 @@ int freerdp_channels_process_pending_messages(freerdp* instance)
*/
BOOL freerdp_channels_check_fds(rdpChannels* channels, freerdp* instance)
{
if (WaitForSingleObject(channels->signal, 0) == WAIT_OBJECT_0)
if (WaitForSingleObject(MessageQueue_Event(channels->MsgPipe->Out), 0) == WAIT_OBJECT_0)
{
ResetEvent(channels->signal);
freerdp_channels_process_sync(channels, instance);
}
@ -1321,16 +1315,16 @@ BOOL freerdp_channels_check_fds(rdpChannels* channels, freerdp* instance)
RDP_EVENT* freerdp_channels_pop_event(rdpChannels* channels)
{
RDP_EVENT* event;
wMessage message;
RDP_EVENT* event = NULL;
if (channels->event == NULL)
return NULL;
event = channels->event;
channels->event = NULL;
/* release channels->event */
ReleaseSemaphore(channels->event_sem, 1, NULL);
if (MessageQueue_Peek(channels->MsgPipe->In, &message, TRUE))
{
if (message.id == 1)
{
event = (RDP_EVENT*) message.wParam;
}
}
return event;
}

View File

@ -92,6 +92,7 @@ static void rdpsnd_alsa_set_params(rdpsndAlsaPlugin* alsa)
snd_pcm_hw_params_set_rate_near(alsa->pcm_handle, hw_params, &alsa->actual_rate, NULL);
snd_pcm_hw_params_set_channels_near(alsa->pcm_handle, hw_params, &alsa->actual_channels);
snd_pcm_hw_params_get_period_size(hw_params, &alsa->period_size, 0);
alsa->audio_data_left = 0;
if (alsa->latency < 0)
@ -103,7 +104,7 @@ static void rdpsnd_alsa_set_params(rdpsndAlsaPlugin* alsa)
buffer_size = alsa->actual_rate / 2; /* Minimum 0.5-second buffer */
snd_pcm_hw_params_set_buffer_size_near(alsa->pcm_handle, hw_params, &buffer_size);
//snd_pcm_hw_params_set_period_size_near(alsa->out_handle, hw_params, &alsa->period_size, NULL);
//snd_pcm_hw_params_set_period_size_near(alsa->pcm_handle, hw_params, &alsa->period_size, NULL);
snd_pcm_hw_params(alsa->pcm_handle, hw_params);
snd_pcm_hw_params_free(hw_params);

View File

@ -352,7 +352,7 @@ static void rdpsnd_process_message_wave(rdpsndPlugin* rdpsnd, STREAM* data_in)
STREAM* data;
UINT16 wTimeStamp;
rdpsnd->expectingWave = 0;
rdpsnd->expectingWave = FALSE;
CopyMemory(stream_get_head(data_in), rdpsnd->waveData, 4);

View File

@ -34,6 +34,7 @@
#include "xf_cliprdr.h"
typedef struct clipboard_format_mapping clipboardFormatMapping;
struct clipboard_format_mapping
{
Atom target_format;
@ -41,6 +42,7 @@ struct clipboard_format_mapping
};
typedef struct clipboard_context clipboardContext;
struct clipboard_context
{
rdpChannels* channels;

View File

@ -1017,21 +1017,21 @@ int xf_receive_channel_data(freerdp* instance, int channelId, BYTE* data, int si
return freerdp_channels_data(instance, channelId, data, size, flags, total_size);
}
void xf_process_channel_event(rdpChannels* chanman, freerdp* instance)
void xf_process_channel_event(rdpChannels* channels, freerdp* instance)
{
xfInfo* xfi;
RDP_EVENT* event;
xfi = ((xfContext*) instance->context)->xfi;
event = freerdp_channels_pop_event(chanman);
event = freerdp_channels_pop_event(channels);
if (event)
{
switch (event->event_class)
{
case RDP_EVENT_CLASS_RAIL:
xf_process_rail_event(xfi, chanman, event);
xf_process_rail_event(xfi, channels, event);
break;
case RDP_EVENT_CLASS_TSMF:

View File

@ -117,7 +117,7 @@ RDP_EVENT* freerdp_event_new(UINT16 event_class, UINT16 event_type,
break;
}
if (event != NULL)
if (event)
{
event->event_class = event_class;
event->event_type = event_type;
@ -168,9 +168,9 @@ static void freerdp_rail_event_free(RDP_EVENT* event)
void freerdp_event_free(RDP_EVENT* event)
{
if (event != NULL)
if (event)
{
if (event->on_event_free_callback != NULL)
if (event->on_event_free_callback)
event->on_event_free_callback(event);
switch (event->event_class)
@ -185,6 +185,7 @@ void freerdp_event_free(RDP_EVENT* event)
freerdp_rail_event_free(event);
break;
}
free(event);
}
}