Merge pull request #871 from awakecoding/master

Transport Layer Improvements
This commit is contained in:
Marc-André Moreau 2012-12-21 15:39:45 -08:00
commit b9c7a423bb
19 changed files with 927 additions and 138 deletions

View File

@ -162,16 +162,12 @@ BOOL rdp_client_connect(rdpRdp* rdp)
return FALSE;
}
rdp->transport->ProcessSinglePdu = TRUE;
while (rdp->state != CONNECTION_STATE_ACTIVE)
{
if (rdp_check_fds(rdp) < 0)
return FALSE;
}
rdp->transport->ProcessSinglePdu = FALSE;
return TRUE;
}

View File

@ -882,8 +882,8 @@ void nego_init(rdpNego* nego)
{
nego->state = NEGO_STATE_INITIAL;
nego->requested_protocols = PROTOCOL_RDP;
nego->transport->recv_callback = nego_recv;
nego->transport->recv_extra = (void*) nego;
nego->transport->ReceiveCallback = nego_recv;
nego->transport->ReceiveExtra = (void*) nego;
nego->cookie_max_length = DEFAULT_COOKIE_MAX_LENGTH;
nego->flags = 0;
}

View File

@ -380,8 +380,8 @@ void freerdp_peer_context_new(freerdp_peer* client)
transport_attach(rdp->transport, client->sockfd);
rdp->transport->recv_callback = peer_recv_callback;
rdp->transport->recv_extra = client;
rdp->transport->ReceiveCallback = peer_recv_callback;
rdp->transport->ReceiveExtra = client;
transport_set_blocking_mode(rdp->transport, FALSE);
IFCALL(client->ContextNew, client, client->context);

View File

@ -908,8 +908,8 @@ int rdp_send_channel_data(rdpRdp* rdp, int channel_id, BYTE* data, int size)
*/
void rdp_set_blocking_mode(rdpRdp* rdp, BOOL blocking)
{
rdp->transport->recv_callback = rdp_recv_callback;
rdp->transport->recv_extra = rdp;
rdp->transport->ReceiveCallback = rdp_recv_callback;
rdp->transport->ReceiveExtra = rdp;
transport_set_blocking_mode(rdp->transport, blocking);
}

View File

