diff --git a/src/system/kernel/fs/pipefs.cpp b/src/system/kernel/fs/pipefs.cpp index 47606a228a..0828ed0367 100644 --- a/src/system/kernel/fs/pipefs.cpp +++ b/src/system/kernel/fs/pipefs.cpp @@ -58,6 +58,9 @@ class RingBuffer { RingBuffer(); ~RingBuffer(); + status_t CreateBuffer(); + void DeleteBuffer(); + size_t Write(const void *buffer, size_t length); size_t Read(void *buffer, size_t length); ssize_t UserWrite(const void *buffer, ssize_t length); @@ -170,6 +173,7 @@ class Inode { ino_t ID() const { return fID; } const char *Name() const { return fName; } + bool IsActive() const { return fActive; } int32 Type() const { return fType; } void SetMode(mode_t mode) { fType = (fType & ~S_IUMSK) | (mode & S_IUMSK); } @@ -246,6 +250,7 @@ class Inode { int32 fReaderCount; int32 fWriterCount; + bool fActive; select_sync_pool *fReadSelectSyncPool; select_sync_pool *fWriteSelectSyncPool; @@ -280,20 +285,44 @@ extern fs_vnode_ops kVnodeOps; RingBuffer::RingBuffer() + : fBuffer(NULL) { - fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE); } RingBuffer::~RingBuffer() { - delete_ring_buffer(fBuffer); + DeleteBuffer(); +} + + +status_t +RingBuffer::CreateBuffer() +{ + if (fBuffer != NULL) + return B_OK; + + fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE); + return (fBuffer != NULL ? B_OK : B_NO_MEMORY); +} + + +void +RingBuffer::DeleteBuffer() +{ + if (fBuffer != NULL) { + delete_ring_buffer(fBuffer); + fBuffer = NULL; + } } inline size_t RingBuffer::Write(const void *buffer, size_t length) { + if (fBuffer == NULL) + return B_NO_MEMORY; + return ring_buffer_write(fBuffer, (const uint8 *)buffer, length); } @@ -301,6 +330,9 @@ RingBuffer::Write(const void *buffer, size_t length) inline size_t RingBuffer::Read(void *buffer, size_t length) { + if (fBuffer == NULL) + return B_NO_MEMORY; + return ring_buffer_read(fBuffer, (uint8 *)buffer, length); } @@ -308,6 +340,9 @@ RingBuffer::Read(void *buffer, size_t length) inline ssize_t RingBuffer::UserWrite(const void *buffer, ssize_t length) { + if (fBuffer == NULL) + return B_NO_MEMORY; + return ring_buffer_user_write(fBuffer, (const uint8 *)buffer, length); } @@ -315,6 +350,9 @@ RingBuffer::UserWrite(const void *buffer, ssize_t length) inline ssize_t RingBuffer::UserRead(void *buffer, ssize_t length) { + if (fBuffer == NULL) + return B_NO_MEMORY; + return ring_buffer_user_read(fBuffer, (uint8 *)buffer, length); } @@ -322,14 +360,14 @@ RingBuffer::UserRead(void *buffer, ssize_t length) inline size_t RingBuffer::Readable() const { - return ring_buffer_readable(fBuffer); + return (fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0); } inline size_t RingBuffer::Writable() const { - return ring_buffer_writable(fBuffer); + return (fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0); } @@ -600,6 +638,7 @@ Inode::Inode(ino_t id, Inode *parent, const char *name, int32 type) fWriteRequests(), fReaderCount(0), fWriterCount(0), + fActive(false), fReadSelectSyncPool(NULL), fWriteSelectSyncPool(NULL) { @@ -667,7 +706,8 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking) while (dataSize > 0) { // Wait until enough space in the buffer is available. - while (fBuffer.Writable() < minToWrite && fReaderCount > 0) { + while (!fActive + || fBuffer.Writable() < minToWrite && fReaderCount > 0) { if (nonBlocking) return B_WOULD_BLOCK; @@ -688,7 +728,7 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking) } // write only as long as there are readers left - if (fReaderCount == 0) { + if (fReaderCount == 0 && fActive) { if (written == 0) send_signal(find_thread(NULL), SIGPIPE); return EPIPE; @@ -696,11 +736,11 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking) // write as much as we can - size_t toWrite = fBuffer.Writable(); + size_t toWrite = (fActive ? fBuffer.Writable() : 0); if (toWrite > dataSize) toWrite = dataSize; - if (fBuffer.UserWrite(data, toWrite) < B_OK) + if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK) return B_BAD_ADDRESS; data += toWrite; @@ -737,7 +777,7 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking, if (nonBlocking) return B_WOULD_BLOCK; - if (fWriterCount == 0) + if (fActive && fWriterCount == 0) return B_OK; error = WaitForReadRequest(request); @@ -881,18 +921,29 @@ Inode::NotifyEndClosed(bool writer) } - void Inode::Open(int openMode) { if (!S_ISFIFO(fType)) return; + BenaphoreLocker locker(RequestLock()); + if ((openMode & O_ACCMODE) == O_WRONLY) - atomic_add(&fWriterCount, 1); + fWriterCount++; if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) - atomic_add(&fReaderCount, 1); + fReaderCount++; + + if (fReaderCount > 0 && fWriterCount > 0) { + fBuffer.CreateBuffer(); + fActive = true; + + // notify all waiting writers that they can start + if (fWriteSelectSyncPool) + notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE); + fWriteCondition.NotifyAll(); + } } @@ -906,15 +957,18 @@ Inode::Close(int openMode) BenaphoreLocker locker(RequestLock()); - if ((openMode & O_ACCMODE) == O_WRONLY - && atomic_add(&fWriterCount, -1) == 1) { + if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0) NotifyEndClosed(true); - } if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) { - if (atomic_add(&fReaderCount, -1) == 1) + if (--fReaderCount == 0) NotifyEndClosed(false); } + + if (fReaderCount == 0 && fWriterCount == 0) { + fActive = false; + fBuffer.DeleteBuffer(); + } } @@ -1351,8 +1405,7 @@ pipefs_read(fs_volume *_volume, fs_vnode *_node, void *_cookie, BenaphoreLocker locker(inode->RequestLock()); - if (inode->WriterCount() == 0) { - + if (inode->IsActive() && inode->WriterCount() == 0) { // as long there is no writer, and the pipe is empty, // we always just return 0 to indicate end of file if (inode->BytesAvailable() == 0) {