diff --git a/src/system/kernel/posix/xsi_message_queue.cpp b/src/system/kernel/posix/xsi_message_queue.cpp index f819f28d4f..cdd9acd41d 100644 --- a/src/system/kernel/posix/xsi_message_queue.cpp +++ b/src/system/kernel/posix/xsi_message_queue.cpp @@ -24,7 +24,7 @@ #include -//#define TRACE_XSI_MSG_QUEUE +#define TRACE_XSI_MSG_QUEUE #ifdef TRACE_XSI_MSG_QUEUE # define TRACE(x) dprintf x # define TRACE_ERROR(x) dprintf x @@ -76,6 +76,15 @@ struct queued_message : DoublyLinkedListLinkImpl { free(message); } + status_t copy_to_user_buffer(void *_message, ssize_t _length) + { + if (user_memcpy(_message, &type, sizeof(long)) != B_OK + || user_memcpy((void *)((long *)_message + sizeof(long)), message, + _length) != B_OK) + return B_ERROR; + return B_OK; + } + bool initOK; ssize_t length; char *message; @@ -91,7 +100,9 @@ class XsiMessageQueue { public: XsiMessageQueue(int flags) : - fBytesInQueue(0) + fBytesInQueue(0), + fThreadsWaitingToReceive(0), + fThreadsWaitingToSend(0) { mutex_init(&fLock, "XsiMessageQueue private mutex"); SetIpcKey((key_t)-1); @@ -105,8 +116,19 @@ public: ~XsiMessageQueue() { mutex_destroy(&fLock); - UnsetID(); // TODO: free up all messages + // TODO: Wake up any thread still waiting + } + + status_t BlockAndUnlock(struct thread *thread, MutexLocker *queueLocker) + { + thread_prepare_to_block(thread, B_CAN_INTERRUPT, + THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue"); + // Unlock the queue before blocking + queueLocker->Unlock(); + + InterruptsSpinLocker _(gThreadSpinlock); + return thread_block_locked(thread); } void DoIpcSet(struct msqid_ds *result) @@ -118,6 +140,36 @@ public: fMessageQueue.msg_ctime = (time_t)real_time_clock(); } + void Deque(queued_thread *queueEntry, bool waitForMessage) + { + if (queueEntry->queued) { + if (waitForMessage) { + fWaitingToReceive.Remove(queueEntry); + fThreadsWaitingToReceive--; + } else { + fWaitingToSend.Remove(queueEntry); + fThreadsWaitingToSend--; + } + } + } + + void Enqueue(queued_thread *queueEntry, bool waitForMessage) + { + if (waitForMessage) { + fWaitingToReceive.Add(queueEntry); + fThreadsWaitingToReceive++; + } else { + fWaitingToSend.Add(queueEntry); + fThreadsWaitingToSend++; + } + queueEntry->queued = true; + } + + struct msqid_ds &GetMessageQueue() + { + return fMessageQueue; + } + bool HasPermission() const { if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0) @@ -165,9 +217,12 @@ public: return fMessageQueue.msg_qbytes; } - struct msqid_ds &GetMessageQueue() + // Implemented after sXsiMessageCountLock is declared + queued_message *Remove(long typeRequested); + + uint32 SequenceNumber() const { - return fMessageQueue; + return fSequenceNumber; } // Implemented after sMessageQueueHashTable is declared @@ -185,14 +240,6 @@ public: fMessageQueue.msg_perm.mode = (flags & 0x01ff); } - // Implemented after sMessageQueueHashTable is declared - void UnsetID(); - - status_t Wait(queued_message *message) - { - return B_ERROR; - } - HashTableLink* Link() { return &fLink; @@ -204,8 +251,12 @@ private: mutex fLock; MessageQueue fMessage; struct msqid_ds fMessageQueue; - ThreadQueue fThreadWaitingToSend; - ThreadQueue fThreadWaitingToReceive; + uint32 fSequenceNumber; + uint32 fThreadsWaitingToReceive; + uint32 fThreadsWaitingToSend; + + ThreadQueue fWaitingToReceive; + ThreadQueue fWaitingToSend; ::HashTableLink fLink; }; @@ -309,7 +360,7 @@ static mutex sIpcLock; static mutex sXsiMessageQueueLock; static mutex sXsiMessageCountLock; -static vint32 sNextAvailableID = 1; +static uint32 sGlobalSequenceNumber = 1; static vint32 sXsiMessageCount = 0; static vint32 sXsiMessageQueueCount = 0; @@ -334,27 +385,68 @@ XsiMessageQueue::Insert(queued_message *message) fMessageQueue.msg_stime = real_time_clock(); fBytesInQueue += message->length; sXsiMessageCount++; + // TODO: Wake up any thread waiting on receive return false; } +queued_message* +XsiMessageQueue::Remove(long typeRequested) +{ + queued_message *message = NULL; + if (typeRequested < 0) { + // Return first message of the lowest type + // that is less than or equal to the absolute + // value of type requested. + MessageQueue::Iterator iterator = fMessage.GetIterator(); + while (iterator.HasNext()) { + queued_message *current = iterator.Next(); + if (current->type <= -typeRequested) { + message = iterator.Remove(); + break; + } + } + } else if (typeRequested == 0) { + // Return the first message on the queue + message = fMessage.RemoveHead(); + } else { + // Return the first message of type requested + MessageQueue::Iterator iterator = fMessage.GetIterator(); + while (iterator.HasNext()) { + queued_message *current = iterator.Next(); + if (current->type == typeRequested) { + message = iterator.Remove(); + break; + } + } + } + + if (message == NULL) + return NULL; + + fMessageQueue.msg_qnum--; + fMessageQueue.msg_lrpid = getpid(); + fMessageQueue.msg_rtime = real_time_clock(); + fBytesInQueue -= message->length; + MutexLocker _(sXsiMessageCountLock); + sXsiMessageCount--; + // TODO: Wake up any thread waiting on send + return message; +} + + void XsiMessageQueue::SetID() { + fID = real_time_clock(); // The lock is held before calling us while (true) { - if (sMessageQueueHashTable.Lookup(sNextAvailableID) == NULL) + if (sMessageQueueHashTable.Lookup(fID) == NULL) break; - sNextAvailableID++; + fID++; } - fID = sNextAvailableID++; -} - - -void -XsiMessageQueue::UnsetID() -{ - sNextAvailableID = fID; + sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX; + fSequenceNumber = sGlobalSequenceNumber; } @@ -489,7 +581,7 @@ _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) int _user_xsi_msgget(key_t key, int flags) { - TRACE(("xsi_msgget: key = %d, flags = %d\n", (nt)key, flags)); + TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags)); XsiMessageQueue *messageQueue = NULL; Ipc *ipcKey = NULL; // Default assumptions @@ -513,7 +605,7 @@ _user_xsi_msgget(key_t key, int flags) "for key %d\n", (int)key)); return ENOMEM; } - sIpcHashTable.Lookup(key); + sIpcHashTable.Insert(ipcKey); } else { // The IPC key exist and it already has a message queue if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) { @@ -569,7 +661,99 @@ ssize_t _user_xsi_msgrcv(int messageQueueID, void *messagePointer, size_t messageSize, long messageType, int messageFlags) { - // TODO + TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n", + messageQueueID, messageSize)); + MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); + XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); + if (messageQueue == NULL) { + TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n", + messageQueueID)); + return EINVAL; + } + MutexLocker messageQueueLocker(messageQueue->Lock()); + messageQueueHashLocker.Unlock(); + + if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) { + TRACE_ERROR(("xsi_msgrcv: message size is out of range\n")); + return EINVAL; + } + if (!messageQueue->HasPermission()) { + TRACE_ERROR(("xsi_msgrcv: calling process has not permission " + "on message queue id %d, key %d\n", messageQueueID, + (int)messageQueue->IpcKey())); + return EACCES; + } + if (!IS_USER_ADDRESS(messagePointer)) { + TRACE_ERROR(("xsi_msgrcv: message address is not valid\n")); + return B_BAD_ADDRESS; + } + + queued_message *message = NULL; + bool notReceived = true; + while (notReceived) { + message = messageQueue->Remove(messageType); + + if (message == NULL && !(messageFlags & IPC_NOWAIT)) { + // We are going to sleep + struct thread *thread = thread_get_current_thread(); + queued_thread queueEntry(thread, messageSize); + messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true); + + uint32 sequenceNumber = messageQueue->SequenceNumber(); + + TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id)); + status_t result + = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); + TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id)); + + messageQueueHashLocker.Lock(); + messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); + if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL + && sequenceNumber != messageQueue->SequenceNumber())) { + TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = %ld) " + "got destroyed\n", messageQueueID, sequenceNumber)); + notReceived = false; + result = EIDRM; + } else if (result == B_INTERRUPTED) { + TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while " + "waiting on message queue %d\n",(int)thread->id, + messageQueueID)); + messageQueue->Deque(&queueEntry, /* waitForMessage */ true); + notReceived = false; + result = EINTR; + } else { + messageQueueLocker.Lock(); + messageQueueHashLocker.Unlock(); + } + } else if (message == NULL) { + // There is not message of type requested and + // we can't wait + return ENOMSG; + } else { + // Message received correctly (so far) + if ((ssize_t)messageSize > message->length + && !(messageFlags & MSG_NOERROR)) { + TRACE_ERROR(("xsi_msgrcv: message too big!\n")); + // Put the message back inside. Since we hold the + // queue message lock, not one else could have filled + // up the queue meanwhile + messageQueue->Insert(message); + return E2BIG; + } + + status_t result + = message->copy_to_user_buffer(messagePointer, messageSize); + if (result != B_OK) { + messageQueue->Insert(message); + return B_BAD_ADDRESS; + } + + delete message; + TRACE(("xsi_msgrcv: message received correctly\n")); + notReceived = false; + } + } + return B_ERROR; } @@ -578,7 +762,7 @@ int _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, size_t messageSize, int messageFlags) { - TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %Ld\n", + TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n", messageQueueID, messageSize)); MutexLocker messageQueueHashLocker(sXsiMessageQueueLock); XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); @@ -605,19 +789,52 @@ _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, return B_BAD_ADDRESS; } + queued_message *message + = new(std::nothrow) queued_message(messagePointer, messageSize); + if (message == NULL || message->initOK != true) { + TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); + delete message; + return ENOMEM; + } + bool notSent = true; while (notSent) { - queued_message *message - = new(std::nothrow) queued_message(messagePointer, messageSize); - if (message == NULL || message->initOK != true) { - TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n")); - delete message; - return ENOMEM; - } bool goToSleep = messageQueue->Insert(message); + if (goToSleep && !(messageFlags & IPC_NOWAIT)) { // We are going to sleep - messageQueue->Wait(message); + struct thread *thread = thread_get_current_thread(); + queued_thread queueEntry(thread, messageSize); + messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false); + + uint32 sequenceNumber = messageQueue->SequenceNumber(); + + TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id)); + status_t result + = messageQueue->BlockAndUnlock(thread, &messageQueueLocker); + TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id)); + + messageQueueHashLocker.Lock(); + messageQueue = sMessageQueueHashTable.Lookup(messageQueueID); + if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL + && sequenceNumber != messageQueue->SequenceNumber())) { + TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = %ld) " + "got destroyed\n", messageQueueID, sequenceNumber)); + delete message; + notSent = false; + result = EIDRM; + } else if (result == B_INTERRUPTED) { + TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while " + "waiting on message queue %d\n",(int)thread->id, + messageQueueID)); + messageQueue->Deque(&queueEntry, /* waitForMessage */ false); + delete message; + notSent = false; + result = EINTR; + } else { + messageQueueLocker.Lock(); + messageQueueHashLocker.Unlock(); + } } else if (goToSleep) { // We did not send the message and we can't wait delete message;