Revert "Revert "allow to use in single threaded mode" (#6864)"

This reverts commit f7465af44f.
This commit is contained in:
sss 2021-03-12 18:54:45 +03:00 committed by akallabeth
parent fab649a1b9
commit 6b76ac9545
27 changed files with 337 additions and 353 deletions

View File

@ -411,3 +411,223 @@ PVIRTUALCHANNELENTRY freerdp_channels_load_static_addin_entry(LPCSTR pszName, LP
return NULL;
}
typedef struct
{
wMessageQueue* queue;
wStream* data_in;
HANDLE thread;
char* channel_name;
rdpContext* ctx;
LPVOID userdata;
MsgHandler msg_handler;
} msg_proc_internals;
static DWORD WINAPI channel_client_thread_proc(LPVOID userdata)
{
UINT error = CHANNEL_RC_OK;
wStream* data;
wMessage message;
msg_proc_internals* internals = userdata;
if (!internals)
{
/* TODO: return some error */
}
while (1)
{
if (!MessageQueue_Wait(internals->queue))
{
WLog_ERR(TAG, "MessageQueue_Wait failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (!MessageQueue_Peek(internals->queue, &message, TRUE))
{
WLog_ERR(TAG, "MessageQueue_Peek failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (message.id == WMQ_QUIT)
break;
if (message.id == 0)
{
data = (wStream*)message.wParam;
if ((error = internals->msg_handler(internals->userdata, data)))
{
WLog_ERR(TAG, "msg_handler failed with error %" PRIu32 "!", error);
break;
}
}
}
if (error && internals->ctx)
{
char msg[128];
_snprintf(msg, 127,
"%s_virtual_channel_client_thread reported an"
" error",
internals->channel_name);
setChannelError(internals->ctx, error, msg);
}
ExitThread(error);
return error;
}
static void free_msg(void* obj)
{
wMessage* msg = (wMessage*)obj;
if (msg)
{
wStream* s = (wStream*)msg->wParam;
Stream_Free(s, TRUE);
}
}
/* Create message queue and thread or not, depending on settings */
void* channel_client_create_handler(rdpContext* ctx, LPVOID userdata, MsgHandler msg_handler,
const char* channel_name)
{
msg_proc_internals* internals = calloc(1, sizeof(msg_proc_internals));
if (!internals)
{
WLog_ERR(TAG, "calloc failed!");
return 0;
}
internals->msg_handler = msg_handler;
internals->userdata = userdata;
internals->channel_name = strdup(channel_name);
internals->ctx = ctx;
if (!(ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
wObject obj = { 0 };
obj.fnObjectFree = free_msg;
internals->queue = MessageQueue_New(&obj);
if (!internals->queue)
{
WLog_ERR(TAG, "MessageQueue_New failed!");
return 0;
}
if (!(internals->thread =
CreateThread(NULL, 0, channel_client_thread_proc, (void*)internals, 0, NULL)))
{
WLog_ERR(TAG, "CreateThread failed!");
MessageQueue_Free(internals->queue);
internals->queue = NULL;
}
}
return internals;
}
/* post a message in the queue or directly call the processing handler */
UINT channel_client_post_message(void* MsgsHandle, LPVOID pData, UINT32 dataLength,
UINT32 totalLength, UINT32 dataFlags)
{
msg_proc_internals* internals = MsgsHandle;
wStream* data_in;
if (!internals)
{
/* TODO: return some error here */
return CHANNEL_RC_OK;
}
if ((dataFlags & CHANNEL_FLAG_SUSPEND) || (dataFlags & CHANNEL_FLAG_RESUME))
{
return CHANNEL_RC_OK;
}
if (dataFlags & CHANNEL_FLAG_FIRST)
{
if (internals->data_in)
Stream_Free(internals->data_in, TRUE);
internals->data_in = Stream_New(NULL, totalLength);
}
if (!(data_in = internals->data_in))
{
WLog_ERR(TAG, "Stream_New failed!");
return CHANNEL_RC_NO_MEMORY;
}
if (!Stream_EnsureRemainingCapacity(data_in, dataLength))
{
Stream_Free(internals->data_in, TRUE);
internals->data_in = NULL;
return CHANNEL_RC_NO_MEMORY;
}
Stream_Write(data_in, pData, dataLength);
if (dataFlags & CHANNEL_FLAG_LAST)
{
if (Stream_Capacity(data_in) != Stream_GetPosition(data_in))
{
char msg[128];
_snprintf(msg, 127, "%s_plugin_process_received: read error", internals->channel_name);
WLog_ERR(TAG, msg);
return ERROR_INTERNAL_ERROR;
}
internals->data_in = NULL;
Stream_SealLength(data_in);
Stream_SetPosition(data_in, 0);
if (internals->ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS)
{
UINT error = CHANNEL_RC_OK;
if ((error = internals->msg_handler(internals->userdata, data_in)))
{
WLog_ERR(TAG,
"msg_handler failed with error"
" %" PRIu32 "!",
error);
return ERROR_INTERNAL_ERROR;
}
}
else if (!MessageQueue_Post(internals->queue, NULL, 0, (void*)data_in, NULL))
{
WLog_ERR(TAG, "MessageQueue_Post failed!");
return ERROR_INTERNAL_ERROR;
}
}
return CHANNEL_RC_OK;
}
/* Tear down queue and thread */
UINT channel_client_quit_handler(void* MsgsHandle)
{
msg_proc_internals* internals = MsgsHandle;
UINT rc;
if (!internals)
{
/* TODO: return some error here */
return CHANNEL_RC_OK;
}
if (!(internals->ctx->settings->ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
if (MessageQueue_PostQuit(internals->queue, 0) &&
(WaitForSingleObject(internals->thread, INFINITE) == WAIT_FAILED))
{
rc = GetLastError();
WLog_ERR(TAG, "WaitForSingleObject failed with error %" PRIu32 "", rc);
return rc;
}
MessageQueue_Free(internals->queue);
CloseHandle(internals->thread);
}
if (internals->data_in)
{
Stream_Free(internals->data_in, TRUE);
internals->data_in = NULL;
}
if (internals->channel_name)
{
free(internals->channel_name);
}
free(internals);
return CHANNEL_RC_OK;
}

View File

@ -16,3 +16,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
typedef UINT (*MsgHandler)(LPVOID userdata, wStream* data);
FREERDP_API void* channel_client_create_handler(rdpContext* ctx, LPVOID userdata, MsgHandler,
const char* channel_name);
UINT channel_client_post_message(void* MsgsHandle, LPVOID pData, UINT32 dataLength,
UINT32 totalLength, UINT32 dataFlags);
UINT channel_client_quit_handler(void* MsgsHandle);

View File

@ -32,6 +32,8 @@
#include <freerdp/constants.h>
#include <freerdp/client/cliprdr.h>
#include "../../channels/client/addin.h"
#include "cliprdr_main.h"
#include "cliprdr_format.h"
#include "../cliprdr_common.h"
@ -445,8 +447,9 @@ static UINT cliprdr_process_unlock_clipdata(cliprdrPlugin* cliprdr, wStream* s,
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT cliprdr_order_recv(cliprdrPlugin* cliprdr, wStream* s)
static UINT cliprdr_order_recv(LPVOID userdata, wStream* s)
{
cliprdrPlugin* cliprdr = userdata;
UINT16 msgType;
UINT16 msgFlags;
UINT32 dataLen;
@ -849,67 +852,6 @@ cliprdr_client_file_contents_response(CliprdrClientContext* context,
return cliprdr_packet_send(cliprdr, s);
}
/**
* Function description
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT cliprdr_virtual_channel_event_data_received(cliprdrPlugin* cliprdr, void* pData,
UINT32 dataLength, UINT32 totalLength,
UINT32 dataFlags)
{
wStream* data_in;
if ((dataFlags & CHANNEL_FLAG_SUSPEND) || (dataFlags & CHANNEL_FLAG_RESUME))
{
return CHANNEL_RC_OK;
}
if (dataFlags & CHANNEL_FLAG_FIRST)
{
if (cliprdr->data_in)
Stream_Free(cliprdr->data_in, TRUE);
cliprdr->data_in = Stream_New(NULL, totalLength);
}
if (!(data_in = cliprdr->data_in))
{
WLog_ERR(TAG, "Stream_New failed!");
return CHANNEL_RC_NO_MEMORY;
}
if (!Stream_EnsureRemainingCapacity(data_in, dataLength))
{
Stream_Free(cliprdr->data_in, TRUE);
cliprdr->data_in = NULL;
return CHANNEL_RC_NO_MEMORY;
}
Stream_Write(data_in, pData, dataLength);
if (dataFlags & CHANNEL_FLAG_LAST)
{
if (Stream_Capacity(data_in) != Stream_GetPosition(data_in))
{
WLog_ERR(TAG, "cliprdr_plugin_process_received: read error");
return ERROR_INTERNAL_ERROR;
}
cliprdr->data_in = NULL;
Stream_SealLength(data_in);
Stream_SetPosition(data_in, 0);
if (!MessageQueue_Post(cliprdr->queue, NULL, 0, (void*)data_in, NULL))
{
WLog_ERR(TAG, "MessageQueue_Post failed!");
return ERROR_INTERNAL_ERROR;
}
}
return CHANNEL_RC_OK;
}
static VOID VCAPITYPE cliprdr_virtual_channel_open_event_ex(LPVOID lpUserParam, DWORD openHandle,
UINT event, LPVOID pData,
UINT32 dataLength, UINT32 totalLength,
@ -926,8 +868,8 @@ static VOID VCAPITYPE cliprdr_virtual_channel_open_event_ex(LPVOID lpUserParam,
WLog_ERR(TAG, "error no match");
return;
}
if ((error = cliprdr_virtual_channel_event_data_received(cliprdr, pData, dataLength,
totalLength, dataFlags)))
if ((error = channel_client_post_message(cliprdr->MsgsHandle, pData, dataLength,
totalLength, dataFlags)))
WLog_ERR(TAG, "failed with error %" PRIu32 "", error);
break;
@ -949,62 +891,6 @@ static VOID VCAPITYPE cliprdr_virtual_channel_open_event_ex(LPVOID lpUserParam,
"cliprdr_virtual_channel_open_event_ex reported an error");
}
static DWORD WINAPI cliprdr_virtual_channel_client_thread(LPVOID arg)
{
wStream* data;
wMessage message;
cliprdrPlugin* cliprdr = (cliprdrPlugin*)arg;
UINT error = CHANNEL_RC_OK;
while (1)
{
if (!MessageQueue_Wait(cliprdr->queue))
{
WLog_ERR(TAG, "MessageQueue_Wait failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (!MessageQueue_Peek(cliprdr->queue, &message, TRUE))
{
WLog_ERR(TAG, "MessageQueue_Peek failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (message.id == WMQ_QUIT)
break;
if (message.id == 0)
{
data = (wStream*)message.wParam;
if ((error = cliprdr_order_recv(cliprdr, data)))
{
WLog_ERR(TAG, "cliprdr_order_recv failed with error %" PRIu32 "!", error);
break;
}
}
}
if (error && cliprdr->context->rdpcontext)
setChannelError(cliprdr->context->rdpcontext, error,
"cliprdr_virtual_channel_client_thread reported an error");
ExitThread(error);
return error;
}
static void cliprdr_free_msg(void* obj)
{
wMessage* msg = (wMessage*)obj;
if (msg)
{
wStream* s = (wStream*)msg->wParam;
Stream_Free(s, TRUE);
}
}
/**
* Function description
@ -1015,7 +901,6 @@ static UINT cliprdr_virtual_channel_event_connected(cliprdrPlugin* cliprdr, LPVO
UINT32 dataLength)
{
UINT32 status;
wObject obj = { 0 };
status = cliprdr->channelEntryPoints.pVirtualChannelOpenEx(
cliprdr->InitHandle, &cliprdr->OpenHandle, cliprdr->channelDef.name,
cliprdr_virtual_channel_open_event_ex);
@ -1027,23 +912,8 @@ static UINT cliprdr_virtual_channel_event_connected(cliprdrPlugin* cliprdr, LPVO
return status;
}
obj.fnObjectFree = cliprdr_free_msg;
cliprdr->queue = MessageQueue_New(&obj);
if (!cliprdr->queue)
{
WLog_ERR(TAG, "MessageQueue_New failed!");
return ERROR_NOT_ENOUGH_MEMORY;
}
if (!(cliprdr->thread = CreateThread(NULL, 0, cliprdr_virtual_channel_client_thread,
(void*)cliprdr, 0, NULL)))
{
WLog_ERR(TAG, "CreateThread failed!");
MessageQueue_Free(cliprdr->queue);
cliprdr->queue = NULL;
return ERROR_INTERNAL_ERROR;
}
cliprdr->MsgsHandle = channel_client_create_handler(cliprdr->context->rdpcontext, cliprdr,
cliprdr_order_recv, "cliprdr");
return CHANNEL_RC_OK;
}
@ -1060,16 +930,8 @@ static UINT cliprdr_virtual_channel_event_disconnected(cliprdrPlugin* cliprdr)
if (cliprdr->OpenHandle == 0)
return CHANNEL_RC_OK;
if (MessageQueue_PostQuit(cliprdr->queue, 0) &&
(WaitForSingleObject(cliprdr->thread, INFINITE) == WAIT_FAILED))
{
rc = GetLastError();
WLog_ERR(TAG, "WaitForSingleObject failed with error %" PRIu32 "", rc);
return rc;
}
channel_client_quit_handler(cliprdr->MsgsHandle);
MessageQueue_Free(cliprdr->queue);
CloseHandle(cliprdr->thread);
rc = cliprdr->channelEntryPoints.pVirtualChannelCloseEx(cliprdr->InitHandle,
cliprdr->OpenHandle);
@ -1082,12 +944,6 @@ static UINT cliprdr_virtual_channel_event_disconnected(cliprdrPlugin* cliprdr)
cliprdr->OpenHandle = 0;
if (cliprdr->data_in)
{
Stream_Free(cliprdr->data_in, TRUE);
cliprdr->data_in = NULL;
}
return CHANNEL_RC_OK;
}

View File

@ -38,11 +38,9 @@ struct cliprdr_plugin
CliprdrClientContext* context;
wLog* log;
HANDLE thread;
wStream* data_in;
void* InitHandle;
DWORD OpenHandle;
wMessageQueue* queue;
void* MsgsHandle;
BOOL capabilitiesReceived;
BOOL useLongFormatNames;

View File

@ -691,7 +691,7 @@ static UINT dvcman_receive_channel_data_first(drdynvcPlugin* drdynvc,
*/
static UINT dvcman_receive_channel_data(drdynvcPlugin* drdynvc,
IWTSVirtualChannelManager* pChannelMgr, UINT32 ChannelId,
wStream* data)
wStream* data, UINT32 ThreadingFlags)
{
UINT status = CHANNEL_RC_OK;
DVCMAN_CHANNEL* channel;
@ -1123,7 +1123,8 @@ static UINT drdynvc_process_create_request(drdynvcPlugin* drdynvc, int Sp, int c
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT drdynvc_process_data_first(drdynvcPlugin* drdynvc, int Sp, int cbChId, wStream* s)
static UINT drdynvc_process_data_first(drdynvcPlugin* drdynvc, int Sp, int cbChId, wStream* s,
UINT32 ThreadingFlags)
{
UINT status;
UINT32 Length;
@ -1140,7 +1141,8 @@ static UINT drdynvc_process_data_first(drdynvcPlugin* drdynvc, int Sp, int cbChI
status = dvcman_receive_channel_data_first(drdynvc, drdynvc->channel_mgr, ChannelId, Length);
if (status == CHANNEL_RC_OK)
status = dvcman_receive_channel_data(drdynvc, drdynvc->channel_mgr, ChannelId, s);
status = dvcman_receive_channel_data(drdynvc, drdynvc->channel_mgr, ChannelId, s,
ThreadingFlags);
if (status != CHANNEL_RC_OK)
status = dvcman_close_channel(drdynvc->channel_mgr, ChannelId, TRUE);
@ -1153,7 +1155,8 @@ static UINT drdynvc_process_data_first(drdynvcPlugin* drdynvc, int Sp, int cbChI
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT drdynvc_process_data(drdynvcPlugin* drdynvc, int Sp, int cbChId, wStream* s)
static UINT drdynvc_process_data(drdynvcPlugin* drdynvc, int Sp, int cbChId, wStream* s,
UINT32 ThreadingFlags)
{
UINT32 ChannelId;
UINT status;
@ -1164,7 +1167,8 @@ static UINT drdynvc_process_data(drdynvcPlugin* drdynvc, int Sp, int cbChId, wSt
ChannelId = drdynvc_read_variable_uint(s, cbChId);
WLog_Print(drdynvc->log, WLOG_TRACE, "process_data: Sp=%d cbChId=%d, ChannelId=%" PRIu32 "", Sp,
cbChId, ChannelId);
status = dvcman_receive_channel_data(drdynvc, drdynvc->channel_mgr, ChannelId, s);
status =
dvcman_receive_channel_data(drdynvc, drdynvc->channel_mgr, ChannelId, s, ThreadingFlags);
if (status != CHANNEL_RC_OK)
status = dvcman_close_channel(drdynvc->channel_mgr, ChannelId, TRUE);
@ -1202,7 +1206,7 @@ static UINT drdynvc_process_close_request(drdynvcPlugin* drdynvc, int Sp, int cb
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT drdynvc_order_recv(drdynvcPlugin* drdynvc, wStream* s)
static UINT drdynvc_order_recv(drdynvcPlugin* drdynvc, wStream* s, UINT32 ThreadingFlags)
{
int value;
int Cmd;
@ -1227,10 +1231,10 @@ static UINT drdynvc_order_recv(drdynvcPlugin* drdynvc, wStream* s)
return drdynvc_process_create_request(drdynvc, Sp, cbChId, s);
case DATA_FIRST_PDU:
return drdynvc_process_data_first(drdynvc, Sp, cbChId, s);
return drdynvc_process_data_first(drdynvc, Sp, cbChId, s, ThreadingFlags);
case DATA_PDU:
return drdynvc_process_data(drdynvc, Sp, cbChId, s);
return drdynvc_process_data(drdynvc, Sp, cbChId, s, ThreadingFlags);
case CLOSE_REQUEST_PDU:
return drdynvc_process_close_request(drdynvc, Sp, cbChId, s);
@ -1350,6 +1354,7 @@ static void VCAPITYPE drdynvc_virtual_channel_open_event_ex(LPVOID lpUserParam,
static DWORD WINAPI drdynvc_virtual_channel_client_thread(LPVOID arg)
{
/* TODO: rewrite this */
wStream* data;
wMessage message;
UINT error = CHANNEL_RC_OK;
@ -1382,9 +1387,10 @@ static DWORD WINAPI drdynvc_virtual_channel_client_thread(LPVOID arg)
if (message.id == 0)
{
UINT32 ThreadingFlags = TRUE;
data = (wStream*)message.wParam;
if ((error = drdynvc_order_recv(drdynvc, data)))
if ((error = drdynvc_order_recv(drdynvc, data, ThreadingFlags)))
{
WLog_Print(drdynvc->log, WLOG_WARN,
"drdynvc_order_recv failed with error %" PRIu32 "!", error);

View File

@ -35,6 +35,8 @@
#include "rail_orders.h"
#include "rail_main.h"
#include "../../channels/client/addin.h"
RailClientContext* rail_get_client_interface(railPlugin* rail)
{
RailClientContext* pInterface;
@ -513,68 +515,6 @@ static UINT rail_client_snap_arrange(RailClientContext* context, const RAIL_SNAP
return rail_send_client_snap_arrange_order(rail, snap);
}
/**
* Function description
*
* @return 0 on success, otherwise a Win32 error code
*/
static UINT rail_virtual_channel_event_data_received(railPlugin* rail, void* pData,
UINT32 dataLength, UINT32 totalLength,
UINT32 dataFlags)
{
wStream* data_in;
if ((dataFlags & CHANNEL_FLAG_SUSPEND) || (dataFlags & CHANNEL_FLAG_RESUME))
{
return CHANNEL_RC_OK;
}
if (dataFlags & CHANNEL_FLAG_FIRST)
{
if (rail->data_in)
Stream_Free(rail->data_in, TRUE);
rail->data_in = Stream_New(NULL, totalLength);
if (!rail->data_in)
{
WLog_ERR(TAG, "Stream_New failed!");
return CHANNEL_RC_NO_MEMORY;
}
}
data_in = rail->data_in;
if (!Stream_EnsureRemainingCapacity(data_in, dataLength))
{
WLog_ERR(TAG, "Stream_EnsureRemainingCapacity failed!");
return CHANNEL_RC_NO_MEMORY;
}
Stream_Write(data_in, pData, dataLength);
if (dataFlags & CHANNEL_FLAG_LAST)
{
if (Stream_Capacity(data_in) != Stream_GetPosition(data_in))
{
WLog_ERR(TAG, "rail_plugin_process_received: read error");
return ERROR_INTERNAL_ERROR;
}
rail->data_in = NULL;
Stream_SealLength(data_in);
Stream_SetPosition(data_in, 0);
if (!MessageQueue_Post(rail->queue, NULL, 0, (void*)data_in, NULL))
{
WLog_ERR(TAG, "MessageQueue_Post failed!");
return ERROR_INTERNAL_ERROR;
}
}
return CHANNEL_RC_OK;
}
static VOID VCAPITYPE rail_virtual_channel_open_event_ex(LPVOID lpUserParam, DWORD openHandle,
UINT event, LPVOID pData,
UINT32 dataLength, UINT32 totalLength,
@ -591,11 +531,15 @@ static VOID VCAPITYPE rail_virtual_channel_open_event_ex(LPVOID lpUserParam, DWO
WLog_ERR(TAG, "error no match");
return;
}
if ((error = rail_virtual_channel_event_data_received(rail, pData, dataLength,
totalLength, dataFlags)))
if ((error = channel_client_post_message(rail->MsgsHandle, pData, dataLength,
totalLength, dataFlags)))
{
WLog_ERR(TAG,
"rail_virtual_channel_event_data_received failed with error %" PRIu32 "!",
"rail_virtual_channel_event_data_received"
" failed with error %" PRIu32 "!",
error);
}
break;
@ -618,54 +562,6 @@ static VOID VCAPITYPE rail_virtual_channel_open_event_ex(LPVOID lpUserParam, DWO
return;
}
static DWORD WINAPI rail_virtual_channel_client_thread(LPVOID arg)
{
wStream* data;
wMessage message;
railPlugin* rail = (railPlugin*)arg;
UINT error = CHANNEL_RC_OK;
while (1)
{
if (!MessageQueue_Wait(rail->queue))
{
WLog_ERR(TAG, "MessageQueue_Wait failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (!MessageQueue_Peek(rail->queue, &message, TRUE))
{
WLog_ERR(TAG, "MessageQueue_Peek failed!");
error = ERROR_INTERNAL_ERROR;
break;
}
if (message.id == WMQ_QUIT)
break;
if (message.id == 0)
{
data = (wStream*)message.wParam;
error = rail_order_recv(rail, data);
Stream_Free(data, TRUE);
if (error)
{
WLog_ERR(TAG, "rail_order_recv failed with error %" PRIu32 "!", error);
break;
}
}
}
if (error && rail->rdpcontext)
setChannelError(rail->rdpcontext, error,
"rail_virtual_channel_client_thread reported an error");
ExitThread(error);
return error;
}
/**
* Function description
*
@ -694,23 +590,8 @@ static UINT rail_virtual_channel_event_connected(railPlugin* rail, LPVOID pData,
WLog_ERR(TAG, "context->OnOpen failed with %s [%08" PRIX32 "]",
WTSErrorToString(status), status);
}
rail->queue = MessageQueue_New(NULL);
if (!rail->queue)
{
WLog_ERR(TAG, "MessageQueue_New failed!");
return CHANNEL_RC_NO_MEMORY;
}
if (!(rail->thread =
CreateThread(NULL, 0, rail_virtual_channel_client_thread, (void*)rail, 0, NULL)))
{
WLog_ERR(TAG, "CreateThread failed!");
MessageQueue_Free(rail->queue);
rail->queue = NULL;
return ERROR_INTERNAL_ERROR;
}
rail->MsgsHandle =
channel_client_create_handler(rail->rdpcontext, rail, rail_order_recv, "rail");
return CHANNEL_RC_OK;
}
@ -727,18 +608,8 @@ static UINT rail_virtual_channel_event_disconnected(railPlugin* rail)
if (rail->OpenHandle == 0)
return CHANNEL_RC_OK;
if (MessageQueue_PostQuit(rail->queue, 0) &&
(WaitForSingleObject(rail->thread, INFINITE) == WAIT_FAILED))
{
rc = GetLastError();
WLog_ERR(TAG, "WaitForSingleObject failed with error %" PRIu32 "", rc);
return rc;
}
channel_client_quit_handler(rail->MsgsHandle);
MessageQueue_Free(rail->queue);
CloseHandle(rail->thread);
rail->queue = NULL;
rail->thread = NULL;
rc = rail->channelEntryPoints.pVirtualChannelCloseEx(rail->InitHandle, rail->OpenHandle);
if (CHANNEL_RC_OK != rc)
@ -750,11 +621,6 @@ static UINT rail_virtual_channel_event_disconnected(railPlugin* rail)
rail->OpenHandle = 0;
if (rail->data_in)
{
Stream_Free(rail->data_in, TRUE);
rail->data_in = NULL;
}
return CHANNEL_RC_OK;
}

View File

@ -44,11 +44,9 @@ struct rail_plugin
RailClientContext* context;
wLog* log;
HANDLE thread;
wStream* data_in;
void* InitHandle;
DWORD OpenHandle;
wMessageQueue* queue;
void* MsgsHandle;
rdpContext* rdpcontext;
DWORD channelBuildNumber;
DWORD channelFlags;

View File

@ -909,8 +909,9 @@ static UINT rail_recv_get_application_id_extended_response_order(railPlugin* rai
*
* @return 0 on success, otherwise a Win32 error code
*/
UINT rail_order_recv(railPlugin* rail, wStream* s)
UINT rail_order_recv(LPVOID userdata, wStream* s)
{
railPlugin* rail = userdata;
UINT16 orderType;
UINT16 orderLength;
UINT error;

View File

@ -29,7 +29,7 @@
#define TAG CHANNELS_TAG("rail.client")
UINT rail_order_recv(railPlugin* rail, wStream* s);
UINT rail_order_recv(LPVOID userdata, wStream* s);
UINT rail_send_pdu(railPlugin* rail, wStream* s, UINT16 orderType);
UINT rail_send_handshake_order(railPlugin* rail, const RAIL_HANDSHAKE_ORDER* handshake);

View File

@ -199,7 +199,7 @@ extern "C"
FREERDP_API BOOL rfx_context_reset(RFX_CONTEXT* context, UINT32 width, UINT32 height);
FREERDP_API RFX_CONTEXT* rfx_context_new(BOOL encoder);
FREERDP_API RFX_CONTEXT* rfx_context_new(BOOL encoder, UINT32 ThreadingFlags);
FREERDP_API void rfx_context_free(RFX_CONTEXT* context);
#ifdef __cplusplus

View File

@ -54,7 +54,7 @@ extern "C"
FREERDP_API void yuv_context_reset(YUV_CONTEXT* context, UINT32 width, UINT32 height);
FREERDP_API YUV_CONTEXT* yuv_context_new(BOOL encoder);
FREERDP_API YUV_CONTEXT* yuv_context_new(BOOL encoder, UINT32 ThreadingFlags);
FREERDP_API void yuv_context_free(YUV_CONTEXT* context);
#ifdef __cplusplus

View File

@ -502,6 +502,8 @@ typedef struct _RDPDR_PARALLEL RDPDR_PARALLEL;
#define PROXY_TYPE_SOCKS 2
#define PROXY_TYPE_IGNORE 0xFFFF
/* ThreadingFlags */
#define THREADING_FLAGS_DISABLE_THREADS 0x00000001
/* Settings */
#ifdef __GNUC__
@ -533,6 +535,7 @@ typedef struct _RDPDR_PARALLEL RDPDR_PARALLEL;
#define FreeRDP_MaxTimeInCheckLoop (26)
#define FreeRDP_AcceptedCert (27)
#define FreeRDP_AcceptedCertLength (28)
#define FreeRDP_ThreadingFlags (64)
#define FreeRDP_RdpVersion (128)
#define FreeRDP_DesktopWidth (129)
#define FreeRDP_DesktopHeight (130)
@ -928,10 +931,11 @@ struct rdp_settings
ALIGN64 UINT32 MaxTimeInCheckLoop; /* 26 */
ALIGN64 char* AcceptedCert; /* 27 */
ALIGN64 UINT32 AcceptedCertLength; /* 28 */
UINT64 padding0064[64 - 29]; /* 29 */
/* resource management related options */
ALIGN64 UINT32 ThreadingFlags; /* 64 */
UINT64 padding0064[64 - 29]; /* 29 */
UINT64 padding0128[128 - 64]; /* 64 */
UINT64 padding0128[128 - 65]; /* 65 */
/**
* GCC User Data Blocks
@ -1719,6 +1723,7 @@ extern "C"
FREERDP_API SSIZE_T freerdp_settings_get_type_for_name(const char* value);
FREERDP_API SSIZE_T freerdp_settings_get_type_for_key(size_t key);
FREERDP_API const char* freerdp_settings_get_name_for_key(size_t key);
FREERDP_API UINT32 freerdp_settings_get_codecs_flags(const rdpSettings* settings);
#ifdef __cplusplus
}

View File

@ -658,7 +658,7 @@ H264_CONTEXT* h264_context_new(BOOL Compressor)
if (!h264_context_init(h264))
goto fail;
h264->yuv = yuv_context_new(Compressor);
h264->yuv = yuv_context_new(Compressor, 0);
if (!h264->yuv)
goto fail;

View File

@ -2603,7 +2603,7 @@ PROGRESSIVE_CONTEXT* progressive_context_new(BOOL Compressor)
progressive->log = WLog_Get(TAG);
if (!progressive->log)
goto fail;
progressive->rfx_context = rfx_context_new(Compressor);
progressive->rfx_context = rfx_context_new(Compressor, 0);
if (!progressive->rfx_context)
goto fail;
progressive->buffer = Stream_New(NULL, 1024);

View File

@ -195,7 +195,7 @@ static void rfx_encoder_tile_free(void* obj)
free(obj);
}
RFX_CONTEXT* rfx_context_new(BOOL encoder)
RFX_CONTEXT* rfx_context_new(BOOL encoder, UINT32 ThreadingFlags)
{
HKEY hKey;
LONG status;
@ -257,6 +257,8 @@ RFX_CONTEXT* rfx_context_new(BOOL encoder)
if (!priv->BufferPool)
goto error_BufferPool;
if (!(ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
#ifdef _WIN32
{
BOOL isVistaOrLater;
@ -265,7 +267,7 @@ RFX_CONTEXT* rfx_context_new(BOOL encoder)
verinfo.dwOSVersionInfoSize = sizeof(OSVERSIONINFOA);
GetVersionExA(&verinfo);
isVistaOrLater =
((verinfo.dwMajorVersion >= 6) && (verinfo.dwMinorVersion >= 0)) ? TRUE : FALSE;
((verinfo.dwMajorVersion >= 6) && (verinfo.dwMinorVersion >= 0)) ? TRUE : FALSE;
priv->UseThreads = isVistaOrLater;
}
#else
@ -281,19 +283,24 @@ RFX_CONTEXT* rfx_context_new(BOOL encoder)
dwSize = sizeof(dwValue);
if (RegQueryValueEx(hKey, _T("UseThreads"), NULL, &dwType, (BYTE*)&dwValue, &dwSize) ==
ERROR_SUCCESS)
ERROR_SUCCESS)
priv->UseThreads = dwValue ? 1 : 0;
if (RegQueryValueEx(hKey, _T("MinThreadCount"), NULL, &dwType, (BYTE*)&dwValue, &dwSize) ==
ERROR_SUCCESS)
ERROR_SUCCESS)
priv->MinThreadCount = dwValue;
if (RegQueryValueEx(hKey, _T("MaxThreadCount"), NULL, &dwType, (BYTE*)&dwValue, &dwSize) ==
ERROR_SUCCESS)
ERROR_SUCCESS)
priv->MaxThreadCount = dwValue;
RegCloseKey(hKey);
}
}
else
{
priv->UseThreads = FALSE;
}
if (priv->UseThreads)
{

View File

@ -851,7 +851,9 @@ int TestFreeRDPCodecRemoteFX(int argc, char* argv[])
BYTE* dest = NULL;
size_t stride = FORMAT_SIZE * IMG_WIDTH;
context = rfx_context_new(FALSE);
/* use default threading options here, pass zero as
* ThreadingFlags */
context = rfx_context_new(FALSE, 0);
if (!context)
goto fail;

View File

@ -111,7 +111,7 @@ void yuv_context_reset(YUV_CONTEXT* context, UINT32 width, UINT32 height)
context->heightStep = (height / context->nthreads);
}
YUV_CONTEXT* yuv_context_new(BOOL encoder)
YUV_CONTEXT* yuv_context_new(BOOL encoder, UINT32 ThreadingFlags)
{
SYSTEM_INFO sysInfos;
YUV_CONTEXT* ret = calloc(1, sizeof(*ret));
@ -121,23 +121,23 @@ YUV_CONTEXT* yuv_context_new(BOOL encoder)
/** do it here to avoid a race condition between threads */
primitives_get();
GetNativeSystemInfo(&sysInfos);
ret->useThreads = (sysInfos.dwNumberOfProcessors > 1);
if (ret->useThreads)
ret->nthreads = 1;
if (!(ThreadingFlags & THREADING_FLAGS_DISABLE_THREADS))
{
ret->nthreads = sysInfos.dwNumberOfProcessors;
ret->threadPool = CreateThreadpool(NULL);
if (!ret->threadPool)
GetNativeSystemInfo(&sysInfos);
ret->useThreads = (sysInfos.dwNumberOfProcessors > 1);
if (ret->useThreads)
{
goto error_threadpool;
}
ret->nthreads = sysInfos.dwNumberOfProcessors;
ret->threadPool = CreateThreadpool(NULL);
if (!ret->threadPool)
{
goto error_threadpool;
}
InitializeThreadpoolEnvironment(&ret->ThreadPoolEnv);
SetThreadpoolCallbackPool(&ret->ThreadPoolEnv, ret->threadPool);
}
else
{
ret->nthreads = 1;
InitializeThreadpoolEnvironment(&ret->ThreadPoolEnv);
SetThreadpoolCallbackPool(&ret->ThreadPoolEnv, ret->threadPool);
}
}
return ret;
@ -308,6 +308,7 @@ static INLINE BOOL check_rect(const YUV_CONTEXT* yuv, const RECTANGLE_16* rect,
/* Check, if the output rectangle is valid in decoded h264 frame. */
if ((rect->right > yuv->width) || (rect->left > yuv->width))
return FALSE;
if ((rect->top > yuv->height) || (rect->bottom > yuv->height))
return FALSE;
@ -422,6 +423,7 @@ BOOL yuv444_context_decode(YUV_CONTEXT* context, BYTE type, const BYTE* pYUVData
if (!pool_decode_rect(context, type, pYUVData, iStride, pYUVDstData, iDstStride, regionRects,
numRegionRects))
return FALSE;
pYUVCDstData[0] = pYUVDstData[0];
pYUVCDstData[1] = pYUVDstData[1];
pYUVCDstData[2] = pYUVDstData[2];

View File

@ -1591,6 +1591,9 @@ UINT32 freerdp_settings_get_uint32(const rdpSettings* settings, size_t id)
case FreeRDP_TcpKeepAliveRetries:
return settings->TcpKeepAliveRetries;
case FreeRDP_ThreadingFlags:
return settings->ThreadingFlags;
case FreeRDP_TlsSecLevel:
return settings->TlsSecLevel;
@ -2049,6 +2052,10 @@ BOOL freerdp_settings_set_uint32(rdpSettings* settings, size_t id, UINT32 val)
settings->TcpKeepAliveRetries = val;
break;
case FreeRDP_ThreadingFlags:
settings->ThreadingFlags = val;
break;
case FreeRDP_TlsSecLevel:
settings->TlsSecLevel = val;
break;

View File

@ -290,6 +290,7 @@ static const struct settings_str_entry settings_map[] = {
{ FreeRDP_TcpKeepAliveDelay, 3, "FreeRDP_TcpKeepAliveDelay" },
{ FreeRDP_TcpKeepAliveInterval, 3, "FreeRDP_TcpKeepAliveInterval" },
{ FreeRDP_TcpKeepAliveRetries, 3, "FreeRDP_TcpKeepAliveRetries" },
{ FreeRDP_ThreadingFlags, 3, "FreeRDP_ThreadingFlags" },
{ FreeRDP_TlsSecLevel, 3, "FreeRDP_TlsSecLevel" },
{ FreeRDP_VirtualChannelChunkSize, 3, "FreeRDP_VirtualChannelChunkSize" },
{ FreeRDP_VirtualChannelCompressionFlags, 3, "FreeRDP_VirtualChannelCompressionFlags" },

View File

@ -65,8 +65,7 @@ BOOL freerdp_client_codecs_prepare(rdpCodecs* codecs, UINT32 flags, UINT32 width
if ((flags & FREERDP_CODEC_REMOTEFX))
{
rfx_context_free(codecs->rfx);
if (!(codecs->rfx = rfx_context_new(FALSE)))
if (!(codecs->rfx = rfx_context_new(FALSE, codecs->context->settings->ThreadingFlags)))
{
WLog_ERR(TAG, "Failed to create rfx codec context");
return FALSE;

View File

@ -200,8 +200,8 @@ static BOOL rdp_client_reset_codecs(rdpContext* context)
if (!context->codecs)
return FALSE;
if (!freerdp_client_codecs_prepare(context->codecs, FREERDP_CODEC_ALL, settings->DesktopWidth,
settings->DesktopHeight))
if (!freerdp_client_codecs_prepare(context->codecs, freerdp_settings_get_codecs_flags(settings),
settings->DesktopWidth, settings->DesktopHeight))
return FALSE;
/* Runtime H264 detection. (only available if dynamic backend loading is defined)

View File

@ -287,6 +287,7 @@ static const size_t uint32_list_indices[] = {
FreeRDP_TcpKeepAliveDelay,
FreeRDP_TcpKeepAliveInterval,
FreeRDP_TcpKeepAliveRetries,
FreeRDP_ThreadingFlags,
FreeRDP_TlsSecLevel,
FreeRDP_VirtualChannelChunkSize,
FreeRDP_VirtualChannelCompressionFlags,

View File

@ -117,8 +117,12 @@ static UINT gdi_ResetGraphics(RdpgfxClientContext* context,
free(pSurfaceIds);
if (!freerdp_client_codecs_reset(context->codecs, FREERDP_CODEC_ALL, gdi->width, gdi->height))
if (!freerdp_client_codecs_reset(gdi->context->codecs,
freerdp_settings_get_codecs_flags(settings), gdi->width,
gdi->height))
{
goto fail;
}
rc = CHANNEL_RC_OK;
fail:

View File

@ -158,7 +158,7 @@ static BOOL mf_peer_context_new(freerdp_peer* client, mfPeerContext* context)
if (!(context->info = mf_info_get_instance()))
return FALSE;
if (!(context->rfx_context = rfx_context_new(TRUE)))
if (!(context->rfx_context = rfx_context_new(TRUE, client->settings->ThreadingFlags)))
goto fail_rfx_context;
context->rfx_context->mode = RLGR3;

View File

@ -58,7 +58,7 @@ static BOOL test_dump_rfx_realtime = TRUE;
static BOOL test_peer_context_new(freerdp_peer* client, rdpContext* ctx)
{
testPeerContext* context = (testPeerContext*)ctx;
if (!(context->rfx_context = rfx_context_new(TRUE)))
if (!(context->rfx_context = rfx_context_new(TRUE, client->settings->ThreadingFlags)))
goto fail_rfx_context;
if (!rfx_context_reset(context->rfx_context, SAMPLE_SERVER_DEFAULT_WIDTH,

View File

@ -187,7 +187,8 @@ void wf_update_encoder_reset(wfInfo* wfi)
}
else
{
wfi->rfx_context = rfx_context_new(TRUE);
/* TODO: pass ThreadingFlags somehow */
wfi->rfx_context = rfx_context_new(TRUE, 0);
wfi->rfx_context->mode = RLGR3;
wfi->rfx_context->width = wfi->servscreen_width;
wfi->rfx_context->height = wfi->servscreen_height;

View File

@ -132,7 +132,7 @@ static int shadow_encoder_uninit_grid(rdpShadowEncoder* encoder)
static int shadow_encoder_init_rfx(rdpShadowEncoder* encoder)
{
if (!encoder->rfx)
encoder->rfx = rfx_context_new(TRUE);
encoder->rfx = rfx_context_new(TRUE, encoder->server->settings->ThreadingFlags);
if (!encoder->rfx)
goto fail;