@ -50,7 +50,7 @@
STREAM* transport_recv_stream_init(rdpTransport* transport, int size)
{
STREAM* s = transport->recv_stream;
STREAM* s = transport->ReceiveStream;
stream_check_size(s, size);
stream_set_pos(s, 0);
return s;
@ -58,7 +58,7 @@ STREAM* transport_recv_stream_init(rdpTransport* transport, int size)
STREAM* transport_send_stream_init(rdpTransport* transport, int size)
{
STREAM* s = transport->send_stream;
STREAM* s = transport->SendStream;
stream_check_size(s, size);
stream_set_pos(s, 0);
return s;
@ -288,16 +288,72 @@ BOOL transport_accept_nla(rdpTransport* transport)
return TRUE;
}
BOOL nla_verify_header(STREAM* s)
{
if ((s->p[0] == 0x30) && (s->p[1] & 0x80))
return TRUE;
return FALSE;
}
UINT32 nla_read_header(STREAM* s)
{
UINT32 length = 0;
if (s->p[1] & 0x80)
{
if ((s->p[1] & ~(0x80)) == 1)
{
length = s->p[2];
length += 3;
stream_seek(s, 3);
}
else if ((s->p[1] & ~(0x80)) == 2)
{
length = (s->p[2] << 8) | s->p[3];
length += 4;
stream_seek(s, 4);
}
else
{
printf("Error reading TSRequest!\n");
}
}
else
{
length = s->p[1];
length += 2;
stream_seek(s, 2);
}
return length;
}
UINT32 nla_header_length(STREAM* s)
{
UINT32 length = 0;
if (s->p[1] & 0x80)
{
if ((s->p[1] & ~(0x80)) == 1)
length = 3;
else if ((s->p[1] & ~(0x80)) == 2)
length = 4;
else
printf("Error reading TSRequest!\n");
}
else
{
length = 2;
}
return length;
}
int transport_read_layer(rdpTransport* transport, UINT8* data, int bytes)
{
int status = -1;
#if 0
int read = 0;
/**
* FIXME: this breaks NLA, since the NLA packet length is improperly detected
*/
int status = -1;
while (read < bytes)
{
@ -320,33 +376,17 @@ int transport_read_layer(rdpTransport* transport, UINT8* data, int bytes)
if (status == 0)
{
/* instead of sleeping, we should wait timeout on the socket
but this only happens on initial connection */
USleep(transport->usleep_interval);
/*
* instead of sleeping, we should wait timeout on the
* socket but this only happens on initial connection
*/
USleep(transport->SleepInterval);
}
}
return read;
#else
if (transport->layer == TRANSPORT_LAYER_TLS)
status = tls_read(transport->TlsIn, data, bytes);
else if (transport->layer == TRANSPORT_LAYER_TCP)
status = tcp_read(transport->TcpIn, data, bytes);
else if (transport->layer == TRANSPORT_LAYER_TSG)
status = tsg_read(transport->tsg, data, bytes);
return status;
#endif
}
#if 0
/**
* FIXME: this breaks NLA in certain cases only, why?
*/
int transport_read(rdpTransport* transport, STREAM* s)
{
int status;
@ -377,10 +417,41 @@ int transport_read(rdpTransport* transport, STREAM* s)
/* if header is present, read in exactly one PDU */
if (s->data[0] == 0x03)
{
/* TPKT header */
pdu_bytes = (s->data[2] << 8) | s->data[3];
}
else if (s->data[0] == 0x30)
{
/* TSRequest (NLA) */
if (s->data[1] & 0x80)
{
if ((s->data[1] & ~(0x80)) == 1)
{
pdu_bytes = s->data[2];
pdu_bytes += 3;
}
else if ((s->data[1] & ~(0x80)) == 2)
{
pdu_bytes = (s->data[2] << 8) | s->data[3];
pdu_bytes += 4;
}
else
{
printf("Error reading TSRequest!\n");
}
}
else
{
pdu_bytes = s->data[1];
pdu_bytes += 2;
}
}
else
{
/* Fast-Path Header */
if (s->data[1] & 0x80)
pdu_bytes = ((s->data[1] & 0x7f) << 8) | s->data[2];
else
@ -406,59 +477,17 @@ int transport_read(rdpTransport* transport, STREAM* s)
return transport_status;
}
#else
int transport_read(rdpTransport* transport, STREAM* s)
{
int status = -1;
while (TRUE)
{
if (transport->layer == TRANSPORT_LAYER_TLS)
status = tls_read(transport->TlsIn, stream_get_tail(s), stream_get_left(s));
else if (transport->layer == TRANSPORT_LAYER_TCP)
status = tcp_read(transport->TcpIn, stream_get_tail(s), stream_get_left(s));
else if (transport->layer == TRANSPORT_LAYER_TSG)
status = tsg_read(transport->tsg, stream_get_tail(s), stream_get_left(s));
if ((status == 0) && (transport->blocking))
{
if (transport->layer == TRANSPORT_LAYER_TLS)
tls_wait_read(transport->TlsIn);
else if (transport->layer == TRANSPORT_LAYER_TCP)
tcp_wait_read(transport->TcpIn);
else
USleep(transport->usleep_interval);
continue;
}
break;
}
#ifdef WITH_DEBUG_TRANSPORT
if (status > 0)
{
printf("Local < Remote\n");
winpr_HexDump(s->data, status);
}
#endif
return status;
}
#endif
static int transport_read_nonblocking(rdpTransport* transport)
{
int status;
stream_check_size(transport->recv_buffer, 32 * 1024);
status = transport_read(transport, transport->recv_buffer);
stream_check_size(transport->ReceiveBuffer, 32 * 1024);
status = transport_read(transport, transport->ReceiveBuffer);
if (status <= 0)
return status;
stream_seek(transport->recv_buffer, status);
stream_seek(transport->ReceiveBuffer, status);
return status;
}
@ -498,7 +527,7 @@ int transport_write(rdpTransport* transport, STREAM* s)
{
/* and in case we do have buffered some data, we set the event so next loop will get it */
if (transport_read_nonblocking(transport) > 0)
SetEvent(transport->recv_event);
SetEvent(transport->ReceiveEvent);
}
if (transport->layer == TRANSPORT_LAYER_TLS)
@ -506,7 +535,7 @@ int transport_write(rdpTransport* transport, STREAM* s)
else if (transport->layer == TRANSPORT_LAYER_TCP)
tcp_wait_write(transport->TcpOut);
else
USleep(transport->usleep_interval);
USleep(transport->SleepInterval);
}
length -= status;
@ -546,7 +575,7 @@ void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount)
}
#endif
pfd = GetEventWaitObject(transport->recv_event);
pfd = GetEventWaitObject(transport->ReceiveEvent);
if (pfd)
{
@ -566,59 +595,81 @@ int transport_check_fds(rdpTransport** ptransport)
#ifdef _WIN32
WSAResetEvent(transport->TcpIn->wsa_event);
#endif
ResetEvent(transport->recv_event);
ResetEvent(transport->ReceiveEvent);
status = transport_read_nonblocking(transport);
if (status < 0)
return status;
while ((pos = stream_get_pos(transport->recv_buffer)) > 0)
while ((pos = stream_get_pos(transport->ReceiveBuffer)) > 0)
{
stream_set_pos(transport->recv_buffer, 0);
stream_set_pos(transport->ReceiveBuffer, 0);
if (tpkt_verify_header(transport->recv_buffer)) /* TPKT */
if (tpkt_verify_header(transport->ReceiveBuffer)) /* TPKT */
{
/* Ensure the TPKT header is available. */
if (pos <= 4)
{
stream_set_pos(transport->recv_buffer, pos);
stream_set_pos(transport->ReceiveBuffer, pos);
return 0;
}
length = tpkt_read_header(transport->recv_buffer);
length = tpkt_read_header(transport->ReceiveBuffer);
}
else if (nla_verify_header(transport->ReceiveBuffer))
{
/* TSRequest */
/* Ensure the TSRequest header is available. */
if (pos <= 4)
{
stream_set_pos(transport->ReceiveBuffer, pos);
return 0;
}
/* TSRequest header can be 2, 3 or 4 bytes long */
length = nla_header_length(transport->ReceiveBuffer);
if (pos < length)
{
stream_set_pos(transport->ReceiveBuffer, pos);
return 0;
}
length = nla_read_header(transport->ReceiveBuffer);
}
else /* Fast Path */
{
/* Ensure the Fast Path header is available. */
if (pos <= 2)
{
stream_set_pos(transport->recv_buffer, pos);
stream_set_pos(transport->ReceiveBuffer, pos);
return 0;
}
/* Fastpath header can be two or three bytes long. */
length = fastpath_header_length(transport->recv_buffer);
length = fastpath_header_length(transport->ReceiveBuffer);
if (pos < length)
{
stream_set_pos(transport->recv_buffer, pos);
stream_set_pos(transport->ReceiveBuffer, pos);
return 0;
}
length = fastpath_read_header(NULL, transport->recv_buffer);
length = fastpath_read_header(NULL, transport->ReceiveBuffer);
}
if (length == 0)
{
printf("transport_check_fds: protocol error, not a TPKT or Fast Path header.\n");
winpr_HexDump(stream_get_head(transport->recv_buffer), pos);
winpr_HexDump(stream_get_head(transport->ReceiveBuffer), pos);
return -1;
}
if (pos < length)
{
stream_set_pos(transport->recv_buffer, pos);
stream_set_pos(transport->ReceiveBuffer, pos);
return 0; /* Packet is not yet completely received. */
}
@ -626,40 +677,23 @@ int transport_check_fds(rdpTransport** ptransport)
* A complete packet has been received. In case there are trailing data
* for the next packet, we copy it to the new receive buffer.
*/
received = transport->recv_buffer;
transport->recv_buffer = stream_new(BUFFER_SIZE);
if (pos > length)
{
stream_set_pos(received, length);
stream_check_size(transport->recv_buffer, pos - length);
stream_copy(transport->recv_buffer, received, pos - length);
}
received = transport->ReceiveBuffer;
transport->ReceiveBuffer = transport_receive_pool_take(transport);
stream_set_pos(received, length);
stream_seal(received);
stream_set_pos(received, 0);
if (transport->recv_callback(transport, received, transport->recv_extra) == FALSE)
if (transport->ReceiveCallback(transport, received, transport->ReceiveExtra) == FALSE)
status = -1;
stream_free(received);
transport_receive_pool_return(transport, received);
if (status < 0)
return status;
/* transport might now have been freed by rdp_client_redirect and a new rdp->transport created */
transport = *ptransport;
if (transport->ProcessSinglePdu)
{
/* one at a time but set event if data buffered
* so the main loop will call freerdp_check_fds asap */
if (stream_get_pos(transport->recv_buffer) > 0)
SetEvent(transport->recv_event);
break;
}
}
return 0;
@ -690,6 +724,27 @@ BOOL transport_set_blocking_mode(rdpTransport* transport, BOOL blocking)
return status;
}
STREAM* transport_receive_pool_take(rdpTransport* transport)
{
STREAM* pdu = NULL;
if (WaitForSingleObject(Queue_Event(transport->ReceivePool), 0) == WAIT_OBJECT_0)
pdu = Queue_Dequeue(transport->ReceivePool);
if (!pdu)
pdu = stream_new(BUFFER_SIZE);
pdu->p = pdu->data;
return pdu;
}
int transport_receive_pool_return(rdpTransport* transport, STREAM* pdu)
{
Queue_Enqueue(transport->ReceivePool, pdu);
return 0;
}
rdpTransport* transport_new(rdpSettings* settings)
{
rdpTransport* transport;
@ -704,19 +759,24 @@ rdpTransport* transport_new(rdpSettings* settings)
transport->settings = settings;
/* a small 0.1ms delay when transport is blocking. */
transport->usleep_interval = 100;
transport->SleepInterval = 100;
/* receive buffer for non-blocking read. */
transport->recv_buffer = stream_new(BUFFER_SIZE);
transport->recv_event = CreateEvent(NULL, TRUE, FALSE, NULL);
transport->ReceiveBuffer = stream_new(BUFFER_SIZE);
transport->ReceiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
/* buffers for blocking read/write */
transport->recv_stream = stream_new(BUFFER_SIZE);
transport->send_stream = stream_new(BUFFER_SIZE);
transport->ReceiveStream = stream_new(BUFFER_SIZE);
transport->SendStream = stream_new(BUFFER_SIZE);
transport->blocking = TRUE;
transport->layer = TRANSPORT_LAYER_TCP;
transport->ReceivePool = Queue_New(TRUE, -1, -1);
transport->ReceiveQueue = Queue_New(TRUE, -1, -1);
Queue_Object(transport->ReceivePool)->fnObjectFree = (OBJECT_FREE_FN) stream_free;
Queue_Object(transport->ReceiveQueue)->fnObjectFree = (OBJECT_FREE_FN) stream_free;
}
return transport;
@ -726,10 +786,10 @@ void transport_free(rdpTransport* transport)
{
if (transport != NULL)
{
stream_free(transport->recv_buffer);
stream_free(transport->recv_stream);
stream_free(transport->send_stream);
CloseHandle(transport->recv_event);
stream_free(transport->ReceiveBuffer);
stream_free(transport->ReceiveStream);
stream_free(transport->SendStream);
CloseHandle(transport->ReceiveEvent);
if (transport->TlsIn)
tls_free(transport->TlsIn);
@ -744,6 +804,9 @@ void transport_free(rdpTransport* transport)
tsg_free(transport->tsg);
Queue_Free(transport->ReceivePool);
Queue_Free(transport->ReceiveQueue);
free(transport);
}
}

