diff --git a/src/kernel/core/fs/pipefs.cpp b/src/kernel/core/fs/pipefs.cpp index 88443aa462..5f8b41d91c 100644 --- a/src/kernel/core/fs/pipefs.cpp +++ b/src/kernel/core/fs/pipefs.cpp @@ -1,5 +1,5 @@ /* -** Copyright 2003, Axel Dörfler, axeld@pinc-software.de. All rights reserved. +** Copyright 2003-2004, Axel Dörfler, axeld@pinc-software.de. All rights reserved. ** Distributed under the terms of the OpenBeOS License. */ @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -35,44 +36,41 @@ #endif +#define PIPEFS_HASH_SIZE 16 +#define PIPEFS_MAX_BUFFER_SIZE 65536 + + namespace pipefs { class Volume; class Inode; struct dir_cookie; -struct read_request { - read_request *prev; - read_request *next; - void *buffer; - size_t buffer_size; - size_t bytes_read; - team_id team; - - size_t SpaceLeft() const { return buffer_size - bytes_read; } - void Fill(Inode *inode); - status_t PutBuffer(const void **_buffer, size_t *_bufferSize); -}; - -class ReadRequests { +class ReadRequest { public: - ReadRequests(); - ~ReadRequests(); + ReadRequest(void *buffer, size_t bufferSize); + ~ReadRequest(); - void Lock(); - void Unlock(); + status_t Wait(bool nonBlocking); + status_t PutBufferChain(cbuf *bufferChain, size_t *_bytesRead = NULL, + bool releasePartiallyFilled = true); + status_t PutBuffer(const void **_buffer, size_t *_bufferSize); - status_t Add(read_request &request); - status_t Remove(read_request &request); + size_t SpaceLeft() const { return fBufferSize - fBytesRead; } + size_t BytesRead() const { return fBytesRead; } + team_id Team() const { return fTeam; } - read_request *GetCurrent() const { return fCurrent; } - void SkipCurrent() { if (fCurrent) fCurrent = fCurrent->next; } + DoublyLinked::Link fLink; private: - mutex fLock; - read_request *fFirst, *fCurrent, *fLast; + sem_id fLock; + team_id fTeam; + void *fBuffer; + size_t fBufferSize; + size_t fBytesRead; }; + class Volume { public: Volume(mount_id id); @@ -92,7 +90,6 @@ class Volume { status_t RemoveNode(Inode *directory, const char *name); vnode_id GetNextNodeID() { return fNextNodeID++; } - ReadRequests &GetReadRequests() { return fReadRequests; } Inode *FirstEntry() const { return fFirstEntry; } @@ -106,7 +103,6 @@ class Volume { status_t RemoveNode(Inode *inode); status_t InsertNode(Inode *inode); - ReadRequests fReadRequests; mutex fLock; mount_id fID; Inode *fRootNode; @@ -118,11 +114,12 @@ class Volume { dir_cookie *fFirstDirCookie; }; + class Inode { public: Inode(Volume *volume, const char *name, int32 type); ~Inode(); - + status_t InitCheck(); vnode_id ID() const { return fID; } const char *Name() const { return fName; } @@ -131,12 +128,18 @@ class Inode { Inode *Next() const { return fNext; } void SetNext(Inode *inode) { fNext = inode; } - sem_id ReadLock() { return fReadLock; } - mutex *WriteMutex() { return &fWriteMutex; } + benaphore *RequestLock() { return &fRequestLock; } + DoublyLinked::List &Requests() { return fRequests; } - status_t WriteBufferToChain(const void *buffer, size_t bufferSize); - status_t ReadBufferFromChain(void *buffer, size_t *_bufferSize); - off_t BytesInChain() const { return cbuf_get_length(fBufferChain); } + status_t WriteBufferToChain(const void *buffer, size_t *_bufferSize, bool nonBlocking); +// status_t ReadBufferFromChain(void *buffer, size_t *_bufferSize); + size_t BytesInChain() const { return cbuf_get_length(fBufferChain); } + + void MayReleaseWriter(); + void FillPendingRequests(const void *buffer, size_t *_bytesLeft); + void FillPendingRequests(); + status_t AddRequest(ReadRequest &request); + status_t RemoveRequest(ReadRequest &request); static int32 HashNextOffset(); static uint32 hash_func(void *_node, const void *_key, uint32 range); @@ -151,8 +154,12 @@ class Inode { cbuf *fBufferChain; + DoublyLinked::List fRequests; + DoublyLinked::List fDoneRequests; + + benaphore fRequestLock; sem_id fReadLock; - mutex fWriteMutex; + sem_id fWriteLock; }; @@ -167,7 +174,7 @@ struct file_cookie { }; -#define PIPEFS_HASH_SIZE 16 +//--------------------- Volume::Volume(mount_id id) @@ -417,7 +424,8 @@ Inode::Inode(Volume *volume, const char *name, int32 type) if (type == S_IFIFO) { fReadLock = create_sem(0, "pipe read"); - mutex_init(&fWriteMutex, "pipe write"); + fWriteLock = create_sem(1, "pipe write"); + benaphore_init(&fRequestLock, "pipe request"); } } @@ -428,7 +436,8 @@ Inode::~Inode() if (fType == S_IFIFO) { delete_sem(fReadLock); - mutex_destroy(&fWriteMutex); + delete_sem(fWriteLock); + benaphore_destroy(&fRequestLock); } } @@ -437,7 +446,7 @@ status_t Inode::InitCheck() { if (fName == NULL - || fType == S_IFIFO && (fReadLock < B_OK || fWriteMutex.sem < B_OK)) + || fType == S_IFIFO && (fRequestLock.sem < B_OK || fWriteLock < B_OK)) return B_ERROR; return B_OK; @@ -454,64 +463,150 @@ Inode::InitCheck() */ status_t -Inode::WriteBufferToChain(const void *buffer, size_t bufferSize) +Inode::WriteBufferToChain(const void *buffer, size_t *_bufferSize, bool nonBlocking) { + size_t bufferSize = *_bufferSize; + + status_t status = acquire_sem_etc(fWriteLock, 1, + ( nonBlocking ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0); + if (status != B_OK) + return status; + + // ensure that we don't write more than PIPEFS_MAX_BUFFER_SIZE + // into a pipe without blocking + + size_t inChain = 0; + if (fBufferChain != NULL) + inChain = cbuf_get_length(fBufferChain); + + if (bufferSize + inChain > PIPEFS_MAX_BUFFER_SIZE) + bufferSize = PIPEFS_MAX_BUFFER_SIZE - inChain; + cbuf *chain = cbuf_get_chain(bufferSize); - if (chain == NULL) + if (chain == NULL) { + release_sem(fWriteLock); return B_NO_MEMORY; - - if (cbuf_user_memcpy_to_chain(chain, 0, buffer, bufferSize) < B_OK) + } + + if (cbuf_user_memcpy_to_chain(chain, 0, buffer, bufferSize) < B_OK) { + release_sem(fWriteLock); return B_BAD_ADDRESS; + } // join this chain with our already existing chain (if any) chain = cbuf_merge_chains(fBufferChain, chain); // let waiting readers go on - if (fBufferChain == NULL) - release_sem(ReadLock()); +// if (fBufferChain == NULL) +// release_sem(ReadLock()); fBufferChain = chain; + *_bufferSize = bufferSize; + + MayReleaseWriter(); + return B_OK; } -/** Reads the contents of the buffer chain to the specified - * buffer, if any. - * Unblocks other waiting readers if there would be still - * some bytes left in the stream. +void +Inode::MayReleaseWriter() +{ + if (BytesInChain() < PIPEFS_MAX_BUFFER_SIZE) + release_sem(fWriteLock); +} + + +void +Inode::FillPendingRequests() +{ + size_t bytesLeft = cbuf_get_length(fBufferChain); + + ReadRequest *request; + DoublyLinked::Iterator iterator(fRequests); + while (bytesLeft != 0 && (request = iterator.Next()) != NULL) { + // try to fill this request + size_t bytesRead; + if (request->PutBufferChain(fBufferChain, &bytesRead, true) == B_OK) { + bytesLeft -= bytesRead; + MayReleaseWriter(); + } + } +} + + +void +Inode::FillPendingRequests(const void *buffer, size_t *_bytesLeft) +{ + team_id team = team_get_current_team_id(); + + ReadRequest *request; + DoublyLinked::Iterator iterator(fRequests); + while (*_bytesLeft != 0 && (request = iterator.Next()) != NULL) { + // try to fill this request + size_t bytesRead; + if (request->PutBufferChain(fBufferChain, &bytesRead, false) != B_OK) + continue; + + MayReleaseWriter(); + + if (request->SpaceLeft() > 0 + && (team == B_SYSTEM_TEAM || request->Team() == team)) { + // ToDo: This is something where we can optimize the buffer + // hand-shaking considerably: we should have a function + // that copies the data to another address space - either + // remapping copy on write or a direct copy. + + // place our data into that buffer + request->PutBuffer(&buffer, _bytesLeft); + } + } +} + + +/** This function adds a request into the queue. + * If there is already data waiting in the pipe, the request will + * be fulfilled. + * This function is called from within the readers thread only. */ -status_t -Inode::ReadBufferFromChain(void *buffer, size_t *_bufferSize) +status_t +Inode::AddRequest(ReadRequest &request) { - if (fBufferChain == NULL) { - *_bufferSize = 0; - return B_OK; - } + if (benaphore_lock(&fRequestLock) != B_OK) + return B_ERROR; - size_t length = BytesInChain(); + if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK) { + fDoneRequests.Add(&request); + MayReleaseWriter(); + } else + fRequests.Add(&request); - // we read *_bufferSize bytes at maximum - but never - // more than there are in the chain - if (*_bufferSize > length) - *_bufferSize = length; - else { - length = *_bufferSize; + benaphore_unlock(&fRequestLock); + return B_OK; +} - // we unlock another reader here, so that it can read - // the rest of the pending bytes - release_sem(ReadLock()); - } - if (cbuf_user_memcpy_from_chain(buffer, fBufferChain, 0, length) < B_OK) - return B_BAD_ADDRESS; +/** This function removes a request from the queue. + * This function is called from within the readers thread only. + */ - if (cbuf_truncate_head(fBufferChain, length) < B_OK) { - // if that call fails, the next read will duplicate the input - dprintf("pipefs: cbuf_truncate_head() failed for inode %Ld\n", ID()); - } +status_t +Inode::RemoveRequest(ReadRequest &request) +{ + if (benaphore_lock(&fRequestLock) != B_OK) + return B_ERROR; + // we might have some data waiting now, if the direct team + // handshake couldn't be done + + if (BytesInChain() > 0 && request.PutBufferChain(fBufferChain) == B_OK) + release_sem(fWriteLock); + + DoublyLinked::List::Remove(&request); + + benaphore_unlock(&fRequestLock); return B_OK; } @@ -553,16 +648,75 @@ Inode::compare_func(void *_node, const void *_key) // #pragma mark - -void -read_request::Fill(Inode *inode) +ReadRequest::ReadRequest(void *buffer, size_t bufferSize) + : + fBuffer(buffer), + fBufferSize(bufferSize), + fBytesRead(0) { - size_t length = SpaceLeft(); + fLock = create_sem(0, "request lock"); + fTeam = team_get_current_team_id(); +} - if (inode->ReadBufferFromChain(buffer, &length) < B_OK) { - // ToDo: should add status to read_request - dprintf("reading from chain failed!"); - } else - bytes_read += length; + +ReadRequest::~ReadRequest() +{ + delete_sem(fLock); +} + + +status_t +ReadRequest::Wait(bool nonBlocking) +{ + TRACE(("pipefs: request@%p waits for data (%sblocking), thread 0x%lx\n", this, nonBlocking ? "non" : "", find_thread(NULL))); + return acquire_sem_etc(fLock, 1, ( nonBlocking ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0); +} + + +/** Reads the contents of the buffer chain to the specified + * buffer, if any. + */ + +status_t +ReadRequest::PutBufferChain(cbuf *bufferChain, size_t *_bytesRead, bool releasePartiallyFilled) +{ + TRACE(("pipefs: ReadRequest::PutBufferChain()\n")); + + if (_bytesRead) + *_bytesRead = 0; + + if (bufferChain == NULL) + return B_OK; + + size_t length = cbuf_get_length(bufferChain); + size_t spaceLeft = SpaceLeft(); + + // we read spaceLeft bytes at maximum - but never + // more than there are in the chain + if (spaceLeft < length) + length = spaceLeft; + + if (length == 0) + return B_OK; + + if (cbuf_user_memcpy_from_chain(fBuffer, bufferChain, 0, length) < B_OK) { + // if the buffer is just invalid, we release the reader as well + release_sem(fLock); + return B_BAD_ADDRESS; + } + + if (cbuf_truncate_head(bufferChain, length) < B_OK) { + // if that call fails, the next read will duplicate the input + dprintf("pipefs: cbuf_truncate_head() failed for cbuf %p\n", bufferChain); + } + fBytesRead += length; + if (_bytesRead) + *_bytesRead = length; + + if (releasePartiallyFilled || SpaceLeft() == 0) + release_sem(fLock); + + return B_OK; } @@ -571,9 +725,9 @@ read_request::Fill(Inode *inode) */ status_t -read_request::PutBuffer(const void **_buffer, size_t *_bufferSize) +ReadRequest::PutBuffer(const void **_buffer, size_t *_bufferSize) { - TRACE(("pipefs: read_request::PutUserBuffer(buffer = %p, size = %lu)\n", *_buffer, *_bufferSize)); + TRACE(("pipefs: ReadRequest::PutBuffer(buffer = %p, size = %lu)\n", *_buffer, *_bufferSize)); size_t bytes = *_bufferSize; if (bytes > SpaceLeft()) @@ -581,89 +735,15 @@ read_request::PutBuffer(const void **_buffer, size_t *_bufferSize) uint8 *source = (uint8 *)*_buffer; - if (user_memcpy((uint8 *)buffer + bytes_read, source, bytes) < B_OK) + if (user_memcpy((uint8 *)fBuffer + fBytesRead, source, bytes) < B_OK) { + release_sem(fLock); return B_BAD_ADDRESS; + } - bytes_read += bytes; + fBytesRead += bytes; *_buffer = (void *)(source + bytes); *_bufferSize -= bytes; - - return B_OK; -} - - -// #pragma mark - - - -ReadRequests::ReadRequests() - : - fFirst(NULL), - fCurrent(NULL), - fLast(NULL) -{ - mutex_init(&fLock, "pipefs read requests"); -} - - -ReadRequests::~ReadRequests() -{ - mutex_destroy(&fLock); -} - - -void -ReadRequests::Lock() -{ - mutex_lock(&fLock); -} - - -void -ReadRequests::Unlock() -{ - mutex_unlock(&fLock); -} - - -status_t -ReadRequests::Add(read_request &request) -{ - if (fLast != NULL) - fLast->next = &request; - - if (fFirst == NULL) - fFirst = &request; - - // ToDo: could directly skip full requests - if (fCurrent == NULL) - fCurrent = &request; - - request.prev = fLast; - request.next = NULL; - fLast = &request; - - return B_OK; -} - - -status_t -ReadRequests::Remove(read_request &request) -{ - if (request.next != NULL) - request.next->prev = request.prev; - if (request.prev != NULL) - request.prev->next = request.next; - - // update pointers - - if (fCurrent == &request) - fCurrent = fCurrent->next; - - if (fLast == &request) - fLast = request.prev; - - if (fFirst == &request) - fFirst = request.next; + release_sem(fLock); return B_OK; } @@ -922,124 +1002,72 @@ pipefs_fsync(fs_volume _volume, fs_vnode _v) static ssize_t -pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t pos, +pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/, void *buffer, size_t *_length) { file_cookie *cookie = (file_cookie *)_cookie; - Volume *volume = (Volume *)_volume; - (void)pos; - TRACE(("pipefs_read: vnode %p, cookie %p, pos 0x%Lx , len 0x%lx\n", _node, cookie, pos, *_length)); + TRACE(("pipefs_read: vnode %p, cookie %p, len 0x%lx, mode = %d\n", _node, cookie, *_length, cookie->open_mode)); if ((cookie->open_mode & O_RWMASK) != O_RDONLY) return B_NOT_ALLOWED; // issue read request - read_request request; - request.buffer = buffer; - request.buffer_size = *_length; - request.bytes_read = 0; - request.team = team_get_current_team_id(); + ReadRequest request(buffer, *_length); - ReadRequests &requests = volume->GetReadRequests(); - - requests.Lock(); - requests.Add(request); - requests.Unlock(); - - // ToDo: here is the race condition that another reader issues - // its read request after this one, but locks earlier; this - // will currently lead to a dead-lock or failure - that could - // be solved by attaching a thread to the request + Inode *inode = (Inode *)_node; + inode->AddRequest(request); // wait for it to be filled - Inode *inode = (Inode *)_node; - status_t status = acquire_sem_etc(inode->ReadLock(), 1, - (cookie->open_mode & O_NONBLOCK ? B_TIMEOUT : 0 ) | B_CAN_INTERRUPT, 0); + status_t status = request.Wait((cookie->open_mode & O_NONBLOCK) != 0); + inode->RemoveRequest(request); - requests.Lock(); - - if (status == B_OK) - request.Fill(inode); - - requests.Remove(request); - requests.Unlock(); - - if (status == B_TIMED_OUT || B_INTERRUPTED && request.bytes_read > 0) + if (status == B_TIMED_OUT || B_INTERRUPTED && request.BytesRead() > 0) status = B_OK; if (status == B_OK) - *_length = request.bytes_read; + *_length = request.BytesRead(); return status; } static ssize_t -pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t pos, +pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/, const void *buffer, size_t *_length) { file_cookie *cookie = (file_cookie *)_cookie; - Volume *volume = (Volume *)_volume; Inode *inode = (Inode *)_node; - team_id team = team_get_current_team_id(); - TRACE(("pipefs_write: vnode %p, cookie %p, pos 0x%Lx , len 0x%lx\n", _node, cookie, pos, *_length)); + TRACE(("pipefs_write: vnode %p, cookie %p, len 0x%lx\n", _node, cookie, *_length)); if ((cookie->open_mode & O_RWMASK) != O_WRONLY) return B_NOT_ALLOWED; - mutex_lock(inode->WriteMutex()); - - ReadRequests &requests = volume->GetReadRequests(); - requests.Lock(); + benaphore_lock(inode->RequestLock()); size_t bytesLeft = *_length; - read_request *request = NULL; - - do { - request = requests.GetCurrent(); - if (request != NULL) { - // fill this request - - request->Fill(inode); - if (request->SpaceLeft() > 0 - && (team == B_SYSTEM_TEAM || request->team == team)) { - // ToDo: This is something where we can optimize the buffer - // hand-shaking considerably: we should have a function - // that copies the data to another address space - either - // remapping copy on write or a direct copy. - - // place our data into that buffer - request->PutBuffer(&buffer, &bytesLeft); - if (bytesLeft == 0) { - release_sem(inode->ReadLock()); - break; - } - } - - requests.SkipCurrent(); - } - } while (request != NULL); + inode->FillPendingRequests(buffer, &bytesLeft); status_t status; - if (request == NULL) { - // there is no read matching request pending, so we have to put - // our data in a temporary buffer + if (bytesLeft != 0) { + // we could not place all our data in pending requests, so + // have to put them into a temporary buffer - status = inode->WriteBufferToChain(buffer, bytesLeft); - if (status != B_OK) + status = inode->WriteBufferToChain(buffer, &bytesLeft, (cookie->open_mode & O_NONBLOCK) != 0); + if (status == B_OK) { + inode->FillPendingRequests(); *_length -= bytesLeft; + } } else { // could write everything without the need to copy! status = B_OK; } - requests.Unlock(); - mutex_unlock(inode->WriteMutex()); + benaphore_unlock(inode->RequestLock()); return status; }