libfreerdp-core: added ObjectPool

This commit is contained in:
Marc-André Moreau 2013-02-14 20:39:56 -05:00
parent cfa706cace
commit e42b1272ef
8 changed files with 206 additions and 36 deletions

View File

@ -439,6 +439,7 @@ BOOL rdp_client_connect_mcs_channel_join_confirm(rdpRdp* rdp, STREAM* s)
{
if (channel_id != rdp->mcs->user_id)
return FALSE;
rdp->mcs->user_channel_joined = TRUE;
if (!mcs_send_channel_join_request(rdp->mcs, MCS_GLOBAL_CHANNEL_ID))
@ -448,6 +449,7 @@ BOOL rdp_client_connect_mcs_channel_join_confirm(rdpRdp* rdp, STREAM* s)
{
if (channel_id != MCS_GLOBAL_CHANNEL_ID)
return FALSE;
rdp->mcs->global_channel_joined = TRUE;
if (rdp->settings->ChannelCount > 0)
@ -471,6 +473,7 @@ BOOL rdp_client_connect_mcs_channel_join_confirm(rdpRdp* rdp, STREAM* s)
rdp->settings->ChannelDefArray[i].joined = TRUE;
break;
}
if (i + 1 < rdp->settings->ChannelCount)
{
if (!mcs_send_channel_join_request(rdp->mcs, rdp->settings->ChannelDefArray[i + 1].ChannelId))
@ -484,8 +487,10 @@ BOOL rdp_client_connect_mcs_channel_join_confirm(rdpRdp* rdp, STREAM* s)
{
if (!rdp_client_establish_keys(rdp))
return FALSE;
if (!rdp_send_client_info(rdp))
return FALSE;
rdp->state = CONNECTION_STATE_LICENSE;
}

View File

@ -193,9 +193,9 @@ static const char* const mcs_result_enumerated[] =
BOOL mcs_read_domain_mcspdu_header(STREAM* s, enum DomainMCSPDU* domainMCSPDU, UINT16* length)
{
UINT16 li;
BYTE choice;
enum DomainMCSPDU MCSPDU;
UINT16 li;
*length = tpkt_read_header(s);
@ -203,8 +203,10 @@ BOOL mcs_read_domain_mcspdu_header(STREAM* s, enum DomainMCSPDU* domainMCSPDU, U
return FALSE;
MCSPDU = *domainMCSPDU;
if(!per_read_choice(s, &choice))
if (!per_read_choice(s, &choice))
return FALSE;
*domainMCSPDU = (choice >> 2);
if (*domainMCSPDU != MCSPDU)
@ -736,19 +738,23 @@ BOOL mcs_send_channel_join_request(rdpMcs* mcs, UINT16 channel_id)
BOOL mcs_recv_channel_join_confirm(rdpMcs* mcs, STREAM* s, UINT16* channel_id)
{
BOOL status;
UINT16 length;
BYTE result;
UINT16 initiator;
UINT16 requested;
enum DomainMCSPDU MCSPDU;
status = TRUE;
MCSPDU = DomainMCSPDU_ChannelJoinConfirm;
return
mcs_read_domain_mcspdu_header(s, &MCSPDU, &length) &&
per_read_enumerated(s, &result, MCS_Result_enum_length) && /* result */
per_read_integer16(s, &initiator, MCS_BASE_CHANNEL_ID) && /* initiator (UserId) */
per_read_integer16(s, &requested, 0) && /* requested (ChannelId) */
per_read_integer16(s, channel_id, 0); /* channelId */
status &= mcs_read_domain_mcspdu_header(s, &MCSPDU, &length);
status &= per_read_enumerated(s, &result, MCS_Result_enum_length); /* result */
status &= per_read_integer16(s, &initiator, MCS_BASE_CHANNEL_ID); /* initiator (UserId) */
status &= per_read_integer16(s, &requested, 0); /* requested (ChannelId) */
status &= per_read_integer16(s, channel_id, 0); /* channelId */
return status;
}
/**

View File

@ -915,10 +915,7 @@ static int rdp_recv_callback(rdpTransport* transport, STREAM* s, void* extra)
case CONNECTION_STATE_CAPABILITY:
if (!rdp_client_connect_demand_active(rdp, s))
{
printf("rdp_client_connect_demand_active failed\n");
status = -1;
}
break;
case CONNECTION_STATE_FINALIZATION:

View File

@ -679,7 +679,8 @@ int transport_check_fds(rdpTransport** ptransport)
}
received = transport->ReceiveBuffer;
transport->ReceiveBuffer = transport_receive_pool_take(transport);
transport->ReceiveBuffer = ObjectPool_Take(transport->ReceivePool);
transport->ReceiveBuffer->p = transport->ReceiveBuffer->data;
stream_set_pos(received, length);
stream_seal(received);
@ -695,7 +696,7 @@ int transport_check_fds(rdpTransport** ptransport)
recv_status = transport->ReceiveCallback(transport, received, transport->ReceiveExtra);
transport_receive_pool_return(transport, received);
ObjectPool_Return(transport->ReceivePool, received);
if (recv_status < 0)
status = -1;
@ -735,27 +736,16 @@ BOOL transport_set_blocking_mode(rdpTransport* transport, BOOL blocking)
return status;
}
STREAM* transport_receive_pool_take(rdpTransport* transport)
STREAM* transport_receive_buffer_pool_new()
{
STREAM* pdu = NULL;
if (WaitForSingleObject(Queue_Event(transport->ReceivePool), 0) == WAIT_OBJECT_0)
pdu = Queue_Dequeue(transport->ReceivePool);
if (!pdu)
pdu = stream_new(BUFFER_SIZE);
pdu = stream_new(BUFFER_SIZE);
pdu->p = pdu->data;
return pdu;
}
int transport_receive_pool_return(rdpTransport* transport, STREAM* pdu)
{
Queue_Enqueue(transport->ReceivePool, pdu);
return 0;
}
rdpTransport* transport_new(rdpSettings* settings)
{
rdpTransport* transport;
@ -772,11 +762,12 @@ rdpTransport* transport_new(rdpSettings* settings)
/* a small 0.1ms delay when transport is blocking. */
transport->SleepInterval = 100;
transport->ReceivePool = Queue_New(TRUE, -1, -1);
//Queue_Object(transport->ReceivePool)->fnObjectFree = (OBJECT_FREE_FN) stream_free;
transport->ReceivePool = ObjectPool_New(TRUE);
ObjectPool_Object(transport->ReceivePool)->fnObjectFree = (OBJECT_FREE_FN) stream_free;
ObjectPool_Object(transport->ReceivePool)->fnObjectNew = (OBJECT_NEW_FN) transport_receive_buffer_pool_new;
/* receive buffer for non-blocking read. */
transport->ReceiveBuffer = transport_receive_pool_take(transport);
transport->ReceiveBuffer = ObjectPool_Take(transport->ReceivePool);
transport->ReceiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
/* buffers for blocking read/write */
@ -795,6 +786,11 @@ void transport_free(rdpTransport* transport)
{
if (transport != NULL)
{
if (transport->ReceiveBuffer)
ObjectPool_Return(transport->ReceivePool, transport->ReceiveBuffer);
ObjectPool_Free(transport->ReceivePool);
stream_free(transport->ReceiveStream);
stream_free(transport->SendStream);
CloseHandle(transport->ReceiveEvent);
@ -812,8 +808,6 @@ void transport_free(rdpTransport* transport)
tsg_free(transport->tsg);
Queue_Free(transport->ReceivePool);
free(transport);
}
}

View File

@ -66,9 +66,7 @@ struct rdp_transport
HANDLE ReceiveEvent;
BOOL blocking;
BOOL SplitInputOutput;
wQueue* ReceivePool;
wQueue* ReceiveQueue;
wObjectPool* ReceivePool;
};
STREAM* transport_recv_stream_init(rdpTransport* transport, int size);

View File

@ -29,13 +29,15 @@
#include <winpr/synch.h>
typedef void (*OBJECT_EQUALS_FN)(void* objA, void* objB);
typedef void* (*OBJECT_NEW_FN)();
typedef void (*OBJECT_FREE_FN)(void* obj);
typedef void (*OBJECT_EQUALS_FN)(void* objA, void* objB);
struct _wObject
{
OBJECT_EQUALS_FN fnObjectEquals;
OBJECT_NEW_FN fnObjectNew;
OBJECT_FREE_FN fnObjectFree;
OBJECT_EQUALS_FN fnObjectEquals;
};
typedef struct _wObject wObject;
@ -250,6 +252,28 @@ WINPR_API void BufferPool_Clear(wBufferPool* pool);
WINPR_API wBufferPool* BufferPool_New(BOOL synchronized, int fixedSize, DWORD alignment);
WINPR_API void BufferPool_Free(wBufferPool* pool);
/* ObjectPool */
struct _wObjectPool
{
int size;
int capacity;
void** array;
HANDLE mutex;
wObject object;
BOOL synchronized;
};
typedef struct _wObjectPool wObjectPool;
WINPR_API void* ObjectPool_Take(wObjectPool* pool);
WINPR_API void ObjectPool_Return(wObjectPool* pool, void* obj);
WINPR_API void ObjectPool_Clear(wObjectPool* pool);
#define ObjectPool_Object(_pool) (&_pool->object)
WINPR_API wObjectPool* ObjectPool_New(BOOL synchronized);
WINPR_API void ObjectPool_Free(wObjectPool* pool);
/* Message Queue */
struct _wMessage

View File

@ -28,6 +28,7 @@ set(${MODULE_PREFIX}_COLLECTIONS_SRCS
collections/KeyValuePair.c
collections/CountdownEvent.c
collections/BufferPool.c
collections/ObjectPool.c
collections/MessageQueue.c)
set(${MODULE_PREFIX}_SRCS

View File

@ -0,0 +1,145 @@
/**
* WinPR: Windows Portable Runtime
* Object Pool
*
* Copyright 2012 Marc-Andre Moreau <marcandre.moreau@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <winpr/crt.h>
#include <winpr/collections.h>
/**
* C Object Pool similar to C# BufferManager Class:
* http://msdn.microsoft.com/en-us/library/ms405814.aspx
*/
/**
* Methods
*/
/**
* Gets an object from the pool.
*/
void* ObjectPool_Take(wObjectPool* pool)
{
void* obj = NULL;
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
if (pool->size > 0)
obj = pool->array[--(pool->size)];
if (!obj)
{
if (pool->object.fnObjectNew)
obj = pool->object.fnObjectNew();
}
if (pool->synchronized)
ReleaseMutex(pool->mutex);
return obj;
}
/**
* Returns an object to the pool.
*/
void ObjectPool_Return(wObjectPool* pool, void* obj)
{
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
if ((pool->size + 1) >= pool->capacity)
{
pool->capacity *= 2;
pool->array = (void**) realloc(pool->array, sizeof(void*) * pool->capacity);
}
pool->array[(pool->size)++] = obj;
if (pool->synchronized)
ReleaseMutex(pool->mutex);
}
/**
* Releases the buffers currently cached in the pool.
*/
void ObjectPool_Clear(wObjectPool* pool)
{
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
while (pool->size > 0)
{
(pool->size)--;
if (pool->object.fnObjectFree)
pool->object.fnObjectFree(pool->array[pool->size]);
}
if (pool->synchronized)
ReleaseMutex(pool->mutex);
}
/**
* Construction, Destruction
*/
wObjectPool* ObjectPool_New(BOOL synchronized)
{
wObjectPool* pool = NULL;
pool = (wObjectPool*) malloc(sizeof(wObjectPool));
if (pool)
{
ZeroMemory(pool, sizeof(wObjectPool));
pool->synchronized = synchronized;
if (pool->synchronized)
pool->mutex = CreateMutex(NULL, FALSE, NULL);
pool->size = 0;
pool->capacity = 32;
pool->array = (void**) malloc(sizeof(void*) * pool->capacity);
}
return pool;
}
void ObjectPool_Free(wObjectPool* pool)
{
if (pool)
{
ObjectPool_Clear(pool);
if (pool->synchronized)
CloseHandle(pool->mutex);
free(pool->array);
free(pool);
}
}