View File

@ -36,6 +36,8 @@ typedef struct rdp_transport rdpTransport;
#include "gateway/tsg.h"
#include <winpr/sspi.h>
#include <winpr/collections.h>
#include <freerdp/crypto/tls.h>
#include <time.h>
@ -47,8 +49,6 @@ typedef BOOL (*TransportRecv) (rdpTransport* transport, STREAM* stream, void* ex
struct rdp_transport
{
STREAM* recv_stream;
STREAM* send_stream;
TRANSPORT_LAYER layer;
rdpTsg* tsg;
rdpTcp* TcpIn;
@ -57,14 +57,18 @@ struct rdp_transport
rdpTls* TlsOut;
rdpCredssp* credssp;
rdpSettings* settings;
UINT32 usleep_interval;
void* recv_extra;
STREAM* recv_buffer;
TransportRecv recv_callback;
HANDLE recv_event;
UINT32 SleepInterval;
STREAM* SendStream;
STREAM* ReceiveStream;
void* ReceiveExtra;
STREAM* ReceiveBuffer;
TransportRecv ReceiveCallback;
HANDLE ReceiveEvent;
BOOL blocking;
BOOL ProcessSinglePdu;
BOOL SplitInputOutput;
wQueue* ReceivePool;
wQueue* ReceiveQueue;
};
STREAM* transport_recv_stream_init(rdpTransport* transport, int size);
@ -84,6 +88,10 @@ int transport_write(rdpTransport* transport, STREAM* s);
void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount);
int transport_check_fds(rdpTransport** ptransport);
BOOL transport_set_blocking_mode(rdpTransport* transport, BOOL blocking);
STREAM* transport_receive_pool_take(rdpTransport* transport);
int transport_receive_pool_return(rdpTransport* transport, STREAM* pdu);
rdpTransport* transport_new(rdpSettings* settings);
void transport_free(rdpTransport* transport);

