* Implemented msgrcv and msgsnd. Not complete yet.

* Reworked the way IDs are geneterad in the same way they are in xsi semaphores


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@27327 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Salvatore Benedetto 2008-09-04 19:45:02 +00:00
parent dc08d5dc1e
commit d9bf715495

View File

@ -24,7 +24,7 @@
#include <util/OpenHashTable.h>
//#define TRACE_XSI_MSG_QUEUE
#define TRACE_XSI_MSG_QUEUE
#ifdef TRACE_XSI_MSG_QUEUE
# define TRACE(x) dprintf x
# define TRACE_ERROR(x) dprintf x
@ -76,6 +76,15 @@ struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
free(message);
}
status_t copy_to_user_buffer(void *_message, ssize_t _length)
{
if (user_memcpy(_message, &type, sizeof(long)) != B_OK
|| user_memcpy((void *)((long *)_message + sizeof(long)), message,
_length) != B_OK)
return B_ERROR;
return B_OK;
}
bool initOK;
ssize_t length;
char *message;
@ -91,7 +100,9 @@ class XsiMessageQueue {
public:
XsiMessageQueue(int flags)
:
fBytesInQueue(0)
fBytesInQueue(0),
fThreadsWaitingToReceive(0),
fThreadsWaitingToSend(0)
{
mutex_init(&fLock, "XsiMessageQueue private mutex");
SetIpcKey((key_t)-1);
@ -105,8 +116,19 @@ public:
~XsiMessageQueue()
{
mutex_destroy(&fLock);
UnsetID();
// TODO: free up all messages
// TODO: Wake up any thread still waiting
}
status_t BlockAndUnlock(struct thread *thread, MutexLocker *queueLocker)
{
thread_prepare_to_block(thread, B_CAN_INTERRUPT,
THREAD_BLOCK_TYPE_OTHER, (void*)"xsi message queue");
// Unlock the queue before blocking
queueLocker->Unlock();
InterruptsSpinLocker _(gThreadSpinlock);
return thread_block_locked(thread);
}
void DoIpcSet(struct msqid_ds *result)
@ -118,6 +140,36 @@ public:
fMessageQueue.msg_ctime = (time_t)real_time_clock();
}
void Deque(queued_thread *queueEntry, bool waitForMessage)
{
if (queueEntry->queued) {
if (waitForMessage) {
fWaitingToReceive.Remove(queueEntry);
fThreadsWaitingToReceive--;
} else {
fWaitingToSend.Remove(queueEntry);
fThreadsWaitingToSend--;
}
}
}
void Enqueue(queued_thread *queueEntry, bool waitForMessage)
{
if (waitForMessage) {
fWaitingToReceive.Add(queueEntry);
fThreadsWaitingToReceive++;
} else {
fWaitingToSend.Add(queueEntry);
fThreadsWaitingToSend++;
}
queueEntry->queued = true;
}
struct msqid_ds &GetMessageQueue()
{
return fMessageQueue;
}
bool HasPermission() const
{
if ((fMessageQueue.msg_perm.mode & S_IWOTH) != 0)
@ -165,9 +217,12 @@ public:
return fMessageQueue.msg_qbytes;
}
struct msqid_ds &GetMessageQueue()
// Implemented after sXsiMessageCountLock is declared
queued_message *Remove(long typeRequested);
uint32 SequenceNumber() const
{
return fMessageQueue;
return fSequenceNumber;
}
// Implemented after sMessageQueueHashTable is declared
@ -185,14 +240,6 @@ public:
fMessageQueue.msg_perm.mode = (flags & 0x01ff);
}
// Implemented after sMessageQueueHashTable is declared
void UnsetID();
status_t Wait(queued_message *message)
{
return B_ERROR;
}
HashTableLink<XsiMessageQueue>* Link()
{
return &fLink;
@ -204,8 +251,12 @@ private:
mutex fLock;
MessageQueue fMessage;
struct msqid_ds fMessageQueue;
ThreadQueue fThreadWaitingToSend;
ThreadQueue fThreadWaitingToReceive;
uint32 fSequenceNumber;
uint32 fThreadsWaitingToReceive;
uint32 fThreadsWaitingToSend;
ThreadQueue fWaitingToReceive;
ThreadQueue fWaitingToSend;
::HashTableLink<XsiMessageQueue> fLink;
};
@ -309,7 +360,7 @@ static mutex sIpcLock;
static mutex sXsiMessageQueueLock;
static mutex sXsiMessageCountLock;
static vint32 sNextAvailableID = 1;
static uint32 sGlobalSequenceNumber = 1;
static vint32 sXsiMessageCount = 0;
static vint32 sXsiMessageQueueCount = 0;
@ -334,27 +385,68 @@ XsiMessageQueue::Insert(queued_message *message)
fMessageQueue.msg_stime = real_time_clock();
fBytesInQueue += message->length;
sXsiMessageCount++;
// TODO: Wake up any thread waiting on receive
return false;
}
queued_message*
XsiMessageQueue::Remove(long typeRequested)
{
queued_message *message = NULL;
if (typeRequested < 0) {
// Return first message of the lowest type
// that is less than or equal to the absolute
// value of type requested.
MessageQueue::Iterator iterator = fMessage.GetIterator();
while (iterator.HasNext()) {
queued_message *current = iterator.Next();
if (current->type <= -typeRequested) {
message = iterator.Remove();
break;
}
}
} else if (typeRequested == 0) {
// Return the first message on the queue
message = fMessage.RemoveHead();
} else {
// Return the first message of type requested
MessageQueue::Iterator iterator = fMessage.GetIterator();
while (iterator.HasNext()) {
queued_message *current = iterator.Next();
if (current->type == typeRequested) {
message = iterator.Remove();
break;
}
}
}
if (message == NULL)
return NULL;
fMessageQueue.msg_qnum--;
fMessageQueue.msg_lrpid = getpid();
fMessageQueue.msg_rtime = real_time_clock();
fBytesInQueue -= message->length;
MutexLocker _(sXsiMessageCountLock);
sXsiMessageCount--;
// TODO: Wake up any thread waiting on send
return message;
}
void
XsiMessageQueue::SetID()
{
fID = real_time_clock();
// The lock is held before calling us
while (true) {
if (sMessageQueueHashTable.Lookup(sNextAvailableID) == NULL)
if (sMessageQueueHashTable.Lookup(fID) == NULL)
break;
sNextAvailableID++;
fID++;
}
fID = sNextAvailableID++;
}
void
XsiMessageQueue::UnsetID()
{
sNextAvailableID = fID;
sGlobalSequenceNumber = (sGlobalSequenceNumber + 1) % UINT_MAX;
fSequenceNumber = sGlobalSequenceNumber;
}
@ -489,7 +581,7 @@ _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
int
_user_xsi_msgget(key_t key, int flags)
{
TRACE(("xsi_msgget: key = %d, flags = %d\n", (nt)key, flags));
TRACE(("xsi_msgget: key = %d, flags = %d\n", (int)key, flags));
XsiMessageQueue *messageQueue = NULL;
Ipc *ipcKey = NULL;
// Default assumptions
@ -513,7 +605,7 @@ _user_xsi_msgget(key_t key, int flags)
"for key %d\n", (int)key));
return ENOMEM;
}
sIpcHashTable.Lookup(key);
sIpcHashTable.Insert(ipcKey);
} else {
// The IPC key exist and it already has a message queue
if ((flags & IPC_CREAT) && (flags & IPC_EXCL)) {
@ -569,7 +661,99 @@ ssize_t
_user_xsi_msgrcv(int messageQueueID, void *messagePointer,
size_t messageSize, long messageType, int messageFlags)
{
// TODO
TRACE(("xsi_msgrcv: messageQueueID = %d, messageSize = %ld\n",
messageQueueID, messageSize));
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (messageQueue == NULL) {
TRACE_ERROR(("xsi_msgrcv: message queue id %d not valid\n",
messageQueueID));
return EINVAL;
}
MutexLocker messageQueueLocker(messageQueue->Lock());
messageQueueHashLocker.Unlock();
if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) {
TRACE_ERROR(("xsi_msgrcv: message size is out of range\n"));
return EINVAL;
}
if (!messageQueue->HasPermission()) {
TRACE_ERROR(("xsi_msgrcv: calling process has not permission "
"on message queue id %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EACCES;
}
if (!IS_USER_ADDRESS(messagePointer)) {
TRACE_ERROR(("xsi_msgrcv: message address is not valid\n"));
return B_BAD_ADDRESS;
}
queued_message *message = NULL;
bool notReceived = true;
while (notReceived) {
message = messageQueue->Remove(messageType);
if (message == NULL && !(messageFlags & IPC_NOWAIT)) {
// We are going to sleep
struct thread *thread = thread_get_current_thread();
queued_thread queueEntry(thread, messageSize);
messageQueue->Enqueue(&queueEntry, /* waitForMessage */ true);
uint32 sequenceNumber = messageQueue->SequenceNumber();
TRACE(("xsi_msgrcv: thread %d going to sleep\n", (int)thread->id));
status_t result
= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
TRACE(("xsi_msgrcv: thread %d back to life\n", (int)thread->id));
messageQueueHashLocker.Lock();
messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
&& sequenceNumber != messageQueue->SequenceNumber())) {
TRACE_ERROR(("xsi_msgrcv: message queue id %d (sequence = %ld) "
"got destroyed\n", messageQueueID, sequenceNumber));
notReceived = false;
result = EIDRM;
} else if (result == B_INTERRUPTED) {
TRACE_ERROR(("xsi_msgrcv: thread %d got interrupted while "
"waiting on message queue %d\n",(int)thread->id,
messageQueueID));
messageQueue->Deque(&queueEntry, /* waitForMessage */ true);
notReceived = false;
result = EINTR;
} else {
messageQueueLocker.Lock();
messageQueueHashLocker.Unlock();
}
} else if (message == NULL) {
// There is not message of type requested and
// we can't wait
return ENOMSG;
} else {
// Message received correctly (so far)
if ((ssize_t)messageSize > message->length
&& !(messageFlags & MSG_NOERROR)) {
TRACE_ERROR(("xsi_msgrcv: message too big!\n"));
// Put the message back inside. Since we hold the
// queue message lock, not one else could have filled
// up the queue meanwhile
messageQueue->Insert(message);
return E2BIG;
}
status_t result
= message->copy_to_user_buffer(messagePointer, messageSize);
if (result != B_OK) {
messageQueue->Insert(message);
return B_BAD_ADDRESS;
}
delete message;
TRACE(("xsi_msgrcv: message received correctly\n"));
notReceived = false;
}
}
return B_ERROR;
}
@ -578,7 +762,7 @@ int
_user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
size_t messageSize, int messageFlags)
{
TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %Ld\n",
TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %ld\n",
messageQueueID, messageSize));
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
@ -605,19 +789,52 @@ _user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
return B_BAD_ADDRESS;
}
queued_message *message
= new(std::nothrow) queued_message(messagePointer, messageSize);
if (message == NULL || message->initOK != true) {
TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
delete message;
return ENOMEM;
}
bool notSent = true;
while (notSent) {
queued_message *message
= new(std::nothrow) queued_message(messagePointer, messageSize);
if (message == NULL || message->initOK != true) {
TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
delete message;
return ENOMEM;
}
bool goToSleep = messageQueue->Insert(message);
if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
// We are going to sleep
messageQueue->Wait(message);
struct thread *thread = thread_get_current_thread();
queued_thread queueEntry(thread, messageSize);
messageQueue->Enqueue(&queueEntry, /* waitForMessage */ false);
uint32 sequenceNumber = messageQueue->SequenceNumber();
TRACE(("xsi_msgsnd: thread %d going to sleep\n", (int)thread->id));
status_t result
= messageQueue->BlockAndUnlock(thread, &messageQueueLocker);
TRACE(("xsi_msgsnd: thread %d back to life\n", (int)thread->id));
messageQueueHashLocker.Lock();
messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (result == EIDRM || messageQueue == NULL || (messageQueue != NULL
&& sequenceNumber != messageQueue->SequenceNumber())) {
TRACE_ERROR(("xsi_msgsnd: message queue id %d (sequence = %ld) "
"got destroyed\n", messageQueueID, sequenceNumber));
delete message;
notSent = false;
result = EIDRM;
} else if (result == B_INTERRUPTED) {
TRACE_ERROR(("xsi_msgsnd: thread %d got interrupted while "
"waiting on message queue %d\n",(int)thread->id,
messageQueueID));
messageQueue->Deque(&queueEntry, /* waitForMessage */ false);
delete message;
notSent = false;
result = EINTR;
} else {
messageQueueLocker.Lock();
messageQueueHashLocker.Unlock();
}
} else if (goToSleep) {
// We did not send the message and we can't wait
delete message;