serial: attempt to use two additional threads for read and write requests

winpr-comm: fixed CommWriteFile
This commit is contained in:
Emmanuel Ledoux 2014-05-13 14:55:30 +02:00 committed by Emmanuel Ledoux
parent 881370a338
commit 7e36374a89
2 changed files with 142 additions and 23 deletions

View File

@ -64,8 +64,14 @@ struct _SERIAL_DEVICE
// TMP: use of log
wLog* log;
HANDLE thread;
wMessageQueue* IrpQueue;
HANDLE MainThread;
wMessageQueue* MainIrpQueue;
HANDLE ReadThread;
wMessageQueue* ReadIrpQueue;
HANDLE WriteThread;
wMessageQueue* WriteIrpQueue;
};
static void serial_process_irp_create(SERIAL_DEVICE* serial, IRP* irp)
@ -262,7 +268,6 @@ static void serial_process_irp_read(SERIAL_DEVICE* serial, IRP* irp)
Stream_Write(irp->output, buffer, nbRead); /* ReadData */
}
if (buffer)
free(buffer);
@ -325,6 +330,7 @@ static void serial_process_irp_write(SERIAL_DEVICE* serial, IRP* irp)
}
}
DEBUG_SVC("%lu bytes written to %s", nbWritten, serial->device.name);
Stream_Write_UINT32(irp->output, nbWritten); /* Length (4 bytes) */
Stream_Write_UINT8(irp->output, 0); /* Padding (1 byte) */
@ -463,18 +469,18 @@ static void serial_process_irp(SERIAL_DEVICE* serial, IRP* irp)
}
}
static void* serial_thread_func(void* arg)
static void* serial_read_thread_func(void* arg)
{
IRP* irp;
wMessage message;
SERIAL_DEVICE* drive = (SERIAL_DEVICE*) arg;
SERIAL_DEVICE* serial = (SERIAL_DEVICE*) arg;
while (1)
{
if (!MessageQueue_Wait(drive->IrpQueue))
if (!MessageQueue_Wait(serial->ReadIrpQueue))
break;
if (!MessageQueue_Peek(drive->IrpQueue, &message, TRUE))
if (!MessageQueue_Peek(serial->ReadIrpQueue, &message, TRUE))
break;
if (message.id == WMQ_QUIT)
@ -483,7 +489,68 @@ static void* serial_thread_func(void* arg)
irp = (IRP*) message.wParam;
if (irp)
serial_process_irp(drive, irp);
{
assert(irp->MajorFunction == IRP_MJ_READ);
serial_process_irp(serial, irp);
}
}
ExitThread(0);
return NULL;
}
static void* serial_write_thread_func(void* arg)
{
IRP* irp;
wMessage message;
SERIAL_DEVICE* serial = (SERIAL_DEVICE*) arg;
while (1)
{
if (!MessageQueue_Wait(serial->WriteIrpQueue))
break;
if (!MessageQueue_Peek(serial->WriteIrpQueue, &message, TRUE))
break;
if (message.id == WMQ_QUIT)
break;
irp = (IRP*) message.wParam;
if (irp)
{
assert(irp->MajorFunction == IRP_MJ_WRITE);
serial_process_irp(serial, irp);
}
}
ExitThread(0);
return NULL;
}
static void* serial_thread_func(void* arg)
{
IRP* irp;
wMessage message;
SERIAL_DEVICE* serial = (SERIAL_DEVICE*) arg;
while (1)
{
if (!MessageQueue_Wait(serial->MainIrpQueue))
break;
if (!MessageQueue_Peek(serial->MainIrpQueue, &message, TRUE))
break;
if (message.id == WMQ_QUIT)
break;
irp = (IRP*) message.wParam;
if (irp)
serial_process_irp(serial, irp);
}
ExitThread(0);
@ -493,7 +560,30 @@ static void* serial_thread_func(void* arg)
static void serial_irp_request(DEVICE* device, IRP* irp)
{
SERIAL_DEVICE* serial = (SERIAL_DEVICE*) device;
MessageQueue_Post(serial->IrpQueue, NULL, 0, (void*) irp, NULL);
assert(irp != NULL);
if (irp == NULL)
return;
/* NB: ENABLE_ASYNCIO is set, (MS-RDPEFS 2.2.2.7.2) this
* allows the server to send multiple simultaneous read or
* write requests.
*/
switch(irp->MajorFunction)
{
case IRP_MJ_READ:
MessageQueue_Post(serial->ReadIrpQueue, NULL, 0, (void*) irp, NULL);
break;
case IRP_MJ_WRITE:
MessageQueue_Post(serial->WriteIrpQueue, NULL, 0, (void*) irp, NULL);
break;
default:
MessageQueue_Post(serial->MainIrpQueue, NULL, 0, (void*) irp, NULL);
}
}
static void serial_free(DEVICE* device)
@ -502,16 +592,28 @@ static void serial_free(DEVICE* device)
WLog_Print(serial->log, WLOG_DEBUG, "freeing");
MessageQueue_PostQuit(serial->IrpQueue, 0);
WaitForSingleObject(serial->thread, INFINITE);
CloseHandle(serial->thread);
/* TMP: FIXME: also send a signal to interrupt I/O */
MessageQueue_PostQuit(serial->ReadIrpQueue, 0);
WaitForSingleObject(serial->ReadThread, INFINITE);
CloseHandle(serial->ReadThread);
MessageQueue_PostQuit(serial->WriteIrpQueue, 0);
WaitForSingleObject(serial->WriteThread, INFINITE);
CloseHandle(serial->WriteThread);
MessageQueue_PostQuit(serial->MainIrpQueue, 0);
WaitForSingleObject(serial->MainThread, INFINITE);
CloseHandle(serial->MainThread);
if (serial->hComm)
CloseHandle(serial->hComm);
/* Clean up resources */
Stream_Free(serial->device.data, TRUE);
MessageQueue_Free(serial->IrpQueue);
MessageQueue_Free(serial->ReadIrpQueue);
MessageQueue_Free(serial->WriteIrpQueue);
MessageQueue_Free(serial->MainIrpQueue);
free(serial);
}
@ -563,7 +665,9 @@ int DeviceServiceEntry(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints)
for (i = 0; i <= len; i++)
Stream_Write_UINT8(serial->device.data, name[i] < 0 ? '_' : name[i]);
serial->IrpQueue = MessageQueue_New(NULL);
serial->ReadIrpQueue = MessageQueue_New(NULL);
serial->WriteIrpQueue = MessageQueue_New(NULL);
serial->MainIrpQueue = MessageQueue_New(NULL);
WLog_Init();
serial->log = WLog_Get("com.freerdp.channel.serial.client");
@ -571,8 +675,26 @@ int DeviceServiceEntry(PDEVICE_SERVICE_ENTRY_POINTS pEntryPoints)
pEntryPoints->RegisterDevice(pEntryPoints->devman, (DEVICE*) serial);
serial->thread = CreateThread(NULL, 0,
(LPTHREAD_START_ROUTINE) serial_thread_func, (void*) serial, 0, NULL);
serial->ReadThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE) serial_read_thread_func,
(void*) serial,
0,
NULL);
serial->WriteThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE) serial_write_thread_func,
(void*) serial,
0,
NULL);
serial->MainThread = CreateThread(NULL,
0,
(LPTHREAD_START_ROUTINE) serial_thread_func,
(void*) serial,
0,
NULL);
}
return 0;