177
winpr/include/winpr/pool.h Normal file
View File

@ -0,0 +1,177 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef WINPR_POOL_H
#define WINPR_POOL_H
#include <winpr/winpr.h>
#include <winpr/wtypes.h>
#include <winpr/synch.h>
#include <winpr/thread.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
typedef DWORD TP_VERSION, *PTP_VERSION;
typedef struct _TP_CALLBACK_INSTANCE TP_CALLBACK_INSTANCE, *PTP_CALLBACK_INSTANCE;
typedef VOID (*PTP_SIMPLE_CALLBACK)(PTP_CALLBACK_INSTANCE Instance, PVOID Context);
typedef struct _TP_POOL TP_POOL, *PTP_POOL;
typedef enum _TP_CALLBACK_PRIORITY
{
TP_CALLBACK_PRIORITY_HIGH,
TP_CALLBACK_PRIORITY_NORMAL,
TP_CALLBACK_PRIORITY_LOW,
TP_CALLBACK_PRIORITY_INVALID,
TP_CALLBACK_PRIORITY_COUNT = TP_CALLBACK_PRIORITY_INVALID
} TP_CALLBACK_PRIORITY;
typedef struct _TP_POOL_STACK_INFORMATION
{
SIZE_T StackReserve;
SIZE_T StackCommit;
} TP_POOL_STACK_INFORMATION, *PTP_POOL_STACK_INFORMATION;
typedef struct _TP_CLEANUP_GROUP TP_CLEANUP_GROUP, *PTP_CLEANUP_GROUP;
typedef VOID (*PTP_CLEANUP_GROUP_CANCEL_CALLBACK)(PVOID ObjectContext, PVOID CleanupContext);
typedef struct _TP_CALLBACK_ENVIRON_V3
{
TP_VERSION Version;
PTP_POOL Pool;
PTP_CLEANUP_GROUP CleanupGroup;
PTP_CLEANUP_GROUP_CANCEL_CALLBACK CleanupGroupCancelCallback;
PVOID RaceDll;
struct _ACTIVATION_CONTEXT *ActivationContext;
PTP_SIMPLE_CALLBACK FinalizationCallback;
union
{
DWORD Flags;
struct
{
DWORD LongFunction:1;
DWORD Persistent:1;
DWORD Private:30;
} s;
} u;
TP_CALLBACK_PRIORITY CallbackPriority;
DWORD Size;
} TP_CALLBACK_ENVIRON_V3;
typedef TP_CALLBACK_ENVIRON_V3 TP_CALLBACK_ENVIRON, *PTP_CALLBACK_ENVIRON;
typedef struct _TP_WORK TP_WORK, *PTP_WORK;
typedef VOID (*PTP_WORK_CALLBACK)(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work);
typedef struct _TP_TIMER TP_TIMER, *PTP_TIMER;
typedef VOID (*PTP_TIMER_CALLBACK)(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_TIMER Timer);
typedef DWORD TP_WAIT_RESULT;
typedef struct _TP_WAIT TP_WAIT, *PTP_WAIT;
typedef VOID (*PTP_WAIT_CALLBACK)(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult);
typedef struct _TP_IO TP_IO, *PTP_IO;
typedef VOID (*PTP_WIN32_IO_CALLBACK)(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PVOID Overlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO Io);
/* Synch */
WINPR_API PTP_WAIT CreateThreadpoolWait(PTP_WAIT_CALLBACK pfnwa, PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID CloseThreadpoolWait(PTP_WAIT pwa);
WINPR_API VOID SetThreadpoolWait(PTP_WAIT pwa, HANDLE h, PFILETIME pftTimeout);
WINPR_API VOID WaitForThreadpoolWaitCallbacks(PTP_WAIT pwa, BOOL fCancelPendingCallbacks);
/* Work */
WINPR_API PTP_WORK CreateThreadpoolWork(PTP_WORK_CALLBACK pfnwk, PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID CloseThreadpoolWork(PTP_WORK pwk);
WINPR_API VOID SubmitThreadpoolWork(PTP_WORK pwk);
WINPR_API BOOL TrySubmitThreadpoolCallback(PTP_SIMPLE_CALLBACK pfns, PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID WaitForThreadpoolWorkCallbacks(PTP_WORK pwk, BOOL fCancelPendingCallbacks);
/* Timer */
WINPR_API PTP_TIMER CreateThreadpoolTimer(PTP_TIMER_CALLBACK pfnti, PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID CloseThreadpoolTimer(PTP_TIMER pti);
WINPR_API BOOL IsThreadpoolTimerSet(PTP_TIMER pti);
WINPR_API VOID SetThreadpoolTimer(PTP_TIMER pti, PFILETIME pftDueTime, DWORD msPeriod, DWORD msWindowLength);
WINPR_API VOID WaitForThreadpoolTimerCallbacks(PTP_TIMER pti, BOOL fCancelPendingCallbacks);
/* I/O */
WINPR_API PTP_IO CreateThreadpoolIo(HANDLE fl, PTP_WIN32_IO_CALLBACK pfnio, PVOID pv, PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID CloseThreadpoolIo(PTP_IO pio);
WINPR_API VOID StartThreadpoolIo(PTP_IO pio);
WINPR_API VOID CancelThreadpoolIo(PTP_IO pio);
WINPR_API VOID WaitForThreadpoolIoCallbacks(PTP_IO pio, BOOL fCancelPendingCallbacks);
/* Clean-up Group */
WINPR_API PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup();
VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPendingCallbacks, PVOID pvCleanupContext);
VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg);
/* Pool */
WINPR_API PTP_POOL CreateThreadpool(PVOID reserved);
WINPR_API VOID CloseThreadpool(PTP_POOL ptpp);
WINPR_API VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost);
WINPR_API BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic);
/* Callback Environment */
WINPR_API VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp);
WINPR_API VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe,
PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng);
WINPR_API VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe);
WINPR_API VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod);
WINPR_API VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority);
/* Callback */
WINPR_API BOOL CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci);
/* Callback Clean-up */
WINPR_API VOID SetEventWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE evt);
WINPR_API VOID ReleaseSemaphoreWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE sem, DWORD crel);
WINPR_API VOID ReleaseMutexWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE mut);
WINPR_API VOID LeaveCriticalSectionWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, PCRITICAL_SECTION pcs);
WINPR_API VOID FreeLibraryWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HMODULE mod);
WINPR_API VOID DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci);
#endif
WINPR_API void winpr_pool_dummy();
#endif /* WINPR_POOL_H */

