diff --git a/libfreerdp/codec/CMakeLists.txt b/libfreerdp/codec/CMakeLists.txt index 917c43cc7..2102535fc 100644 --- a/libfreerdp/codec/CMakeLists.txt +++ b/libfreerdp/codec/CMakeLists.txt @@ -100,7 +100,7 @@ set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS MONOLITHIC ${MONOLITHIC_BUILD} MODULE winpr - MODULES winpr-crt winpr-utils) + MODULES winpr-crt winpr-pool winpr-utils) if(MONOLITHIC_BUILD) set(FREERDP_LIBS ${FREERDP_LIBS} ${${MODULE_PREFIX}_LIBS} PARENT_SCOPE) diff --git a/libfreerdp/codec/rfx.c b/libfreerdp/codec/rfx.c index 2448a787c..33eb9a43d 100644 --- a/libfreerdp/codec/rfx.c +++ b/libfreerdp/codec/rfx.c @@ -150,6 +150,15 @@ RFX_CONTEXT* rfx_context_new(void) context->priv->TilePool = Queue_New(TRUE, -1, -1); context->priv->TileQueue = Queue_New(TRUE, -1, -1); + context->priv->parallel = FALSE; + + if (context->priv->parallel) + { + context->priv->ThreadPool = CreateThreadpool(NULL); + InitializeThreadpoolEnvironment(&context->priv->ThreadPoolEnv); + SetThreadpoolCallbackPool(&context->priv->ThreadPoolEnv, context->priv->ThreadPool); + } + /* initialize the default pixel format */ rfx_context_set_pixel_format(context, RDP_PIXEL_FORMAT_B8G8R8A8); @@ -189,6 +198,12 @@ void rfx_context_free(RFX_CONTEXT* context) rfx_profiler_print(context); rfx_profiler_free(context); + if (context->priv->parallel) + { + CloseThreadpool(context->priv->ThreadPool); + DestroyThreadpoolEnvironment(&context->priv->ThreadPoolEnv); + } + free(context->priv); free(context); } @@ -441,16 +456,32 @@ static void rfx_process_message_tile(RFX_CONTEXT* context, RFX_TILE* tile, STREA tile->data, 64 * sizeof(UINT32)); } +struct _RFX_TILE_WORK_PARAM +{ + STREAM s; + RFX_TILE* tile; + RFX_CONTEXT* context; +}; +typedef struct _RFX_TILE_WORK_PARAM RFX_TILE_WORK_PARAM; + +void CALLBACK rfx_process_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work) +{ + RFX_TILE_WORK_PARAM* param = (RFX_TILE_WORK_PARAM*) context; + rfx_process_message_tile(param->context, param->tile, &(param->s)); +} + static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* message, STREAM* s) { int i; + int pos; + BYTE quant; + UINT32* quants; UINT16 subtype; UINT32 blockLen; UINT32 blockType; UINT32 tilesDataSize; - UINT32* quants; - BYTE quant; - int pos; + PTP_WORK* work_objects = NULL; + RFX_TILE_WORK_PARAM* params = NULL; stream_read_UINT16(s, subtype); /* subtype (2 bytes) must be set to CBT_TILESET (0xCAC2) */ @@ -519,6 +550,12 @@ static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa message->tiles = (RFX_TILE**) malloc(sizeof(RFX_TILE*) * message->num_tiles); ZeroMemory(message->tiles, sizeof(RFX_TILE*) * message->num_tiles); + if (context->priv->parallel) + { + work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * message->num_tiles); + params = (RFX_TILE_WORK_PARAM*) malloc(sizeof(RFX_TILE_WORK_PARAM) * message->num_tiles); + } + /* tiles */ for (i = 0; i < message->num_tiles; i++) { @@ -535,10 +572,34 @@ static void rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa } message->tiles[i] = rfx_tile_pool_take(context); - rfx_process_message_tile(context, message->tiles[i], s); + + if (context->priv->parallel) + { + params[i].context = context; + params[i].tile = message->tiles[i]; + CopyMemory(&(params[i].s), s, sizeof(STREAM)); + + work_objects[i] = CreateThreadpoolWork((PTP_WORK_CALLBACK) rfx_process_message_tile_work_callback, + (void*) ¶ms[i], &context->priv->ThreadPoolEnv); + + SubmitThreadpoolWork(work_objects[i]); + } + else + { + rfx_process_message_tile(context, message->tiles[i], s); + } stream_set_pos(s, pos); } + + if (context->priv->parallel) + { + for (i = 0; i < message->num_tiles; i++) + WaitForThreadpoolWorkCallbacks(work_objects[i], FALSE); + + free(work_objects); + free(params); + } } RFX_MESSAGE* rfx_process_message(RFX_CONTEXT* context, BYTE* data, UINT32 length) diff --git a/libfreerdp/codec/rfx_decode.c b/libfreerdp/codec/rfx_decode.c index 91e0e949b..7eaba5fcf 100644 --- a/libfreerdp/codec/rfx_decode.c +++ b/libfreerdp/codec/rfx_decode.c @@ -121,13 +121,13 @@ static void rfx_decode_component(RFX_CONTEXT* context, const UINT32* quantizatio /* stride is bytes between rows in the output buffer. */ void rfx_decode_rgb(RFX_CONTEXT* context, STREAM* data_in, - int y_size, const UINT32 * y_quants, - int cb_size, const UINT32 * cb_quants, - int cr_size, const UINT32 * cr_quants, BYTE* rgb_buffer, int stride) + int y_size, const UINT32* y_quants, + int cb_size, const UINT32* cb_quants, + int cr_size, const UINT32* cr_quants, BYTE* rgb_buffer, int stride) { + INT16* pSrcDst[3]; static const prim_size_t roi_64x64 = { 64, 64 }; const primitives_t *prims = primitives_get(); - INT16 *pSrcDst[3]; PROFILER_ENTER(context->priv->prof_rfx_decode_rgb); @@ -141,8 +141,8 @@ void rfx_decode_rgb(RFX_CONTEXT* context, STREAM* data_in, pSrcDst[0] = context->priv->y_r_buffer; pSrcDst[1] = context->priv->cb_g_buffer; pSrcDst[2] = context->priv->cr_b_buffer; - prims->yCbCrToRGB_16s16s_P3P3((const INT16 **) pSrcDst, 64*sizeof(INT16), - pSrcDst, 64*sizeof(INT16), &roi_64x64); + prims->yCbCrToRGB_16s16s_P3P3((const INT16**) pSrcDst, 64 * sizeof(INT16), + pSrcDst, 64 * sizeof(INT16), &roi_64x64); PROFILER_ENTER(context->priv->prof_rfx_decode_format_rgb); rfx_decode_format_rgb(context->priv->y_r_buffer, context->priv->cb_g_buffer, context->priv->cr_b_buffer, diff --git a/libfreerdp/codec/rfx_types.h b/libfreerdp/codec/rfx_types.h index 30b29802d..987e669b2 100644 --- a/libfreerdp/codec/rfx_types.h +++ b/libfreerdp/codec/rfx_types.h @@ -25,6 +25,7 @@ #endif #include +#include #include #include @@ -43,6 +44,10 @@ struct _RFX_CONTEXT_PRIV wQueue* TilePool; wQueue* TileQueue; + BOOL parallel; + PTP_POOL ThreadPool; + TP_CALLBACK_ENVIRON ThreadPoolEnv; + INT16 y_r_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */ INT16 cb_g_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */ INT16 cr_b_mem[4096 + 8]; /* 4096 = 64x64 (+ 8x2 = 16 for mem align) */ diff --git a/winpr/libwinpr/pool/test/CMakeLists.txt b/winpr/libwinpr/pool/test/CMakeLists.txt index 9b81c6dd7..eb03c0b6f 100644 --- a/winpr/libwinpr/pool/test/CMakeLists.txt +++ b/winpr/libwinpr/pool/test/CMakeLists.txt @@ -20,7 +20,7 @@ add_executable(${MODULE_NAME} ${${MODULE_PREFIX}_SRCS}) set_complex_link_libraries(VARIABLE ${MODULE_PREFIX}_LIBS MONOLITHIC ${MONOLITHIC_BUILD} MODULE winpr - MODULES winpr-pool) + MODULES winpr-pool winpr-interlocked) target_link_libraries(${MODULE_NAME} ${${MODULE_PREFIX}_LIBS}) diff --git a/winpr/libwinpr/pool/test/TestPoolWork.c b/winpr/libwinpr/pool/test/TestPoolWork.c index 72bc228ff..5c191f44d 100644 --- a/winpr/libwinpr/pool/test/TestPoolWork.c +++ b/winpr/libwinpr/pool/test/TestPoolWork.c @@ -66,6 +66,7 @@ int TestPoolWork(int argc, char* argv[]) InitializeThreadpoolEnvironment(&environment); SetThreadpoolCallbackPool(&environment, pool); +#if 0 cleanupGroup = CreateThreadpoolCleanupGroup(); if (!cleanupGroup) @@ -75,6 +76,7 @@ int TestPoolWork(int argc, char* argv[]) } SetThreadpoolCallbackCleanupGroup(&environment, cleanupGroup, NULL); +#endif work = CreateThreadpoolWork((PTP_WORK_CALLBACK) test_WorkCallback, "world", &environment); @@ -89,9 +91,9 @@ int TestPoolWork(int argc, char* argv[]) WaitForThreadpoolWorkCallbacks(work, FALSE); - CloseThreadpoolCleanupGroupMembers(cleanupGroup, TRUE, NULL); + //CloseThreadpoolCleanupGroupMembers(cleanupGroup, TRUE, NULL); - CloseThreadpoolCleanupGroup(cleanupGroup); + //CloseThreadpoolCleanupGroup(cleanupGroup); DestroyThreadpoolEnvironment(&environment);