* Use condition variables instead of semaphores for blocking readers and

writers.
* Removed the optimization for piping data between two threads of the
  same team. This greatly simplifies the code. It seems to me the case
  is very uncommon; it could be added back later, though.
* Basically rewrote reading from and writing to the pipe:
  - A blockable writer can now write more than what is currently
    available in the ring buffer.
  - Writing respects the PIPE_BUF non-interleaving limit, though our
    headers don't seem to define PIPE_BUF anywhere.
  - Unblock writers, when the last reader is gone and send those that
    haven't written anything yet a SIGPIPE. Fixes bug #1476.
* Correctly implemented select() support. We were only notifying
  writers. We manage two separate select sync pools per pipe now: one
  for the reader end and one for the writer end.
* Reading/writing from the root dir does no longer end in KDL.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@22378 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2007-09-29 23:52:23 +00:00
parent 16da0b3cdd
commit d5cbcd91b7

View File

@ -1,12 +1,19 @@
/*
* Copyright 2007, Ingo Weinhold, bonefish@cs.tu-berlin.de.
* Copyright 2003-2007, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*/
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <KernelExport.h>
#include <NodeMonitor.h>
#include <condition_variable.h>
#include <util/kernel_cpp.h>
#include <util/DoublyLinkedList.h>
#include <util/AutoLock.h>
@ -20,11 +27,6 @@
#include <vm.h>
#include <team.h>
#include <malloc.h>
#include <string.h>
#include <stdio.h>
#include <sys/stat.h>
//#define TRACE_PIPEFS
#ifdef TRACE_PIPEFS
@ -38,6 +40,10 @@
#define PIPEFS_MAX_BUFFER_SIZE 32768
// TODO: PIPE_BUF is supposed to be defined somewhere else.
#define PIPE_BUF _POSIX_PIPE_BUF
namespace pipefs {
class Volume;
@ -63,31 +69,21 @@ class RingBuffer {
class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
public:
ReadRequest(void *buffer, size_t bufferSize);
~ReadRequest();
void SetUnnotified() { fNotified = false; }
status_t Wait(bool nonBlocking);
void Notify();
void Abort();
void Notify()
{
if (!fNotified) {
fWaitCondition.NotifyOne();
fNotified = true;
}
}
status_t PutBuffer(RingBuffer &buffer, size_t *_bytesRead = NULL,
bool releasePartiallyFilled = true);
status_t PutData(const void **_data, size_t *_dataSize);
size_t SpaceLeft() const { return fBufferSize - fBytesRead; }
size_t BytesRead() const { return fBytesRead; }
team_id Team() const { return fTeam; }
bool IsQueued() const { return fIsQueued; }
void SetQueued(bool queued);
ConditionVariable<>& WaitCondition() { return fWaitCondition; }
private:
sem_id fLock;
team_id fTeam;
void *fBuffer;
size_t fBufferSize;
size_t fBytesRead;
bool fIsQueued;
ConditionVariable<> fWaitCondition;
bool fNotified;
};
typedef DoublyLinkedList<ReadRequest> RequestList;
@ -168,24 +164,30 @@ class Inode {
benaphore *RequestLock() { return &fRequestLock; }
RequestList &Requests() { return fRequests; }
status_t WriteDataToBuffer(const void **_data, size_t *_bytesLeft,
status_t WriteDataToBuffer(const void *data, size_t *_length,
bool nonBlocking);
status_t ReadDataFromBuffer(void *data, size_t *_length,
bool nonBlocking, ReadRequest &request);
size_t BytesAvailable() const { return fBuffer.Readable(); }
void MayReleaseWriter();
void FillPendingRequests(const void **_buffer,
size_t *_bytesLeft);
void FillPendingRequests();
status_t AddRequest(ReadRequest &request);
status_t RemoveRequest(ReadRequest &request);
void AddRequest(ReadRequest &request);
void RemoveRequest(ReadRequest &request);
status_t WaitForRequest(ReadRequest &request);
void NotifyBytesRead(size_t bytes);
void NotifyReadDone();
void NotifyBytesWritten(size_t bytes);
void NotifyWriteDone();
void NotifyEndClosed(bool writer);
void Open(int openMode);
void Close(int openMode);
int32 ReaderCount() const { return fReaderCount; }
int32 WriterCount() const { return fWriterCount; }
status_t Select(uint8 event, uint32 ref, selectsync *sync);
status_t Deselect(uint8 event, selectsync *sync);
status_t Select(uint8 event, uint32 ref, selectsync *sync,
int openMode);
status_t Deselect(uint8 event, selectsync *sync, int openMode);
static int32 NameHashNextOffset();
static uint32 name_hash_func(void *_node, const void *_key,
@ -213,12 +215,14 @@ class Inode {
RequestList fRequests;
benaphore fRequestLock;
sem_id fWriteLock;
ConditionVariable<> fWriteCondition;
int32 fReaderCount;
int32 fWriterCount;
select_sync_pool *fSelectSyncPool;
select_sync_pool *fReadSelectSyncPool;
select_sync_pool *fWriteSelectSyncPool;
};
@ -561,7 +565,8 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type)
fHashNext(NULL),
fReaderCount(0),
fWriterCount(0),
fSelectSyncPool(NULL)
fReadSelectSyncPool(NULL),
fWriteSelectSyncPool(NULL)
{
fName = strdup(name);
if (fName == NULL)
@ -571,7 +576,7 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type)
fType = type;
if (S_ISFIFO(type)) {
fWriteLock = create_sem(1, "pipe write");
fWriteCondition.Publish(this, "pipe");
benaphore_init(&fRequestLock, "pipe request");
}
@ -587,7 +592,7 @@ Inode::~Inode()
free(const_cast<char *>(fName));
if (S_ISFIFO(fType)) {
delete_sem(fWriteLock);
fWriteCondition.Unpublish();
benaphore_destroy(&fRequestLock);
}
}
@ -596,8 +601,7 @@ Inode::~Inode()
status_t
Inode::InitCheck()
{
if (fName == NULL
|| S_ISFIFO(fType) && (fRequestLock.sem < B_OK || fWriteLock < B_OK))
if (fName == NULL || S_ISFIFO(fType) && fRequestLock.sem < B_OK)
return B_ERROR;
return B_OK;
@ -606,204 +610,242 @@ Inode::InitCheck()
/*!
Writes the specified data bytes to the inode's ring buffer. The
Inode::WriteLock() must be held when calling this method.
Releases the reader sem if necessary, so that blocking
readers will get started.
request lock must be held when calling this method.
Notifies readers if necessary, so that blocking readers will get started.
Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode).
and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode). If
the returned length is > 0, the returned error code can be ignored.
*/
status_t
Inode::WriteDataToBuffer(const void **_data, size_t *_bytesLeft,
bool nonBlocking)
Inode::WriteDataToBuffer(const void *_data, size_t *_length, bool nonBlocking)
{
const void *data = *_data;
size_t dataSize = *_bytesLeft;
const uint8* data = (const uint8*)_data;
size_t dataSize = *_length;
size_t& written = *_length;
written = 0;
TRACE(("Inode::WriteDataToBuffer(data = %p, bytes = %lu)\n",
data, dataSize));
// if this is a blocking call, we need to make sure that data can
// still come in and we don't block the whole inode data transfer
// to prevent deadlocks from happening
// According to the standard, request up to PIPE_BUF bytes shall not be
// interleaved with other writer's data.
size_t minToWrite = 1;
if (dataSize <= PIPE_BUF)
minToWrite = dataSize;
while (fBuffer.Writable() == 0) {
if (nonBlocking)
return B_WOULD_BLOCK;
while (dataSize > 0) {
// Wait until enough space in the buffer is available.
while (fBuffer.Writable() < minToWrite && fReaderCount > 0) {
if (nonBlocking)
return B_WOULD_BLOCK;
benaphore_unlock(&fRequestLock);
ConditionVariableEntry<> entry;
entry.Add(this);
status_t status = acquire_sem_etc(fWriteLock, 1, B_CAN_INTERRUPT, 0);
benaphore_unlock(&fRequestLock);
benaphore_lock(&fRequestLock);
status_t status = entry.Wait(B_CAN_INTERRUPT);
if (status != B_OK)
return status;
benaphore_lock(&fRequestLock);
if (status != B_OK)
return status;
}
// write only as long as there are readers left
if (fReaderCount == 0) {
if (written == 0)
send_signal(find_thread(NULL), SIGPIPE);
return EPIPE;
}
// write as much as we can
size_t toWrite = fBuffer.Writable();
if (toWrite > dataSize)
toWrite = dataSize;
if (fBuffer.UserWrite(data, toWrite) < B_OK)
return B_BAD_ADDRESS;
data += toWrite;
dataSize -= toWrite;
written += toWrite;
NotifyBytesWritten(toWrite);
}
// ensure that we don't write more than PIPEFS_MAX_BUFFER_SIZE
// into a pipe without blocking
size_t spaceLeft = fBuffer.Writable();
if (dataSize > spaceLeft)
dataSize = spaceLeft;
if (fBuffer.UserWrite(data, dataSize) < B_OK) {
release_sem(fWriteLock);
return B_BAD_ADDRESS;
}
*_data = (const void *)((addr_t)data + dataSize);
*_bytesLeft -= dataSize;
MayReleaseWriter();
return B_OK;
}
void
Inode::MayReleaseWriter()
status_t
Inode::ReadDataFromBuffer(void *data, size_t *_length, bool nonBlocking,
ReadRequest &request)
{
if (fBuffer.Writable() > 0) {
if (fSelectSyncPool)
notify_select_event_pool(fSelectSyncPool, B_SELECT_WRITE);
size_t dataSize = *_length;
*_length = 0;
release_sem(fWriteLock);
// wait until our request is first in queue
status_t error;
if (fRequests.Head() != &request) {
if (nonBlocking)
return B_WOULD_BLOCK;
error = WaitForRequest(request);
if (error != B_OK)
return error;
}
// wait until data are available
while (fBuffer.Readable() == 0) {
if (nonBlocking)
return B_WOULD_BLOCK;
if (fWriterCount == 0)
return B_OK;
error = WaitForRequest(request);
if (error != B_OK)
return error;
}
// read as much as we can
size_t toRead = fBuffer.Readable();
if (toRead > dataSize)
toRead = dataSize;
if (fBuffer.UserRead(data, toRead) < B_OK)
return B_BAD_ADDRESS;
NotifyBytesRead(toRead);
*_length = toRead;
return B_OK;
}
void
Inode::AddRequest(ReadRequest &request)
{
fRequests.Add(&request);
}
void
Inode::RemoveRequest(ReadRequest &request)
{
fRequests.Remove(&request);
}
status_t
Inode::WaitForRequest(ReadRequest &request)
{
request.SetUnnotified();
// publish the condition variable
ConditionVariable<>& conditionVariable = request.WaitCondition();
conditionVariable.Publish(&request, "pipe read request");
// add the entry to wait on
ConditionVariableEntry<> entry;
entry.Add(&request);
// wait
benaphore_unlock(&fRequestLock);
status_t status = entry.Wait(B_CAN_INTERRUPT);
benaphore_lock(&fRequestLock);
// unpublish the condition variable
conditionVariable.Unpublish();
return status != B_OK;
}
void
Inode::NotifyBytesRead(size_t bytes)
{
// notify writer, if something can be written now
if (bytes > 0 && fBuffer.Writable() == bytes) {
if (fWriteSelectSyncPool)
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
fWriteCondition.NotifyOne();
}
}
/*!
This functions fills pending requests out of the ring buffer.
It either stops when there are no more requests to satisfy, or
the ring buffer is empty, whatever comes first.
You must hold the request lock when calling this function.
*/
void
Inode::FillPendingRequests()
Inode::NotifyReadDone()
{
size_t bytesLeft = fBuffer.Readable();
TRACE(("Inode::FillPendingRequests(): bytesLeft = %lu\n", bytesLeft));
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
while (bytesLeft != 0 && (request = iterator.Next()) != NULL) {
// notify the request, so that it can be filled
size_t space = request->SpaceLeft();
if (space == 0)
continue;
if (space > bytesLeft)
bytesLeft = 0;
else
bytesLeft -= space;
request->Notify();
MayReleaseWriter();
// notify next reader, if there's still something to be read
if (fBuffer.Readable() > 0) {
if (ReadRequest* request = fRequests.First())
request->Notify();
}
if (fSelectSyncPool)
notify_select_event_pool(fSelectSyncPool, B_SELECT_READ);
}
/*!
This function feeds the pending read requests using the provided
buffer directly. It will also make sure that bytes in the ring
buffer are served first.
It only does something as long as the first request in the queue
shares the same team context as the caller (write context).
You must hold the request lock when calling this function.
*/
void
Inode::FillPendingRequests(const void **_data, size_t *_bytesLeft)
Inode::NotifyBytesWritten(size_t bytes)
{
team_id team = team_get_current_team_id();
// notify reader, if something can be read now
if (bytes > 0 && fBuffer.Readable() == bytes) {
if (fReadSelectSyncPool)
notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
TRACE(("Inode::FillPendingRequests(data = %p, bytes = %lu)\n",
_data, *_bytesLeft));
if (ReadRequest* request = fRequests.First())
request->Notify();
}
}
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
while (*_bytesLeft != 0 && (request = iterator.Next()) != NULL) {
// Note that we leave this loop not to mess up the request queue
if (request->Team() != team)
break;
// try to fill this request from the ring buffer first
size_t bytesRead;
if (request->PutBuffer(fBuffer, &bytesRead, false) != B_OK)
continue;
void
Inode::NotifyWriteDone()
{
// notify next writer, if there's still space for writing
if (fBuffer.Writable() > 0)
fWriteCondition.NotifyOne();
}
MayReleaseWriter();
if (request->SpaceLeft() > 0) {
// ToDo: This is something where we can optimize the buffer
// hand-shaking considerably: we should have a function
// that copies the data to another address space - either
// remapping copy on write or a direct copy.
void
Inode::NotifyEndClosed(bool writer)
{
if (writer) {
// Our last writer has been closed; if the pipe
// contains no data, unlock all waiting readers
if (fBuffer.Readable() == 0) {
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
while ((request = iterator.Next()) != NULL)
request->Notify();
// place our data into that buffer
request->PutData(_data, _bytesLeft);
if (fReadSelectSyncPool)
notify_select_event_pool(fReadSelectSyncPool, B_SELECT_READ);
}
} else {
// Last reader is gone. Wake up all writers.
fWriteCondition.NotifyAll();
if (fWriteSelectSyncPool) {
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_WRITE);
notify_select_event_pool(fWriteSelectSyncPool, B_SELECT_ERROR);
}
}
}
/** This function adds a request into the queue.
* If there is already data waiting in the pipe, the request will
* be fulfilled.
* This function is called from within the readers thread only.
*/
status_t
Inode::AddRequest(ReadRequest &request)
{
BenaphoreLocker locker(RequestLock());
if (!locker.IsLocked())
return B_ERROR;
if (BytesAvailable() > 0 && request.PutBuffer(fBuffer) == B_OK) {
MayReleaseWriter();
} else {
fRequests.Add(&request);
request.SetQueued(true);
}
return B_OK;
}
/** This function removes a request from the queue.
* This function is called from within the readers thread only.
*/
status_t
Inode::RemoveRequest(ReadRequest &request)
{
TRACE(("Inode::RemoveRequest(request = %p)\n", &request));
BenaphoreLocker locker(RequestLock());
if (!locker.IsLocked())
return B_ERROR;
// we might have some data waiting now, if the direct team
// handshake couldn't be done
if (BytesAvailable() > 0 && request.PutBuffer(fBuffer) == B_OK)
release_sem(fWriteLock);
if (request.IsQueued())
fRequests.Remove(&request);
return B_OK;
}
void
Inode::Open(int openMode)
{
if (!S_ISFIFO(fType))
return;
if ((openMode & O_ACCMODE) == O_WRONLY)
atomic_add(&fWriterCount, 1);
@ -817,23 +859,20 @@ Inode::Close(int openMode)
{
TRACE(("Inode::Close(openMode = %d)\n", openMode));
if (!S_ISFIFO(fType))
return;
BenaphoreLocker locker(RequestLock());
if ((openMode & O_ACCMODE) == O_WRONLY
&& atomic_add(&fWriterCount, -1) == 1) {
// Our last writer has been closed; if the pipe
// contains no data, unlock all waiting readers
BenaphoreLocker locker(RequestLock());
if (BytesAvailable() == 0) {
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
while ((request = iterator.Next()) != NULL)
request->Abort();
}
NotifyEndClosed(true);
}
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
atomic_add(&fReaderCount, -1);
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) {
if (atomic_add(&fReaderCount, -1) == 1)
NotifyEndClosed(false);
}
}
@ -906,153 +945,57 @@ Inode::name_compare_func(void *_node, const void *_key)
status_t
Inode::Select(uint8 event, uint32 ref, selectsync *sync)
Inode::Select(uint8 event, uint32 ref, selectsync *sync, int openMode)
{
if (add_select_sync_pool_entry(&fSelectSyncPool, sync, ref, event) != B_OK)
if (!S_ISFIFO(fType))
return B_IS_A_DIRECTORY;
bool writer = true;
select_sync_pool** pool;
if ((openMode & O_RWMASK) == O_RDONLY) {
pool = &fReadSelectSyncPool;
writer = false;
} else if ((openMode & O_RWMASK) == O_WRONLY) {
pool = &fWriteSelectSyncPool;
} else
return B_NOT_ALLOWED;
if (add_select_sync_pool_entry(pool, sync, ref, event) != B_OK)
return B_ERROR;
if ((event == B_SELECT_READ && BytesAvailable() > 0)
|| (event == B_SELECT_WRITE && fBuffer.Writable() > 0))
return notify_select_event(sync, ref, event);
return B_OK;
}
status_t
Inode::Deselect(uint8 event, selectsync *sync)
{
remove_select_sync_pool_entry(&fSelectSyncPool, sync, event);
return B_OK;
}
// #pragma mark -
ReadRequest::ReadRequest(void *buffer, size_t bufferSize)
:
fBuffer(buffer),
fBufferSize(bufferSize),
fBytesRead(0),
fIsQueued(false)
{
fLock = create_sem(0, "request lock");
fTeam = team_get_current_team_id();
}
ReadRequest::~ReadRequest()
{
delete_sem(fLock);
}
status_t
ReadRequest::Wait(bool nonBlocking)
{
TRACE(("pipefs: request@%p waits for data (%sblocking), thread 0x%lx\n",
this, nonBlocking ? "non" : "", find_thread(NULL)));
return acquire_sem_etc(fLock, 1,
(nonBlocking ? B_TIMEOUT : 0) | B_CAN_INTERRUPT, 0);
}
void
ReadRequest::Notify()
{
release_sem(fLock);
}
void
ReadRequest::Abort()
{
fBuffer = NULL;
release_sem(fLock);
}
void
ReadRequest::SetQueued(bool queued)
{
fIsQueued = queued;
}
/*!
Reads the contents of the ring buffer into the requests data buffer
if any (canceled requests don't have a buffer anymore).
This function must only be called in the team context that initiated
the read request.
*/
status_t
ReadRequest::PutBuffer(RingBuffer &buffer, size_t *_bytesRead,
bool releasePartiallyFilled)
{
TRACE(("pipefs: ReadRequest::PutBuffer()\n"));
if (_bytesRead)
*_bytesRead = 0;
if (fBuffer == NULL)
return B_CANCELED;
size_t length = buffer.Readable();
size_t spaceLeft = SpaceLeft();
// we read spaceLeft bytes at maximum - but never
// more than there are in the buffer
if (spaceLeft < length)
length = spaceLeft;
if (length == 0)
return B_OK;
if (buffer.UserRead((char*)fBuffer + fBytesRead, length) < B_OK) {
// if the buffer is just invalid, we release the reader as well
release_sem(fLock);
return B_BAD_ADDRESS;
// signal right away, if the condition holds already
if (writer) {
if (event == B_SELECT_WRITE
&& (fBuffer.Writable() > 0 || fReaderCount == 0)
|| event == B_SELECT_ERROR && fReaderCount == 0) {
return notify_select_event(sync, ref, event);
}
} else {
if (event == B_SELECT_READ
&& (fBuffer.Readable() > 0 || fWriterCount == 0)) {
return notify_select_event(sync, ref, event);
}
}
fBytesRead += length;
if (_bytesRead)
*_bytesRead = length;
if (releasePartiallyFilled || SpaceLeft() == 0)
release_sem(fLock);
return B_OK;
}
/*!
Copies the specified buffer into the request. This function currently
only works for the local address space (both, sender and receiver must
be in the same address space).
*/
status_t
ReadRequest::PutData(const void **_buffer, size_t *_bytesLeft)
Inode::Deselect(uint8 event, selectsync *sync, int openMode)
{
TRACE(("pipefs: ReadRequest::PutData(buffer = %p, size = %lu)\n",
*_buffer, *_bytesLeft));
if (!S_ISFIFO(fType))
return B_IS_A_DIRECTORY;
size_t bytes = *_bytesLeft;
if (bytes > SpaceLeft())
bytes = SpaceLeft();
select_sync_pool** pool;
if ((openMode & O_RWMASK) == O_RDONLY) {
pool = &fReadSelectSyncPool;
} else if ((openMode & O_RWMASK) == O_WRONLY) {
pool = &fWriteSelectSyncPool;
} else
return B_NOT_ALLOWED;
uint8 *source = (uint8 *)*_buffer;
if (user_memcpy((uint8 *)fBuffer + fBytesRead, source, bytes) < B_OK) {
release_sem(fLock);
return B_BAD_ADDRESS;
}
fBytesRead += bytes;
*_buffer = (void *)(source + bytes);
*_bytesLeft -= bytes;
release_sem(fLock);
remove_select_sync_pool_entry(pool, sync, event);
return B_OK;
}
@ -1353,11 +1296,15 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie,
TRACE(("pipefs_read(vnode = %p, cookie = %p, length = %lu, mode = %d)\n",
_node, cookie, *_length, cookie->open_mode));
if (!S_ISFIFO(inode->Type()))
return B_IS_A_DIRECTORY;
if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
return B_NOT_ALLOWED;
BenaphoreLocker locker(inode->RequestLock());
if (inode->WriterCount() == 0) {
BenaphoreLocker locker(inode->RequestLock());
// as long there is no writer, and the pipe is empty,
// we always just return 0 to indicate end of file
@ -1369,22 +1316,20 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie,
// issue read request
ReadRequest request(buffer, *_length);
if (inode->AddRequest(request) != B_OK)
return B_ERROR;
ReadRequest request;
inode->AddRequest(request);
// wait for it to be filled
size_t length = *_length;
status_t status = inode->ReadDataFromBuffer(buffer, &length,
(cookie->open_mode & O_NONBLOCK) != 0, request);
status_t status = request.Wait((cookie->open_mode & O_NONBLOCK) != 0);
inode->RemoveRequest(request);
inode->NotifyReadDone();
if ((status == B_TIMED_OUT || status == B_INTERRUPTED || status == B_WOULD_BLOCK)
&& request.BytesRead() > 0)
if (length > 0)
status = B_OK;
if (status == B_OK)
*_length = request.BytesRead();
*_length = length;
return status;
}
@ -1399,37 +1344,28 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie,
TRACE(("pipefs_write(vnode = %p, cookie = %p, length = %lu)\n",
_node, cookie, *_length));
if (!S_ISFIFO(inode->Type()))
return B_IS_A_DIRECTORY;
if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
return B_NOT_ALLOWED;
if (inode->ReaderCount() == 0) {
// if there is no reader yet, we signal the thread and return
send_signal(find_thread(NULL), SIGPIPE);
return EPIPE;
}
BenaphoreLocker locker(inode->RequestLock());
size_t bytesLeft = *_length;
inode->FillPendingRequests(&buffer, &bytesLeft);
size_t length = *_length;
if (length == 0)
return B_OK;
status_t status;
// copy data into ring buffer
status_t status = inode->WriteDataToBuffer(buffer, &length,
(cookie->open_mode & O_NONBLOCK) != 0);
if (bytesLeft != 0) {
// we could not place all our data in pending requests, so
// have to put them into a temporary buffer
inode->NotifyWriteDone();
status = inode->WriteDataToBuffer(&buffer, &bytesLeft,
(cookie->open_mode & O_NONBLOCK) != 0);
if (status == B_OK) {
inode->FillPendingRequests();
*_length -= bytesLeft;
}
} else {
// could write everything without the need to copy!
if (length > 0)
status = B_OK;
}
*_length = length;
return status;
}
@ -1624,13 +1560,15 @@ static status_t
pipefs_select(fs_volume _fs, fs_vnode _node, fs_cookie _cookie, uint8 event,
uint32 ref, selectsync *sync)
{
file_cookie *cookie = (file_cookie *)_cookie;
TRACE(("pipefs_select(vnode = %p)\n", _node));
Inode *inode = (Inode *)_node;
if (!inode)
return B_ERROR;
BenaphoreLocker locker(inode->RequestLock());
return inode->Select(event, ref, sync);
return inode->Select(event, ref, sync, cookie->open_mode);
}
@ -1638,13 +1576,15 @@ static status_t
pipefs_deselect(fs_volume _fs, fs_vnode _node, fs_cookie _cookie, uint8 event,
selectsync *sync)
{
file_cookie *cookie = (file_cookie *)_cookie;
TRACE(("pipefs_deselect(vnode = %p)\n", _node));
Inode *inode = (Inode *)_node;
if (!inode)
return B_ERROR;
BenaphoreLocker locker(inode->RequestLock());
return inode->Deselect(event, sync);
return inode->Deselect(event, sync, cookie->open_mode);
}