codec/rfx: added multithreaded encoder

(cherry picked from commit 0d916527bc)
This commit is contained in:
Norbert Federa 2013-08-04 12:07:53 +02:00 committed by Bernhard Miklautz
parent 41ce9a0969
commit 75f23925cd
2 changed files with 93 additions and 10 deletions

View File

@ -227,6 +227,8 @@ RFX_CONTEXT* rfx_context_new(void)
if (context->priv->MaxThreadCount)
SetThreadpoolThreadMaximum(context->priv->ThreadPool, context->priv->MaxThreadCount);
context->priv->EncoderStreamPool = StreamPool_New(TRUE, 64*64*3+19);
}
/* initialize the default pixel format */
@ -261,6 +263,7 @@ void rfx_context_free(RFX_CONTEXT* context)
{
CloseThreadpool(context->priv->ThreadPool);
DestroyThreadpoolEnvironment(&context->priv->ThreadPoolEnv);
StreamPool_Free(context->priv->EncoderStreamPool);
#ifdef WITH_PROFILER
fprintf(stderr, "\nWARNING: Profiling results probably unusable with multithreaded RemoteFX codec!\n");
#endif
@ -582,17 +585,17 @@ static BOOL rfx_process_message_tile(RFX_CONTEXT* context, RFX_TILE* tile, wStre
tile->data, 64 * 4);
}
struct _RFX_TILE_WORK_PARAM
struct _RFX_TILE_PROCESS_WORK_PARAM
{
wStream s;
RFX_TILE* tile;
RFX_CONTEXT* context;
};
typedef struct _RFX_TILE_WORK_PARAM RFX_TILE_WORK_PARAM;
typedef struct _RFX_TILE_PROCESS_WORK_PARAM RFX_TILE_PROCESS_WORK_PARAM;
void CALLBACK rfx_process_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
{
RFX_TILE_WORK_PARAM* param = (RFX_TILE_WORK_PARAM*) context;
RFX_TILE_PROCESS_WORK_PARAM* param = (RFX_TILE_PROCESS_WORK_PARAM*) context;
rfx_process_message_tile(param->context, param->tile, &(param->s));
}
@ -607,7 +610,7 @@ static BOOL rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa
UINT32 blockType;
UINT32 tilesDataSize;
PTP_WORK* work_objects = NULL;
RFX_TILE_WORK_PARAM* params = NULL;
RFX_TILE_PROCESS_WORK_PARAM* params = NULL;
if (Stream_GetRemainingLength(s) < 14)
{
@ -691,7 +694,7 @@ static BOOL rfx_process_message_tileset(RFX_CONTEXT* context, RFX_MESSAGE* messa
if (context->priv->UseThreads)
{
work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * message->num_tiles);
params = (RFX_TILE_WORK_PARAM*) malloc(sizeof(RFX_TILE_WORK_PARAM) * message->num_tiles);
params = (RFX_TILE_PROCESS_WORK_PARAM*) malloc(sizeof(RFX_TILE_PROCESS_WORK_PARAM) * message->num_tiles);
}
/* tiles */
@ -1043,6 +1046,33 @@ static void rfx_compose_message_tile(RFX_CONTEXT* context, wStream* s,
Stream_SetPosition(s, end_pos);
}
struct _RFX_TILE_COMPOSE_WORK_PARAM
{
RFX_CONTEXT* context;
wStream *s;
BYTE* tile_data;
int tile_width;
int tile_height;
int rowstride;
UINT32* quantVals;
int quantIdxY;
int quantIdxCb;
int quantIdxCr;
int xIdx;
int yIdx;
};
typedef struct _RFX_TILE_COMPOSE_WORK_PARAM RFX_TILE_COMPOSE_WORK_PARAM;
void CALLBACK rfx_compose_message_tile_work_callback(PTP_CALLBACK_INSTANCE instance, void* context, PTP_WORK work)
{
RFX_TILE_COMPOSE_WORK_PARAM* param = (RFX_TILE_COMPOSE_WORK_PARAM*) context;
rfx_compose_message_tile(param->context, param->s,
param->tile_data, param->tile_width, param->tile_height, param->rowstride,
param->quantVals, param->quantIdxY, param->quantIdxCb, param->quantIdxCr, param->xIdx, param->yIdx);
}
static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s,
BYTE* image_data, int width, int height, int rowstride)
{
@ -1061,6 +1091,11 @@ static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s,
int xIdx;
int yIdx;
int tilesDataSize;
BYTE* tileData;
int tileWidth;
int tileHeight;
PTP_WORK* work_objects = NULL;
RFX_TILE_COMPOSE_WORK_PARAM* params = NULL;
if (context->num_quants == 0)
{
@ -1109,17 +1144,64 @@ static void rfx_compose_message_tileset(RFX_CONTEXT* context, wStream* s,
DEBUG_RFX("width:%d height:%d rowstride:%d", width, height, rowstride);
end_pos = Stream_GetPosition(s);
if (context->priv->UseThreads)
{
work_objects = (PTP_WORK*) malloc(sizeof(PTP_WORK) * numTiles);
params = (RFX_TILE_COMPOSE_WORK_PARAM*) malloc(sizeof(RFX_TILE_COMPOSE_WORK_PARAM) * numTiles);
}
for (yIdx = 0; yIdx < numTilesY; yIdx++)
{
for (xIdx = 0; xIdx < numTilesX; xIdx++)
{
rfx_compose_message_tile(context, s,
image_data + yIdx * 64 * rowstride + xIdx * 8 * context->bits_per_pixel,
(xIdx < numTilesX - 1) ? 64 : width - xIdx * 64,
(yIdx < numTilesY - 1) ? 64 : height - yIdx * 64,
rowstride, quantVals, quantIdxY, quantIdxCb, quantIdxCr, xIdx, yIdx);
tileData = image_data + yIdx * 64 * rowstride + xIdx * 8 * context->bits_per_pixel;
tileWidth = (xIdx < numTilesX - 1) ? 64 : width - xIdx * 64;
tileHeight = (yIdx < numTilesY - 1) ? 64 : height - yIdx * 64;
if (context->priv->UseThreads)
{
i = yIdx * numTilesX + xIdx;
params[i].context = context;
params[i].s = StreamPool_Take(context->priv->EncoderStreamPool, 0);
params[i].tile_data = tileData;
params[i].tile_width = tileWidth;
params[i].tile_height = tileHeight;
params[i].rowstride = rowstride;
params[i].quantVals = (UINT32*)quantVals;
params[i].quantIdxY = quantIdxY;
params[i].quantIdxCb = quantIdxCb;
params[i].quantIdxCr = quantIdxCr;
params[i].xIdx = xIdx;
params[i].yIdx = yIdx;
work_objects[i] = CreateThreadpoolWork((PTP_WORK_CALLBACK) rfx_compose_message_tile_work_callback,
(void*) &params[i], &context->priv->ThreadPoolEnv);
SubmitThreadpoolWork(work_objects[i]);
}
else
{
rfx_compose_message_tile(context, s, tileData, tileWidth, tileHeight,
rowstride, quantVals, quantIdxY, quantIdxCb, quantIdxCr, xIdx, yIdx);
}
}
}
if (context->priv->UseThreads)
{
for (i = 0; i < numTiles; i++)
{
WaitForThreadpoolWorkCallbacks(work_objects[i], FALSE);
CloseThreadpoolWork(work_objects[i]);
Stream_Write(s, Stream_Buffer(params[i].s), Stream_GetPosition(params[i].s));
StreamPool_Return(context->priv->EncoderStreamPool, params[i].s);
}
free(work_objects);
free(params);
}
tilesDataSize = Stream_GetPosition(s) - end_pos;
size += tilesDataSize;
end_pos = Stream_GetPosition(s);

View File

@ -49,6 +49,7 @@ struct _RFX_CONTEXT_PRIV
TP_CALLBACK_ENVIRON ThreadPoolEnv;
wBufferPool* BufferPool;
wStreamPool* EncoderStreamPool;
/* profilers */
PROFILER_DEFINE(prof_rfx_decode_rgb);