View File

@ -0,0 +1,60 @@
# WinPR: Windows Portable Runtime
# libwinpr-thread cmake build script
#
# Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set(MODULE_NAME "winpr-pool")
set(MODULE_PREFIX "WINPR_POOL")
set(${MODULE_PREFIX}_SRCS
synch.c
work.c
timer.c
io.c
cleanup_group.c
pool.c
callback_environment.c
callback.c
callback_cleanup.c)
if(MSVC AND (NOT MONOLITHIC_BUILD))
set(${MODULE_PREFIX}_SRCS ${${MODULE_PREFIX}_SRCS} module.def)
endif()
add_complex_library(MODULE ${MODULE_NAME} TYPE "OBJECT"
MONOLITHIC ${MONOLITHIC_BUILD}
SOURCES ${${MODULE_PREFIX}_SRCS})
set_target_properties(${MODULE_NAME} PROPERTIES VERSION ${WINPR_VERSION_FULL} SOVERSION ${WINPR_VERSION} PREFIX "lib")
set(${MODULE_PREFIX}_LIBS
${CMAKE_THREAD_LIBS_INIT}
${CMAKE_DL_LIBS})
if(${CMAKE_SYSTEM_NAME} MATCHES SunOS)
set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} rt)
endif()
if(MONOLITHIC_BUILD)
set(WINPR_LIBS ${WINPR_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE)
else()
set(${MODULE_PREFIX}_LIBS ${${MODULE_PREFIX}_LIBS} winpr-thread winpr-synch)
target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS})
install(TARGETS ${MODULE_NAME} DESTINATION ${CMAKE_INSTALL_LIBDIR})
endif()
set_property(TARGET ${MODULE_NAME} PROPERTY FOLDER "WinPR")

