Replaced the reader/writer blocking semaphores by condition variables.

This fixes race conditions. The OpenSSH tests don't hang anymore --
instead they run the system out of memory, apparently due to a net
buffer/data node leak.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@25117 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-04-23 17:10:30 +00:00
parent 74a7b3ff69
commit 5b29b956f3
2 changed files with 29 additions and 42 deletions

View File

@ -286,23 +286,17 @@ UnixFifo::UnixFifo(size_t capacity)
fWriters(), fWriters(),
fReadRequested(0), fReadRequested(0),
fWriteRequested(0), fWriteRequested(0),
fReaderSem(-1),
fWriterSem(-1),
fShutdown(0) fShutdown(0)
{ {
fReadCondition.Init(this, "unix fifo read");
fWriteCondition.Init(this, "unix fifo write");
fLock.sem = -1; fLock.sem = -1;
} }
UnixFifo::~UnixFifo() UnixFifo::~UnixFifo()
{ {
if (fReaderSem >= 0)
delete_sem(fReaderSem);
if (fWriterSem >= 0)
delete_sem(fWriterSem);
if (fLock.sem >= 0) if (fLock.sem >= 0)
benaphore_destroy(&fLock); benaphore_destroy(&fLock);
} }
@ -311,15 +305,7 @@ UnixFifo::~UnixFifo()
status_t status_t
UnixFifo::Init() UnixFifo::Init()
{ {
status_t error = benaphore_init(&fLock, "unix fifo"); return benaphore_init(&fLock, "unix fifo");
fReaderSem = create_sem(0, "unix fifo readers");
fWriterSem = create_sem(0, "unix fifo writers");
if (error != B_OK || fReaderSem < 0 || fWriterSem < 0)
return ENOBUFS;
return B_OK;
} }
@ -330,8 +316,8 @@ UnixFifo::Shutdown(uint32 shutdown)
if (shutdown != 0) { if (shutdown != 0) {
// Shutting down either end also effects the other, so notify both. // Shutting down either end also effects the other, so notify both.
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); fReadCondition.NotifyAll();
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); fWriteCondition.NotifyAll();
} }
} }
@ -359,13 +345,13 @@ UnixFifo::Read(size_t numBytes, bigtime_t timeout, net_buffer** _buffer)
&& !IsReadShutdown()) { && !IsReadShutdown()) {
// There's more to read, other readers, and we were first in the queue. // There's more to read, other readers, and we were first in the queue.
// So we need to notify the others. // So we need to notify the others.
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); fReadCondition.NotifyAll();
} }
if (error == B_OK && *_buffer != NULL && (*_buffer)->size > 0 if (error == B_OK && *_buffer != NULL && (*_buffer)->size > 0
&& !fWriters.IsEmpty() && !IsWriteShutdown()) { && !fWriters.IsEmpty() && !IsWriteShutdown()) {
// We read something and there are writers. Notify them // We read something and there are writers. Notify them
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); fWriteCondition.NotifyAll();
} }
RETURN_ERROR(error); RETURN_ERROR(error);
@ -392,13 +378,13 @@ UnixFifo::Write(net_buffer* buffer, bigtime_t timeout)
&& !IsWriteShutdown()) { && !IsWriteShutdown()) {
// There's more space for writing, other writers, and we were first in // There's more space for writing, other writers, and we were first in
// the queue. So we need to notify the others. // the queue. So we need to notify the others.
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); fWriteCondition.NotifyAll();
} }
if (error == B_OK && request.size > 0 && !fReaders.IsEmpty() if (error == B_OK && request.size > 0 && !fReaders.IsEmpty()
&& !IsReadShutdown()) { && !IsReadShutdown()) {
// We've written something and there are readers. Notify them // We've written something and there are readers. Notify them
release_sem_etc(fReaderSem, 1, B_RELEASE_ALL); fReadCondition.NotifyAll();
} }
RETURN_ERROR(error); RETURN_ERROR(error);
@ -439,7 +425,7 @@ UnixFifo::SetBufferCapacity(size_t capacity)
// wake up waiting writers, if the capacity increased // wake up waiting writers, if the capacity increased
if (!fWriters.IsEmpty() && !IsWriteShutdown()) if (!fWriters.IsEmpty() && !IsWriteShutdown())
release_sem_etc(fWriterSem, 1, B_RELEASE_ALL); fWriteCondition.NotifyAll();
} }
@ -452,11 +438,11 @@ UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
RETURN_ERROR(B_WOULD_BLOCK); RETURN_ERROR(B_WOULD_BLOCK);
while (fReaders.Head() != &request && !IsReadShutdown()) { while (fReaders.Head() != &request && !IsReadShutdown()) {
ConditionVariableEntry entry;
fReadCondition.Add(&entry, B_CAN_INTERRUPT);
benaphore_unlock(&fLock); benaphore_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
status_t error = acquire_sem_etc(fReaderSem, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
benaphore_lock(&fLock); benaphore_lock(&fLock);
if (error != B_OK) if (error != B_OK)
@ -480,11 +466,11 @@ UnixFifo::_Read(Request& request, size_t numBytes, bigtime_t timeout,
// TODO: Support low water marks! // TODO: Support low water marks!
while (fBuffer.Readable() == 0 while (fBuffer.Readable() == 0
&& !IsReadShutdown() && !IsWriteShutdown()) { && !IsReadShutdown() && !IsWriteShutdown()) {
ConditionVariableEntry entry;
fReadCondition.Add(&entry, B_CAN_INTERRUPT);
benaphore_unlock(&fLock); benaphore_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
status_t error = acquire_sem_etc(fReaderSem, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
benaphore_lock(&fLock); benaphore_lock(&fLock);
if (error != B_OK) if (error != B_OK)
@ -511,11 +497,11 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
RETURN_ERROR(B_WOULD_BLOCK); RETURN_ERROR(B_WOULD_BLOCK);
while (fWriters.Head() != &request && !IsWriteShutdown()) { while (fWriters.Head() != &request && !IsWriteShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry, B_CAN_INTERRUPT);
benaphore_unlock(&fLock); benaphore_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
status_t error = acquire_sem_etc(fWriterSem, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
benaphore_lock(&fLock); benaphore_lock(&fLock);
if (error != B_OK) if (error != B_OK)
@ -534,11 +520,11 @@ UnixFifo::_Write(Request& request, net_buffer* buffer, bigtime_t timeout)
while (fBuffer.Writable() < request.size && !IsWriteShutdown() while (fBuffer.Writable() < request.size && !IsWriteShutdown()
&& !IsReadShutdown()) { && !IsReadShutdown()) {
ConditionVariableEntry entry;
fWriteCondition.Add(&entry, B_CAN_INTERRUPT);
benaphore_unlock(&fLock); benaphore_unlock(&fLock);
status_t error = entry.Wait(B_ABSOLUTE_TIMEOUT, timeout);
status_t error = acquire_sem_etc(fWriterSem, 1,
B_ABSOLUTE_TIMEOUT | B_CAN_INTERRUPT, timeout);
benaphore_lock(&fLock); benaphore_lock(&fLock);
if (error != B_OK) if (error != B_OK)

View File

@ -7,6 +7,7 @@
#include <Referenceable.h> #include <Referenceable.h>
#include <condition_variable.h>
#include <lock.h> #include <lock.h>
#include <util/AutoLock.h> #include <util/AutoLock.h>
#include <util/DoublyLinkedList.h> #include <util/DoublyLinkedList.h>
@ -126,8 +127,8 @@ private:
RequestList fWriters; RequestList fWriters;
size_t fReadRequested; size_t fReadRequested;
size_t fWriteRequested; size_t fWriteRequested;
sem_id fReaderSem; ConditionVariable fReadCondition;
sem_id fWriterSem; ConditionVariable fWriteCondition;
uint32 fShutdown; uint32 fShutdown;
}; };