libwinpr-synch: implement pipe-based semaphore

This commit is contained in:
Marc-André Moreau 2012-11-28 12:47:04 -05:00
parent 335add832d
commit d0792ea4d1
9 changed files with 297 additions and 57 deletions

View File

@ -35,6 +35,8 @@ set(${MODULE_PREFIX}_GATEWAY_SRCS
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc.h
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_bind.c
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_bind.h
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_client.c
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_client.h
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_fault.c
${${MODULE_PREFIX}_GATEWAY_DIR}/rpc_fault.h
${${MODULE_PREFIX}_GATEWAY_DIR}/rts.c

View File

@ -38,6 +38,7 @@
#include "ncacn_http.h"
#include "rpc_bind.h"
#include "rpc_fault.h"
#include "rpc_client.h"
#include "rpc.h"
@ -242,42 +243,6 @@ BOOL rpc_get_stub_data_info(rdpRpc* rpc, BYTE* buffer, UINT32* offset, UINT32* l
return TRUE;
}
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
{
RPC_PDU_ENTRY* PduEntry;
PduEntry = (RPC_PDU_ENTRY*) _aligned_malloc(sizeof(RPC_PDU_ENTRY), MEMORY_ALLOCATION_ALIGNMENT);
PduEntry->Buffer = buffer;
PduEntry->Length = length;
InterlockedPushEntrySList(rpc->SendQueue, &(PduEntry->ItemEntry));
return 0;
}
int rpc_send_dequeue_pdu(rdpRpc* rpc)
{
int status;
RPC_PDU_ENTRY* PduEntry;
PduEntry = (RPC_PDU_ENTRY*) InterlockedPopEntrySList(rpc->SendQueue);
status = rpc_in_write(rpc, PduEntry->Buffer, PduEntry->Length);
/*
* This protocol specifies that only RPC PDUs are subject to the flow control abstract
* data model. RTS PDUs and the HTTP request and response headers are not subject to flow control.
* Implementations of this protocol MUST NOT include them when computing any of the variables
* specified by this abstract data model.
*/
rpc->VirtualConnection->DefaultInChannel->BytesSent += status;
rpc->VirtualConnection->DefaultInChannel->SenderAvailableWindow -= status;
_aligned_free(PduEntry);
return status;
}
int rpc_out_read(rdpRpc* rpc, BYTE* data, int length)
{
int status;

View File

@ -700,6 +700,19 @@ typedef struct _RPC_PDU_ENTRY
UINT32 Length;
} RPC_PDU_ENTRY, *PRPC_PDU_ENTRY;
struct rpc_client
{
HANDLE Thread;
HANDLE StopEvent;
HANDLE SendEvent;
HANDLE SendSemaphore;
HANDLE ReceiveEvent;
HANDLE ReceiveSemaphore;
};
typedef struct rpc_client RpcClient;
struct rdp_rpc
{
RPC_CLIENT_STATE State;
@ -710,6 +723,8 @@ struct rdp_rpc
rdpNtlm* ntlm;
int send_seq_num;
RpcClient* client;
rdpNtlmHttp* NtlmHttpIn;
rdpNtlmHttp* NtlmHttpOut;

View File

@ -0,0 +1,118 @@
/**
* FreeRDP: A Remote Desktop Protocol Implementation
* RPC Client
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <winpr/crt.h>
#include <winpr/synch.h>
#include <winpr/thread.h>
#include "rpc_client.h"
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
{
RPC_PDU_ENTRY* PduEntry;
PduEntry = (RPC_PDU_ENTRY*) _aligned_malloc(sizeof(RPC_PDU_ENTRY), MEMORY_ALLOCATION_ALIGNMENT);
PduEntry->Buffer = buffer;
PduEntry->Length = length;
InterlockedPushEntrySList(rpc->SendQueue, &(PduEntry->ItemEntry));
return 0;
}
int rpc_send_dequeue_pdu(rdpRpc* rpc)
{
int status;
RPC_PDU_ENTRY* PduEntry;
PduEntry = (RPC_PDU_ENTRY*) InterlockedPopEntrySList(rpc->SendQueue);
status = rpc_in_write(rpc, PduEntry->Buffer, PduEntry->Length);
/*
* This protocol specifies that only RPC PDUs are subject to the flow control abstract
* data model. RTS PDUs and the HTTP request and response headers are not subject to flow control.
* Implementations of this protocol MUST NOT include them when computing any of the variables
* specified by this abstract data model.
*/
rpc->VirtualConnection->DefaultInChannel->BytesSent += status;
rpc->VirtualConnection->DefaultInChannel->SenderAvailableWindow -= status;
_aligned_free(PduEntry);
return status;
}
static void* rpc_client_thread(void* arg)
{
rdpRpc* rpc;
DWORD status;
HANDLE events[3];
rpc = (rdpRpc*) arg;
events[0] = rpc->client->StopEvent;
events[1] = rpc->client->SendEvent;
events[2] = rpc->client->ReceiveEvent;
while (1)
{
status = WaitForMultipleObjects(3, events, FALSE, INFINITE);
if (WaitForSingleObject(rpc->client->StopEvent, 0) == WAIT_OBJECT_0)
break;
if (WaitForSingleObject(rpc->client->SendEvent, 0) == WAIT_OBJECT_0)
{
rpc_send_dequeue_pdu(rpc);
}
if (WaitForSingleObject(rpc->client->ReceiveEvent, 0) == WAIT_OBJECT_0)
{
}
}
return NULL;
}
int rpc_client_start(rdpRpc* rpc)
{
rpc->client = (RpcClient*) malloc(sizeof(RpcClient));
rpc->client->Thread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) rpc_client_thread,
rpc, CREATE_SUSPENDED, NULL);
rpc->client->SendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
rpc->client->ReceiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
rpc->client->StopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
ResumeThread(rpc->client->Thread);
return 0;
}

