libfreerdp-codec: start parallel decoding of RemoteFX tiles

This commit is contained in:
Marc-André Moreau 2013-01-22 18:14:50 -05:00
parent fb189989af
commit 438a727c6b
6 changed files with 82 additions and 14 deletions

View File

@ -100,7 +100,7 @@ set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
MONOLITHIC ${MONOLITHIC_BUILD}
MODULE winpr
MODULES winpr-crt winpr-utils)
MODULES winpr-crt winpr-pool winpr-utils)
if(MONOLITHIC_BUILD)
set(FREERDP_LIBS ${FREERDP_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE)

View File

@ -150,6 +150,15 @@ RFX_CONTEXT* rfx_context_new(void)
context->priv->TilePool = Queue_New(TRUE, -1, -1);
context->priv->TileQueue = Queue_New(TRUE, -1, -1);
context->priv->parallel = FALSE;
if (context->priv->parallel)
{
context->priv->ThreadPool = CreateThreadpool(NULL);
InitializeThreadpoolEnvironment(&context->priv->ThreadPoolEnv);
SetThreadpoolCallbackPool(&context->priv->ThreadPoolEnv, context->priv->ThreadPool);
}
/* initialize the default pixel format */
rfx_context_set_pixel_format(context, RDP_PIXEL_FORMAT_B8G8R8A8);
@ -189,6 +198,12 @@ void rfx_context_free(RFX_CONTEXT* context)
rfx_profiler_print(context);
rfx_profiler_free(context);
if (context->priv->parallel)
{
CloseThreadpool(context->priv->ThreadPool);
DestroyThreadpoolEnvironment(&context->priv->ThreadPoolEnv);
}
free(context->priv);
free(context);
}
@ -441,16 +456,32 @@ static void rfx_process_message_tile(RFX_CONTEXT* context, RFX_TILE* tile, STREA
tile->data, 64 * sizeof(UINT32));
}
struct _RFX_TILE_WORK_PARAM
{
STREAM s;
RFX_TILE* tile;
RFX_CONTEXT* context;
};
typedef struct _RFX_TILE_WORK_PARAM RFX_TILE_WORK_PARAM;
void CALLBACK rfx_process_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
{
RFX_TILE_WORK_PARAM* param = (RFX_TILE_WORK_PARAM*) context;
rfx_process_message_tile(param->context, param->tile, &(param->s));
}
static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* message, STREAM* s)
{
int i;
int pos;
BYTE quant;
UINT32* quants;
UINT16 subtype;
UINT32 blockLen;
UINT32 blockType;
UINT32 tilesDataSize;
UINT32* quants;
BYTE quant;
int pos;
PTP_WORK* work_objects = NULL;
RFX_TILE_WORK_PARAM* params = NULL;
stream_read_UINT16(s, subtype); /* subtype (2 bytes) must be set to CBT_TILESET (0xCAC2) */
@ -519,6 +550,12 @@ static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa
message->tiles = (RFX_TILE**) malloc(sizeof(RFX_TILE*) * message->num_tiles);
ZeroMemory(message->tiles, sizeof(RFX_TILE*) * message->num_tiles);
if (context->priv->parallel)
{
work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * message->num_tiles);
params = (RFX_TILE_WORK_PARAM*) malloc(sizeof(RFX_TILE_WORK_PARAM) * message->num_tiles);
}
/* tiles */
for (i = 0; i < message->num_tiles; i++)
{
@ -535,10 +572,34 @@ static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa
}
message->tiles[i] = rfx_tile_pool_take(context);
rfx_process_message_tile(context, message->tiles[i], s);
if (context->priv->parallel)
{
params[i].context = context;
params[i].tile = message->tiles[i];
CopyMemory(&(params[i].s), s, sizeof(STREAM));
work_objects[i] = CreateThreadpoolWork((PTP_WORK_CALLBACK) rfx_process_message_tile_work_callback,
(void*) &params[i], &context->priv->ThreadPoolEnv);
SubmitThreadpoolWork(work_objects[i]);
}
else
{
rfx_process_message_tile(context, message->tiles[i], s);
}
stream_set_pos(s, pos);
}
if (context->priv->parallel)
{
for (i = 0; i < message->num_tiles; i++)
WaitForThreadpoolWorkCallbacks(work_objects[i], FALSE);
free(work_objects);
free(params);
}
}
RFX_MESSAGE* rfx_process_message(RFX_CONTEXT* context, BYTE* data, UINT32 length)

