* Fixed the bug that led to #5119. There was a race condition in the read path
as soon as a second thread got into the game: if a thread was notified that a message is ready, another thread could call read_port() and steal it before the previous thread could claim it. The "Extensions" menu still doesn't seem to work, but I would guess that is unrelated. * The threads of the test app never exited, as read_port() returns the number of bytes it read, not just a status. git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@34681 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
parent
697255873e
commit
7d592ec4d2
@ -57,7 +57,7 @@ struct port_entry {
|
||||
team_id owner;
|
||||
int32 capacity;
|
||||
mutex lock;
|
||||
int32 read_count;
|
||||
uint32 read_count;
|
||||
int32 write_count;
|
||||
ConditionVariable read_condition;
|
||||
ConditionVariable write_condition;
|
||||
@ -529,11 +529,7 @@ fill_port_info(struct port_entry* port, port_info* info, size_t size)
|
||||
info->team = port->owner;
|
||||
info->capacity = port->capacity;
|
||||
|
||||
int32 count = port->read_count;
|
||||
if (count < 0)
|
||||
count = 0;
|
||||
|
||||
info->queue_count = count;
|
||||
info->queue_count = port->read_count;
|
||||
info->total_count = port->total_count;
|
||||
|
||||
strlcpy(info->name, port->lock.name, B_OS_NAME_LENGTH);
|
||||
@ -1057,12 +1053,12 @@ _get_port_message_info_etc(port_id id, port_message_info* info,
|
||||
if (sPorts[slot].id != id
|
||||
|| (is_port_closed(slot) && sPorts[slot].messages.IsEmpty())) {
|
||||
T(Info(sPorts[slot], 0, B_BAD_PORT_ID));
|
||||
TRACE(("port_buffer_size_etc(): %s port %ld\n",
|
||||
TRACE(("_get_port_message_info_etc(): %s port %ld\n",
|
||||
sPorts[slot].id == id ? "closed" : "invalid", id));
|
||||
return B_BAD_PORT_ID;
|
||||
}
|
||||
|
||||
if (sPorts[slot].read_count <= 0) {
|
||||
while (sPorts[slot].read_count == 0) {
|
||||
// We need to wait for a message to appear
|
||||
if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0)
|
||||
return B_WOULD_BLOCK;
|
||||
@ -1126,13 +1122,8 @@ port_count(port_id id)
|
||||
return B_BAD_PORT_ID;
|
||||
}
|
||||
|
||||
int32 count = sPorts[slot].read_count;
|
||||
// do not return negative numbers
|
||||
if (count < 0)
|
||||
count = 0;
|
||||
|
||||
// return count of messages
|
||||
return count;
|
||||
return sPorts[slot].read_count;
|
||||
}
|
||||
|
||||
|
||||
@ -1171,12 +1162,10 @@ read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize,
|
||||
return B_BAD_PORT_ID;
|
||||
}
|
||||
|
||||
if (sPorts[slot].read_count <= 0) {
|
||||
while (sPorts[slot].read_count == 0) {
|
||||
if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0)
|
||||
return B_WOULD_BLOCK;
|
||||
|
||||
sPorts[slot].read_count--;
|
||||
|
||||
// We need to wait for a message to appear
|
||||
ConditionVariableEntry entry;
|
||||
sPorts[slot].read_condition.Add(&entry);
|
||||
@ -1200,8 +1189,7 @@ read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize,
|
||||
sPorts[slot].read_count++;
|
||||
return status != B_OK ? status : entry.WaitStatus();
|
||||
}
|
||||
} else
|
||||
sPorts[slot].read_count--;
|
||||
}
|
||||
|
||||
// determine tail & get the length of the message
|
||||
port_message* message = sPorts[slot].messages.Head();
|
||||
@ -1216,7 +1204,6 @@ read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize,
|
||||
|
||||
T(Read(sPorts[slot], message->code, size));
|
||||
|
||||
sPorts[slot].read_count++;
|
||||
sPorts[slot].read_condition.NotifyOne();
|
||||
// we only peeked, but didn't grab the message
|
||||
return size;
|
||||
@ -1225,6 +1212,7 @@ read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize,
|
||||
sPorts[slot].messages.RemoveHead();
|
||||
sPorts[slot].total_count++;
|
||||
sPorts[slot].write_count++;
|
||||
sPorts[slot].read_count--;
|
||||
|
||||
notify_port_select_events(slot, B_EVENT_WRITE);
|
||||
sPorts[slot].write_condition.NotifyOne();
|
||||
|
@ -21,16 +21,15 @@ read_thread(void* _data)
|
||||
printf("[%ld] read port...\n", find_thread(NULL));
|
||||
|
||||
while (true) {
|
||||
ssize_t bytesWaiting = port_buffer_size(port);
|
||||
printf("[%ld] buffer size %ld waiting\n", find_thread(NULL),
|
||||
bytesWaiting);
|
||||
ssize_t bytes = port_buffer_size(port);
|
||||
printf("[%ld] buffer size %ld waiting\n", find_thread(NULL), bytes);
|
||||
|
||||
char buffer[256];
|
||||
int32 code;
|
||||
status_t status = read_port(port, &code, buffer, sizeof(buffer));
|
||||
bytes = read_port(port, &code, buffer, sizeof(buffer));
|
||||
printf("[%ld] read port result (code %lx): %s\n", find_thread(NULL),
|
||||
code, strerror(status));
|
||||
if (status == B_OK)
|
||||
code, strerror(bytes));
|
||||
if (bytes >= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
@ -51,7 +50,7 @@ main()
|
||||
resume_thread(threads[i]);
|
||||
}
|
||||
|
||||
printf("snooze for a bit, all threads should be waiting now.");
|
||||
printf("snooze for a bit, all threads should be waiting now.\n");
|
||||
snooze(100000);
|
||||
|
||||
for (int32 i = 0; i < THREAD_COUNT; i++) {
|
||||
|
Loading…
Reference in New Issue
Block a user