View File

@ -0,0 +1,30 @@
/**
* FreeRDP: A Remote Desktop Protocol Implementation
* RPC Client
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FREERDP_CORE_RPC_CLIENT_H
#define FREERDP_CORE_RPC_CLIENT_H
#include "rpc.h"
#include <winpr/interlocked.h>
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length);
int rpc_send_dequeue_pdu(rdpRpc* rpc);
#endif /* FREERDP_CORE_RPC_CLIENT_H */

View File

@ -78,10 +78,32 @@ BOOL CloseHandle(HANDLE hObject)
}
else if (Type == HANDLE_TYPE_SEMAPHORE)
{
#if defined __APPLE__
semaphore_destroy(mach_task_self(), *((winpr_sem_t*) Object));
WINPR_SEMAPHORE* semaphore;
semaphore = (WINPR_SEMAPHORE*) Object;
#ifdef WINPR_PIPE_SEMAPHORE
if (semaphore->pipe_fd[0] != -1)
{
close(semaphore->pipe_fd[0]);
semaphore->pipe_fd[0] = -1;
if (semaphore->pipe_fd[1] != -1)
{
close(semaphore->pipe_fd[1]);
semaphore->pipe_fd[1] = -1;
}
}
#else
sem_destroy((winpr_sem_t*) Object);
#if defined __APPLE__
semaphore_destroy(mach_task_self(), *((winpr_sem_t*) semaphore->sem));
#else
sem_destroy((winpr_sem_t*) semaphore->sem);
#endif
#endif
winpr_Handle_Remove(Object);
free(Object);

View File

@ -25,29 +25,49 @@
#include "synch.h"
/**
* CreateSemaphoreExA
* CreateSemaphoreExW
* OpenSemaphoreA
* OpenSemaphoreW
* ReleaseSemaphore
*/
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
#ifndef _WIN32
HANDLE CreateSemaphoreW(LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, LONG lInitialCount, LONG lMaximumCount, LPCWSTR lpName)
{
HANDLE handle;
winpr_sem_t* semaphore;
WINPR_SEMAPHORE* semaphore;
semaphore = (winpr_sem_t*) malloc(sizeof(winpr_sem_t));
semaphore = (WINPR_SEMAPHORE*) malloc(sizeof(WINPR_SEMAPHORE));
semaphore->pipe_fd[0] = -1;
semaphore->pipe_fd[0] = -1;
semaphore->sem = (winpr_sem_t*) NULL;
if (semaphore)
{
#if defined __APPLE__
semaphore_create(mach_task_self(), semaphore, SYNC_POLICY_FIFO, lMaximumCount);
#ifdef WINPR_PIPE_SEMAPHORE
if (pipe(semaphore->pipe_fd) < 0)
{
printf("CreateSemaphoreW: failed to create semaphore\n");
return NULL;
}
while (lInitialCount > 0)
{
if (write(semaphore->pipe_fd[1], "-", 1) != 1)
return FALSE;
lInitialCount--;
}
#else
sem_init(semaphore, 0, lMaximumCount);
semaphore->sem = (winpr_sem_t*) malloc(sizeof(winpr_sem_t));
#if defined __APPLE__
semaphore_create(mach_task_self(), semaphore->sem, SYNC_POLICY_FIFO, lMaximumCount);
#else
sem_init(semaphore->sem, 0, lMaximumCount);
#endif
#endif
}
@ -75,16 +95,36 @@ BOOL ReleaseSemaphore(HANDLE hSemaphore, LONG lReleaseCount, LPLONG lpPreviousCo
{
ULONG Type;
PVOID Object;
WINPR_SEMAPHORE* semaphore;
if (!winpr_Handle_GetInfo(hSemaphore, &Type, &Object))
return FALSE;
if (Type == HANDLE_TYPE_SEMAPHORE)
{
#if defined __APPLE__
semaphore_signal(*((winpr_sem_t*) Object));
semaphore = (WINPR_SEMAPHORE*) Object;
#ifdef WINPR_PIPE_SEMAPHORE
if (semaphore->pipe_fd[0] != -1)
{
while (lReleaseCount > 0)
{
if (write(semaphore->pipe_fd[1], "-", 1) != 1)
return FALSE;
lReleaseCount--;
}
}
#else
sem_post((winpr_sem_t*) Object);
#if defined __APPLE__
semaphore_signal(*((winpr_sem_t*) semaphore->sem));
#else
sem_post((winpr_sem_t*) semaphore->sem);
#endif
#endif
return TRUE;
}

View File

@ -24,6 +24,8 @@
#ifndef _WIN32
#define WINPR_PIPE_SEMAPHORE 1
#if defined __APPLE__
#include <pthread.h>
#include <semaphore.h>
@ -37,6 +39,13 @@
#define winpr_sem_t sem_t
#endif
struct winpr_semaphore
{
int pipe_fd[2];
winpr_sem_t* sem;
};
typedef struct winpr_semaphore WINPR_SEMAPHORE;
struct winpr_event
{
int pipe_fd[2];

View File

@ -90,10 +90,49 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds)
}
else if (Type == HANDLE_TYPE_SEMAPHORE)
{
#if defined __APPLE__
semaphore_wait(*((winpr_sem_t*) Object));
WINPR_SEMAPHORE* semaphore;
semaphore = (WINPR_SEMAPHORE*) Object;
#ifdef WINPR_PIPE_SEMAPHORE
if (semaphore->pipe_fd[0] != -1)
{
int status;
int length;
fd_set rfds;
struct timeval timeout;
FD_ZERO(&rfds);
FD_SET(semaphore->pipe_fd[0], &rfds);
ZeroMemory(&timeout, sizeof(timeout));
if ((dwMilliseconds != INFINITE) && (dwMilliseconds != 0))
{
timeout.tv_usec = dwMilliseconds * 1000;
}
status = select(semaphore->pipe_fd[0] + 1, &rfds, 0, 0,
(dwMilliseconds == INFINITE) ? NULL : &timeout);
if (status < 0)
return WAIT_FAILED;
if (status != 1)
return WAIT_TIMEOUT;
length = read(semaphore->pipe_fd[0], &length, 1);
if (length != 1)
return FALSE;
}
#else
sem_wait((winpr_sem_t*) Object);
#if defined __APPLE__
semaphore_wait(*((winpr_sem_t*) semaphore->sem));
#else
sem_wait((winpr_sem_t*) semaphore->sem);
#endif
#endif
}