From 9639da0067889c8080d0ccbf1b308f19d24c4d0c Mon Sep 17 00:00:00 2001 From: Emmanuel Ledoux Date: Wed, 21 May 2014 10:36:55 +0200 Subject: [PATCH] serial: first steps to get a thread per IRP as a proof of concept. A bit of synchronization is still required. --- channels/serial/client/serial_main.c | 180 ++++++++++++++++++++------- winpr/libwinpr/comm/comm.h | 1 + 2 files changed, 133 insertions(+), 48 deletions(-) diff --git a/channels/serial/client/serial_main.c b/channels/serial/client/serial_main.c index b0fa5d6f5..d4a9bc2d0 100644 --- a/channels/serial/client/serial_main.c +++ b/channels/serial/client/serial_main.c @@ -55,6 +55,8 @@ #include #include +#define MAX_IRP_THREADS 5 + typedef struct _SERIAL_DEVICE SERIAL_DEVICE; struct _SERIAL_DEVICE @@ -67,10 +69,19 @@ struct _SERIAL_DEVICE HANDLE MainThread; wMessageQueue* MainIrpQueue; - HANDLE ReadThread; - wMessageQueue* ReadIrpQueue; + /* one thread per pending IRP and indexed according their CompletionId */ + wListDictionary *IrpThreads; }; +typedef struct _IRP_THREAD_DATA IRP_THREAD_DATA; + +struct _IRP_THREAD_DATA +{ + SERIAL_DEVICE *serial; + IRP *irp; +}; + + static void serial_process_irp_create(SERIAL_DEVICE* serial, IRP* irp) { DWORD DesiredAccess; @@ -138,7 +149,8 @@ static void serial_process_irp_create(SERIAL_DEVICE* serial, IRP* irp) /* _comm_set_permissive(serial->hComm, TRUE); */ /* FIXME: this stinks, see also IOCTL_SERIAL_PURGE */ - _comm_set_ReadIrpQueue(serial->hComm, serial->ReadIrpQueue); + // TMP: to be removed + //_comm_set_ReadIrpQueue(serial->hComm, serial->ReadIrpQueue); /* NOTE: binary mode/raw mode required for the redirection. On * Linux, CommCreateFileA forces this setting. @@ -498,36 +510,126 @@ static void serial_process_irp(SERIAL_DEVICE* serial, IRP* irp) } } -static void* serial_read_thread_func(void* arg) +static void* irp_thread_func(void* arg) { - IRP* irp; - wMessage message; - SERIAL_DEVICE* serial = (SERIAL_DEVICE*) arg; + IRP_THREAD_DATA *data = (IRP_THREAD_DATA*)arg; - while (1) - { - if (!MessageQueue_Wait(serial->ReadIrpQueue)) - break; + /* blocks until the end of the request */ + serial_process_irp(data->serial, data->irp); - if (!MessageQueue_Peek(serial->ReadIrpQueue, &message, TRUE)) - break; - - if (message.id == WMQ_QUIT) - break; - - irp = (IRP*) message.wParam; - - if (irp) - { - assert(irp->MajorFunction == IRP_MJ_READ); - serial_process_irp(serial, irp); - } - } + free(data); ExitThread(0); return NULL; } + +static void create_irp_thread(SERIAL_DEVICE *serial, IRP *irp) +{ + IRP_THREAD_DATA *data = NULL; + HANDLE irpThread = INVALID_HANDLE_VALUE; + HANDLE previousIrpThread; + + /* Checks whether a previous IRP with the same CompletionId + * was completed. NB: this can be the a recall of the same + * request let as blocking. Behavior at least observed with + * IOCTL_SERIAL_WAIT_ON_MASK. FIXME: to be confirmed. + */ + + // TMP: there is a slight chance that the server sends a new request with the same CompletionId whereas the previous thread is not yet terminated. + + previousIrpThread = ListDictionary_GetItemValue(serial->IrpThreads, (void*)irp->CompletionId); + + if (previousIrpThread) + { + DWORD waitResult; + + /* FIXME: not quite sure a zero timeout is a good thing to check whether a thread is stil alived or not */ + waitResult = WaitForSingleObject(previousIrpThread, 0); + + if (waitResult == WAIT_TIMEOUT) + { + /* Thread still alived */ + /* FIXME: how to send a kind of wake up signal to accelerate the pending request */ + + DEBUG_WARN("IRP with the CompletionId=%d not yet completed!", irp->CompletionId); + + // TMP: + assert(FALSE); /* assert() to be removed if it does realy happen */ + + /* FIXME: asserts that the previous thread's IRP is well the same request */ + irp->Discard(irp); + return; + } + else if(waitResult == WAIT_OBJECT_0) + { + DEBUG_SVC("previous IRP thread with CompletionId=%d naturally died", irp->CompletionId); + + /* the previous thread naturally died */ + CloseHandle(previousIrpThread); + ListDictionary_Remove(serial->IrpThreads, (void*)irp->CompletionId); + } + else + { + /* FIXME: handle more error cases */ + DEBUG_WARN("IRP CompletionId=%d : unexpected waitResult=%X"); + irp->Discard(irp); + + assert(FALSE); /* should not happen */ + + return; + } + } + + if (ListDictionary_Count(serial->IrpThreads) >= MAX_IRP_THREADS) + { + DEBUG_WARN("Maximal number of IRP threads reached: %d", ListDictionary_Count(serial->IrpThreads)); + + assert(FALSE); + /* TODO: FIXME: WaitForMultipleObjects() not yet implemented for threads */ + } + + /* error_handle to be used ... */ + + data = (IRP_THREAD_DATA*)calloc(1, sizeof(IRP_THREAD_DATA)); + if (data == NULL) + { + DEBUG_WARN("Could not allocate a new IRP_THREAD_DATA."); + goto error_handle; + } + + data->serial = serial; + data->irp = irp; + + irpThread = CreateThread(NULL, + 0, + (LPTHREAD_START_ROUTINE)irp_thread_func, + (void*)data, + 0, + NULL); + + if (irpThread == INVALID_HANDLE_VALUE) + { + DEBUG_WARN("Could not allocate a new IRP thread."); + goto error_handle; + } + + + + ListDictionary_Add(serial->IrpThreads, (void*)irp->CompletionId, irpThread); + + return; /* data freed by irp_thread_func */ + + error_handle: + + irp->IoStatus = STATUS_NO_MEMORY; + irp->Complete(irp); + + if (data) + free(data); +} + + static void* serial_thread_func(void* arg) { IRP* irp; @@ -548,7 +650,7 @@ static void* serial_thread_func(void* arg) irp = (IRP*) message.wParam; if (irp) - serial_process_irp(serial, irp); + create_irp_thread(serial, irp); } ExitThread(0); @@ -570,15 +672,7 @@ static void serial_irp_request(DEVICE* device, IRP* irp) * write requests. */ - switch(irp->MajorFunction) - { - case IRP_MJ_READ: - MessageQueue_Post(serial->ReadIrpQueue, NULL, 0, (void*) irp, NULL); - break; - - default: - MessageQueue_Post(serial->MainIrpQueue, NULL, 0, (void*) irp, NULL); - } + MessageQueue_Post(serial->MainIrpQueue, NULL, 0, (void*) irp, NULL); } static void serial_free(DEVICE* device) @@ -587,10 +681,6 @@ static void serial_free(DEVICE* device) WLog_Print(serial->log, WLOG_DEBUG, "freeing"); - MessageQueue_PostQuit(serial->ReadIrpQueue, 0); - WaitForSingleObject(serial->ReadThread, 100 /* ms */); /* INFINITE can block the process on a Read, FIXME: is a better signal possible? */ - CloseHandle(serial->ReadThread); - MessageQueue_PostQuit(serial->MainIrpQueue, 0); WaitForSingleObject(serial->MainThread, INFINITE); /* FIXME: might likely block on a pending Write or ioctl */ CloseHandle(serial->MainThread); @@ -600,8 +690,8 @@ static void serial_free(DEVICE* device) /* Clean up resources */ Stream_Free(serial->device.data, TRUE); - MessageQueue_Free(serial->ReadIrpQueue); MessageQueue_Free(serial->MainIrpQueue); + ListDictionary_Free(serial->IrpThreads); free(serial); } @@ -653,22 +743,16 @@ int DeviceServiceEntry(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints) for (i = 0; i <= len; i++) Stream_Write_UINT8(serial->device.data, name[i] < 0 ? '_' : name[i]); - serial->ReadIrpQueue = MessageQueue_New(NULL); serial->MainIrpQueue = MessageQueue_New(NULL); + serial->IrpThreads = ListDictionary_New(FALSE); /* only handled in create_irp_thread() */ + WLog_Init(); serial->log = WLog_Get("com.freerdp.channel.serial.client"); WLog_Print(serial->log, WLOG_DEBUG, "initializing"); pEntryPoints->RegisterDevice(pEntryPoints->devman, (DEVICE*) serial); - serial->ReadThread = CreateThread(NULL, - 0, - (LPTHREAD_START_ROUTINE) serial_read_thread_func, - (void*) serial, - 0, - NULL); - serial->MainThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE) serial_thread_func, diff --git a/winpr/libwinpr/comm/comm.h b/winpr/libwinpr/comm/comm.h index 3af153af9..45999e332 100644 --- a/winpr/libwinpr/comm/comm.h +++ b/winpr/libwinpr/comm/comm.h @@ -57,6 +57,7 @@ struct winpr_comm BOOL permissive; + // TMP: to be renamed serverSerialDriverId REMOTE_SERIAL_DRIVER_ID remoteSerialDriverId; wMessageQueue* ReadIrpQueue; /* considered as optional since it is