Rewritten the msg queue code to use a doubly linked list of port_msgs.

Right now, it allocates/frees the msgs from the kernel heap for every
message sent - that might be too slow for real world usage.
Also removed all known race conditions from the code.
Not tested at all yet, though.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@6953 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2004-03-12 04:57:38 +00:00
parent 405ecca529
commit b3d07d6550

View File

@ -13,13 +13,10 @@
#include <OS.h>
#include <port.h>
#include <sem.h>
#include <kernel.h>
#include <util/list.h>
#include <arch/int.h>
#include <debug.h>
#include <malloc.h>
#include <cbuf.h>
#include <Errors.h>
#include <int.h>
#include <string.h>
#include <stdlib.h>
@ -32,25 +29,24 @@
#endif
struct port_msg {
int msg_code;
cbuf *data_cbuf;
size_t data_len;
};
typedef struct port_msg {
list_link link;
int32 code;
cbuf *buffer_chain;
size_t size;
} port_msg;
struct port_entry {
port_id id;
team_id owner;
int32 capacity;
spinlock lock;
char *name;
sem_id read_sem;
sem_id write_sem;
int head;
int tail;
int total_count;
bool closed;
struct port_msg* msg_queue;
port_id id;
team_id owner;
int32 capacity;
spinlock lock;
const char *name;
sem_id read_sem;
sem_id write_sem;
int32 total_count;
struct list msg_queue;
bool closed;
};
// internal API
@ -91,6 +87,10 @@ port_init(kernel_args *ka)
panic("unable to allocate kernel port table!\n");
}
// ToDo: investigate preallocating a list of port_msgs to
// speed up actual message sending/receiving, a slab allocator
// might do it as well, though :-)
memset(gPorts, 0, size);
for (i = 0; i < MAX_PORTS; i++)
gPorts[i].id = -1;
@ -228,9 +228,7 @@ _dump_port_info(struct port_entry *port)
dprintf("PORT: %p\n", port);
dprintf("name: '%s'\n", port->name);
dprintf("owner: 0x%lx\n", port->owner);
dprintf("cap: %ld\n", port->capacity);
dprintf("head: %d\n", port->head);
dprintf("tail: %d\n", port->tail);
dprintf("capacity: %ld\n", port->capacity);
get_sem_count(port->read_sem, &cnt);
dprintf("read_sem: %ld\n", cnt);
get_sem_count(port->write_sem, &cnt);
@ -321,71 +319,89 @@ delete_owned_ports(team_id owner)
}
static void
put_port_msg(port_msg *msg)
{
cbuf_free_chain(msg->buffer_chain);
free(msg);
}
static port_msg *
get_port_msg(int32 code, size_t bufferSize)
{
// ToDo: investigate preallocation of port_msgs (or use a slab allocator)
cbuf *bufferChain = NULL;
port_msg *msg = (port_msg *)malloc(sizeof(port_msg));
if (msg == NULL)
return NULL;
if (bufferSize > 0) {
bufferChain = cbuf_get_chain(bufferSize);
if (bufferChain == NULL) {
free(msg);
return NULL;
}
}
msg->code = code;
msg->buffer_chain = bufferChain;
msg->size = bufferSize;
return msg;
}
// #pragma mark -
// public kernel API
port_id
create_port(int32 queue_length, const char *name)
create_port(int32 queueLength, const char *name)
{
int i;
int state;
sem_id sem_r, sem_w;
port_id retval;
int name_len;
char *temp_name;
struct port_msg *q;
cpu_status state;
char nameBuffer[B_OS_NAME_LENGTH];
sem_id readSem, writeSem;
port_id returnValue;
team_id owner;
int i;
if (ports_active == false)
return B_BAD_PORT_ID;
// check queue length
if (queue_length < 1)
return EINVAL;
if (queue_length > MAX_QUEUE_LENGTH)
return EINVAL;
if (queueLength < 1
|| queueLength > MAX_QUEUE_LENGTH)
return B_BAD_VALUE;
// check & dup name
if (name == NULL)
name = "unnamed port";
name_len = strlen(name) + 1;
name_len = min(name_len, B_OS_NAME_LENGTH);
temp_name = (char *)malloc(name_len);
if (temp_name == NULL)
return ENOMEM;
strlcpy(temp_name, name, name_len);
// ToDo: we could save the memory and use the semaphore name only instead
strlcpy(nameBuffer, name, B_OS_NAME_LENGTH);
name = strdup(nameBuffer);
if (name == NULL)
return B_NO_MEMORY;
// alloc queue
q = (struct port_msg *)malloc(queue_length * sizeof(struct port_msg));
if (q == NULL) {
free(temp_name); // dealloc name, too
return ENOMEM;
}
// init cbuf list of the queue
for (i = 0; i < queue_length; i++)
q[i].data_cbuf = 0;
// create sem_r with owner set to -1
sem_r = create_sem_etc(0, temp_name, -1);
if (sem_r < 0) {
// create read sem with owner set to -1
// ToDo: should be B_SYSTEM_TEAM
readSem = create_sem_etc(0, name, -1);
if (readSem < B_OK) {
// cleanup
free(temp_name);
free(q);
return sem_r;
free((char *)name);
return readSem;
}
// create sem_w
sem_w = create_sem_etc(queue_length, temp_name, -1);
if (sem_w < 0) {
// create write sem
writeSem = create_sem_etc(queueLength, name, -1);
if (writeSem < 0) {
// cleanup
delete_sem(sem_r);
free(temp_name);
free(q);
return sem_w;
delete_sem(readSem);
free((char *)name);
return writeSem;
}
owner = team_get_current_team_id();
state = disable_interrupts();
@ -404,37 +420,36 @@ create_port(int32 queue_length, const char *name)
gPorts[i].id = gNextPort++;
RELEASE_PORT_LIST_LOCK();
gPorts[i].capacity = queue_length;
gPorts[i].name = temp_name;
gPorts[i].capacity = queueLength;
gPorts[i].owner = owner;
gPorts[i].name = name;
gPorts[i].read_sem = readSem;
gPorts[i].write_sem = writeSem;
list_init(&gPorts[i].msg_queue);
gPorts[i].total_count = 0;
returnValue = gPorts[i].id;
// assign sem
gPorts[i].read_sem = sem_r;
gPorts[i].write_sem = sem_w;
gPorts[i].msg_queue = q;
gPorts[i].head = 0;
gPorts[i].tail = 0;
gPorts[i].total_count= 0;
gPorts[i].owner = owner;
retval = gPorts[i].id;
RELEASE_PORT_LOCK(gPorts[i]);
goto out;
}
}
// not enough gPorts...
RELEASE_PORT_LIST_LOCK();
retval = B_NO_MORE_PORTS;
returnValue = B_NO_MORE_PORTS;
dprintf("create_port(): B_NO_MORE_PORTS\n");
// cleanup
delete_sem(sem_w);
delete_sem(sem_r);
free(temp_name);
free(q);
delete_sem(writeSem);
delete_sem(readSem);
free((char *)name);
out:
restore_interrupts(state);
return retval;
return returnValue;
}
@ -474,18 +489,14 @@ close_port(port_id id)
status_t
delete_port(port_id id)
{
cpu_status state;
sem_id readSem, writeSem;
const char *name;
struct list list;
port_msg *msg;
int slot;
int state;
sem_id r_sem, w_sem;
int capacity;
int i;
char *old_name;
struct port_msg *q;
if (ports_active == false)
return B_BAD_PORT_ID;
if (id < 0)
if (ports_active == false || id < 0)
return B_BAD_PORT_ID;
slot = id % MAX_PORTS;
@ -502,31 +513,28 @@ delete_port(port_id id)
/* mark port as invalid */
gPorts[slot].id = -1;
old_name = gPorts[slot].name;
q = gPorts[slot].msg_queue;
r_sem = gPorts[slot].read_sem;
w_sem = gPorts[slot].write_sem;
capacity = gPorts[slot].capacity;
name = gPorts[slot].name;
list = gPorts[slot].msg_queue;
readSem = gPorts[slot].read_sem;
writeSem = gPorts[slot].write_sem;
gPorts[slot].name = NULL;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// delete the cbuf's that are left in the queue (if any)
for (i = 0; i < capacity; i++) {
if (q[i].data_cbuf != NULL)
cbuf_free_chain(q[i].data_cbuf);
// free the queue
while ((msg = (port_msg *)list_remove_head_item(&list)) != NULL) {
put_port_msg(msg);
}
free(q);
free(old_name);
free((char *)name);
// release the threads that were blocking on this port by deleting the sem
// read_port() will see the B_BAD_SEM_ID acq_sem() return value, and act accordingly
delete_sem(r_sem);
delete_sem(w_sem);
delete_sem(readSem);
delete_sem(writeSem);
return B_NO_ERROR;
return B_OK;
}
@ -682,9 +690,9 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
cpu_status state;
sem_id cachedSem;
status_t status;
port_msg *msg;
ssize_t size;
int slot;
int tail;
if (!ports_active || id < 0)
return B_BAD_PORT_ID;
@ -706,12 +714,8 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// block if no message,
// if TIMEOUT flag set, block with timeout
// block if no message, or, if B_TIMEOUT flag set, block with timeout
// XXX - is it a race condition to acquire a sem just after we
// unlocked the port ?
// XXX: call an acquire_sem which does the release lock, restore int & block the right way
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
if (status == B_BAD_SEM_ID) {
@ -732,12 +736,11 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
}
// determine tail & get the length of the message
tail = gPorts[slot].tail;
if (tail < 0)
panic("port %ld: tail < 0", gPorts[slot].id);
if (tail > gPorts[slot].capacity)
panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
size = gPorts[slot].msg_queue[tail].data_len;
msg = list_get_first_item(&gPorts[slot].msg_queue);
if (msg == NULL)
panic("port %ld: no messages found", gPorts[slot].id);
size = msg->size;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
@ -757,8 +760,7 @@ port_count(port_id id)
int state;
int32 count;
if (ports_active == false
|| id < 0)
if (ports_active == false || id < 0)
return B_BAD_PORT_ID;
slot = id % MAX_PORTS;
@ -774,7 +776,7 @@ port_count(port_id id)
}
get_sem_count(gPorts[slot].read_sem, &count);
// do not return negative numbers
// do not return negative numbers
if (count < 0)
count = 0;
@ -794,24 +796,21 @@ read_port(port_id port, int32 *msgCode, void *msgBuffer, size_t bufferSize)
status_t
read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
read_port_etc(port_id id, int32 *_msgCode, void *msgBuffer, size_t bufferSize,
uint32 flags, bigtime_t timeout)
{
cpu_status state;
sem_id cachedSem;
size_t size;
status_t status;
int tail;
cbuf *msgStore;
int32 code;
bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
port_msg *msg;
size_t size;
int slot;
if (ports_active == false
|| id < 0)
if (!ports_active || id < 0)
return B_BAD_PORT_ID;
if (msgCode == NULL
if (_msgCode == NULL
|| (msgBuffer == NULL && bufferSize > 0)
|| timeout < 0)
return B_BAD_VALUE;
@ -835,16 +834,9 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// XXX -> possible race condition if port gets deleted (->sem deleted too), therefore
// sem_id is cached in local variable up here
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
// get 1 entry from the queue, block if needed
// XXX: possible race condition if port read by two threads...
// both threads will read in 2 different slots allocated above, simultaneously
// slot is a thread-local variable
if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) {
/* somebody deleted the port or the sem went away */
return B_BAD_PORT_ID;
@ -861,47 +853,49 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
state = disable_interrupts();
GRAB_PORT_LOCK(gPorts[slot]);
tail = gPorts[slot].tail;
if (tail < 0)
panic("port %ld: tail < 0", gPorts[slot].id);
if (tail > gPorts[slot].capacity)
panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
// first, let's check if the port is still alive
if (gPorts[slot].id == -1) {
// the port has been deleted in the meantime
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
return B_BAD_PORT_ID;
}
msg = list_get_first_item(&gPorts[slot].msg_queue);
if (msg == NULL)
panic("port %ld: no messages found", gPorts[slot].id);
list_remove_link(msg);
gPorts[slot].tail = (gPorts[slot].tail + 1) % gPorts[slot].capacity;
gPorts[slot].total_count++;
msgStore = gPorts[slot].msg_queue[tail].data_cbuf;
code = gPorts[slot].msg_queue[tail].msg_code;
// mark queue entry unused
gPorts[slot].msg_queue[tail].data_cbuf = NULL;
// check output buffer size
size = min(bufferSize, gPorts[slot].msg_queue[tail].data_len);
cachedSem = gPorts[slot].write_sem;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// check output buffer size
size = min(bufferSize, msg->size);
// copy message
*msgCode = code;
*_msgCode = msg->code;
if (size > 0) {
if (userCopy) {
if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msgStore, 0, size) < B_OK)) {
if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msg->buffer_chain, 0, size) < B_OK)) {
// leave the port intact, for other threads that might not crash
cbuf_free_chain(msgStore);
put_port_msg(msg);
release_sem(cachedSem);
return status;
}
} else
cbuf_memcpy_from_chain(msgBuffer, msgStore, 0, size);
cbuf_memcpy_from_chain(msgBuffer, msg->buffer_chain, 0, size);
}
// free the cbuf
cbuf_free_chain(msgStore);
put_port_msg(msg);
// make one spot in queue available again for write
release_sem(cachedSem);
// ToDo: we might think about setting B_NO_RESCHEDULE here
// from time to time (always?)
return size;
}
@ -918,16 +912,14 @@ status_t
write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
size_t bufferSize, uint32 flags, bigtime_t timeout)
{
int slot;
cpu_status state;
status_t status;
sem_id cachedSem;
int head;
cbuf *msgStore;
status_t status;
port_msg *msg;
bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
int slot;
if (ports_active == false
|| id < 0)
if (!ports_active || id < 0)
return B_BAD_PORT_ID;
// mask irrelevant flags (for acquire_sem() usage)
@ -960,16 +952,9 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// XXX -> possible race condition if port gets deleted (->sem deleted too),
// and queue is full therefore sem_id is cached in local variable up here
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
// get 1 entry from the queue, block if needed
// XXX: possible race condition if port written by two threads...
// both threads will write in 2 different slots allocated above, simultaneously
// slot is a thread-local variable
if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) {
/* somebody deleted the port or the sem while we were waiting */
return B_BAD_PORT_ID;
@ -983,37 +968,37 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
return status;
}
if (bufferSize > 0) {
msgStore = cbuf_get_chain(bufferSize);
if (msgStore == NULL)
return B_NO_MEMORY;
msg = get_port_msg(msgCode, bufferSize);
if (msg == NULL)
return B_NO_MEMORY;
if (bufferSize > 0) {
if (userCopy) {
// copy from user memory
if ((status = cbuf_user_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < B_OK)
if ((status = cbuf_user_memcpy_to_chain(msg->buffer_chain, 0, msgBuffer, bufferSize)) < B_OK)
return status;
} else {
// copy from kernel memory
if ((status = cbuf_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < 0)
if ((status = cbuf_memcpy_to_chain(msg->buffer_chain, 0, msgBuffer, bufferSize)) < 0)
return status;
}
} else
msgStore = NULL;
}
// attach copied message to queue
// attach message to queue
state = disable_interrupts();
GRAB_PORT_LOCK(gPorts[slot]);
head = gPorts[slot].head;
if (head < 0)
panic("port %ld: head < 0", gPorts[slot].id);
if (head >= gPorts[slot].capacity)
panic("port %ld: head > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
// first, let's check if the port is still alive
if (gPorts[slot].id == -1) {
// the port has been deleted in the meantime
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
gPorts[slot].msg_queue[head].msg_code = msgCode;
gPorts[slot].msg_queue[head].data_cbuf = msgStore;
gPorts[slot].msg_queue[head].data_len = bufferSize;
gPorts[slot].head = (gPorts[slot].head + 1) % gPorts[slot].capacity;
put_port_msg(msg);
return B_BAD_PORT_ID;
}
list_add_item(&gPorts[slot].msg_queue, msg);
// store sem_id in local variable
cachedSem = gPorts[slot].read_sem;
@ -1034,8 +1019,7 @@ set_port_owner(port_id id, team_id team)
int slot;
int state;
if (ports_active == false
|| id < 0)
if (!ports_active || id < 0)
return B_BAD_PORT_ID;
slot = id % MAX_PORTS;
@ -1046,7 +1030,7 @@ set_port_owner(port_id id, team_id team)
if (gPorts[slot].id != id) {
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
dprintf("set_port_owner: invalid port_id %ld\n", id);
TRACE(("set_port_owner: invalid port_id %ld\n", id));
return B_BAD_PORT_ID;
}