winpr-utils: fix StreamPool

This commit is contained in:
Marc-André Moreau 2013-04-12 12:20:20 -04:00
parent f94f90c08b
commit 0fded8c2d0
3 changed files with 204 additions and 37 deletions

View File

@ -346,11 +346,17 @@ static INLINE BOOL stream_skip(wStream* s, int sz) {
struct _wStreamPool
{
int size;
int capacity;
wStream** array;
int aSize;
int aCapacity;
wStream** aArray;
int uSize;
int uCapacity;
wStream** uArray;
HANDLE mutex;
BOOL synchronized;
size_t defaultSize;
};
WINPR_API wStream* StreamPool_Take(wStreamPool* pool, size_t size);
@ -359,9 +365,13 @@ WINPR_API void StreamPool_Return(wStreamPool* pool, wStream* s);
WINPR_API void Stream_AddRef(wStream* s);
WINPR_API void Stream_Release(wStream* s);
WINPR_API wStream* StreamPool_Find(wStreamPool* pool, BYTE* ptr);
WINPR_API void StreamPool_AddRef(wStreamPool* pool, BYTE* ptr);
WINPR_API void StreamPool_Release(wStreamPool* pool, BYTE* ptr);
WINPR_API void StreamPool_Clear(wStreamPool* pool);
WINPR_API wStreamPool* StreamPool_New(BOOL synchronized);
WINPR_API wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize);
WINPR_API void StreamPool_Free(wStreamPool* pool);
#ifdef __cplusplus

View File

@ -29,6 +29,65 @@
* Methods
*/
void StreamPool_ShiftUsed(wStreamPool* pool, int index, int count)
{
if (count > 0)
{
if (pool->uSize + count > pool->uCapacity)
{
pool->uCapacity *= 2;
pool->uArray = (wStream**) realloc(pool->uArray, sizeof(wStream*) * pool->uCapacity);
}
MoveMemory(&pool->uArray[index + count], &pool->uArray[index], (pool->uSize - index) * sizeof(wStream*));
pool->uSize += count;
}
else if (count < 0)
{
MoveMemory(&pool->uArray[index], &pool->uArray[index - count], (pool->uSize + count) * sizeof(wStream*));
pool->uSize += count;
}
}
/**
* Adds a used stream to the pool.
*/
void StreamPool_AddUsed(wStreamPool* pool, wStream* s)
{
int index;
if ((pool->uSize + 1) >= pool->uCapacity)
{
pool->uCapacity *= 2;
pool->uArray = (wStream**) realloc(pool->uArray, sizeof(wStream*) * pool->uCapacity);
}
pool->uArray[(pool->uSize)++] = s;
}
/**
* Removes a used stream from the pool.
*/
void StreamPool_RemoveUsed(wStreamPool* pool, wStream* s)
{
int index;
BOOL found = FALSE;
for (index = 0; index < pool->uSize; index++)
{
if (pool->uArray[index] == s)
{
found = TRUE;
break;
}
}
if (found)
StreamPool_ShiftUsed(pool, index, -1);
}
/**
* Gets a stream from the pool.
*/
@ -40,23 +99,27 @@ wStream* StreamPool_Take(wStreamPool* pool, size_t size)
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
if (pool->size > 0)
s = pool->array[--(pool->size)];
if (pool->aSize > 0)
s = pool->aArray[--(pool->aSize)];
if (size < 0)
size = pool->defaultSize;
if (!s)
{
s = Stream_New(NULL, size);
s->pool = (void*) pool;
s->count = 1;
}
else
{
Stream_EnsureCapacity(s, size);
Stream_Pointer(s) = Stream_Buffer(s);
s->pool = (void*) pool;
s->count = 1;
}
s->pool = pool;
s->count = 1;
StreamPool_AddUsed(pool, s);
if (pool->synchronized)
ReleaseMutex(pool->mutex);
@ -72,13 +135,14 @@ void StreamPool_Return(wStreamPool* pool, wStream* s)
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
if ((pool->size + 1) >= pool->capacity)
if ((pool->aSize + 1) >= pool->aCapacity)
{
pool->capacity *= 2;
pool->array = (wStream**) realloc(pool->array, sizeof(wStream*) * pool->capacity);
pool->aCapacity *= 2;
pool->aArray = (wStream**) realloc(pool->aArray, sizeof(wStream*) * pool->aCapacity);
}
pool->array[(pool->size)++] = s;
pool->aArray[(pool->aSize)++] = s;
StreamPool_RemoveUsed(pool, s);
if (pool->synchronized)
ReleaseMutex(pool->mutex);
@ -135,6 +199,62 @@ void Stream_Release(wStream* s)
}
}
/**
* Find stream in pool using pointer inside buffer
*/
wStream* StreamPool_Find(wStreamPool* pool, BYTE* ptr)
{
int index;
wStream* s = NULL;
BOOL found = FALSE;
WaitForSingleObject(pool->mutex, INFINITE);
for (index = 0; index < pool->uSize; index++)
{
s = pool->uArray[index];
if ((ptr >= s->buffer) && (ptr < (s->buffer + s->capacity)))
{
found = TRUE;
break;
}
}
ReleaseMutex(pool->mutex);
return (found) ? s : NULL;
}
/**
* Find stream in pool and increment reference count
*/
void StreamPool_AddRef(wStreamPool* pool, BYTE* ptr)
{
wStream* s;
s = StreamPool_Find(pool, ptr);
if (s)
Stream_AddRef(s);
}
/**
* Find stream in pool and decrement reference count
*/
void StreamPool_Release(wStreamPool* pool, BYTE* ptr)
{
wStream* s;
s = StreamPool_Find(pool, ptr);
if (s)
Stream_Release(s);
}
/**
* Releases the streams currently cached in the pool.
*/
@ -144,10 +264,10 @@ void StreamPool_Clear(wStreamPool* pool)
if (pool->synchronized)
WaitForSingleObject(pool->mutex, INFINITE);
while (pool->size > 0)
while (pool->aSize > 0)
{
(pool->size)--;
Stream_Free(pool->array[pool->size], TRUE);
(pool->aSize)--;
Stream_Free(pool->aArray[pool->aSize], TRUE);
}
if (pool->synchronized)
@ -158,7 +278,7 @@ void StreamPool_Clear(wStreamPool* pool)
* Construction, Destruction
*/
wStreamPool* StreamPool_New(BOOL synchronized)
wStreamPool* StreamPool_New(BOOL synchronized, size_t defaultSize)
{
wStreamPool* pool = NULL;
@ -169,13 +289,18 @@ wStreamPool* StreamPool_New(BOOL synchronized)
ZeroMemory(pool, sizeof(wStreamPool));
pool->synchronized = synchronized;
pool->defaultSize = defaultSize;
if (pool->synchronized)
pool->mutex = CreateMutex(NULL, FALSE, NULL);
pool->size = 0;
pool->capacity = 32;
pool->array = (wStream**) malloc(sizeof(wStream*) * pool->capacity);
pool->aSize = 0;
pool->aCapacity = 32;
pool->aArray = (wStream**) malloc(sizeof(wStream*) * pool->aCapacity);
pool->uSize = 0;
pool->uCapacity = 32;
pool->uArray = (wStream**) malloc(sizeof(wStream*) * pool->uCapacity);
}
return pool;
@ -190,7 +315,8 @@ void StreamPool_Free(wStreamPool* pool)
if (pool->synchronized)
CloseHandle(pool->mutex);
free(pool->array);
free(pool->aArray);
free(pool->uArray);
free(pool);
}

