diff --git a/src/system/kernel/fs/pipefs.cpp b/src/system/kernel/fs/pipefs.cpp index ab81b29845..1207f5e021 100644 --- a/src/system/kernel/fs/pipefs.cpp +++ b/src/system/kernel/fs/pipefs.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2007, Ingo Weinhold, bonefish@cs.tu-berlin.de. + * Copyright 2007-2008, Ingo Weinhold, ingo_weinhold@gmx.de. * Copyright 2003-2007, Axel Dörfler, axeld@pinc-software.de. * Distributed under the terms of the MIT License. */ @@ -66,6 +66,7 @@ class RingBuffer { struct ring_buffer *fBuffer; }; + class ReadRequest : public DoublyLinkedListLinkImpl { public: void SetUnnotified() { fNotified = false; } @@ -85,7 +86,28 @@ class ReadRequest : public DoublyLinkedListLinkImpl { bool fNotified; }; -typedef DoublyLinkedList RequestList; + +class WriteRequest : public DoublyLinkedListLinkImpl { + public: + WriteRequest(size_t minimalWriteCount) + : + fMinimalWriteCount(minimalWriteCount) + { + } + + size_t MinimalWriteCount() const + { + return fMinimalWriteCount; + } + + private: + size_t fMinimalWriteCount; +}; + + +typedef DoublyLinkedList ReadRequestList; +typedef DoublyLinkedList WriteRequestList; + class Volume { public: @@ -161,22 +183,21 @@ class Inode { void SetNext(Inode *inode) { fNext = inode; } benaphore *RequestLock() { return &fRequestLock; } - RequestList &Requests() { return fRequests; } status_t WriteDataToBuffer(const void *data, size_t *_length, bool nonBlocking); status_t ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking, ReadRequest &request); size_t BytesAvailable() const { return fBuffer.Readable(); } + size_t BytesWritable() const { return fBuffer.Writable(); } - void AddRequest(ReadRequest &request); - void RemoveRequest(ReadRequest &request); - status_t WaitForRequest(ReadRequest &request); + void AddReadRequest(ReadRequest &request); + void RemoveReadRequest(ReadRequest &request); + status_t WaitForReadRequest(ReadRequest &request); void NotifyBytesRead(size_t bytes); void NotifyReadDone(); void NotifyBytesWritten(size_t bytes); - void NotifyWriteDone(); void NotifyEndClosed(bool writer); void Open(int openMode); @@ -211,7 +232,8 @@ class Inode { RingBuffer fBuffer; - RequestList fRequests; + ReadRequestList fReadRequests; + WriteRequestList fWriteRequests; benaphore fRequestLock; @@ -562,6 +584,8 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type) : fNext(NULL), fHashNext(NULL), + fReadRequests(), + fWriteRequests(), fReaderCount(0), fWriterCount(0), fReadSelectSyncPool(NULL), @@ -641,12 +665,15 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking) ConditionVariableEntry<> entry; entry.Add(this); + WriteRequest request(minToWrite); + fWriteRequests.Add(&request); + benaphore_unlock(&fRequestLock); - status_t status = entry.Wait(B_CAN_INTERRUPT); - benaphore_lock(&fRequestLock); + fWriteRequests.Remove(&request); + if (status != B_OK) return status; } @@ -687,11 +714,11 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking, // wait until our request is first in queue status_t error; - if (fRequests.Head() != &request) { + if (fReadRequests.Head() != &request) { if (nonBlocking) return B_WOULD_BLOCK; - error = WaitForRequest(request); + error = WaitForReadRequest(request); if (error != B_OK) return error; } @@ -704,7 +731,7 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking, if (fWriterCount == 0) return B_OK; - error = WaitForRequest(request); + error = WaitForReadRequest(request); if (error != B_OK) return error; } @@ -726,27 +753,27 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking, void -Inode::AddRequest(ReadRequest &request) +Inode::AddReadRequest(ReadRequest &request) { - fRequests.Add(&request); + fReadRequests.Add(&request); } void -Inode::RemoveRequest(ReadRequest &request) +Inode::RemoveReadRequest(ReadRequest &request) { - fRequests.Remove(&request); + fReadRequests.Remove(&request); } status_t -Inode::WaitForRequest(ReadRequest &request) +Inode::WaitForReadRequest(ReadRequest &request) { request.SetUnnotified(); // publish the condition variable ConditionVariable<>& conditionVariable = request.WaitCondition(); - conditionVariable.Publish(&request, "pipe read request"); + conditionVariable.Publish(&request, "pipe request"); // add the entry to wait on ConditionVariableEntry<> entry; @@ -768,11 +795,27 @@ void Inode::NotifyBytesRead(size_t bytes) { // notify writer, if something can be written now - if (bytes > 0 && fBuffer.Writable() == bytes) { - if (fWriteSelectSyncPool) - notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE); + size_t writable = fBuffer.Writable(); + if (bytes > 0) { + // notify select()ors only, if nothing was writable before + if (writable == bytes) { + if (fWriteSelectSyncPool) + notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE); + } - fWriteCondition.NotifyOne(); + // If any of the waiting writers has a minimal write count that has + // now become satisfied, we notify all of them (condition variables + // don't support doing that selectively). + WriteRequest *request; + WriteRequestList::Iterator iterator = fWriteRequests.GetIterator(); + while ((request = iterator.Next()) != NULL) { + size_t minWriteCount = request->MinimalWriteCount(); + if (minWriteCount > 0 && minWriteCount <= writable + && minWriteCount > writable - bytes) { + fWriteCondition.NotifyAll(); + break; + } + } } } @@ -782,7 +825,7 @@ Inode::NotifyReadDone() { // notify next reader, if there's still something to be read if (fBuffer.Readable() > 0) { - if (ReadRequest* request = fRequests.First()) + if (ReadRequest* request = fReadRequests.First()) request->Notify(); } } @@ -796,21 +839,12 @@ Inode::NotifyBytesWritten(size_t bytes) if (fReadSelectSyncPool) notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ); - if (ReadRequest* request = fRequests.First()) + if (ReadRequest* request = fReadRequests.First()) request->Notify(); } } -void -Inode::NotifyWriteDone() -{ - // notify next writer, if there's still space for writing - if (fBuffer.Writable() > 0) - fWriteCondition.NotifyOne(); -} - - void Inode::NotifyEndClosed(bool writer) { @@ -819,7 +853,7 @@ Inode::NotifyEndClosed(bool writer) // contains no data, unlock all waiting readers if (fBuffer.Readable() == 0) { ReadRequest *request; - RequestList::Iterator iterator = fRequests.GetIterator(); + ReadRequestList::Iterator iterator = fReadRequests.GetIterator(); while ((request = iterator.Next()) != NULL) request->Notify(); @@ -1316,13 +1350,13 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, // issue read request ReadRequest request; - inode->AddRequest(request); + inode->AddReadRequest(request); size_t length = *_length; status_t status = inode->ReadDataFromBuffer(buffer, &length, (cookie->open_mode & O_NONBLOCK) != 0, request); - inode->RemoveRequest(request); + inode->RemoveReadRequest(request); inode->NotifyReadDone(); if (length > 0) @@ -1359,8 +1393,6 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, status_t status = inode->WriteDataToBuffer(buffer, &length, (cookie->open_mode & O_NONBLOCK) != 0); - inode->NotifyWriteDone(); - if (length > 0) status = B_OK;