View File

@ -0,0 +1,9 @@
set(MINWIN_LAYER "1")
set(MINWIN_GROUP "core")
set(MINWIN_MAJOR_VERSION "2")
set(MINWIN_MINOR_VERSION "1")
set(MINWIN_SHORT_NAME "threadpool")
set(MINWIN_LONG_NAME "Thread Pool API")
set(MODULE_LIBRARY_NAME "api-ms-win-${MINWIN_GROUP}-${MINWIN_SHORT_NAME}-l${MINWIN_LAYER}-${MINWIN_MAJOR_VERSION}-${MINWIN_MINOR_VERSION}")

View File

@ -0,0 +1,34 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Callback)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
BOOL CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci)
{
return FALSE;
}
#endif

View File

@ -0,0 +1,60 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Callback Clean-up)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
VOID SetEventWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE evt)
{
}
VOID ReleaseSemaphoreWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE sem, DWORD crel)
{
}
VOID ReleaseMutexWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HANDLE mut)
{
}
VOID LeaveCriticalSectionWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, PCRITICAL_SECTION pcs)
{
}
VOID FreeLibraryWhenCallbackReturns(PTP_CALLBACK_INSTANCE pci, HMODULE mod)
{
}
VOID DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci)
{
}
#endif

View File

@ -0,0 +1,65 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Callback Environment)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
VOID InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
{
}
VOID DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
{
}
VOID SetThreadpoolCallbackPool(PTP_CALLBACK_ENVIRON pcbe, PTP_POOL ptpp)
{
}
VOID SetThreadpoolCallbackCleanupGroup(PTP_CALLBACK_ENVIRON pcbe, PTP_CLEANUP_GROUP ptpcg, PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng)
{
}
VOID SetThreadpoolCallbackRunsLong(PTP_CALLBACK_ENVIRON pcbe)
{
}
VOID SetThreadpoolCallbackLibrary(PTP_CALLBACK_ENVIRON pcbe, PVOID mod)
{
}
VOID SetThreadpoolCallbackPriority(PTP_CALLBACK_ENVIRON pcbe, TP_CALLBACK_PRIORITY Priority)
{
}
#endif

