read_port_etc() could pass PORT_FLAG_USE_USER_MEMCPY to acquire_sem_etc().

Small cleanup of {read|write}_port_etc().


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@3716 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2003-06-28 16:46:07 +00:00
parent 42b47c7c30
commit 9163395ef6

View File

@ -615,39 +615,36 @@ port_count(port_id id)
status_t status_t
read_port(port_id port, int32 *msg_code, void *msg_buffer, size_t buffer_size) read_port(port_id port, int32 *msgCode, void *msgBuffer, size_t bufferSize)
{ {
return read_port_etc(port, msg_code, msg_buffer, buffer_size, 0, 0); return read_port_etc(port, msgCode, msgBuffer, bufferSize, 0, 0);
} }
status_t status_t
read_port_etc(port_id id, int32 *msg_code, void *msg_buffer, size_t buffer_size, read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize,
uint32 flags, bigtime_t timeout) uint32 flags, bigtime_t timeout)
{ {
int slot; int slot;
int state; int state;
sem_id cached_semid; sem_id cached_semid;
size_t siz; size_t size;
int res; status_t status;
int t; int t;
cbuf* msg_store; cbuf *msgStore;
int32 code; int32 code;
int err; bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
if (ports_active == false) if (ports_active == false
|| id < 0)
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
if (id < 0)
return B_BAD_PORT_ID;
if (msg_code == NULL)
return EINVAL;
if ((msg_buffer == NULL) && (buffer_size > 0))
return EINVAL;
if (timeout < 0)
return EINVAL;
flags = flags & (PORT_FLAG_USE_USER_MEMCPY | B_CAN_INTERRUPT | B_TIMEOUT); if (msgCode == NULL
|| (msgBuffer == NULL && bufferSize > 0)
|| timeout < 0)
return B_BAD_VALUE;
flags = flags & (B_CAN_INTERRUPT | B_TIMEOUT);
slot = id % MAX_PORTS; slot = id % MAX_PORTS;
state = disable_interrupts(); state = disable_interrupts();
@ -669,26 +666,26 @@ read_port_etc(port_id id, int32 *msg_code, void *msg_buffer, size_t buffer_size,
// XXX -> possible race condition if port gets deleted (->sem deleted too), therefore // XXX -> possible race condition if port gets deleted (->sem deleted too), therefore
// sem_id is cached in local variable up here // sem_id is cached in local variable up here
status = acquire_sem_etc(cached_semid, 1, flags, timeout);
// get 1 entry from the queue, block if needed // get 1 entry from the queue, block if needed
res = acquire_sem_etc(cached_semid, 1, flags, timeout);
// XXX: possible race condition if port read by two threads... // XXX: possible race condition if port read by two threads...
// both threads will read in 2 different slots allocated above, simultaneously // both threads will read in 2 different slots allocated above, simultaneously
// slot is a thread-local variable // slot is a thread-local variable
if (res == B_BAD_SEM_ID || res == EINTR) { if (status == B_BAD_SEM_ID || status == EINTR) {
/* somebody deleted the port or the sem went away */ /* somebody deleted the port or the sem went away */
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
} }
if (res == B_TIMED_OUT) { if (status == B_TIMED_OUT) {
// timed out, or, if timeout=0, 'would block' // timed out, or, if timeout=0, 'would block'
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
} }
if (res != B_NO_ERROR) { if (status != B_NO_ERROR) {
dprintf("write_port_etc: res unknown error %d\n", res); dprintf("write_port_etc: unknown error %ld\n", status);
return res; return status;
} }
state = disable_interrupts(); state = disable_interrupts();
@ -702,14 +699,14 @@ read_port_etc(port_id id, int32 *msg_code, void *msg_buffer, size_t buffer_size,
ports[slot].tail = (ports[slot].tail + 1) % ports[slot].capacity; ports[slot].tail = (ports[slot].tail + 1) % ports[slot].capacity;
msg_store = ports[slot].msg_queue[t].data_cbuf; msgStore = ports[slot].msg_queue[t].data_cbuf;
code = ports[slot].msg_queue[t].msg_code; code = ports[slot].msg_queue[t].msg_code;
// mark queue entry unused // mark queue entry unused
ports[slot].msg_queue[t].data_cbuf = NULL; ports[slot].msg_queue[t].data_cbuf = NULL;
// check output buffer size // check output buffer size
siz = min(buffer_size, ports[slot].msg_queue[t].data_len); size = min(bufferSize, ports[slot].msg_queue[t].data_len);
cached_semid = ports[slot].write_sem; cached_semid = ports[slot].write_sem;
@ -717,25 +714,25 @@ read_port_etc(port_id id, int32 *msg_code, void *msg_buffer, size_t buffer_size,
restore_interrupts(state); restore_interrupts(state);
// copy message // copy message
*msg_code = code; *msgCode = code;
if (siz > 0) { if (size > 0) {
if (flags & PORT_FLAG_USE_USER_MEMCPY) { if (userCopy) {
if ((err = cbuf_user_memcpy_from_chain(msg_buffer, msg_store, 0, siz) < 0)) { if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msgStore, 0, size) < B_OK)) {
// leave the port intact, for other threads that might not crash // leave the port intact, for other threads that might not crash
cbuf_free_chain(msg_store); cbuf_free_chain(msgStore);
release_sem(cached_semid); release_sem(cached_semid);
return err; return status;
} }
} else } else
cbuf_memcpy_from_chain(msg_buffer, msg_store, 0, siz); cbuf_memcpy_from_chain(msgBuffer, msgStore, 0, size);
} }
// free the cbuf // free the cbuf
cbuf_free_chain(msg_store); cbuf_free_chain(msgStore);
// make one spot in queue available again for write // make one spot in queue available again for write
release_sem(cached_semid); release_sem(cached_semid);
return siz; return size;
} }
@ -745,9 +742,8 @@ set_port_owner(port_id id, team_id team)
int slot; int slot;
int state; int state;
if (ports_active == false) if (ports_active == false
return B_BAD_PORT_ID; || id < 0)
if (id < 0)
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
slot = id % MAX_PORTS; slot = id % MAX_PORTS;
@ -774,37 +770,34 @@ set_port_owner(port_id id, team_id team)
status_t status_t
write_port(port_id id, int32 msg_code, const void *msg_buffer, size_t buffer_size) write_port(port_id id, int32 msgCode, const void *msgBuffer, size_t bufferSize)
{ {
return write_port_etc(id, msg_code, msg_buffer, buffer_size, 0, 0); return write_port_etc(id, msgCode, msgBuffer, bufferSize, 0, 0);
} }
status_t status_t
write_port_etc(port_id id, int32 msg_code, const void *msg_buffer, write_port_etc(port_id id, int32 msgCode, const void *msgBuffer,
size_t buffer_size, uint32 flags, bigtime_t timeout) size_t bufferSize, uint32 flags, bigtime_t timeout)
{ {
int slot; int slot;
int state; int state;
int res; status_t status;
sem_id cached_semid; sem_id cached_semid;
int h; int h;
cbuf* msg_store; cbuf *msgStore;
int32 c1, c2; int32 c1, c2;
int err; bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0;
if (ports_active == false) if (ports_active == false
return B_BAD_PORT_ID; || id < 0)
if (id < 0)
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
// mask irrelevant flags // mask irrelevant flags (for acquire_sem() usage)
flags = flags & (PORT_FLAG_USE_USER_MEMCPY | B_CAN_INTERRUPT | B_TIMEOUT); flags = flags & (B_CAN_INTERRUPT | B_TIMEOUT);
slot = id % MAX_PORTS; slot = id % MAX_PORTS;
// check buffer_size if (bufferSize > PORT_MAX_MESSAGE_SIZE)
if (buffer_size > PORT_MAX_MESSAGE_SIZE)
return EINVAL; return EINVAL;
state = disable_interrupts(); state = disable_interrupts();
@ -833,45 +826,44 @@ write_port_etc(port_id id, int32 msg_code, const void *msg_buffer,
// XXX -> possible race condition if port gets deleted (->sem deleted too), // XXX -> possible race condition if port gets deleted (->sem deleted too),
// and queue is full therefore sem_id is cached in local variable up here // and queue is full therefore sem_id is cached in local variable up here
status = acquire_sem_etc(cached_semid, 1, flags, timeout);
// get 1 entry from the queue, block if needed // get 1 entry from the queue, block if needed
// assumes flags
res = acquire_sem_etc(cached_semid, 1,
flags & (B_TIMEOUT | B_CAN_INTERRUPT), timeout);
// XXX: possible race condition if port written by two threads... // XXX: possible race condition if port written by two threads...
// both threads will write in 2 different slots allocated above, simultaneously // both threads will write in 2 different slots allocated above, simultaneously
// slot is a thread-local variable // slot is a thread-local variable
if (res == B_BAD_PORT_ID || res == EINTR) { if (status == B_BAD_PORT_ID || status == B_INTERRUPTED) {
/* somebody deleted the port or the sem while we were waiting */ /* somebody deleted the port or the sem while we were waiting */
return B_BAD_PORT_ID; return B_BAD_PORT_ID;
} }
if (res == B_TIMED_OUT) { if (status == B_TIMED_OUT) {
// timed out, or, if timeout=0, 'would block' // timed out, or, if timeout = 0, 'would block'
return B_TIMED_OUT; return B_TIMED_OUT;
} }
if (res != B_NO_ERROR) { if (status != B_NO_ERROR) {
dprintf("write_port_etc: res unknown error %d\n", res); dprintf("write_port_etc: unknown error %ld\n", status);
return res; return status;
} }
if (buffer_size > 0) { if (bufferSize > 0) {
msg_store = cbuf_get_chain(buffer_size); msgStore = cbuf_get_chain(bufferSize);
if (msg_store == NULL) if (msgStore == NULL)
return ENOMEM; return B_NO_MEMORY;
if (flags & PORT_FLAG_USE_USER_MEMCPY) {
if (userCopy) {
// copy from user memory // copy from user memory
if ((err = cbuf_user_memcpy_to_chain(msg_store, 0, msg_buffer, buffer_size)) < 0) if ((status = cbuf_user_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < B_OK)
return err; // memory exception return status;
} else
// copy from kernel memory
if ((err = cbuf_memcpy_to_chain(msg_store, 0, msg_buffer, buffer_size)) < 0)
return err; // memory exception
} else { } else {
msg_store = NULL; // copy from kernel memory
if ((status = cbuf_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < 0)
return status;
} }
} else
msgStore = NULL;
// attach copied message to queue // attach copied message to queue
state = disable_interrupts(); state = disable_interrupts();
@ -882,9 +874,10 @@ write_port_etc(port_id id, int32 msg_code, const void *msg_buffer,
panic("port %ld: head < 0", ports[slot].id); panic("port %ld: head < 0", ports[slot].id);
if (h >= ports[slot].capacity) if (h >= ports[slot].capacity)
panic("port %ld: head > cap %ld", ports[slot].id, ports[slot].capacity); panic("port %ld: head > cap %ld", ports[slot].id, ports[slot].capacity);
ports[slot].msg_queue[h].msg_code = msg_code;
ports[slot].msg_queue[h].data_cbuf = msg_store; ports[slot].msg_queue[h].msg_code = msgCode;
ports[slot].msg_queue[h].data_len = buffer_size; ports[slot].msg_queue[h].data_cbuf = msgStore;
ports[slot].msg_queue[h].data_len = bufferSize;
ports[slot].head = (ports[slot].head + 1) % ports[slot].capacity; ports[slot].head = (ports[slot].head + 1) % ports[slot].capacity;
ports[slot].total_count++; ports[slot].total_count++;
@ -894,6 +887,7 @@ write_port_etc(port_id id, int32 msg_code, const void *msg_buffer,
RELEASE_PORT_LOCK(ports[slot]); RELEASE_PORT_LOCK(ports[slot]);
restore_interrupts(state); restore_interrupts(state);
// ToDo: what is this needed for?
get_sem_count(ports[slot].read_sem, &c1); get_sem_count(ports[slot].read_sem, &c1);
get_sem_count(ports[slot].write_sem, &c2); get_sem_count(ports[slot].write_sem, &c2);