- Make insertion to message queue O(1) by using bitmap and array. However,

mq_prio_max is dynamic, and sorted list is used for custom setup, when
  user manually sets higher priority range.
- Cache mq->mq_attrib in some places.  Change msg_ptr type to uint8_t.
- Update copyright, misc.
This commit is contained in:
rmind 2009-07-13 02:37:12 +00:00
parent b83b94a98e
commit 7e069f82fb
2 changed files with 115 additions and 59 deletions

View File

@ -1,7 +1,7 @@
/* $NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $ */
/* $NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $ */
/*
* Copyright (c) 2007, 2008 Mindaugas Rasiukevicius <rmind at NetBSD org>
* Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -37,12 +37,12 @@
* its members are protected by mqueue::mq_mtx.
*
* Lock order:
* mqlist_mtx
* -> mqueue::mq_mtx
* mqlist_mtx ->
* mqueue::mq_mtx
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $");
__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.22 2009/07/13 02:37:12 rmind Exp $");
#include <sys/param.h>
#include <sys/types.h>
@ -69,7 +69,6 @@ __KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.21 2009/07/13 00:41:08 rmind Exp $"
#include <sys/syscallargs.h>
#include <sys/systm.h>
#include <sys/unistd.h>
#include <sys/vnode.h>
#include <miscfs/genfs/genfs.h>
@ -82,8 +81,7 @@ static u_int mq_def_maxmsg = 32;
static kmutex_t mqlist_mtx;
static pool_cache_t mqmsg_cache;
static LIST_HEAD(, mqueue) mqueue_head =
LIST_HEAD_INITIALIZER(mqueue_head);
static LIST_HEAD(, mqueue) mqueue_head;
static int mq_poll_fop(file_t *, int);
static int mq_stat_fop(file_t *, struct stat *);
@ -111,6 +109,7 @@ mqueue_sysinit(void)
mqmsg_cache = pool_cache_init(MQ_DEF_MSGSIZE, coherency_unit,
0, 0, "mqmsgpl", NULL, IPL_NONE, NULL, NULL, NULL);
mutex_init(&mqlist_mtx, MUTEX_DEFAULT, IPL_NONE);
LIST_INIT(&mqueue_head);
}
/*
@ -120,10 +119,11 @@ static void
mqueue_freemsg(struct mq_msg *msg, const size_t size)
{
if (size > MQ_DEF_MSGSIZE)
if (size > MQ_DEF_MSGSIZE) {
kmem_free(msg, size);
else
} else {
pool_cache_put(mqmsg_cache, msg);
}
}
/*
@ -133,10 +133,15 @@ static void
mqueue_destroy(struct mqueue *mq)
{
struct mq_msg *msg;
size_t msz;
u_int i;
while ((msg = TAILQ_FIRST(&mq->mq_head)) != NULL) {
TAILQ_REMOVE(&mq->mq_head, msg, msg_queue);
mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len);
for (i = 0; i < (MQ_PQSIZE + 1); i++) {
while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) {
TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue);
msz = sizeof(struct mq_msg) + msg->msg_len;
mqueue_freemsg(msg, msz);
}
}
seldestroy(&mq->mq_rsel);
seldestroy(&mq->mq_wsel);
@ -192,6 +197,27 @@ mqueue_get(mqd_t mqd, file_t **fpr)
return 0;
}
/*
* mqueue_linear_insert: perform linear insert according to the message
* priority into the reserved queue (note MQ_PQSIZE + 1). Reserved queue
* is a sorted list used only when mq_prio_max is increased via sysctl.
*/
static inline void
mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg)
{
struct mq_msg *mit;
TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQSIZE], msg_queue) {
if (msg->msg_prio > mit->msg_prio)
break;
}
if (mit == NULL) {
TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQSIZE], msg, msg_queue);
} else {
TAILQ_INSERT_BEFORE(mit, msg, msg_queue);
}
}
/*
* Converter from struct timespec to the ticks.
* Used by mq_timedreceive(), mq_timedsend().
@ -216,7 +242,7 @@ mq_stat_fop(file_t *fp, struct stat *st)
{
struct mqueue *mq = fp->f_data;
(void)memset(st, 0, sizeof(*st));
memset(st, 0, sizeof(*st));
mutex_enter(&mq->mq_mtx);
st->st_mode = mq->mq_mode;
@ -236,19 +262,21 @@ static int
mq_poll_fop(file_t *fp, int events)
{
struct mqueue *mq = fp->f_data;
struct mq_attr *mqattr;
int revents = 0;
mutex_enter(&mq->mq_mtx);
mqattr = &mq->mq_attrib;
if (events & (POLLIN | POLLRDNORM)) {
/* Ready for receiving, if there are messages in the queue */
if (mq->mq_attrib.mq_curmsgs)
if (mqattr->mq_curmsgs)
revents |= (POLLIN | POLLRDNORM);
else
selrecord(curlwp, &mq->mq_rsel);
}
if (events & (POLLOUT | POLLWRNORM)) {
/* Ready for sending, if the message queue is not full */
if (mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg)
if (mqattr->mq_curmsgs < mqattr->mq_maxmsg)
revents |= (POLLOUT | POLLWRNORM);
else
selrecord(curlwp, &mq->mq_wsel);
@ -298,6 +326,7 @@ mq_close_fop(file_t *fp)
static int
mqueue_access(struct mqueue *mq, mode_t mode, kauth_cred_t cred)
{
if (genfs_can_access(VNON, mq->mq_mode, mq->mq_euid,
mq->mq_egid, mode, cred)) {
return EACCES;
@ -339,6 +368,7 @@ sys_mq_open(struct lwp *l, const struct sys_mq_open_args *uap,
if (oflag & O_CREAT) {
struct cwdinfo *cwdi = p->p_cwdi;
struct mq_attr attr;
u_int i;
/* Check the limit */
if (p->p_mqueue_cnt == mq_open_max) {
@ -355,7 +385,7 @@ sys_mq_open(struct lwp *l, const struct sys_mq_open_args *uap,
/* Check for mqueue attributes */
if (SCARG(uap, attr)) {
error = copyin(SCARG(uap, attr), &attr,
sizeof(struct mq_attr));
sizeof(struct mq_attr));
if (error) {
kmem_free(name, MQ_NAMELEN);
return error;
@ -382,7 +412,9 @@ sys_mq_open(struct lwp *l, const struct sys_mq_open_args *uap,
mutex_init(&mq_new->mq_mtx, MUTEX_DEFAULT, IPL_NONE);
cv_init(&mq_new->mq_send_cv, "mqsendcv");
cv_init(&mq_new->mq_recv_cv, "mqrecvcv");
TAILQ_INIT(&mq_new->mq_head);
for (i = 0; i < (MQ_PQSIZE + 1); i++) {
TAILQ_INIT(&mq_new->mq_head[i]);
}
selinit(&mq_new->mq_rsel);
selinit(&mq_new->mq_wsel);
@ -424,6 +456,7 @@ sys_mq_open(struct lwp *l, const struct sys_mq_open_args *uap,
error = EACCES;
goto exit;
}
/* Fail if O_EXCL is set, and mqueue already exists */
if ((oflag & O_CREAT) && (oflag & O_EXCL)) {
error = EEXIST;
@ -503,12 +536,14 @@ sys_mq_close(struct lwp *l, const struct sys_mq_close_args *uap,
* Primary mq_receive1() function.
*/
int
mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
mq_receive1(lwp_t *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
unsigned *msg_prio, int t, ssize_t *mlen)
{
file_t *fp = NULL;
struct mqueue *mq;
struct mq_msg *msg = NULL;
struct mq_attr *mqattr;
u_int prio;
int error;
/* Get the message queue */
@ -522,16 +557,17 @@ mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
goto error;
}
getnanotime(&mq->mq_atime);
mqattr = &mq->mq_attrib;
/* Check the message size limits */
if (msg_len < mq->mq_attrib.mq_msgsize) {
if (msg_len < mqattr->mq_msgsize) {
error = EMSGSIZE;
goto error;
}
/* Check if queue is empty */
while (TAILQ_EMPTY(&mq->mq_head)) {
if (mq->mq_attrib.mq_flags & O_NONBLOCK) {
while (mqattr->mq_curmsgs == 0) {
if (mqattr->mq_flags & O_NONBLOCK) {
error = EAGAIN;
goto error;
}
@ -543,22 +579,35 @@ mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
* Block until someone sends the message.
* While doing this, notification should not be sent.
*/
mq->mq_attrib.mq_flags |= MQ_RECEIVE;
mqattr->mq_flags |= MQ_RECEIVE;
error = cv_timedwait_sig(&mq->mq_send_cv, &mq->mq_mtx, t);
mq->mq_attrib.mq_flags &= ~MQ_RECEIVE;
if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) {
mqattr->mq_flags &= ~MQ_RECEIVE;
if (error || (mqattr->mq_flags & MQ_UNLINK)) {
error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR;
goto error;
}
}
/* Remove the message from the queue */
msg = TAILQ_FIRST(&mq->mq_head);
/* Find the highest priority message */
prio = ffs(mq->mq_bitmap);
if (__predict_false(prio == 0)) {
/* Must be in reserved queue then */
prio = MQ_PQSIZE;
}
/* Remove it from the queue */
msg = TAILQ_FIRST(&mq->mq_head[prio]);
KASSERT(msg != NULL);
TAILQ_REMOVE(&mq->mq_head, msg, msg_queue);
TAILQ_REMOVE(&mq->mq_head[prio], msg, msg_queue);
/* Unmark the bit, if last message */
if (__predict_true(prio != MQ_PQSIZE) &&
TAILQ_EMPTY(&mq->mq_head[prio])) {
mq->mq_bitmap &= ~(MQ_PQMSB >> prio);
}
/* Decrement the counter and signal waiter, if any */
mq->mq_attrib.mq_curmsgs--;
mqattr->mq_curmsgs--;
cv_signal(&mq->mq_recv_cv);
/* Ready for sending now */
@ -643,12 +692,13 @@ sys___mq_timedreceive50(struct lwp *l,
* Primary mq_send1() function.
*/
int
mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
mq_send1(lwp_t *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
unsigned msg_prio, int t)
{
file_t *fp = NULL;
struct mqueue *mq;
struct mq_msg *msg, *pos_msg;
struct mq_msg *msg;
struct mq_attr *mqattr;
struct proc *notify = NULL;
ksiginfo_t ksi;
size_t size;
@ -663,10 +713,11 @@ mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
if (size > mq_max_msgsize)
return EMSGSIZE;
if (size > MQ_DEF_MSGSIZE)
if (size > MQ_DEF_MSGSIZE) {
msg = kmem_alloc(size, KM_SLEEP);
else
} else {
msg = pool_cache_get(mqmsg_cache, PR_WAITOK);
}
/* Get the data from user-space */
error = copyin(msg_ptr, msg->msg_ptr, msg_len);
@ -689,16 +740,17 @@ mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
goto error;
}
getnanotime(&mq->mq_mtime);
mqattr = &mq->mq_attrib;
/* Check the message size limit */
if (msg_len <= 0 || msg_len > mq->mq_attrib.mq_msgsize) {
if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) {
error = EMSGSIZE;
goto error;
}
/* Check if queue is full */
while (mq->mq_attrib.mq_curmsgs >= mq->mq_attrib.mq_maxmsg) {
if (mq->mq_attrib.mq_flags & O_NONBLOCK) {
while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) {
if (mqattr->mq_flags & O_NONBLOCK) {
error = EAGAIN;
goto error;
}
@ -708,25 +760,24 @@ mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
}
/* Block until queue becomes available */
error = cv_timedwait_sig(&mq->mq_recv_cv, &mq->mq_mtx, t);
if (error || (mq->mq_attrib.mq_flags & MQ_UNLINK)) {
if (error || (mqattr->mq_flags & MQ_UNLINK)) {
error = (error == EWOULDBLOCK) ? ETIMEDOUT : error;
goto error;
}
}
KASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg);
KASSERT(mqattr->mq_curmsgs < mqattr->mq_maxmsg);
/* Insert message into the queue, according to the priority */
TAILQ_FOREACH(pos_msg, &mq->mq_head, msg_queue)
if (msg->msg_prio > pos_msg->msg_prio)
break;
if (pos_msg == NULL)
TAILQ_INSERT_TAIL(&mq->mq_head, msg, msg_queue);
else
TAILQ_INSERT_BEFORE(pos_msg, msg, msg_queue);
if (__predict_true(msg_prio < MQ_PQSIZE)) {
TAILQ_INSERT_TAIL(&mq->mq_head[msg_prio], msg, msg_queue);
mq->mq_bitmap |= (MQ_PQMSB >> msg_prio);
} else {
mqueue_linear_insert(mq, msg);
}
/* Check for the notify */
if (mq->mq_attrib.mq_curmsgs == 0 && mq->mq_notify_proc &&
(mq->mq_attrib.mq_flags & MQ_RECEIVE) == 0) {
if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc &&
(mqattr->mq_flags & MQ_RECEIVE) == 0) {
/* Initialize the signal */
KSI_INIT(&ksi);
ksi.ksi_signo = mq->mq_sig_notify.sigev_signo;
@ -738,7 +789,7 @@ mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
}
/* Increment the counter and signal waiter, if any */
mq->mq_attrib.mq_curmsgs++;
mqattr->mq_curmsgs++;
cv_signal(&mq->mq_send_cv);
/* Ready for receiving now */
@ -755,7 +806,6 @@ error:
kpsignal(notify, &ksi, NULL);
mutex_exit(proc_lock);
}
return error;
}
@ -901,8 +951,9 @@ sys_mq_setattr(struct lwp *l, const struct sys_mq_setattr_args *uap,
mq = fp->f_data;
/* Copy the old attributes, if needed */
if (SCARG(uap, omqstat))
if (SCARG(uap, omqstat)) {
memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr));
}
/* Ignore everything, except O_NONBLOCK */
if (nonblock)
@ -990,7 +1041,7 @@ error:
}
/*
* SysCtl.
* System control nodes.
*/
SYSCTL_SETUP(sysctl_mqueue_setup, "sysctl mqueue setup")

View File

@ -1,7 +1,7 @@
/* $NetBSD: mqueue.h,v 1.7 2009/04/11 15:47:34 christos Exp $ */
/* $NetBSD: mqueue.h,v 1.8 2009/07/13 02:37:13 rmind Exp $ */
/*
* Copyright (c) 2007, Mindaugas Rasiukevicius <rmind at NetBSD org>
* Copyright (c) 2007-2009 Mindaugas Rasiukevicius <rmind at NetBSD org>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -67,6 +67,10 @@ struct mq_attr {
/* Default size of the message */
#define MQ_DEF_MSGSIZE 1024
/* Size/bits and MSB for the queue array */
#define MQ_PQSIZE 32
#define MQ_PQMSB 0x80000000U
/* Structure of the message queue */
struct mqueue {
char mq_name[MQ_NAMELEN];
@ -83,11 +87,13 @@ struct mqueue {
mode_t mq_mode;
uid_t mq_euid;
gid_t mq_egid;
/* Reference counter, head of the message queue */
/* Reference counter, queue array and bitmap */
u_int mq_refcnt;
TAILQ_HEAD(, mq_msg) mq_head;
TAILQ_HEAD(, mq_msg) mq_head[MQ_PQSIZE + 1];
uint32_t mq_bitmap;
/* Entry of the global list */
LIST_ENTRY(mqueue) mq_list;
/* Time stamps */
struct timespec mq_atime;
struct timespec mq_mtime;
struct timespec mq_btime;
@ -98,16 +104,15 @@ struct mq_msg {
TAILQ_ENTRY(mq_msg) msg_queue;
size_t msg_len;
u_int msg_prio;
int8_t msg_ptr[1];
uint8_t msg_ptr[1];
};
/* Prototypes */
void mqueue_sysinit(void);
void mqueue_print_list(void (*pr)(const char *, ...));
int abstimeout2timo(struct timespec *, int *);
int mq_send1(struct lwp *, mqd_t, const char *, size_t, unsigned, int);
int mq_receive1(struct lwp *, mqd_t, void *, size_t, unsigned *, int,
ssize_t *);
int mq_send1(lwp_t *, mqd_t, const char *, size_t, unsigned, int);
int mq_receive1(lwp_t *, mqd_t, void *, size_t, unsigned *, int, ssize_t *);
#endif /* _KERNEL */