* Start implementing _kern_xsi_msgsnd().

git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@27295 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Salvatore Benedetto 2008-09-03 09:00:27 +00:00
parent 84d0e984f3
commit 25d466ba72
1 changed files with 121 additions and 19 deletions

View File

@ -52,25 +52,31 @@ typedef DoublyLinkedList<queued_thread> ThreadQueue;
struct queued_message : DoublyLinkedListLinkImpl<queued_message> { struct queued_message : DoublyLinkedListLinkImpl<queued_message> {
queued_message(long type, char *_message, ssize_t length) queued_message(const void *_message, ssize_t _length)
: :
init(false), initOK(false),
length(length), length(_length)
type(type)
{ {
message = (char *)malloc(sizeof(char) * length); message = (char *)malloc(sizeof(char) * _length);
if (message) if (message == NULL)
init = true; return;
memcpy(_message, message, length);
if (user_memcpy(&type, _message, sizeof(long)) != B_OK
|| user_memcpy(message, (void *)((long *)_message + sizeof(long)),
_length) != B_OK) {
free(message);
return;
}
initOK = true;
} }
~queued_message() ~queued_message()
{ {
if (init) if (initOK)
free(message); free(message);
} }
bool init; bool initOK;
ssize_t length; ssize_t length;
char *message; char *message;
long type; long type;
@ -84,6 +90,8 @@ typedef DoublyLinkedList<queued_message> MessageQueue;
class XsiMessageQueue { class XsiMessageQueue {
public: public:
XsiMessageQueue(int flags) XsiMessageQueue(int flags)
:
fBytesInQueue(0)
{ {
mutex_init(&fLock, "XsiMessageQueue private mutex"); mutex_init(&fLock, "XsiMessageQueue private mutex");
SetIpcKey((key_t)-1); SetIpcKey((key_t)-1);
@ -139,6 +147,9 @@ public:
return fID; return fID;
} }
// Implemented after sXsiMessageCountLock is declared
bool Insert(queued_message *message);
key_t IpcKey() const key_t IpcKey() const
{ {
return fMessageQueue.msg_perm.key; return fMessageQueue.msg_perm.key;
@ -177,12 +188,18 @@ public:
// Implemented after sMessageQueueHashTable is declared // Implemented after sMessageQueueHashTable is declared
void UnsetID(); void UnsetID();
status_t Wait(queued_message *message)
{
return B_ERROR;
}
HashTableLink<XsiMessageQueue>* Link() HashTableLink<XsiMessageQueue>* Link()
{ {
return &fLink; return &fLink;
} }
private: private:
msglen_t fBytesInQueue;
int fID; int fID;
mutex fLock; mutex fLock;
MessageQueue fMessage; MessageQueue fMessage;
@ -282,20 +299,45 @@ struct IpcHashTableDefinition {
} }
}; };
// Arbitrary limit // Arbitrary limits
#define MAX_XSI_MESSAGE_QUEUE 2048 #define MAX_XSI_MESSAGE 4096
#define MAX_XSI_MESSAGE_QUEUE 1024
static OpenHashTable<IpcHashTableDefinition> sIpcHashTable; static OpenHashTable<IpcHashTableDefinition> sIpcHashTable;
static OpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable; static OpenHashTable<MessageQueueHashTableDefinition> sMessageQueueHashTable;
static mutex sIpcLock; static mutex sIpcLock;
static mutex sXsiMessageQueueLock; static mutex sXsiMessageQueueLock;
static mutex sXsiMessageCountLock;
static vint32 sNextAvailableID = 1; static vint32 sNextAvailableID = 1;
static vint32 sXsiMessageCount = 0;
static vint32 sXsiMessageQueueCount = 0; static vint32 sXsiMessageQueueCount = 0;
// #pragma mark - // #pragma mark -
bool
XsiMessageQueue::Insert(queued_message *message)
{
// The only situation that would make us (potentially) wait
// is that we exceed with bytes or with the total number of messages
MutexLocker _(sXsiMessageCountLock);
if (fBytesInQueue == fMessageQueue.msg_qbytes
|| fBytesInQueue + message->length > fMessageQueue.msg_qbytes
|| sXsiMessageCount <= MAX_XSI_MESSAGE)
return true;
fMessage.Add(message);
fMessageQueue.msg_qnum++;
fMessageQueue.msg_lspid = getpid();
fMessageQueue.msg_stime = real_time_clock();
fBytesInQueue += message->length;
sXsiMessageCount++;
return false;
}
void void
XsiMessageQueue::SetID() XsiMessageQueue::SetID()
{ {
@ -308,6 +350,7 @@ XsiMessageQueue::SetID()
fID = sNextAvailableID++; fID = sNextAvailableID++;
} }
void void
XsiMessageQueue::UnsetID() XsiMessageQueue::UnsetID()
{ {
@ -331,13 +374,15 @@ xsi_msg_init()
mutex_init(&sIpcLock, "global POSIX message queue IPC table"); mutex_init(&sIpcLock, "global POSIX message queue IPC table");
mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table"); mutex_init(&sXsiMessageQueueLock, "global POSIX xsi message queue table");
mutex_init(&sXsiMessageCountLock, "global POSIX xsi message count");
} }
// #pragma mark - Syscalls // #pragma mark - Syscalls
int _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer) int
_user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
{ {
TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command)); TRACE(("xsi_msgctl: messageQueueID = %d, command = %d\n", messageQueueID, command));
MutexLocker ipcHashLocker(sIpcLock); MutexLocker ipcHashLocker(sIpcLock);
@ -361,7 +406,11 @@ int _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
messageQueueLocker.SetTo(&messageQueue->Lock(), false); messageQueueLocker.SetTo(&messageQueue->Lock(), false);
messageQueueHashLocker.Unlock(); messageQueueHashLocker.Unlock();
ipcHashLocker.Unlock(); ipcHashLocker.Unlock();
} } else
// Since we are going to delete the message queue object
// along with its mutex, we can't use a MutexLocker object,
// as the mutex itself won't exist on function exit
mutex_lock(&messageQueue->Lock());
switch (command) { switch (command) {
case IPC_STAT: { case IPC_STAT: {
@ -437,7 +486,8 @@ int _user_xsi_msgctl(int messageQueueID, int command, struct msqid_ds *buffer)
} }
int _user_xsi_msgget(key_t key, int flags) int
_user_xsi_msgget(key_t key, int flags)
{ {
TRACE(("xsi_msgget: key = %d, flags = %d\n", (nt)key, flags)); TRACE(("xsi_msgget: key = %d, flags = %d\n", (nt)key, flags));
XsiMessageQueue *messageQueue = NULL; XsiMessageQueue *messageQueue = NULL;
@ -515,7 +565,8 @@ int _user_xsi_msgget(key_t key, int flags)
} }
ssize_t _user_xsi_msgrcv(int messageQueueID, void *messagePointer, ssize_t
_user_xsi_msgrcv(int messageQueueID, void *messagePointer,
size_t messageSize, long messageType, int messageFlags) size_t messageSize, long messageType, int messageFlags)
{ {
// TODO // TODO
@ -523,9 +574,60 @@ ssize_t _user_xsi_msgrcv(int messageQueueID, void *messagePointer,
} }
int _user_xsi_msgsnd(int messageQueueID, const void *messagePointer, int
_user_xsi_msgsnd(int messageQueueID, const void *messagePointer,
size_t messageSize, int messageFlags) size_t messageSize, int messageFlags)
{ {
// TODO TRACE(("xsi_msgsnd: messageQueueID = %d, messageSize = %Ld\n",
return B_ERROR; messageQueueID, messageSize));
MutexLocker messageQueueHashLocker(sXsiMessageQueueLock);
XsiMessageQueue *messageQueue = sMessageQueueHashTable.Lookup(messageQueueID);
if (messageQueue == NULL) {
TRACE_ERROR(("xsi_msgsnd: message queue id %d not valid\n",
messageQueueID));
return EINVAL;
}
MutexLocker messageQueueLocker(messageQueue->Lock());
messageQueueHashLocker.Unlock();
if (messageSize < 0 || messageSize > MAX_BYTES_PER_QUEUE) {
TRACE_ERROR(("xsi_msgsnd: message size is out of range\n"));
return EINVAL;
}
if (!messageQueue->HasPermission()) {
TRACE_ERROR(("xsi_msgsnd: calling process has not permission "
"on message queue id %d, key %d\n", messageQueueID,
(int)messageQueue->IpcKey()));
return EACCES;
}
if (!IS_USER_ADDRESS(messagePointer)) {
TRACE_ERROR(("xsi_msgsnd: message address is not valid\n"));
return B_BAD_ADDRESS;
}
bool notSent = true;
while (notSent) {
queued_message *message
= new(std::nothrow) queued_message(messagePointer, messageSize);
if (message == NULL || message->initOK != true) {
TRACE_ERROR(("xsi_msgsnd: failed to create new message to queue\n"));
delete message;
return ENOMEM;
}
bool goToSleep = messageQueue->Insert(message);
if (goToSleep && !(messageFlags & IPC_NOWAIT)) {
// We are going to sleep
messageQueue->Wait(message);
} else if (goToSleep) {
// We did not send the message and we can't wait
delete message;
return EAGAIN;
} else {
// Message delivered correctly
TRACE(("xsi_msgsnd: message sent correctly\n"));
notSent = false;
}
}
return B_OK;
} }