serial: first steps to get a thread per IRP as a proof of concept. A bit of synchronization is still required.

This commit is contained in:
Emmanuel Ledoux 2014-05-21 10:36:55 +02:00 committed by Emmanuel Ledoux
parent baf4896a38
commit 9639da0067
2 changed files with 133 additions and 48 deletions

View File

@ -55,6 +55,8 @@
#include <freerdp/channels/rdpdr.h>
#include <freerdp/utils/svc_plugin.h>
#define MAX_IRP_THREADS 5
typedef struct _SERIAL_DEVICE SERIAL_DEVICE;
struct _SERIAL_DEVICE
@ -67,10 +69,19 @@ struct _SERIAL_DEVICE
HANDLE MainThread;
wMessageQueue* MainIrpQueue;
HANDLE ReadThread;
wMessageQueue* ReadIrpQueue;
/* one thread per pending IRP and indexed according their CompletionId */
wListDictionary *IrpThreads;
};
typedef struct _IRP_THREAD_DATA IRP_THREAD_DATA;
struct _IRP_THREAD_DATA
{
SERIAL_DEVICE *serial;
IRP *irp;
};
static void serial_process_irp_create(SERIAL_DEVICE* serial, IRP* irp)
{
DWORD DesiredAccess;
@ -138,7 +149,8 @@ static void serial_process_irp_create(SERIAL_DEVICE* serial, IRP* irp)
/* _comm_set_permissive(serial->hComm, TRUE); */
/* FIXME: this stinks, see also IOCTL_SERIAL_PURGE */
_comm_set_ReadIrpQueue(serial->hComm, serial->ReadIrpQueue);
// TMP: to be removed
//_comm_set_ReadIrpQueue(serial->hComm, serial->ReadIrpQueue);
/* NOTE: binary mode/raw mode required for the redirection. On
* Linux, CommCreateFileA forces this setting.
@ -498,36 +510,126 @@ static void serial_process_irp(SERIAL_DEVICE* serial, IRP* irp)
}
}
static void* serial_read_thread_func(void* arg)
static void* irp_thread_func(void* arg)
{
IRP* irp;
wMessage message;
SERIAL_DEVICE* serial = (SERIAL_DEVICE*) arg;
IRP_THREAD_DATA *data = (IRP_THREAD_DATA*)arg;
while (1)
{
if (!MessageQueue_Wait(serial->ReadIrpQueue))
break;
/* blocks until the end of the request */
serial_process_irp(data->serial, data->irp);
if (!MessageQueue_Peek(serial->ReadIrpQueue, &message, TRUE))
break;
if (message.id == WMQ_QUIT)
break;
irp = (IRP*) message.wParam;
if (irp)
{
assert(irp->MajorFunction == IRP_MJ_READ);
serial_process_irp(serial, irp);
}
}
free(data);
ExitThread(0);
return NULL;
}
static void create_irp_thread(SERIAL_DEVICE *serial, IRP *irp)
{
IRP_THREAD_DATA *data = NULL;
HANDLE irpThread = INVALID_HANDLE_VALUE;
HANDLE previousIrpThread;
/* Checks whether a previous IRP with the same CompletionId
* was completed. NB: this can be the a recall of the same
* request let as blocking. Behavior at least observed with
* IOCTL_SERIAL_WAIT_ON_MASK. FIXME: to be confirmed.
*/
// TMP: there is a slight chance that the server sends a new request with the same CompletionId whereas the previous thread is not yet terminated.
previousIrpThread = ListDictionary_GetItemValue(serial->IrpThreads, (void*)irp->CompletionId);
if (previousIrpThread)
{
DWORD waitResult;
/* FIXME: not quite sure a zero timeout is a good thing to check whether a thread is stil alived or not */
waitResult = WaitForSingleObject(previousIrpThread, 0);
if (waitResult == WAIT_TIMEOUT)
{
/* Thread still alived */
/* FIXME: how to send a kind of wake up signal to accelerate the pending request */
DEBUG_WARN("IRP with the CompletionId=%d not yet completed!", irp->CompletionId);
// TMP:
assert(FALSE); /* assert() to be removed if it does realy happen */
/* FIXME: asserts that the previous thread's IRP is well the same request */
irp->Discard(irp);
return;
}
else if(waitResult == WAIT_OBJECT_0)
{
DEBUG_SVC("previous IRP thread with CompletionId=%d naturally died", irp->CompletionId);
/* the previous thread naturally died */
CloseHandle(previousIrpThread);
ListDictionary_Remove(serial->IrpThreads, (void*)irp->CompletionId);
}
else
{
/* FIXME: handle more error cases */
DEBUG_WARN("IRP CompletionId=%d : unexpected waitResult=%X");
irp->Discard(irp);
assert(FALSE); /* should not happen */
return;
}
}
if (ListDictionary_Count(serial->IrpThreads) >= MAX_IRP_THREADS)
{
DEBUG_WARN("Maximal number of IRP threads reached: %d", ListDictionary_Count(serial->IrpThreads));
assert(FALSE);
/* TODO: FIXME: WaitForMultipleObjects() not yet implemented for threads */
}
/* error_handle to be used ... */
data = (IRP_THREAD_DATA*)calloc(1, sizeof(IRP_THREAD_DATA));
if (data == NULL)
{
DEBUG_WARN("Could not allocate a new IRP_THREAD_DATA.");
goto error_handle;
}
data->serial = serial;
data->irp = irp;
irpThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE)irp_thread_func,
(void*)data,
0,
NULL);
if (irpThread == INVALID_HANDLE_VALUE)
{
DEBUG_WARN("Could not allocate a new IRP thread.");
goto error_handle;
}
ListDictionary_Add(serial->IrpThreads, (void*)irp->CompletionId, irpThread);
return; /* data freed by irp_thread_func */
error_handle:
irp->IoStatus = STATUS_NO_MEMORY;
irp->Complete(irp);
if (data)
free(data);
}
static void* serial_thread_func(void* arg)
{
IRP* irp;
@ -548,7 +650,7 @@ static void* serial_thread_func(void* arg)
irp = (IRP*) message.wParam;
if (irp)
serial_process_irp(serial, irp);
create_irp_thread(serial, irp);
}
ExitThread(0);
@ -570,15 +672,7 @@ static void serial_irp_request(DEVICE* device, IRP* irp)
* write requests.
*/
switch(irp->MajorFunction)
{
case IRP_MJ_READ:
MessageQueue_Post(serial->ReadIrpQueue, NULL, 0, (void*) irp, NULL);
break;
default:
MessageQueue_Post(serial->MainIrpQueue, NULL, 0, (void*) irp, NULL);
}
MessageQueue_Post(serial->MainIrpQueue, NULL, 0, (void*) irp, NULL);
}
static void serial_free(DEVICE* device)
@ -587,10 +681,6 @@ static void serial_free(DEVICE* device)
WLog_Print(serial->log, WLOG_DEBUG, "freeing");
MessageQueue_PostQuit(serial->ReadIrpQueue, 0);
WaitForSingleObject(serial->ReadThread, 100 /* ms */); /* INFINITE can block the process on a Read, FIXME: is a better signal possible? */
CloseHandle(serial->ReadThread);
MessageQueue_PostQuit(serial->MainIrpQueue, 0);
WaitForSingleObject(serial->MainThread, INFINITE); /* FIXME: might likely block on a pending Write or ioctl */
CloseHandle(serial->MainThread);
@ -600,8 +690,8 @@ static void serial_free(DEVICE* device)
/* Clean up resources */
Stream_Free(serial->device.data, TRUE);
MessageQueue_Free(serial->ReadIrpQueue);
MessageQueue_Free(serial->MainIrpQueue);
ListDictionary_Free(serial->IrpThreads);
free(serial);
}
@ -653,22 +743,16 @@ int DeviceServiceEntry(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints)
for (i = 0; i <= len; i++)
Stream_Write_UINT8(serial->device.data, name[i] < 0 ? '_' : name[i]);
serial->ReadIrpQueue = MessageQueue_New(NULL);
serial->MainIrpQueue = MessageQueue_New(NULL);
serial->IrpThreads = ListDictionary_New(FALSE); /* only handled in create_irp_thread() */
WLog_Init();
serial->log = WLog_Get("com.freerdp.channel.serial.client");
WLog_Print(serial->log, WLOG_DEBUG, "initializing");
pEntryPoints->RegisterDevice(pEntryPoints->devman, (DEVICE*) serial);
serial->ReadThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE) serial_read_thread_func,
(void*) serial,
0,
NULL);
serial->MainThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE) serial_thread_func,

View File

@ -57,6 +57,7 @@ struct winpr_comm
BOOL permissive;
// TMP: to be renamed serverSerialDriverId
REMOTE_SERIAL_DRIVER_ID remoteSerialDriverId;
wMessageQueue* ReadIrpQueue; /* considered as optional since it is