Now uses the ring_buffer implementation instead of cbuf.

Decreased the pipe buffer to 32768 bytes (it's 4096 on BeOS).
pipefs_select/deselect() did no locking.
Switched to using the BenaphoreLocker where possible.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@12381 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2005-04-13 22:43:35 +00:00
parent 3e15f83d9e
commit b51d2e941a

View File

@ -9,7 +9,8 @@
#include <util/kernel_cpp.h>
#include <util/DoublyLinkedList.h>
#include <cbuf.h>
#include <util/AutoLock.h>
#include <util/ring_buffer.h>
#include <vfs.h>
#include <vfs_select.h>
#include <select_sync_pool.h>
@ -37,7 +38,7 @@
#define PIPEFS_HASH_SIZE 16
#define PIPEFS_MAX_BUFFER_SIZE 65536
#define PIPEFS_MAX_BUFFER_SIZE 32768
namespace pipefs {
@ -46,6 +47,23 @@ class Volume;
class Inode;
struct dir_cookie;
class RingBuffer {
public:
RingBuffer();
~RingBuffer();
size_t Write(const void *buffer, size_t length);
size_t Read(void *buffer, size_t length);
ssize_t UserWrite(const void *buffer, ssize_t length);
ssize_t UserRead(void *buffer, ssize_t length);
size_t Readable() const;
size_t Writable() const;
private:
struct ring_buffer *fBuffer;
};
class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
public:
ReadRequest(void *buffer, size_t bufferSize);
@ -55,9 +73,9 @@ class ReadRequest : public DoublyLinkedListLinkImpl<ReadRequest> {
void Notify();
void Abort();
status_t PutBufferChain(cbuf *bufferChain, size_t *_bytesRead = NULL,
status_t PutBuffer(RingBuffer &buffer, size_t *_bytesRead = NULL,
bool releasePartiallyFilled = true);
status_t PutBuffer(const void **_buffer, size_t *_bufferSize);
status_t PutData(const void **_data, size_t *_dataSize);
size_t SpaceLeft() const { return fBufferSize - fBytesRead; }
size_t BytesRead() const { return fBytesRead; }
@ -147,9 +165,8 @@ class Inode {
benaphore *RequestLock() { return &fRequestLock; }
RequestList &Requests() { return fRequests; }
status_t WriteBufferToChain(const void **_buffer, size_t *_bytesLeft, bool nonBlocking);
// status_t ReadBufferFromChain(void *buffer, size_t *_bufferSize);
size_t BytesInChain() const { return cbuf_get_length(fBufferChain); }
status_t WriteDataToBuffer(const void **_data, size_t *_bytesLeft, bool nonBlocking);
size_t BytesAvailable() const { return fBuffer.Readable(); }
void MayReleaseWriter();
void FillPendingRequests(const void **_buffer, size_t *_bytesLeft);
@ -180,7 +197,7 @@ class Inode {
time_t fCreationTime;
time_t fModificationTime;
cbuf *fBufferChain;
RingBuffer fBuffer;
RequestList fRequests;
@ -217,6 +234,63 @@ struct file_cookie {
//---------------------
RingBuffer::RingBuffer()
{
fBuffer = create_ring_buffer(PIPEFS_MAX_BUFFER_SIZE);
}
RingBuffer::~RingBuffer()
{
delete_ring_buffer(fBuffer);
}
inline size_t
RingBuffer::Write(const void *buffer, size_t length)
{
return ring_buffer_write(fBuffer, (const uint8 *)buffer, length);
}
inline size_t
RingBuffer::Read(void *buffer, size_t length)
{
return ring_buffer_read(fBuffer, (uint8 *)buffer, length);
}
inline ssize_t
RingBuffer::UserWrite(const void *buffer, ssize_t length)
{
return ring_buffer_user_write(fBuffer, (const uint8 *)buffer, length);
}
inline ssize_t
RingBuffer::UserRead(void *buffer, ssize_t length)
{
return ring_buffer_user_read(fBuffer, (uint8 *)buffer, length);
}
inline size_t
RingBuffer::Readable() const
{
return ring_buffer_readable(fBuffer);
}
inline size_t
RingBuffer::Writable() const
{
return ring_buffer_writable(fBuffer);
}
// #pragma mark -
Volume::Volume(mount_id id)
:
fID(id),
@ -459,7 +533,6 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type)
:
fNext(NULL),
fHashNext(NULL),
fBufferChain(NULL),
fReaderCount(0),
fWriterCount(0),
fSelectSyncPool(NULL)
@ -505,22 +578,21 @@ Inode::InitCheck()
}
/** Adds the specified buffer to the inode's buffer chain by copying
* its contents. The Inode::WriteLock() must be held when calling
* this method.
/** Writes the specified data bytes to the inode's ring buffer. The
* Inode::WriteLock() must be held when calling this method.
* Releases the reader sem if necessary, so that blocking
* readers will get started.
* Returns B_NO_MEMORY if the chain couldn't be allocated, B_BAD_ADDRESS
* if copying from the buffer failed, and B_OK on success.
* Returns B_OK for success, B_BAD_ADDRESS if copying from the buffer failed,
* and various semaphore errors (like B_WOULD_BLOCK in non-blocking mode).
*/
status_t
Inode::WriteBufferToChain(const void **_buffer, size_t *_bytesLeft, bool nonBlocking)
Inode::WriteDataToBuffer(const void **_data, size_t *_bytesLeft, bool nonBlocking)
{
const void *buffer = *_buffer;
size_t bufferSize = *_bytesLeft;
const void *data = *_data;
size_t dataSize = *_bytesLeft;
TRACE(("Inode::WriteBufferToChain(buffer = %p, bytes = %lu)\n", buffer, bufferSize));
TRACE(("Inode::WriteDataToBuffer(data = %p, bytes = %lu)\n", data, dataSize));
// if this is a blocking call, we need to make sure that data can
// still come in and we don't block the whole inode data transfer
@ -541,29 +613,17 @@ Inode::WriteBufferToChain(const void **_buffer, size_t *_bytesLeft, bool nonBloc
// ensure that we don't write more than PIPEFS_MAX_BUFFER_SIZE
// into a pipe without blocking
size_t inChain = 0;
if (fBufferChain != NULL)
inChain = cbuf_get_length(fBufferChain);
size_t spaceLeft = fBuffer.Writable();
if (dataSize > spaceLeft)
dataSize = spaceLeft;
if (bufferSize + inChain > PIPEFS_MAX_BUFFER_SIZE)
bufferSize = PIPEFS_MAX_BUFFER_SIZE - inChain;
cbuf *chain = cbuf_get_chain(bufferSize);
if (chain == NULL) {
release_sem(fWriteLock);
return B_NO_MEMORY;
}
if (cbuf_user_memcpy_to_chain(chain, 0, buffer, bufferSize) < B_OK) {
if (fBuffer.UserWrite(data, dataSize) < B_OK) {
release_sem(fWriteLock);
return B_BAD_ADDRESS;
}
// join this chain with our already existing chain (if any)
fBufferChain = cbuf_merge_chains(fBufferChain, chain);
*_buffer = (const void *)((addr_t)buffer + bufferSize);
*_bytesLeft -= bufferSize;
*_data = (const void *)((addr_t)data + dataSize);
*_bytesLeft -= dataSize;
MayReleaseWriter();
@ -574,25 +634,25 @@ Inode::WriteBufferToChain(const void **_buffer, size_t *_bytesLeft, bool nonBloc
void
Inode::MayReleaseWriter()
{
if (BytesInChain() < PIPEFS_MAX_BUFFER_SIZE) {
if (fBuffer.Writable() > 0) {
if (fSelectSyncPool)
notify_select_event_pool(fSelectSyncPool, B_SELECT_WRITE);
release_sem(fWriteLock);
}
}
/** This functions fills pending requests out of the buffer chain.
/** This functions fills pending requests out of the ring buffer.
* It either stops when there are no more requests to satisfy, or
* the buffer chain is empty, whatever comes first.
* the ring buffer is empty, whatever comes first.
* You must hold the request lock when calling this function.
*/
void
Inode::FillPendingRequests()
{
size_t bytesLeft = cbuf_get_length(fBufferChain);
size_t bytesLeft = fBuffer.Readable();
TRACE(("Inode::FillPendingRequests(): bytesLeft = %lu\n", bytesLeft));
@ -619,19 +679,19 @@ Inode::FillPendingRequests()
/** This function feeds the pending read requests using the provided
* buffer directly. It will also make sure that bytes in the buffer
* chain are served first.
* buffer directly. It will also make sure that bytes in the ring
* buffer are served first.
* It only does something as long as the first request in the queue
* shares the same team context as the caller (write context).
* You must hold the request lock when calling this function.
*/
void
Inode::FillPendingRequests(const void **_buffer, size_t *_bytesLeft)
Inode::FillPendingRequests(const void **_data, size_t *_bytesLeft)
{
team_id team = team_get_current_team_id();
TRACE(("Inode::FillPendingRequests(buffer = %p, bytes = %lu)\n", *_buffer, *_bytesLeft));
TRACE(("Inode::FillPendingRequests(data = %p, bytes = %lu)\n", *_data, *_bytesLeft));
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
@ -640,9 +700,9 @@ Inode::FillPendingRequests(const void **_buffer, size_t *_bytesLeft)
if (request->Team() != team)
break;
// try to fill this request from the buffer chain first
// try to fill this request from the ring buffer first
size_t bytesRead;
if (request->PutBufferChain(fBufferChain, &bytesRead, false) != B_OK)
if (request->PutBuffer(fBuffer, &bytesRead, false) != B_OK)
continue;
MayReleaseWriter();
@ -654,7 +714,7 @@ Inode::FillPendingRequests(const void **_buffer, size_t *_bytesLeft)
// remapping copy on write or a direct copy.
// place our data into that buffer
request->PutBuffer(_buffer, _bytesLeft);
request->PutData(_data, _bytesLeft);
}
}
}
@ -669,17 +729,17 @@ Inode::FillPendingRequests(const void **_buffer, size_t *_bytesLeft)
status_t
Inode::AddRequest(ReadRequest &request)
{
if (benaphore_lock(&fRequestLock) != B_OK)
BenaphoreLocker locker(RequestLock());
if (!locker.IsLocked())
return B_ERROR;
if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK) {
if (BytesAvailable() > 0 && request.PutBuffer(fBuffer) == B_OK) {
MayReleaseWriter();
} else {
fRequests.Add(&request);
request.SetQueued(true);
}
benaphore_unlock(&fRequestLock);
return B_OK;
}
@ -693,19 +753,19 @@ Inode::RemoveRequest(ReadRequest &request)
{
TRACE(("Inode::RemoveRequest(request = %p)\n", &request));
if (benaphore_lock(&fRequestLock) != B_OK)
BenaphoreLocker locker(RequestLock());
if (!locker.IsLocked())
return B_ERROR;
// we might have some data waiting now, if the direct team
// handshake couldn't be done
if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK)
if (BytesAvailable() > 0 && request.PutBuffer(fBuffer) == B_OK)
release_sem(fWriteLock);
if (request.IsQueued())
fRequests.Remove(&request);
benaphore_unlock(&fRequestLock);
return B_OK;
}
@ -731,16 +791,14 @@ Inode::Close(int openMode)
// Our last writer has been closed; if the pipe
// contains no data, unlock all waiting readers
benaphore_lock(&fRequestLock);
BenaphoreLocker locker(RequestLock());
if (cbuf_get_length(fBufferChain) == 0) {
if (BytesAvailable() == 0) {
ReadRequest *request;
RequestList::Iterator iterator = fRequests.GetIterator();
while ((request = iterator.Next()) != NULL)
request->Abort();
}
benaphore_unlock(&fRequestLock);
}
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
@ -787,11 +845,11 @@ Inode::Select(uint8 event, uint32 ref, selectsync *sync)
{
if (add_select_sync_pool_entry(&fSelectSyncPool, sync, ref, event) != B_OK)
return B_ERROR;
if ((event == B_SELECT_READ && BytesInChain() > 0)
|| (event == B_SELECT_WRITE && BytesInChain() < PIPEFS_MAX_BUFFER_SIZE))
if ((event == B_SELECT_READ && BytesAvailable() > 0)
|| (event == B_SELECT_WRITE && fBuffer.Writable() > 0))
return notify_select_event(sync, ref, event);
return B_OK;
}
@ -856,16 +914,16 @@ ReadRequest::SetQueued(bool queued)
}
/** Reads the contents of the buffer chain to the specified
* buffer, if any.
/** Reads the contents of the ring buffer into the requests data buffer
* if any (canceled requests don't have a buffer anymore).
* This function must only be called in the team context that initiated
* the read request.
*/
status_t
ReadRequest::PutBufferChain(cbuf *bufferChain, size_t *_bytesRead, bool releasePartiallyFilled)
ReadRequest::PutBuffer(RingBuffer &buffer, size_t *_bytesRead, bool releasePartiallyFilled)
{
TRACE(("pipefs: ReadRequest::PutBufferChain()\n"));
TRACE(("pipefs: ReadRequest::PutBuffer()\n"));
if (_bytesRead)
*_bytesRead = 0;
@ -873,31 +931,23 @@ ReadRequest::PutBufferChain(cbuf *bufferChain, size_t *_bytesRead, bool releaseP
if (fBuffer == NULL)
return B_CANCELED;
if (bufferChain == NULL)
return B_OK;
size_t length = cbuf_get_length(bufferChain);
size_t length = buffer.Readable();
size_t spaceLeft = SpaceLeft();
// we read spaceLeft bytes at maximum - but never
// more than there are in the chain
// more than there are in the buffer
if (spaceLeft < length)
length = spaceLeft;
if (length == 0)
return B_OK;
if (cbuf_user_memcpy_from_chain(fBuffer, bufferChain, 0, length) < B_OK) {
if (buffer.UserRead(fBuffer, length) < B_OK) {
// if the buffer is just invalid, we release the reader as well
release_sem(fLock);
return B_BAD_ADDRESS;
}
if (cbuf_truncate_head(bufferChain, length) < B_OK) {
// if that call fails, the next read will duplicate the input
dprintf("pipefs: cbuf_truncate_head() failed for cbuf %p\n", bufferChain);
}
fBytesRead += length;
if (_bytesRead)
*_bytesRead = length;
@ -915,9 +965,9 @@ ReadRequest::PutBufferChain(cbuf *bufferChain, size_t *_bytesRead, bool releaseP
*/
status_t
ReadRequest::PutBuffer(const void **_buffer, size_t *_bytesLeft)
ReadRequest::PutData(const void **_buffer, size_t *_bytesLeft)
{
TRACE(("pipefs: ReadRequest::PutBuffer(buffer = %p, size = %lu)\n", *_buffer, *_bytesLeft));
TRACE(("pipefs: ReadRequest::PutData(buffer = %p, size = %lu)\n", *_buffer, *_bytesLeft));
size_t bytes = *_bytesLeft;
if (bytes > SpaceLeft())
@ -1227,15 +1277,11 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/,
return B_NOT_ALLOWED;
if (inode->WriterCount() == 0) {
uint32 available;
benaphore_lock(inode->RequestLock());
available = inode->BytesInChain();
benaphore_unlock(inode->RequestLock());
BenaphoreLocker locker(inode->RequestLock());
// as long there is no writer, and the pipe is empty,
// we always just return 0 to indicate end of file
if (available == 0) {
if (inode->BytesAvailable() == 0) {
*_length = 0;
return B_OK;
}
@ -1281,7 +1327,7 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/
return EPIPE;
}
benaphore_lock(inode->RequestLock());
BenaphoreLocker locker(inode->RequestLock());
size_t bytesLeft = *_length;
inode->FillPendingRequests(&buffer, &bytesLeft);
@ -1292,7 +1338,7 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/
// we could not place all our data in pending requests, so
// have to put them into a temporary buffer
status = inode->WriteBufferToChain(&buffer, &bytesLeft, (cookie->open_mode & O_NONBLOCK) != 0);
status = inode->WriteDataToBuffer(&buffer, &bytesLeft, (cookie->open_mode & O_NONBLOCK) != 0);
if (status == B_OK) {
inode->FillPendingRequests();
*_length -= bytesLeft;
@ -1302,8 +1348,6 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/
status = B_OK;
}
benaphore_unlock(inode->RequestLock());
return status;
}
@ -1495,7 +1539,8 @@ pipefs_select(fs_volume _fs, fs_vnode _node, fs_cookie _cookie, uint8 event,
Inode *inode = (Inode *)_node;
if (!inode)
return B_ERROR;
BenaphoreLocker locker(inode->RequestLock());
return inode->Select(event, ref, sync);
}
@ -1509,6 +1554,7 @@ pipefs_deselect(fs_volume _fs, fs_vnode _node, fs_cookie _cookie, uint8 event,
if (!inode)
return B_ERROR;
BenaphoreLocker locker(inode->RequestLock());
return inode->Deselect(event, sync);
}
@ -1557,10 +1603,9 @@ pipefs_read_stat(fs_volume _volume, fs_vnode _node, struct stat *stat)
stat->st_dev = volume->ID();
stat->st_ino = inode->ID();
benaphore_lock(inode->RequestLock());
stat->st_size = inode->BytesInChain();
benaphore_unlock(inode->RequestLock());
BenaphoreLocker locker(inode->RequestLock());
stat->st_size = inode->BytesAvailable();
stat->st_mode = inode->Type();
stat->st_nlink = 1;