View File

@ -121,13 +121,13 @@ static void rfx_decode_component(RFX_CONTEXT* context, const UINT32* quantizatio
/* stride is bytes between rows in the output buffer. */
void rfx_decode_rgb(RFX_CONTEXT* context, STREAM* data_in,
int y_size, const UINT32 * y_quants,
int cb_size, const UINT32 * cb_quants,
int cr_size, const UINT32 * cr_quants, BYTE* rgb_buffer, int stride)
int y_size, const UINT32* y_quants,
int cb_size, const UINT32* cb_quants,
int cr_size, const UINT32* cr_quants, BYTE* rgb_buffer, int stride)
{
INT16* pSrcDst[3];
static const prim_size_t roi_64x64 = { 64, 64 };
const primitives_t *prims = primitives_get();
INT16 *pSrcDst[3];
PROFILER_ENTER(context->priv->prof_rfx_decode_rgb);
@ -141,8 +141,8 @@ void rfx_decode_rgb(RFX_CONTEXT* context, STREAM* data_in,
pSrcDst[0] = context->priv->y_r_buffer;
pSrcDst[1] = context->priv->cb_g_buffer;
pSrcDst[2] = context->priv->cr_b_buffer;
prims->yCbCrToRGB_16s16s_P3P3((const INT16 **) pSrcDst, 64*sizeof(INT16),
pSrcDst, 64*sizeof(INT16), &roi_64x64);
prims->yCbCrToRGB_16s16s_P3P3((const INT16**) pSrcDst, 64 * sizeof(INT16),
pSrcDst, 64 * sizeof(INT16), &roi_64x64);
PROFILER_ENTER(context->priv->prof_rfx_decode_format_rgb);
rfx_decode_format_rgb(context->priv->y_r_buffer, context->priv->cb_g_buffer, context->priv->cr_b_buffer,

View File

@ -25,6 +25,7 @@
#endif
#include <winpr/crt.h>
#include <winpr/pool.h>
#include <winpr/collections.h>
#include <freerdp/utils/debug.h>
@ -43,6 +44,10 @@ struct _RFX_CONTEXT_PRIV
wQueue* TilePool;
wQueue* TileQueue;
BOOL parallel;
PTP_POOL ThreadPool;
TP_CALLBACK_ENVIRON ThreadPoolEnv;
INT16 y_r_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */
INT16 cb_g_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */
INT16 cr_b_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */

View File

@ -20,7 +20,7 @@ add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS})
set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS
MONOLITHIC ${MONOLITHIC_BUILD}
MODULE winpr
MODULES winpr-pool)
MODULES winpr-pool winpr-interlocked)
target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS})

View File

@ -66,6 +66,7 @@ int TestPoolWork(int argc, char* argv[])
InitializeThreadpoolEnvironment(&environment);
SetThreadpoolCallbackPool(&environment, pool);
#if 0
cleanupGroup = CreateThreadpoolCleanupGroup();
if (!cleanupGroup)
@ -75,6 +76,7 @@ int TestPoolWork(int argc, char* argv[])
}
SetThreadpoolCallbackCleanupGroup(&environment, cleanupGroup, NULL);
#endif
work = CreateThreadpoolWork((PTP_WORK_CALLBACK) test_WorkCallback, "world", &environment);
@ -89,9 +91,9 @@ int TestPoolWork(int argc, char* argv[])
WaitForThreadpoolWorkCallbacks(work, FALSE);
CloseThreadpoolCleanupGroupMembers(cleanupGroup, TRUE, NULL);
//CloseThreadpoolCleanupGroupMembers(cleanupGroup, TRUE, NULL);
CloseThreadpoolCleanupGroup(cleanupGroup);
//CloseThreadpoolCleanupGroup(cleanupGroup);
DestroyThreadpoolEnvironment(&environment);