Fixed the pipefs. It now should work without any problems.

I don't know which part of me has written the previous version, but
it seems not have been supervised by a brain while doing it.
The read requests are now maintained per inode (as it has to be),
and there is one read lock per request.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@6891 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2004-03-03 18:57:52 +00:00
parent 3637d4b543
commit 76946b95da

View File

@ -1,5 +1,5 @@
/*
** Copyright 2003, Axel Dörfler, axeld@pinc-software.de. All rights reserved.
** Copyright 2003-2004, Axel Dörfler, axeld@pinc-software.de. All rights reserved.
** Distributed under the terms of the OpenBeOS License.
*/
@ -8,6 +8,7 @@
#include <NodeMonitor.h>
#include <util/kernel_cpp.h>
#include <util/DoublyLinkedList.h>
#include <cbuf.h>
#include <vfs.h>
#include <debug.h>
@ -35,44 +36,41 @@
#endif
#define PIPEFS_HASH_SIZE 16
#define PIPEFS_MAX_BUFFER_SIZE 65536
namespace pipefs {
class Volume;
class Inode;
struct dir_cookie;
struct read_request {
read_request *prev;
read_request *next;
void *buffer;
size_t buffer_size;
size_t bytes_read;
team_id team;
size_t SpaceLeft() const { return buffer_size - bytes_read; }
void Fill(Inode *inode);
status_t PutBuffer(const void **_buffer, size_t *_bufferSize);
};
class ReadRequests {
class ReadRequest {
public:
ReadRequests();
~ReadRequests();
ReadRequest(void *buffer, size_t bufferSize);
~ReadRequest();
void Lock();
void Unlock();
status_t Wait(bool nonBlocking);
status_t PutBufferChain(cbuf *bufferChain, size_t *_bytesRead = NULL,
bool releasePartiallyFilled = true);
status_t PutBuffer(const void **_buffer, size_t *_bufferSize);
status_t Add(read_request &request);
status_t Remove(read_request &request);
size_t SpaceLeft() const { return fBufferSize - fBytesRead; }
size_t BytesRead() const { return fBytesRead; }
team_id Team() const { return fTeam; }
read_request *GetCurrent() const { return fCurrent; }
void SkipCurrent() { if (fCurrent) fCurrent = fCurrent->next; }
DoublyLinked::Link fLink;
private:
mutex fLock;
read_request *fFirst, *fCurrent, *fLast;
sem_id fLock;
team_id fTeam;
void *fBuffer;
size_t fBufferSize;
size_t fBytesRead;
};
class Volume {
public:
Volume(mount_id id);
@ -92,7 +90,6 @@ class Volume {
status_t RemoveNode(Inode *directory, const char *name);
vnode_id GetNextNodeID() { return fNextNodeID++; }
ReadRequests &GetReadRequests() { return fReadRequests; }
Inode *FirstEntry() const { return fFirstEntry; }
@ -106,7 +103,6 @@ class Volume {
status_t RemoveNode(Inode *inode);
status_t InsertNode(Inode *inode);
ReadRequests fReadRequests;
mutex fLock;
mount_id fID;
Inode *fRootNode;
@ -118,11 +114,12 @@ class Volume {
dir_cookie *fFirstDirCookie;
};
class Inode {
public:
Inode(Volume *volume, const char *name, int32 type);
~Inode();
status_t InitCheck();
vnode_id ID() const { return fID; }
const char *Name() const { return fName; }
@ -131,12 +128,18 @@ class Inode {
Inode *Next() const { return fNext; }
void SetNext(Inode *inode) { fNext = inode; }
sem_id ReadLock() { return fReadLock; }
mutex *WriteMutex() { return &fWriteMutex; }
benaphore *RequestLock() { return &fRequestLock; }
DoublyLinked::List<ReadRequest> &Requests() { return fRequests; }
status_t WriteBufferToChain(const void *buffer, size_t bufferSize);
status_t ReadBufferFromChain(void *buffer, size_t *_bufferSize);
off_t BytesInChain() const { return cbuf_get_length(fBufferChain); }
status_t WriteBufferToChain(const void *buffer, size_t *_bufferSize, bool nonBlocking);
// status_t ReadBufferFromChain(void *buffer, size_t *_bufferSize);
size_t BytesInChain() const { return cbuf_get_length(fBufferChain); }
void MayReleaseWriter();
void FillPendingRequests(const void *buffer, size_t *_bytesLeft);
void FillPendingRequests();
status_t AddRequest(ReadRequest &request);
status_t RemoveRequest(ReadRequest &request);
static int32 HashNextOffset();
static uint32 hash_func(void *_node, const void *_key, uint32 range);
@ -151,8 +154,12 @@ class Inode {
cbuf *fBufferChain;
DoublyLinked::List<ReadRequest> fRequests;
DoublyLinked::List<ReadRequest> fDoneRequests;
benaphore fRequestLock;
sem_id fReadLock;
mutex fWriteMutex;
sem_id fWriteLock;
};
@ -167,7 +174,7 @@ struct file_cookie {
};
#define PIPEFS_HASH_SIZE 16
//---------------------
Volume::Volume(mount_id id)
@ -417,7 +424,8 @@ Inode::Inode(Volume *volume, const char *name, int32 type)
if (type == S_IFIFO) {
fReadLock = create_sem(0, "pipe read");
mutex_init(&fWriteMutex, "pipe write");
fWriteLock = create_sem(1, "pipe write");
benaphore_init(&fRequestLock, "pipe request");
}
}
@ -428,7 +436,8 @@ Inode::~Inode()
if (fType == S_IFIFO) {
delete_sem(fReadLock);
mutex_destroy(&fWriteMutex);
delete_sem(fWriteLock);
benaphore_destroy(&fRequestLock);
}
}
@ -437,7 +446,7 @@ status_t
Inode::InitCheck()
{
if (fName == NULL
|| fType == S_IFIFO && (fReadLock < B_OK || fWriteMutex.sem < B_OK))
|| fType == S_IFIFO && (fRequestLock.sem < B_OK || fWriteLock < B_OK))
return B_ERROR;
return B_OK;
@ -454,64 +463,150 @@ Inode::InitCheck()
*/
status_t
Inode::WriteBufferToChain(const void *buffer, size_t bufferSize)
Inode::WriteBufferToChain(const void *buffer, size_t *_bufferSize, bool nonBlocking)
{
size_t bufferSize = *_bufferSize;
status_t status = acquire_sem_etc(fWriteLock, 1,
( nonBlocking ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0);
if (status != B_OK)
return status;
// ensure that we don't write more than PIPEFS_MAX_BUFFER_SIZE
// into a pipe without blocking
size_t inChain = 0;
if (fBufferChain != NULL)
inChain = cbuf_get_length(fBufferChain);
if (bufferSize + inChain > PIPEFS_MAX_BUFFER_SIZE)
bufferSize = PIPEFS_MAX_BUFFER_SIZE - inChain;
cbuf *chain = cbuf_get_chain(bufferSize);
if (chain == NULL)
if (chain == NULL) {
release_sem(fWriteLock);
return B_NO_MEMORY;
if (cbuf_user_memcpy_to_chain(chain, 0, buffer, bufferSize) < B_OK)
}
if (cbuf_user_memcpy_to_chain(chain, 0, buffer, bufferSize) < B_OK) {
release_sem(fWriteLock);
return B_BAD_ADDRESS;
}
// join this chain with our already existing chain (if any)
chain = cbuf_merge_chains(fBufferChain, chain);
// let waiting readers go on
if (fBufferChain == NULL)
release_sem(ReadLock());
// if (fBufferChain == NULL)
// release_sem(ReadLock());
fBufferChain = chain;
*_bufferSize = bufferSize;
MayReleaseWriter();
return B_OK;
}
/** Reads the contents of the buffer chain to the specified
* buffer, if any.
* Unblocks other waiting readers if there would be still
* some bytes left in the stream.
void
Inode::MayReleaseWriter()
{
if (BytesInChain() < PIPEFS_MAX_BUFFER_SIZE)
release_sem(fWriteLock);
}
void
Inode::FillPendingRequests()
{
size_t bytesLeft = cbuf_get_length(fBufferChain);
ReadRequest *request;
DoublyLinked::Iterator<ReadRequest> iterator(fRequests);
while (bytesLeft != 0 && (request = iterator.Next()) != NULL) {
// try to fill this request
size_t bytesRead;
if (request->PutBufferChain(fBufferChain, &bytesRead, true) == B_OK) {
bytesLeft -= bytesRead;
MayReleaseWriter();
}
}
}
void
Inode::FillPendingRequests(const void *buffer, size_t *_bytesLeft)
{
team_id team = team_get_current_team_id();
ReadRequest *request;
DoublyLinked::Iterator<ReadRequest> iterator(fRequests);
while (*_bytesLeft != 0 && (request = iterator.Next()) != NULL) {
// try to fill this request
size_t bytesRead;
if (request->PutBufferChain(fBufferChain, &bytesRead, false) != B_OK)
continue;
MayReleaseWriter();
if (request->SpaceLeft() > 0
&& (team == B_SYSTEM_TEAM || request->Team() == team)) {
// ToDo: This is something where we can optimize the buffer
// hand-shaking considerably: we should have a function
// that copies the data to another address space - either
// remapping copy on write or a direct copy.
// place our data into that buffer
request->PutBuffer(&buffer, _bytesLeft);
}
}
}
/** This function adds a request into the queue.
* If there is already data waiting in the pipe, the request will
* be fulfilled.
* This function is called from within the readers thread only.
*/
status_t
Inode::ReadBufferFromChain(void *buffer, size_t *_bufferSize)
status_t
Inode::AddRequest(ReadRequest &request)
{
if (fBufferChain == NULL) {
*_bufferSize = 0;
return B_OK;
}
if (benaphore_lock(&fRequestLock) != B_OK)
return B_ERROR;
size_t length = BytesInChain();
if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK) {
fDoneRequests.Add(&request);
MayReleaseWriter();
} else
fRequests.Add(&request);
// we read *_bufferSize bytes at maximum - but never
// more than there are in the chain
if (*_bufferSize > length)
*_bufferSize = length;
else {
length = *_bufferSize;
benaphore_unlock(&fRequestLock);
return B_OK;
}
// we unlock another reader here, so that it can read
// the rest of the pending bytes
release_sem(ReadLock());
}
if (cbuf_user_memcpy_from_chain(buffer, fBufferChain, 0, length) < B_OK)
return B_BAD_ADDRESS;
/** This function removes a request from the queue.
* This function is called from within the readers thread only.
*/
if (cbuf_truncate_head(fBufferChain, length) < B_OK) {
// if that call fails, the next read will duplicate the input
dprintf("pipefs: cbuf_truncate_head() failed for inode %Ld\n", ID());
}
status_t
Inode::RemoveRequest(ReadRequest &request)
{
if (benaphore_lock(&fRequestLock) != B_OK)
return B_ERROR;
// we might have some data waiting now, if the direct team
// handshake couldn't be done
if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK)
release_sem(fWriteLock);
DoublyLinked::List<ReadRequest>::Remove(&request);
benaphore_unlock(&fRequestLock);
return B_OK;
}
@ -553,16 +648,75 @@ Inode::compare_func(void *_node, const void *_key)
// #pragma mark -
void
read_request::Fill(Inode *inode)
ReadRequest::ReadRequest(void *buffer, size_t bufferSize)
:
fBuffer(buffer),
fBufferSize(bufferSize),
fBytesRead(0)
{
size_t length = SpaceLeft();
fLock = create_sem(0, "request lock");
fTeam = team_get_current_team_id();
}
if (inode->ReadBufferFromChain(buffer, &length) < B_OK) {
// ToDo: should add status to read_request
dprintf("reading from chain failed!");
} else
bytes_read += length;
ReadRequest::~ReadRequest()
{
delete_sem(fLock);
}
status_t
ReadRequest::Wait(bool nonBlocking)
{
TRACE(("pipefs: request@%p waits for data (%sblocking), thread 0x%lx\n", this, nonBlocking ? "non" : "", find_thread(NULL)));
return acquire_sem_etc(fLock, 1, ( nonBlocking ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0);
}
/** Reads the contents of the buffer chain to the specified
* buffer, if any.
*/
status_t
ReadRequest::PutBufferChain(cbuf *bufferChain, size_t *_bytesRead, bool releasePartiallyFilled)
{
TRACE(("pipefs: ReadRequest::PutBufferChain()\n"));
if (_bytesRead)
*_bytesRead = 0;
if (bufferChain == NULL)
return B_OK;
size_t length = cbuf_get_length(bufferChain);
size_t spaceLeft = SpaceLeft();
// we read spaceLeft bytes at maximum - but never
// more than there are in the chain
if (spaceLeft < length)
length = spaceLeft;
if (length == 0)
return B_OK;
if (cbuf_user_memcpy_from_chain(fBuffer, bufferChain, 0, length) < B_OK) {
// if the buffer is just invalid, we release the reader as well
release_sem(fLock);
return B_BAD_ADDRESS;
}
if (cbuf_truncate_head(bufferChain, length) < B_OK) {
// if that call fails, the next read will duplicate the input
dprintf("pipefs: cbuf_truncate_head() failed for cbuf %p\n", bufferChain);
}
fBytesRead += length;
if (_bytesRead)
*_bytesRead = length;
if (releasePartiallyFilled || SpaceLeft() == 0)
release_sem(fLock);
return B_OK;
}
@ -571,9 +725,9 @@ read_request::Fill(Inode *inode)
*/
status_t
read_request::PutBuffer(const void **_buffer, size_t *_bufferSize)
ReadRequest::PutBuffer(const void **_buffer, size_t *_bufferSize)
{
TRACE(("pipefs: read_request::PutUserBuffer(buffer = %p, size = %lu)\n", *_buffer, *_bufferSize));
TRACE(("pipefs: ReadRequest::PutBuffer(buffer = %p, size = %lu)\n", *_buffer, *_bufferSize));
size_t bytes = *_bufferSize;
if (bytes > SpaceLeft())
@ -581,89 +735,15 @@ read_request::PutBuffer(const void **_buffer, size_t *_bufferSize)
uint8 *source = (uint8 *)*_buffer;
if (user_memcpy((uint8 *)buffer + bytes_read, source, bytes) < B_OK)
if (user_memcpy((uint8 *)fBuffer + fBytesRead, source, bytes) < B_OK) {
release_sem(fLock);
return B_BAD_ADDRESS;
}
bytes_read += bytes;
fBytesRead += bytes;
*_buffer = (void *)(source + bytes);
*_bufferSize -= bytes;
return B_OK;
}
// #pragma mark -
ReadRequests::ReadRequests()
:
fFirst(NULL),
fCurrent(NULL),
fLast(NULL)
{
mutex_init(&fLock, "pipefs read requests");
}
ReadRequests::~ReadRequests()
{
mutex_destroy(&fLock);
}
void
ReadRequests::Lock()
{
mutex_lock(&fLock);
}
void
ReadRequests::Unlock()
{
mutex_unlock(&fLock);
}
status_t
ReadRequests::Add(read_request &request)
{
if (fLast != NULL)
fLast->next = &request;
if (fFirst == NULL)
fFirst = &request;
// ToDo: could directly skip full requests
if (fCurrent == NULL)
fCurrent = &request;
request.prev = fLast;
request.next = NULL;
fLast = &request;
return B_OK;
}
status_t
ReadRequests::Remove(read_request &request)
{
if (request.next != NULL)
request.next->prev = request.prev;
if (request.prev != NULL)
request.prev->next = request.next;
// update pointers
if (fCurrent == &request)
fCurrent = fCurrent->next;
if (fLast == &request)
fLast = request.prev;
if (fFirst == &request)
fFirst = request.next;
release_sem(fLock);
return B_OK;
}
@ -922,124 +1002,72 @@ pipefs_fsync(fs_volume _volume, fs_vnode _v)
static ssize_t
pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t pos,
pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/,
void *buffer, size_t *_length)
{
file_cookie *cookie = (file_cookie *)_cookie;
Volume *volume = (Volume *)_volume;
(void)pos;
TRACE(("pipefs_read: vnode %p, cookie %p, pos 0x%Lx , len 0x%lx\n", _node, cookie, pos, *_length));
TRACE(("pipefs_read: vnode %p, cookie %p, len 0x%lx, mode = %d\n", _node, cookie, *_length, cookie->open_mode));
if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
return B_NOT_ALLOWED;
// issue read request
read_request request;
request.buffer = buffer;
request.buffer_size = *_length;
request.bytes_read = 0;
request.team = team_get_current_team_id();
ReadRequest request(buffer, *_length);
ReadRequests &requests = volume->GetReadRequests();
requests.Lock();
requests.Add(request);
requests.Unlock();
// ToDo: here is the race condition that another reader issues
// its read request after this one, but locks earlier; this
// will currently lead to a dead-lock or failure - that could
// be solved by attaching a thread to the request
Inode *inode = (Inode *)_node;
inode->AddRequest(request);
// wait for it to be filled
Inode *inode = (Inode *)_node;
status_t status = acquire_sem_etc(inode->ReadLock(), 1,
(cookie->open_mode & O_NONBLOCK ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0);
status_t status = request.Wait((cookie->open_mode & O_NONBLOCK) != 0);
inode->RemoveRequest(request);
requests.Lock();
if (status == B_OK)
request.Fill(inode);
requests.Remove(request);
requests.Unlock();
if (status == B_TIMED_OUT || B_INTERRUPTED && request.bytes_read > 0)
if (status == B_TIMED_OUT || B_INTERRUPTED && request.BytesRead() > 0)
status = B_OK;
if (status == B_OK)
*_length = request.bytes_read;
*_length = request.BytesRead();
return status;
}
static ssize_t
pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t pos,
pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/,
const void *buffer, size_t *_length)
{
file_cookie *cookie = (file_cookie *)_cookie;
Volume *volume = (Volume *)_volume;
Inode *inode = (Inode *)_node;
team_id team = team_get_current_team_id();
TRACE(("pipefs_write: vnode %p, cookie %p, pos 0x%Lx , len 0x%lx\n", _node, cookie, pos, *_length));
TRACE(("pipefs_write: vnode %p, cookie %p, len 0x%lx\n", _node, cookie, *_length));
if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
return B_NOT_ALLOWED;
mutex_lock(inode->WriteMutex());
ReadRequests &requests = volume->GetReadRequests();
requests.Lock();
benaphore_lock(inode->RequestLock());
size_t bytesLeft = *_length;
read_request *request = NULL;
do {
request = requests.GetCurrent();
if (request != NULL) {
// fill this request
request->Fill(inode);
if (request->SpaceLeft() > 0
&& (team == B_SYSTEM_TEAM || request->team == team)) {
// ToDo: This is something where we can optimize the buffer
// hand-shaking considerably: we should have a function
// that copies the data to another address space - either
// remapping copy on write or a direct copy.
// place our data into that buffer
request->PutBuffer(&buffer, &bytesLeft);
if (bytesLeft == 0) {
release_sem(inode->ReadLock());
break;
}
}
requests.SkipCurrent();
}
} while (request != NULL);
inode->FillPendingRequests(buffer, &bytesLeft);
status_t status;
if (request == NULL) {
// there is no read matching request pending, so we have to put
// our data in a temporary buffer
if (bytesLeft != 0) {
// we could not place all our data in pending requests, so
// have to put them into a temporary buffer
status = inode->WriteBufferToChain(buffer, bytesLeft);
if (status != B_OK)
status = inode->WriteBufferToChain(buffer, &bytesLeft, (cookie->open_mode & O_NONBLOCK) != 0);
if (status == B_OK) {
inode->FillPendingRequests();
*_length -= bytesLeft;
}
} else {
// could write everything without the need to copy!
status = B_OK;
}
requests.Unlock();
mutex_unlock(inode->WriteMutex());
benaphore_unlock(inode->RequestLock());
return status;
}