View File

@ -0,0 +1,45 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Clean-up Group)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup()
{
return NULL;
}
VOID CloseThreadpoolCleanupGroupMembers(PTP_CLEANUP_GROUP ptpcg, BOOL fCancelPendingCallbacks, PVOID pvCleanupContext)
{
}
VOID CloseThreadpoolCleanupGroup(PTP_CLEANUP_GROUP ptpcg)
{
}
#endif

55
winpr/libwinpr/pool/io.c Normal file
View File

@ -0,0 +1,55 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (I/O)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_IO CreateThreadpoolIo(HANDLE fl, PTP_WIN32_IO_CALLBACK pfnio, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
return NULL;
}
VOID CloseThreadpoolIo(PTP_IO pio)
{
}
VOID StartThreadpoolIo(PTP_IO pio)
{
}
VOID CancelThreadpoolIo(PTP_IO pio)
{
}
VOID WaitForThreadpoolIoCallbacks(PTP_IO pio, BOOL fCancelPendingCallbacks)
{
}
#endif

View File

@ -0,0 +1,3 @@
LIBRARY "libwinpr-pool"
EXPORTS

View File

@ -0,0 +1,54 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Pool)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_POOL CreateThreadpool(PVOID reserved)
{
return NULL;
}
VOID CloseThreadpool(PTP_POOL ptpp)
{
}
VOID SetThreadpoolThreadMaximum(PTP_POOL ptpp, DWORD cthrdMost)
{
}
BOOL SetThreadpoolThreadMinimum(PTP_POOL ptpp, DWORD cthrdMic)
{
return FALSE;
}
#endif
void winpr_pool_dummy()
{
}

