* Since writers also might have to wait when there is still some room

in the buffer, they need to be notified after a read when their
  minimal write size requirement has just become satisfied. We were
  notifying only when there was no space in the buffer before, which
  caused bug #1755.
* Removed Inode::NotifyWriteDone(). It's not needed anymore, since we
  don't queue writers. They are always all notified, so that one doesn't
  have to notify the next one, when it's done.
* Renamed *Request* to *ReadRequest, since we do have WriteRequests now
  as well.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@24701 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-03-30 22:35:41 +00:00
parent fd223db5ad
commit 32122dc5c4
1 changed files with 71 additions and 39 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2007, Ingo Weinhold, bonefish@cs.tu-berlin.de.
* Copyright 2007-2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Copyright 2003-2007, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*/
@ -66,6 +66,7 @@ class RingBuffer {
struct ring_buffer *fBuffer;
};
class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
public:
void SetUnnotified() { fNotified = false; }
@ -85,7 +86,28 @@ class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
bool fNotified;
};
typedef DoublyLinkedList<ReadRequest> RequestList;
class WriteRequest : public DoublyLinkedListLinkImpl<WriteRequest> {
public:
WriteRequest(size_t minimalWriteCount)
:
fMinimalWriteCount(minimalWriteCount)
{
}
size_t MinimalWriteCount() const
{
return fMinimalWriteCount;
}
private:
size_t fMinimalWriteCount;
};
typedef DoublyLinkedList<ReadRequest> ReadRequestList;
typedef DoublyLinkedList<WriteRequest> WriteRequestList;
class Volume {
public:
@ -161,22 +183,21 @@ class Inode {
void SetNext(Inode *inode) { fNext = inode; }
benaphore *RequestLock() { return &fRequestLock; }
RequestList &Requests() { return fRequests; }
status_t WriteDataToBuffer(const void *data, size_t *_length,
bool nonBlocking);
status_t ReadDataFromBuffer(void *data, size_t *_length,
bool nonBlocking, ReadRequest &request);
size_t BytesAvailable() const { return fBuffer.Readable(); }
size_t BytesWritable() const { return fBuffer.Writable(); }
void AddRequest(ReadRequest &request);
void RemoveRequest(ReadRequest &request);
status_t WaitForRequest(ReadRequest &request);
void AddReadRequest(ReadRequest &request);
void RemoveReadRequest(ReadRequest &request);
status_t WaitForReadRequest(ReadRequest &request);
void NotifyBytesRead(size_t bytes);
void NotifyReadDone();
void NotifyBytesWritten(size_t bytes);
void NotifyWriteDone();
void NotifyEndClosed(bool writer);
void Open(int openMode);
@ -211,7 +232,8 @@ class Inode {
RingBuffer fBuffer;
RequestList fRequests;
ReadRequestList fReadRequests;
WriteRequestList fWriteRequests;
benaphore fRequestLock;
@ -562,6 +584,8 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type)
:
fNext(NULL),
fHashNext(NULL),
fReadRequests(),
fWriteRequests(),
fReaderCount(0),
fWriterCount(0),
fReadSelectSyncPool(NULL),
@ -641,12 +665,15 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
ConditionVariableEntry<> entry;
entry.Add(this);
WriteRequest request(minToWrite);
fWriteRequests.Add(&request);
benaphore_unlock(&fRequestLock);
status_t status = entry.Wait(B_CAN_INTERRUPT);
benaphore_lock(&fRequestLock);
fWriteRequests.Remove(&request);
if (status != B_OK)
return status;
}
@ -687,11 +714,11 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
// wait until our request is first in queue
status_t error;
if (fRequests.Head() != &request) {
if (fReadRequests.Head() != &request) {
if (nonBlocking)
return B_WOULD_BLOCK;
error = WaitForRequest(request);
error = WaitForReadRequest(request);
if (error != B_OK)
return error;
}
@ -704,7 +731,7 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
if (fWriterCount == 0)
return B_OK;
error = WaitForRequest(request);
error = WaitForReadRequest(request);
if (error != B_OK)
return error;
}
@ -726,27 +753,27 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
void
Inode::AddRequest(ReadRequest &request)
Inode::AddReadRequest(ReadRequest &request)
{
fRequests.Add(&request);
fReadRequests.Add(&request);
}
void
Inode::RemoveRequest(ReadRequest &request)
Inode::RemoveReadRequest(ReadRequest &request)
{
fRequests.Remove(&request);
fReadRequests.Remove(&request);
}
status_t
Inode::WaitForRequest(ReadRequest &request)
Inode::WaitForReadRequest(ReadRequest &request)
{
request.SetUnnotified();
// publish the condition variable
ConditionVariable<>& conditionVariable = request.WaitCondition();
conditionVariable.Publish(&request, "pipe read request");
conditionVariable.Publish(&request, "pipe request");
// add the entry to wait on
ConditionVariableEntry<> entry;
@ -768,11 +795,27 @@ void
Inode::NotifyBytesRead(size_t bytes)
{
// notify writer, if something can be written now
if (bytes > 0 && fBuffer.Writable() == bytes) {
if (fWriteSelectSyncPool)
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
size_t writable = fBuffer.Writable();
if (bytes > 0) {
// notify select()ors only, if nothing was writable before
if (writable == bytes) {
if (fWriteSelectSyncPool)
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
}
fWriteCondition.NotifyOne();
// If any of the waiting writers has a minimal write count that has
// now become satisfied, we notify all of them (condition variables
// don't support doing that selectively).
WriteRequest *request;
WriteRequestList::Iterator iterator = fWriteRequests.GetIterator();
while ((request = iterator.Next()) != NULL) {
size_t minWriteCount = request->MinimalWriteCount();
if (minWriteCount > 0 && minWriteCount <= writable
&& minWriteCount > writable - bytes) {
fWriteCondition.NotifyAll();
break;
}
}
}
}
@ -782,7 +825,7 @@ Inode::NotifyReadDone()
{
// notify next reader, if there's still something to be read
if (fBuffer.Readable() > 0) {
if (ReadRequest* request = fRequests.First())
if (ReadRequest* request = fReadRequests.First())
request->Notify();
}
}
@ -796,21 +839,12 @@ Inode::NotifyBytesWritten(size_t bytes)
if (fReadSelectSyncPool)
notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
if (ReadRequest* request = fRequests.First())
if (ReadRequest* request = fReadRequests.First())
request->Notify();
}
}
void
Inode::NotifyWriteDone()
{
// notify next writer, if there's still space for writing
if (fBuffer.Writable() > 0)
fWriteCondition.NotifyOne();
}
void
Inode::NotifyEndClosed(bool writer)
{
@ -819,7 +853,7 @@ Inode::NotifyEndClosed(bool writer)
// contains no data, unlock all waiting readers
if (fBuffer.Readable() == 0) {
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
ReadRequestList::Iterator iterator = fReadRequests.GetIterator();
while ((request = iterator.Next()) != NULL)
request->Notify();
@ -1316,13 +1350,13 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie,
// issue read request
ReadRequest request;
inode->AddRequest(request);
inode->AddReadRequest(request);
size_t length = *_length;
status_t status = inode->ReadDataFromBuffer(buffer, &length,
(cookie->open_mode & O_NONBLOCK) != 0, request);
inode->RemoveRequest(request);
inode->RemoveReadRequest(request);
inode->NotifyReadDone();
if (length > 0)
@ -1359,8 +1393,6 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie,
status_t status = inode->WriteDataToBuffer(buffer, &length,
(cookie->open_mode & O_NONBLOCK) != 0);
inode->NotifyWriteDone();
if (length > 0)
status = B_OK;