* Wake up waiting threads when a new message is sent or received

* Removed sXsiMessageCountLock in favor of atomic_* function utility
* free up any remaining messages when a queue gets destroyed


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@27391 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Salvatore Benedetto 2008-09-09 17:57:36 +00:00
parent ca6210d5e9
commit 3b1f1178aa

View File

@ -113,12 +113,8 @@ public:
fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE; fMessageQueue.msg_qbytes = MAX_BYTES_PER_QUEUE;
} }
~XsiMessageQueue() // Implemented after sXsiMessageCount is declared
{ ~XsiMessageQueue();
mutex_destroy(&fLock);
// TODO: free up all messages
// TODO: Wake up any thread still waiting
}
status_t BlockAndUnlock(struct thread *thread, MutexLocker *queueLocker) status_t BlockAndUnlock(struct thread *thread, MutexLocker *queueLocker)
{ {
@ -199,7 +195,7 @@ public:
return fID; return fID;
} }
// Implemented after sXsiMessageCountLock is declared // Implemented after sXsiMessageCount is declared
bool Insert(queued_message *message); bool Insert(queued_message *message);
key_t IpcKey() const key_t IpcKey() const
@ -217,7 +213,7 @@ public:
return fMessageQueue.msg_qbytes; return fMessageQueue.msg_qbytes;
} }
// Implemented after sXsiMessageCountLock is declared // Implemented after sXsiMessageCount is declared
queued_message *Remove(long typeRequested); queued_message *Remove(long typeRequested);
uint32 SequenceNumber() const uint32 SequenceNumber() const
@ -240,6 +236,27 @@ public:
fMessageQueue.msg_perm.mode = (flags & 0x01ff); fMessageQueue.msg_perm.mode = (flags & 0x01ff);
} }
void WakeUpThread(bool waitForMessage)
{
InterruptsSpinLocker _(gThreadSpinlock);
if (waitForMessage) {
// Wake up all waiting thread for a message
// TODO: this can cause starvation for any
// very-unlucky-and-slow thread
while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
entry->queued = false;
fThreadsWaitingToReceive--;
thread_unblock_locked(entry->thread, 0);
}
} else {
while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
entry->queued = false;
fThreadsWaitingToSend--;
thread_unblock_locked(entry->thread, 0);
}
}
}
HashTableLink<XsiMessageQueue>* Link() HashTableLink<XsiMessageQueue>* Link()
{ {
return &fLink; return &fLink;
@ -358,7 +375,6 @@ static OpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
static mutex sIpcLock; static mutex sIpcLock;
static mutex sXsiMessageQueueLock; static mutex sXsiMessageQueueLock;
static mutex sXsiMessageCountLock;
static uint32 sGlobalSequenceNumber = 1; static uint32 sGlobalSequenceNumber = 1;
static vint32 sXsiMessageCount = 0; static vint32 sXsiMessageCount = 0;
@ -368,24 +384,59 @@ static vint32 sXsiMessageQueueCount = 0;
// #pragma mark - // #pragma mark -
XsiMessageQueue::~XsiMessageQueue()
{
mutex_destroy(&fLock);
// Wake up any threads still waiting
if (fThreadsWaitingToSend || fThreadsWaitingToReceive) {
InterruptsSpinLocker _(gThreadSpinlock);
while (queued_thread *entry = fWaitingToReceive.RemoveHead()) {
entry->queued = false;
thread_unblock_locked(entry->thread, EIDRM);
}
while (queued_thread *entry = fWaitingToSend.RemoveHead()) {
entry->queued = false;
thread_unblock_locked(entry->thread, EIDRM);
}
}
// Free up any remaining messages
if (fMessageQueue.msg_qnum) {
while (queued_message *message = fMessage.RemoveHead()) {
atomic_add(&sXsiMessageCount, -1);
delete message;
}
}
}
bool bool
XsiMessageQueue::Insert(queued_message *message) XsiMessageQueue::Insert(queued_message *message)
{ {
// The only situation that would make us (potentially) wait // The only situation that would make us (potentially) wait
// is that we exceed with bytes or with the total number of messages // is that we exceed with bytes or with the total number of messages
MutexLocker _(sXsiMessageCountLock); if (fBytesInQueue + message->length > fMessageQueue.msg_qbytes)
if (fBytesInQueue == fMessageQueue.msg_qbytes
|| fBytesInQueue + message->length > fMessageQueue.msg_qbytes
|| sXsiMessageCount >= MAX_XSI_MESSAGE)
return true; return true;
while (true) {
int32 oldCount = atomic_get(&sXsiMessageCount);
if (oldCount >= MAX_XSI_MESSAGE)
return true;
// If another thread updates the counter we keep
// iterating
if (atomic_test_and_set(&sXsiMessageCount, oldCount + 1, oldCount)
== oldCount)
break;
}
fMessage.Add(message); fMessage.Add(message);
fMessageQueue.msg_qnum++; fMessageQueue.msg_qnum++;
fMessageQueue.msg_lspid = getpid(); fMessageQueue.msg_lspid = getpid();
fMessageQueue.msg_stime = real_time_clock(); fMessageQueue.msg_stime = real_time_clock();
fBytesInQueue += message->length; fBytesInQueue += message->length;
sXsiMessageCount++; if (fThreadsWaitingToReceive)
// TODO: Wake up any thread waiting on receive WakeUpThread(true /* WaitForMessage */);
return false; return false;
} }
@ -428,9 +479,9 @@ XsiMessageQueue::Remove(long typeRequested)
fMessageQueue.msg_lrpid = getpid(); fMessageQueue.msg_lrpid = getpid();
fMessageQueue.msg_rtime = real_time_clock(); fMessageQueue.msg_rtime = real_time_clock();
fBytesInQueue -= message->length; fBytesInQueue -= message->length;
MutexLocker _(sXsiMessageCountLock); atomic_add(&sXsiMessageCount, -1);
sXsiMessageCount--; if (fThreadsWaitingToSend)
// TODO: Wake up any thread waiting on send WakeUpThread(false /* WaitForMessage */);
return message; return message;
} }
@ -466,7 +517,6 @@ xsi_msg_init()
mutex_init(&sIpcLock, "global POSIX message queue IPC table"); mutex_init(&sIpcLock, "global POSIX message queue IPC table");
mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
mutex_init(&sXsiMessageCountLock, "global POSIX xsi message count");
} }
@ -754,7 +804,7 @@ _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
} }
} }
return B_ERROR; return B_OK;
} }