View File

@ -0,0 +1,50 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Synch)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_WAIT CreateThreadpoolWait(PTP_WAIT_CALLBACK pfnwa, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
return NULL;
}
VOID CloseThreadpoolWait(PTP_WAIT pwa)
{
}
VOID SetThreadpoolWait(PTP_WAIT pwa, HANDLE h, PFILETIME pftTimeout)
{
}
VOID WaitForThreadpoolWaitCallbacks(PTP_WAIT pwa, BOOL fCancelPendingCallbacks)
{
}
#endif

View File

@ -0,0 +1,55 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Timer)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_TIMER CreateThreadpoolTimer(PTP_TIMER_CALLBACK pfnti, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
return NULL;
}
VOID CloseThreadpoolTimer(PTP_TIMER pti)
{
}
BOOL IsThreadpoolTimerSet(PTP_TIMER pti)
{
return FALSE;
}
VOID SetThreadpoolTimer(PTP_TIMER pti, PFILETIME pftDueTime, DWORD msPeriod, DWORD msWindowLength)
{
}
VOID WaitForThreadpoolTimerCallbacks(PTP_TIMER pti, BOOL fCancelPendingCallbacks)
{
}
#endif

View File

@ -0,0 +1,55 @@
/**
* WinPR: Windows Portable Runtime
* Thread Pool API (Work)
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#if (!(defined _WIN32 && (_WIN32_WINNT < 0x0600)))
PTP_WORK CreateThreadpoolWork(PTP_WORK_CALLBACK pfnwk, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
return NULL;
}
VOID CloseThreadpoolWork(PTP_WORK pwk)
{
}
VOID SubmitThreadpoolWork(PTP_WORK pwk)
{
}
BOOL TrySubmitThreadpoolCallback(PTP_SIMPLE_CALLBACK pfns, PVOID pv, PTP_CALLBACK_ENVIRON pcbe)
{
return FALSE;
}
VOID WaitForThreadpoolWorkCallbacks(PTP_WORK pwk, BOOL fCancelPendingCallbacks)
{
}
#endif