View File

@ -196,9 +196,6 @@ BOOL CommReadFile(HANDLE hDevice, LPVOID lpBuffer, DWORD nNumberOfBytesToRead,
}
}
// TMP:
DEBUG_MSG("Reading N=%u, VMIN=%u, VTIME=%u", nNumberOfBytesToRead, vmin, vtime);
nbRead = read(pComm->fd, lpBuffer, nNumberOfBytesToRead);
if (nbRead < 0)
@ -225,6 +222,7 @@ BOOL CommReadFile(HANDLE hDevice, LPVOID lpBuffer, DWORD nNumberOfBytesToRead,
}
// TODO:
// SetLastError(ERROR_TIMEOUT)
*lpNumberOfBytesRead = nbRead;
@ -283,9 +281,9 @@ BOOL CommWriteFile(HANDLE hDevice, LPCVOID lpBuffer, DWORD nNumberOfBytesToWrite
{
ssize_t nbWritten;
nbWritten += write(pComm->fd,
lpBuffer + (*lpNumberOfBytesWritten),
nNumberOfBytesToWrite - (*lpNumberOfBytesWritten));
nbWritten = write(pComm->fd,
lpBuffer + (*lpNumberOfBytesWritten),
nNumberOfBytesToWrite - (*lpNumberOfBytesWritten));
if (nbWritten < 0)
{
@ -308,7 +306,6 @@ BOOL CommWriteFile(HANDLE hDevice, LPCVOID lpBuffer, DWORD nNumberOfBytesToWrite
*lpNumberOfBytesWritten += nbWritten;
}
return TRUE;
}