libwinpr-utils: add PubSub multiplexing and synchronized access

This commit is contained in:
Marc-André Moreau 2013-06-15 18:18:02 -04:00
parent 59b7c53c5f
commit e77d4657e4
4 changed files with 76 additions and 14 deletions

View File

@ -349,11 +349,14 @@ typedef struct _wEventArgs wEventArgs;
typedef void (*pEventHandler)(void* context, wEventArgs* e);
#define MAX_EVENT_HANDLERS 32
struct _wEvent
{
const char* EventName;
pEventHandler EventHandler;
wEventArgs EventArgs;
int EventHandlerCount;
pEventHandler EventHandlers[MAX_EVENT_HANDLERS];
};
typedef struct _wEvent wEvent;
@ -369,7 +372,7 @@ typedef struct _wEvent wEvent;
DEFINE_EVENT_HANDLER(_name);
#define DEFINE_EVENT_ENTRY(_name) \
{ #_name, NULL, { sizeof( _name ## EventArgs) } },
{ #_name, { sizeof( _name ## EventArgs) }, 0, { } },
struct _wPubSub
{
@ -382,7 +385,11 @@ struct _wPubSub
};
typedef struct _wPubSub wPubSub;
WINPR_API BOOL PubSub_Lock(wPubSub* pubSub);
WINPR_API BOOL PubSub_Unlock(wPubSub* pubSub);
WINPR_API wEvent* PubSub_Events(wPubSub* pubSub, int* count);
WINPR_API wEvent* PubSub_FindEvent(wPubSub* pubSub, const char* EventName);
WINPR_API void PubSub_Publish(wPubSub* pubSub, wEvent* events, int count);

View File

@ -32,7 +32,7 @@
#define HANDLE_TYPE_ANONYMOUS_PIPE 7
#define WINPR_HANDLE_DEF() \
ULONG Type;
ULONG Type
struct winpr_handle
{

View File

@ -38,8 +38,6 @@
BOOL CreatePipe(PHANDLE hReadPipe, PHANDLE hWritePipe, LPSECURITY_ATTRIBUTES lpPipeAttributes, DWORD nSize)
{
void* ptr;
HANDLE handle;
int pipe_fd[2];
WINPR_PIPE* pReadPipe;
WINPR_PIPE* pWritePipe;

View File

@ -41,6 +41,16 @@ wEvent* PubSub_Events(wPubSub* pubSub, int* count)
* Methods
*/
BOOL PubSub_Lock(wPubSub* pubSub)
{
return (WaitForSingleObject(pubSub->mutex, INFINITE) == WAIT_OBJECT_0) ? TRUE : FALSE;
}
BOOL PubSub_Unlock(wPubSub* pubSub)
{
return ReleaseMutex(pubSub->mutex);
}
wEvent* PubSub_FindEvent(wPubSub* pubSub, const char* EventName)
{
int index;
@ -60,6 +70,9 @@ wEvent* PubSub_FindEvent(wPubSub* pubSub, const char* EventName)
void PubSub_Publish(wPubSub* pubSub, wEvent* events, int count)
{
if (pubSub->synchronized)
PubSub_Lock(pubSub);
while (pubSub->count + count >= pubSub->size)
{
pubSub->size *= 2;
@ -68,6 +81,9 @@ void PubSub_Publish(wPubSub* pubSub, wEvent* events, int count)
CopyMemory(&pubSub->events[pubSub->count], events, count * sizeof(wEvent));
pubSub->count += count;
if (pubSub->synchronized)
PubSub_Unlock(pubSub);
}
int PubSub_Subscribe(wPubSub* pubSub, const char* EventName, pEventHandler EventHandler)
@ -75,53 +91,94 @@ int PubSub_Subscribe(wPubSub* pubSub, const char* EventName, pEventHandler Event
wEvent* event;
int status = -1;
if (pubSub->synchronized)
PubSub_Lock(pubSub);
event = PubSub_FindEvent(pubSub, EventName);
if (event)
{
event->EventHandler = EventHandler;
status = 0;
if (event->EventHandlerCount <= MAX_EVENT_HANDLERS)
{
event->EventHandlers[event->EventHandlerCount] = EventHandler;
event->EventHandlerCount++;
}
else
{
status = -1;
}
}
if (pubSub->synchronized)
PubSub_Unlock(pubSub);
return status;
}
int PubSub_Unsubscribe(wPubSub* pubSub, const char* EventName, pEventHandler EventHandler)
{
int index;
wEvent* event;
int status = -1;
if (pubSub->synchronized)
PubSub_Lock(pubSub);
event = PubSub_FindEvent(pubSub, EventName);
if (event)
{
event->EventHandler = NULL;
status = 0;
for (index = 0; index < event->EventHandlerCount; index++)
{
if (event->EventHandlers[index] == EventHandler)
{
event->EventHandlers[index] = NULL;
event->EventHandlerCount--;
MoveMemory(&event->EventHandlers[index], &event->EventHandlers[index + 1],
(MAX_EVENT_HANDLERS - index - 1) * sizeof(wEvent));
status = 1;
}
}
}
if (pubSub->synchronized)
PubSub_Unlock(pubSub);
return status;
}
int PubSub_OnEvent(wPubSub* pubSub, const char* EventName, void* context, wEventArgs* e)
{
int index;
wEvent* event;
int status = -1;
if (pubSub->synchronized)
PubSub_Lock(pubSub);
event = PubSub_FindEvent(pubSub, EventName);
if (event)
{
if (event->EventHandler)
status = 0;
for (index = 0; index < event->EventHandlerCount; index++)
{
event->EventHandler(context, e);
if (event->EventHandlers[index])
{
event->EventHandlers[index](context, e);
status = 1;
}
else
{
status = 0;
}
}
if (pubSub->synchronized)
PubSub_Unlock(pubSub);
return status;
}