diff --git a/src/kernel/core/port.c b/src/kernel/core/port.c index 5b13977b6c..c22265ee36 100644 --- a/src/kernel/core/port.c +++ b/src/kernel/core/port.c @@ -13,13 +13,10 @@ #include #include #include -#include +#include #include -#include -#include #include -#include -#include + #include #include @@ -32,25 +29,24 @@ #endif -struct port_msg { - int msg_code; - cbuf *data_cbuf; - size_t data_len; -}; +typedef struct port_msg { + list_link link; + int32 code; + cbuf *buffer_chain; + size_t size; +} port_msg; struct port_entry { - port_id id; - team_id owner; - int32 capacity; - spinlock lock; - char *name; - sem_id read_sem; - sem_id write_sem; - int head; - int tail; - int total_count; - bool closed; - struct port_msg* msg_queue; + port_id id; + team_id owner; + int32 capacity; + spinlock lock; + const char *name; + sem_id read_sem; + sem_id write_sem; + int32 total_count; + struct list msg_queue; + bool closed; }; // internal API @@ -91,6 +87,10 @@ port_init(kernel_args *ka) panic("unable to allocate kernel port table!\n"); } + // ToDo: investigate preallocating a list of port_msgs to + // speed up actual message sending/receiving, a slab allocator + // might do it as well, though :-) + memset(gPorts, 0, size); for (i = 0; i < MAX_PORTS; i++) gPorts[i].id = -1; @@ -228,9 +228,7 @@ _dump_port_info(struct port_entry *port) dprintf("PORT: %p\n", port); dprintf("name: '%s'\n", port->name); dprintf("owner: 0x%lx\n", port->owner); - dprintf("cap: %ld\n", port->capacity); - dprintf("head: %d\n", port->head); - dprintf("tail: %d\n", port->tail); + dprintf("capacity: %ld\n", port->capacity); get_sem_count(port->read_sem, &cnt); dprintf("read_sem: %ld\n", cnt); get_sem_count(port->write_sem, &cnt); @@ -321,71 +319,89 @@ delete_owned_ports(team_id owner) } +static void +put_port_msg(port_msg *msg) +{ + cbuf_free_chain(msg->buffer_chain); + free(msg); +} + + +static port_msg * +get_port_msg(int32 code, size_t bufferSize) +{ + // ToDo: investigate preallocation of port_msgs (or use a slab allocator) + cbuf *bufferChain = NULL; + + port_msg *msg = (port_msg *)malloc(sizeof(port_msg)); + if (msg == NULL) + return NULL; + + if (bufferSize > 0) { + bufferChain = cbuf_get_chain(bufferSize); + if (bufferChain == NULL) { + free(msg); + return NULL; + } + } + + msg->code = code; + msg->buffer_chain = bufferChain; + msg->size = bufferSize; + return msg; +} + + // #pragma mark - // public kernel API port_id -create_port(int32 queue_length, const char *name) +create_port(int32 queueLength, const char *name) { - int i; - int state; - sem_id sem_r, sem_w; - port_id retval; - int name_len; - char *temp_name; - struct port_msg *q; + cpu_status state; + char nameBuffer[B_OS_NAME_LENGTH]; + sem_id readSem, writeSem; + port_id returnValue; team_id owner; - + int i; + if (ports_active == false) return B_BAD_PORT_ID; // check queue length - if (queue_length < 1) - return EINVAL; - if (queue_length > MAX_QUEUE_LENGTH) - return EINVAL; + if (queueLength < 1 + || queueLength > MAX_QUEUE_LENGTH) + return B_BAD_VALUE; // check & dup name if (name == NULL) name = "unnamed port"; - name_len = strlen(name) + 1; - name_len = min(name_len, B_OS_NAME_LENGTH); - temp_name = (char *)malloc(name_len); - if (temp_name == NULL) - return ENOMEM; - strlcpy(temp_name, name, name_len); + // ToDo: we could save the memory and use the semaphore name only instead + strlcpy(nameBuffer, name, B_OS_NAME_LENGTH); + name = strdup(nameBuffer); + if (name == NULL) + return B_NO_MEMORY; - // alloc queue - q = (struct port_msg *)malloc(queue_length * sizeof(struct port_msg)); - if (q == NULL) { - free(temp_name); // dealloc name, too - return ENOMEM; - } - - // init cbuf list of the queue - for (i = 0; i < queue_length; i++) - q[i].data_cbuf = 0; - - // create sem_r with owner set to -1 - sem_r = create_sem_etc(0, temp_name, -1); - if (sem_r < 0) { + // create read sem with owner set to -1 + // ToDo: should be B_SYSTEM_TEAM + readSem = create_sem_etc(0, name, -1); + if (readSem < B_OK) { // cleanup - free(temp_name); - free(q); - return sem_r; + free((char *)name); + return readSem; } - // create sem_w - sem_w = create_sem_etc(queue_length, temp_name, -1); - if (sem_w < 0) { + // create write sem + writeSem = create_sem_etc(queueLength, name, -1); + if (writeSem < 0) { // cleanup - delete_sem(sem_r); - free(temp_name); - free(q); - return sem_w; + delete_sem(readSem); + free((char *)name); + return writeSem; } + owner = team_get_current_team_id(); state = disable_interrupts(); @@ -404,37 +420,36 @@ create_port(int32 queue_length, const char *name) gPorts[i].id = gNextPort++; RELEASE_PORT_LIST_LOCK(); - gPorts[i].capacity = queue_length; - gPorts[i].name = temp_name; + gPorts[i].capacity = queueLength; + gPorts[i].owner = owner; + gPorts[i].name = name; + + gPorts[i].read_sem = readSem; + gPorts[i].write_sem = writeSem; + + list_init(&gPorts[i].msg_queue); + gPorts[i].total_count = 0; + returnValue = gPorts[i].id; - // assign sem - gPorts[i].read_sem = sem_r; - gPorts[i].write_sem = sem_w; - gPorts[i].msg_queue = q; - gPorts[i].head = 0; - gPorts[i].tail = 0; - gPorts[i].total_count= 0; - gPorts[i].owner = owner; - retval = gPorts[i].id; RELEASE_PORT_LOCK(gPorts[i]); goto out; } } + // not enough gPorts... RELEASE_PORT_LIST_LOCK(); - retval = B_NO_MORE_PORTS; + returnValue = B_NO_MORE_PORTS; dprintf("create_port(): B_NO_MORE_PORTS\n"); // cleanup - delete_sem(sem_w); - delete_sem(sem_r); - free(temp_name); - free(q); + delete_sem(writeSem); + delete_sem(readSem); + free((char *)name); out: restore_interrupts(state); - return retval; + return returnValue; } @@ -474,18 +489,14 @@ close_port(port_id id) status_t delete_port(port_id id) { + cpu_status state; + sem_id readSem, writeSem; + const char *name; + struct list list; + port_msg *msg; int slot; - int state; - sem_id r_sem, w_sem; - int capacity; - int i; - char *old_name; - struct port_msg *q; - - if (ports_active == false) - return B_BAD_PORT_ID; - if (id < 0) + if (ports_active == false || id < 0) return B_BAD_PORT_ID; slot = id % MAX_PORTS; @@ -502,31 +513,28 @@ delete_port(port_id id) /* mark port as invalid */ gPorts[slot].id = -1; - old_name = gPorts[slot].name; - q = gPorts[slot].msg_queue; - r_sem = gPorts[slot].read_sem; - w_sem = gPorts[slot].write_sem; - capacity = gPorts[slot].capacity; + name = gPorts[slot].name; + list = gPorts[slot].msg_queue; + readSem = gPorts[slot].read_sem; + writeSem = gPorts[slot].write_sem; gPorts[slot].name = NULL; RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); - // delete the cbuf's that are left in the queue (if any) - for (i = 0; i < capacity; i++) { - if (q[i].data_cbuf != NULL) - cbuf_free_chain(q[i].data_cbuf); + // free the queue + while ((msg = (port_msg *)list_remove_head_item(&list)) != NULL) { + put_port_msg(msg); } - free(q); - free(old_name); + free((char *)name); // release the threads that were blocking on this port by deleting the sem // read_port() will see the B_BAD_SEM_ID acq_sem() return value, and act accordingly - delete_sem(r_sem); - delete_sem(w_sem); + delete_sem(readSem); + delete_sem(writeSem); - return B_NO_ERROR; + return B_OK; } @@ -682,9 +690,9 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout) cpu_status state; sem_id cachedSem; status_t status; + port_msg *msg; ssize_t size; int slot; - int tail; if (!ports_active || id < 0) return B_BAD_PORT_ID; @@ -706,12 +714,8 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout) RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); - // block if no message, - // if TIMEOUT flag set, block with timeout + // block if no message, or, if B_TIMEOUT flag set, block with timeout - // XXX - is it a race condition to acquire a sem just after we - // unlocked the port ? - // XXX: call an acquire_sem which does the release lock, restore int & block the right way status = acquire_sem_etc(cachedSem, 1, flags, timeout); if (status == B_BAD_SEM_ID) { @@ -732,12 +736,11 @@ port_buffer_size_etc(port_id id, uint32 flags, bigtime_t timeout) } // determine tail & get the length of the message - tail = gPorts[slot].tail; - if (tail < 0) - panic("port %ld: tail < 0", gPorts[slot].id); - if (tail > gPorts[slot].capacity) - panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity); - size = gPorts[slot].msg_queue[tail].data_len; + msg = list_get_first_item(&gPorts[slot].msg_queue); + if (msg == NULL) + panic("port %ld: no messages found", gPorts[slot].id); + + size = msg->size; RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); @@ -757,8 +760,7 @@ port_count(port_id id) int state; int32 count; - if (ports_active == false - || id < 0) + if (ports_active == false || id < 0) return B_BAD_PORT_ID; slot = id % MAX_PORTS; @@ -774,7 +776,7 @@ port_count(port_id id) } get_sem_count(gPorts[slot].read_sem, &count); - // do not return negative numbers + // do not return negative numbers if (count < 0) count = 0; @@ -794,24 +796,21 @@ read_port(port_id port, int32 *msgCode, void *msgBuffer, size_t bufferSize) status_t -read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize, +read_port_etc(port_id id, int32 *_msgCode, void *msgBuffer, size_t bufferSize, uint32 flags, bigtime_t timeout) { cpu_status state; sem_id cachedSem; - size_t size; status_t status; - int tail; - cbuf *msgStore; - int32 code; bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0; + port_msg *msg; + size_t size; int slot; - if (ports_active == false - || id < 0) + if (!ports_active || id < 0) return B_BAD_PORT_ID; - if (msgCode == NULL + if (_msgCode == NULL || (msgBuffer == NULL && bufferSize > 0) || timeout < 0) return B_BAD_VALUE; @@ -835,16 +834,9 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize, RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); - // XXX -> possible race condition if port gets deleted (->sem deleted too), therefore - // sem_id is cached in local variable up here - status = acquire_sem_etc(cachedSem, 1, flags, timeout); // get 1 entry from the queue, block if needed - // XXX: possible race condition if port read by two threads... - // both threads will read in 2 different slots allocated above, simultaneously - // slot is a thread-local variable - if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) { /* somebody deleted the port or the sem went away */ return B_BAD_PORT_ID; @@ -861,47 +853,49 @@ read_port_etc(port_id id, int32 *msgCode, void *msgBuffer, size_t bufferSize, state = disable_interrupts(); GRAB_PORT_LOCK(gPorts[slot]); - tail = gPorts[slot].tail; - if (tail < 0) - panic("port %ld: tail < 0", gPorts[slot].id); - if (tail > gPorts[slot].capacity) - panic("port %ld: tail > cap %ld", gPorts[slot].id, gPorts[slot].capacity); + // first, let's check if the port is still alive + if (gPorts[slot].id == -1) { + // the port has been deleted in the meantime + RELEASE_PORT_LOCK(gPorts[slot]); + restore_interrupts(state); + return B_BAD_PORT_ID; + } + + msg = list_get_first_item(&gPorts[slot].msg_queue); + if (msg == NULL) + panic("port %ld: no messages found", gPorts[slot].id); + + list_remove_link(msg); - gPorts[slot].tail = (gPorts[slot].tail + 1) % gPorts[slot].capacity; gPorts[slot].total_count++; - msgStore = gPorts[slot].msg_queue[tail].data_cbuf; - code = gPorts[slot].msg_queue[tail].msg_code; - - // mark queue entry unused - gPorts[slot].msg_queue[tail].data_cbuf = NULL; - - // check output buffer size - size = min(bufferSize, gPorts[slot].msg_queue[tail].data_len); - cachedSem = gPorts[slot].write_sem; RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); + // check output buffer size + size = min(bufferSize, msg->size); + // copy message - *msgCode = code; + *_msgCode = msg->code; if (size > 0) { if (userCopy) { - if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msgStore, 0, size) < B_OK)) { + if ((status = cbuf_user_memcpy_from_chain(msgBuffer, msg->buffer_chain, 0, size) < B_OK)) { // leave the port intact, for other threads that might not crash - cbuf_free_chain(msgStore); + put_port_msg(msg); release_sem(cachedSem); return status; } } else - cbuf_memcpy_from_chain(msgBuffer, msgStore, 0, size); + cbuf_memcpy_from_chain(msgBuffer, msg->buffer_chain, 0, size); } - // free the cbuf - cbuf_free_chain(msgStore); + put_port_msg(msg); // make one spot in queue available again for write release_sem(cachedSem); + // ToDo: we might think about setting B_NO_RESCHEDULE here + // from time to time (always?) return size; } @@ -918,16 +912,14 @@ status_t write_port_etc(port_id id, int32 msgCode, const void *msgBuffer, size_t bufferSize, uint32 flags, bigtime_t timeout) { - int slot; cpu_status state; - status_t status; sem_id cachedSem; - int head; - cbuf *msgStore; + status_t status; + port_msg *msg; bool userCopy = (flags & PORT_FLAG_USE_USER_MEMCPY) > 0; + int slot; - if (ports_active == false - || id < 0) + if (!ports_active || id < 0) return B_BAD_PORT_ID; // mask irrelevant flags (for acquire_sem() usage) @@ -960,16 +952,9 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer, RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); - // XXX -> possible race condition if port gets deleted (->sem deleted too), - // and queue is full therefore sem_id is cached in local variable up here - status = acquire_sem_etc(cachedSem, 1, flags, timeout); // get 1 entry from the queue, block if needed - // XXX: possible race condition if port written by two threads... - // both threads will write in 2 different slots allocated above, simultaneously - // slot is a thread-local variable - if (status == B_BAD_SEM_ID || status == B_INTERRUPTED) { /* somebody deleted the port or the sem while we were waiting */ return B_BAD_PORT_ID; @@ -983,37 +968,37 @@ write_port_etc(port_id id, int32 msgCode, const void *msgBuffer, return status; } - if (bufferSize > 0) { - msgStore = cbuf_get_chain(bufferSize); - if (msgStore == NULL) - return B_NO_MEMORY; + msg = get_port_msg(msgCode, bufferSize); + if (msg == NULL) + return B_NO_MEMORY; + if (bufferSize > 0) { if (userCopy) { // copy from user memory - if ((status = cbuf_user_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < B_OK) + if ((status = cbuf_user_memcpy_to_chain(msg->buffer_chain, 0, msgBuffer, bufferSize)) < B_OK) return status; } else { // copy from kernel memory - if ((status = cbuf_memcpy_to_chain(msgStore, 0, msgBuffer, bufferSize)) < 0) + if ((status = cbuf_memcpy_to_chain(msg->buffer_chain, 0, msgBuffer, bufferSize)) < 0) return status; } - } else - msgStore = NULL; + } - // attach copied message to queue + // attach message to queue state = disable_interrupts(); GRAB_PORT_LOCK(gPorts[slot]); - head = gPorts[slot].head; - if (head < 0) - panic("port %ld: head < 0", gPorts[slot].id); - if (head >= gPorts[slot].capacity) - panic("port %ld: head > cap %ld", gPorts[slot].id, gPorts[slot].capacity); + // first, let's check if the port is still alive + if (gPorts[slot].id == -1) { + // the port has been deleted in the meantime + RELEASE_PORT_LOCK(gPorts[slot]); + restore_interrupts(state); - gPorts[slot].msg_queue[head].msg_code = msgCode; - gPorts[slot].msg_queue[head].data_cbuf = msgStore; - gPorts[slot].msg_queue[head].data_len = bufferSize; - gPorts[slot].head = (gPorts[slot].head + 1) % gPorts[slot].capacity; + put_port_msg(msg); + return B_BAD_PORT_ID; + } + + list_add_item(&gPorts[slot].msg_queue, msg); // store sem_id in local variable cachedSem = gPorts[slot].read_sem; @@ -1034,8 +1019,7 @@ set_port_owner(port_id id, team_id team) int slot; int state; - if (ports_active == false - || id < 0) + if (!ports_active || id < 0) return B_BAD_PORT_ID; slot = id % MAX_PORTS; @@ -1046,7 +1030,7 @@ set_port_owner(port_id id, team_id team) if (gPorts[slot].id != id) { RELEASE_PORT_LOCK(gPorts[slot]); restore_interrupts(state); - dprintf("set_port_owner: invalid port_id %ld\n", id); + TRACE(("set_port_owner: invalid port_id %ld\n", id)); return B_BAD_PORT_ID; }