* A pipe can now be inactive and active. After creation it remains
inactive until at least one reader and one writer have opened it. As long as it is inactive, reads from and writes to it just block. When active, they behave as before (if there's no counterpart writes fail: SIGPIPE + EPIPE, reads return 0). When both reader and writer count drop to zero, the pipe becomes inactive again. * Allocate the ring buffer lazily when the pipe becomes active and delete it when it becomes inactive. This makes the pipe implementation FIFO compatible. IOW, FIFOs work as expected as far as I've tested them. git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@24819 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
parent
a8cba5904f
commit
d4016ffdbb
|
@ -58,6 +58,9 @@ class RingBuffer {
|
|||
RingBuffer();
|
||||
~RingBuffer();
|
||||
|
||||
status_t CreateBuffer();
|
||||
void DeleteBuffer();
|
||||
|
||||
size_t Write(const void *buffer, size_t length);
|
||||
size_t Read(void *buffer, size_t length);
|
||||
ssize_t UserWrite(const void *buffer, ssize_t length);
|
||||
|
@ -170,6 +173,7 @@ class Inode {
|
|||
ino_t ID() const { return fID; }
|
||||
const char *Name() const { return fName; }
|
||||
|
||||
bool IsActive() const { return fActive; }
|
||||
int32 Type() const { return fType; }
|
||||
void SetMode(mode_t mode)
|
||||
{ fType = (fType & ~S_IUMSK) | (mode & S_IUMSK); }
|
||||
|
@ -246,6 +250,7 @@ class Inode {
|
|||
|
||||
int32 fReaderCount;
|
||||
int32 fWriterCount;
|
||||
bool fActive;
|
||||
|
||||
select_sync_pool *fReadSelectSyncPool;
|
||||
select_sync_pool *fWriteSelectSyncPool;
|
||||
|
@ -280,20 +285,44 @@ extern fs_vnode_ops kVnodeOps;
|
|||
|
||||
|
||||
RingBuffer::RingBuffer()
|
||||
: fBuffer(NULL)
|
||||
{
|
||||
fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
|
||||
}
|
||||
|
||||
|
||||
RingBuffer::~RingBuffer()
|
||||
{
|
||||
delete_ring_buffer(fBuffer);
|
||||
DeleteBuffer();
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
RingBuffer::CreateBuffer()
|
||||
{
|
||||
if (fBuffer != NULL)
|
||||
return B_OK;
|
||||
|
||||
fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
|
||||
return (fBuffer != NULL ? B_OK : B_NO_MEMORY);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
RingBuffer::DeleteBuffer()
|
||||
{
|
||||
if (fBuffer != NULL) {
|
||||
delete_ring_buffer(fBuffer);
|
||||
fBuffer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
inline size_t
|
||||
RingBuffer::Write(const void *buffer, size_t length)
|
||||
{
|
||||
if (fBuffer == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
return ring_buffer_write(fBuffer, (const uint8 *)buffer, length);
|
||||
}
|
||||
|
||||
|
@ -301,6 +330,9 @@ RingBuffer::Write(const void *buffer, size_t length)
|
|||
inline size_t
|
||||
RingBuffer::Read(void *buffer, size_t length)
|
||||
{
|
||||
if (fBuffer == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
return ring_buffer_read(fBuffer, (uint8 *)buffer, length);
|
||||
}
|
||||
|
||||
|
@ -308,6 +340,9 @@ RingBuffer::Read(void *buffer, size_t length)
|
|||
inline ssize_t
|
||||
RingBuffer::UserWrite(const void *buffer, ssize_t length)
|
||||
{
|
||||
if (fBuffer == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
return ring_buffer_user_write(fBuffer, (const uint8 *)buffer, length);
|
||||
}
|
||||
|
||||
|
@ -315,6 +350,9 @@ RingBuffer::UserWrite(const void *buffer, ssize_t length)
|
|||
inline ssize_t
|
||||
RingBuffer::UserRead(void *buffer, ssize_t length)
|
||||
{
|
||||
if (fBuffer == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
return ring_buffer_user_read(fBuffer, (uint8 *)buffer, length);
|
||||
}
|
||||
|
||||
|
@ -322,14 +360,14 @@ RingBuffer::UserRead(void *buffer, ssize_t length)
|
|||
inline size_t
|
||||
RingBuffer::Readable() const
|
||||
{
|
||||
return ring_buffer_readable(fBuffer);
|
||||
return (fBuffer != NULL ? ring_buffer_readable(fBuffer) : 0);
|
||||
}
|
||||
|
||||
|
||||
inline size_t
|
||||
RingBuffer::Writable() const
|
||||
{
|
||||
return ring_buffer_writable(fBuffer);
|
||||
return (fBuffer != NULL ? ring_buffer_writable(fBuffer) : 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -600,6 +638,7 @@ Inode::Inode(ino_t id, Inode *parent, const char *name, int32 type)
|
|||
fWriteRequests(),
|
||||
fReaderCount(0),
|
||||
fWriterCount(0),
|
||||
fActive(false),
|
||||
fReadSelectSyncPool(NULL),
|
||||
fWriteSelectSyncPool(NULL)
|
||||
{
|
||||
|
@ -667,7 +706,8 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
|
|||
|
||||
while (dataSize > 0) {
|
||||
// Wait until enough space in the buffer is available.
|
||||
while (fBuffer.Writable() < minToWrite && fReaderCount > 0) {
|
||||
while (!fActive
|
||||
|| fBuffer.Writable() < minToWrite && fReaderCount > 0) {
|
||||
if (nonBlocking)
|
||||
return B_WOULD_BLOCK;
|
||||
|
||||
|
@ -688,7 +728,7 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
|
|||
}
|
||||
|
||||
// write only as long as there are readers left
|
||||
if (fReaderCount == 0) {
|
||||
if (fReaderCount == 0 && fActive) {
|
||||
if (written == 0)
|
||||
send_signal(find_thread(NULL), SIGPIPE);
|
||||
return EPIPE;
|
||||
|
@ -696,11 +736,11 @@ Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
|
|||
|
||||
// write as much as we can
|
||||
|
||||
size_t toWrite = fBuffer.Writable();
|
||||
size_t toWrite = (fActive ? fBuffer.Writable() : 0);
|
||||
if (toWrite > dataSize)
|
||||
toWrite = dataSize;
|
||||
|
||||
if (fBuffer.UserWrite(data, toWrite) < B_OK)
|
||||
if (toWrite > 0 && fBuffer.UserWrite(data, toWrite) < B_OK)
|
||||
return B_BAD_ADDRESS;
|
||||
|
||||
data += toWrite;
|
||||
|
@ -737,7 +777,7 @@ Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
|
|||
if (nonBlocking)
|
||||
return B_WOULD_BLOCK;
|
||||
|
||||
if (fWriterCount == 0)
|
||||
if (fActive && fWriterCount == 0)
|
||||
return B_OK;
|
||||
|
||||
error = WaitForReadRequest(request);
|
||||
|
@ -881,18 +921,29 @@ Inode::NotifyEndClosed(bool writer)
|
|||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
Inode::Open(int openMode)
|
||||
{
|
||||
if (!S_ISFIFO(fType))
|
||||
return;
|
||||
|
||||
BenaphoreLocker locker(RequestLock());
|
||||
|
||||
if ((openMode & O_ACCMODE) == O_WRONLY)
|
||||
atomic_add(&fWriterCount, 1);
|
||||
fWriterCount++;
|
||||
|
||||
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
|
||||
atomic_add(&fReaderCount, 1);
|
||||
fReaderCount++;
|
||||
|
||||
if (fReaderCount > 0 && fWriterCount > 0) {
|
||||
fBuffer.CreateBuffer();
|
||||
fActive = true;
|
||||
|
||||
// notify all waiting writers that they can start
|
||||
if (fWriteSelectSyncPool)
|
||||
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
|
||||
fWriteCondition.NotifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -906,15 +957,18 @@ Inode::Close(int openMode)
|
|||
|
||||
BenaphoreLocker locker(RequestLock());
|
||||
|
||||
if ((openMode & O_ACCMODE) == O_WRONLY
|
||||
&& atomic_add(&fWriterCount, -1) == 1) {
|
||||
if ((openMode & O_ACCMODE) == O_WRONLY && --fWriterCount == 0)
|
||||
NotifyEndClosed(true);
|
||||
}
|
||||
|
||||
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
|
||||
if (atomic_add(&fReaderCount, -1) == 1)
|
||||
if (--fReaderCount == 0)
|
||||
NotifyEndClosed(false);
|
||||
}
|
||||
|
||||
if (fReaderCount == 0 && fWriterCount == 0) {
|
||||
fActive = false;
|
||||
fBuffer.DeleteBuffer();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1351,8 +1405,7 @@ pipefs_read(fs_volume *_volume, fs_vnode *_node, void *_cookie,
|
|||
|
||||
BenaphoreLocker locker(inode->RequestLock());
|
||||
|
||||
if (inode->WriterCount() == 0) {
|
||||
|
||||
if (inode->IsActive() && inode->WriterCount() == 0) {
|
||||
// as long there is no writer, and the pipe is empty,
|
||||
// we always just return 0 to indicate end of file
|
||||
if (inode->BytesAvailable() == 0) {
|
||||
|
|
Loading…
Reference in New Issue