port_buffer_size_etc() was horribly broken in a lot of ways, it even read

the info from the wrong end of the queue.
find_port() did not return the correct error codes.
write_port_etc() checked incorrectly for a deleted semaphore.
B_WOULD_BLOCK would let various functions print out a warning for no reason
(the comments in the code actually didn't fit to the implementation).
"total_count" was counting the number of messages written, not those that
have been read (as the BeBook says).
Thanks to Bill Hayden who reported a lot of these.
Added TRACE macro and moved some of the dprintf()s to that.
Added/fixed some comments.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@6901 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2004-03-05 01:23:43 +00:00
parent 75a52c6151
commit 7fb4c29b80
1 changed files with 106 additions and 85 deletions

View File

@ -24,6 +24,14 @@
#include <stdlib.h>
//#define TRACE_PORTS
#ifdef TRACE_PORTS
# define TRACE(x) dprintf x
#else
# define TRACE(x)
#endif
struct port_msg {
int msg_code;
cbuf *data_cbuf;
@ -377,14 +385,14 @@ delete_port(port_id id)
port_id
find_port(const char *name)
{
port_id portFound = B_BAD_PORT_ID;
port_id portFound = B_NAME_NOT_FOUND;
cpu_status state;
int i;
if (ports_active == false)
return B_BAD_PORT_ID;
return B_NAME_NOT_FOUND;
if (name == NULL)
return B_BAD_PORT_ID;
return B_BAD_VALUE;
// Since we have to check every single port, and we don't
// care if it goes away at any point, we're only grabbing
@ -415,12 +423,20 @@ find_port(const char *name)
static void
fill_port_info(struct port_entry *port, port_info *info, size_t size)
{
int32 count;
info->port = port->id;
info->team = port->owner;
info->capacity = port->capacity;
info->total_count = gPorts->total_count;
strlcpy(info->name, gPorts->name, B_OS_NAME_LENGTH);
get_sem_count(port->read_sem, &info->queue_count);
get_sem_count(port->read_sem, &count);
if (count < 0)
count = 0;
info->queue_count = count;
info->total_count = port->total_count;
strlcpy(info->name, port->name, B_OS_NAME_LENGTH);
}
@ -515,12 +531,13 @@ port_buffer_size(port_id id)
ssize_t
port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
{
cpu_status state;
sem_id cachedSem;
status_t status;
ssize_t size;
int slot;
int res;
int t;
int len;
int state;
int tail;
if (!ports_active || id < 0)
return B_BAD_PORT_ID;
@ -532,9 +549,12 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
if (gPorts[slot].id != id) {
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
dprintf("get_port_info: invalid port_id %ld\n", id);
TRACE(("get_buffer_size_etc: invalid port_id %ld\n", id));
return B_BAD_PORT_ID;
}
cachedSem = gPorts[slot].read_sem;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
@ -544,37 +564,41 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout)
// XXX - is it a race condition to acquire a sem just after we
// unlocked the port ?
// XXX: call an acquire_sem which does the release lock, restore int & block the right way
res = acquire_sem_etc(gPorts[slot].read_sem, 1, flags & (B_TIMEOUT | B_CAN_INTERRUPT), timeout);
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
GRAB_PORT_LOCK(gPorts[slot]);
if (res == B_BAD_SEM_ID) {
if (status == B_BAD_SEM_ID) {
// somebody deleted the port
RELEASE_PORT_LOCK(gPorts[slot]);
return B_BAD_PORT_ID;
}
if (res == B_TIMED_OUT) {
if (status == B_TIMED_OUT || B_WOULD_BLOCK)
return status;
state = disable_interrupts();
GRAB_PORT_LOCK(gPorts[slot]);
if (gPorts[slot].id != id) {
// the port is no longer there
RELEASE_PORT_LOCK(gPorts[slot]);
return B_TIMED_OUT;
restore_interrupts(state);
return B_BAD_PORT_ID;
}
// once message arrived, read data's length
// determine tail
// read data's head length
t = gPorts[slot].head;
if (t < 0)
// determine tail & get the length of the message
tail = gPorts[slot].tail;
if (tail < 0)
panic("port %ld: tail < 0", gPorts[slot].id);
if (t > gPorts[slot].capacity)
if (tail > gPorts[slot].capacity)
panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
len = gPorts[slot].msg_queue[t].data_len;
// restore readsem
release_sem(gPorts[slot].read_sem);
size = gPorts[slot].msg_queue[tail].data_len;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// restore read_sem, as we haven't read from the port
release_sem(cachedSem);
// return length of item at end of queue
return len;
return size;
}
@ -585,9 +609,8 @@ port_count(port_id id)
int state;
int32 count;
if (ports_active == false)
return B_BAD_PORT_ID;
if (id < 0)
if (ports_active == false
|| id < 0)
return B_BAD_PORT_ID;
slot = id % MAX_PORTS;
@ -598,10 +621,10 @@ port_count(port_id id)
if (gPorts[slot].id != id) {
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
dprintf("port_count: invalid port_id %ld\n", id);
TRACE(("port_count: invalid port_id %ld\n", id));
return B_BAD_PORT_ID;
}
get_sem_count(gPorts[slot].read_sem, &count);
// do not return negative numbers
if (count < 0)
@ -626,15 +649,15 @@ status_t
read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
uint32 flags, bigtime_t timeout)
{
int slot;
int state;
sem_id cached_semid;
cpu_status state;
sem_id cachedSem;
size_t size;
status_t status;
int t;
int tail;
cbuf *msgStore;
int32 code;
bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
int slot;
if (ports_active == false
|| id < 0)
@ -658,7 +681,7 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
return B_BAD_PORT_ID;
}
// store sem_id in local variable
cached_semid = gPorts[slot].read_sem;
cachedSem = gPorts[slot].read_sem;
// unlock port && enable ints/
RELEASE_PORT_LOCK(gPorts[slot]);
@ -666,23 +689,21 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
// XXX -> possible race condition if port gets deleted (->sem deleted too), therefore
// sem_id is cached in local variable up here
status = acquire_sem_etc(cached_semid, 1, flags, timeout);
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
// get 1 entry from the queue, block if needed
// XXX: possible race condition if port read by two threads...
// both threads will read in 2 different slots allocated above, simultaneously
// slot is a thread-local variable
if (status == B_BAD_SEM_ID || status == EINTR) {
if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) {
/* somebody deleted the port or the sem went away */
return B_BAD_PORT_ID;
}
if (status == B_TIMED_OUT) {
// timed out, or, if timeout=0, 'would block'
return B_BAD_PORT_ID;
}
if (status == B_TIMED_OUT || status == B_WOULD_BLOCK)
return status;
if (status != B_NO_ERROR) {
dprintf("write_port_etc: unknown error %ld\n", status);
@ -692,24 +713,25 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
state = disable_interrupts();
GRAB_PORT_LOCK(gPorts[slot]);
t = gPorts[slot].tail;
if (t < 0)
tail = gPorts[slot].tail;
if (tail < 0)
panic("port %ld: tail < 0", gPorts[slot].id);
if (t > gPorts[slot].capacity)
if (tail > gPorts[slot].capacity)
panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
gPorts[slot].tail = (gPorts[slot].tail + 1) % gPorts[slot].capacity;
gPorts[slot].total_count++;
msgStore = gPorts[slot].msg_queue[t].data_cbuf;
code = gPorts[slot].msg_queue[t].msg_code;
msgStore = gPorts[slot].msg_queue[tail].data_cbuf;
code = gPorts[slot].msg_queue[tail].msg_code;
// mark queue entry unused
gPorts[slot].msg_queue[t].data_cbuf = NULL;
gPorts[slot].msg_queue[tail].data_cbuf = NULL;
// check output buffer size
size = min(bufferSize, gPorts[slot].msg_queue[t].data_len);
size = min(bufferSize, gPorts[slot].msg_queue[tail].data_len);
cached_semid = gPorts[slot].write_sem;
cachedSem = gPorts[slot].write_sem;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
@ -721,7 +743,7 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msgStore, 0, size) < B_OK)) {
// leave the port intact, for other threads that might not crash
cbuf_free_chain(msgStore);
release_sem(cached_semid);
release_sem(cachedSem);
return status;
}
} else
@ -731,7 +753,7 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
cbuf_free_chain(msgStore);
// make one spot in queue available again for write
release_sem(cached_semid);
release_sem(cachedSem);
return size;
}
@ -749,10 +771,10 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
size_t bufferSize, uint32 flags, bigtime_t timeout)
{
int slot;
int state;
cpu_status state;
status_t status;
sem_id cached_semid;
int h;
sem_id cachedSem;
int head;
cbuf *msgStore;
bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
@ -773,42 +795,40 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
if (gPorts[slot].id != id) {
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
dprintf("write_port_etc: invalid port_id %ld\n", id);
TRACE(("write_port_etc: invalid port_id %ld\n", id));
return B_BAD_PORT_ID;
}
if (gPorts[slot].closed) {
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
dprintf("write_port_etc: port %ld closed\n", id);
TRACE(("write_port_etc: port %ld closed\n", id));
return B_BAD_PORT_ID;
}
// store sem_id in local variable
cached_semid = gPorts[slot].write_sem;
cachedSem = gPorts[slot].write_sem;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// XXX -> possible race condition if port gets deleted (->sem deleted too),
// and queue is full therefore sem_id is cached in local variable up here
status = acquire_sem_etc(cached_semid, 1, flags, timeout);
status = acquire_sem_etc(cachedSem, 1, flags, timeout);
// get 1 entry from the queue, block if needed
// XXX: possible race condition if port written by two threads...
// both threads will write in 2 different slots allocated above, simultaneously
// slot is a thread-local variable
if (status == B_BAD_PORT_ID || status == B_INTERRUPTED) {
if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) {
/* somebody deleted the port or the sem while we were waiting */
return B_BAD_PORT_ID;
}
if (status == B_TIMED_OUT) {
// timed out, or, if timeout = 0, 'would block'
return B_TIMED_OUT;
}
if (status == B_TIMED_OUT || status == B_WOULD_BLOCK)
return status;
if (status != B_NO_ERROR) {
dprintf("write_port_etc: unknown error %ld\n", status);
@ -836,26 +856,25 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
state = disable_interrupts();
GRAB_PORT_LOCK(gPorts[slot]);
h = gPorts[slot].head;
if (h < 0)
head = gPorts[slot].head;
if (head < 0)
panic("port %ld: head < 0", gPorts[slot].id);
if (h >= gPorts[slot].capacity)
if (head >= gPorts[slot].capacity)
panic("port %ld: head > cap %ld", gPorts[slot].id, gPorts[slot].capacity);
gPorts[slot].msg_queue[h].msg_code = msgCode;
gPorts[slot].msg_queue[h].data_cbuf = msgStore;
gPorts[slot].msg_queue[h].data_len = bufferSize;
gPorts[slot].msg_queue[head].msg_code = msgCode;
gPorts[slot].msg_queue[head].data_cbuf = msgStore;
gPorts[slot].msg_queue[head].data_len = bufferSize;
gPorts[slot].head = (gPorts[slot].head + 1) % gPorts[slot].capacity;
gPorts[slot].total_count++;
// store sem_id in local variable
cached_semid = gPorts[slot].read_sem;
cachedSem = gPorts[slot].read_sem;
RELEASE_PORT_LOCK(gPorts[slot]);
restore_interrupts(state);
// release sem, allowing read (might reschedule)
release_sem(cached_semid);
release_sem(cachedSem);
return B_NO_ERROR;
}
@ -894,13 +913,15 @@ set_port_owner(port_id id, team_id team)
}
/** this function cycles through the gPorts table, deleting all
* the gPorts that are owned by the passed team_id
/** this function cycles through the ports table, deleting all
* the ports that are owned by the passed team_id
*/
int
delete_owned_ports(team_id owner)
{
// ToDo: investigate maintaining a list of ports in the team
// to make this simpler and more efficient.
int state;
int i;
int count = 0;
@ -912,7 +933,7 @@ delete_owned_ports(team_id owner)
GRAB_PORT_LIST_LOCK();
for (i = 0; i < MAX_PORTS; i++) {
if(gPorts[i].id != -1 && gPorts[i].owner == owner) {
if (gPorts[i].id != -1 && gPorts[i].owner == owner) {
port_id id = gPorts[i].id;
RELEASE_PORT_LIST_LOCK();