View File

@ -10,35 +10,35 @@ int TestStreamPool(int argc, char* argv[])
wStream* s[5];
wStreamPool* pool;
pool = StreamPool_New(TRUE);
pool = StreamPool_New(TRUE, BUFFER_SIZE);
s[0] = StreamPool_Take(pool, BUFFER_SIZE);
s[1] = StreamPool_Take(pool, BUFFER_SIZE);
s[2] = StreamPool_Take(pool, BUFFER_SIZE);
s[0] = StreamPool_Take(pool, -1);
s[1] = StreamPool_Take(pool, -1);
s[2] = StreamPool_Take(pool, -1);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
Stream_Release(s[0]);
Stream_Release(s[1]);
Stream_Release(s[2]);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
s[3] = StreamPool_Take(pool, BUFFER_SIZE);
s[4] = StreamPool_Take(pool, BUFFER_SIZE);
s[3] = StreamPool_Take(pool, -1);
s[4] = StreamPool_Take(pool, -1);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
Stream_Release(s[3]);
Stream_Release(s[4]);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
s[2] = StreamPool_Take(pool, BUFFER_SIZE);
s[3] = StreamPool_Take(pool, BUFFER_SIZE);
s[4] = StreamPool_Take(pool, BUFFER_SIZE);
s[2] = StreamPool_Take(pool, -1);
s[3] = StreamPool_Take(pool, -1);
s[4] = StreamPool_Take(pool, -1);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
Stream_AddRef(s[2]);
@ -61,7 +61,38 @@ int TestStreamPool(int argc, char* argv[])
Stream_Release(s[4]);
Stream_Release(s[4]);
printf("StreamPool: size: %d\n", pool->size);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
s[2] = StreamPool_Take(pool, -1);
s[3] = StreamPool_Take(pool, -1);
s[4] = StreamPool_Take(pool, -1);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
StreamPool_AddRef(pool, s[2]->buffer + 1024);
StreamPool_AddRef(pool, s[3]->buffer + 1024);
StreamPool_AddRef(pool, s[3]->buffer + 1024 * 2);
StreamPool_AddRef(pool, s[4]->buffer + 1024);
StreamPool_AddRef(pool, s[4]->buffer + 1024 * 2);
StreamPool_AddRef(pool, s[4]->buffer + 1024 * 3);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
StreamPool_Release(pool, s[2]->buffer + 2048);
StreamPool_Release(pool, s[2]->buffer + 2048 * 2);
StreamPool_Release(pool, s[3]->buffer + 2048);
StreamPool_Release(pool, s[3]->buffer + 2048 * 2);
StreamPool_Release(pool, s[3]->buffer + 2048 * 3);
StreamPool_Release(pool, s[4]->buffer + 2048);
StreamPool_Release(pool, s[4]->buffer + 2048 * 2);
StreamPool_Release(pool, s[4]->buffer + 2048 * 3);
StreamPool_Release(pool, s[4]->buffer + 2048 * 4);
printf("StreamPool: aSize: %d uSize: %d\n", pool->aSize, pool->uSize);
StreamPool_Free(pool);