diff --git a/src/kernel/core/fs/pipefs.cpp b/src/kernel/core/fs/pipefs.cpp index b85baa4d30..10560f2af2 100644 --- a/src/kernel/core/fs/pipefs.cpp +++ b/src/kernel/core/fs/pipefs.cpp @@ -50,6 +50,8 @@ class ReadRequest { ~ReadRequest(); status_t Wait(bool nonBlocking); + void Abort(); + status_t PutBufferChain(cbuf *bufferChain, size_t *_bytesRead = NULL, bool releasePartiallyFilled = true); status_t PutBuffer(const void **_buffer, size_t *_bufferSize); @@ -149,6 +151,11 @@ class Inode { status_t AddRequest(ReadRequest &request); status_t RemoveRequest(ReadRequest &request); + void Open(int openMode); + void Close(int openMode); + int32 ReaderCount() const { return fReaderCount; } + int32 WriterCount() const { return fWriterCount; } + static int32 HashNextOffset(); static uint32 hash_func(void *_node, const void *_key, uint32 range); static int compare_func(void *_node, const void *_key); @@ -171,6 +178,9 @@ class Inode { benaphore fRequestLock; sem_id fWriteLock; + + int32 fReaderCount; + int32 fWriterCount; }; @@ -439,7 +449,9 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type) : fNext(NULL), fHashNext(NULL), - fBufferChain(NULL) + fBufferChain(NULL), + fReaderCount(0), + fWriterCount(0) { fName = strdup(name); if (fName == NULL) @@ -660,7 +672,43 @@ Inode::RemoveRequest(ReadRequest &request) } -int32 +void +Inode::Open(int openMode) +{ + if ((openMode & O_ACCMODE) == O_WRONLY) + atomic_add(&fWriterCount, 1); + + if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) + atomic_add(&fReaderCount, 1); +} + + +void +Inode::Close(int openMode) +{ + if ((openMode & O_ACCMODE) == O_WRONLY + && atomic_add(&fWriterCount, -1) == 1) { + // Our last writer has been closed; if the pipe + // contains no data, unlock all waiting readers + + benaphore_lock(&fRequestLock); + + if (cbuf_get_length(fBufferChain) == 0) { + ReadRequest *request; + DoublyLinked::Iterator iterator(fRequests); + while ((request = iterator.Next()) != NULL) + request->Abort(); + } + + benaphore_unlock(&fRequestLock); + } + + if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR) + atomic_add(&fReaderCount, -1); +} + + +int32 Inode::HashNextOffset() { Inode *inode; @@ -722,6 +770,14 @@ ReadRequest::Wait(bool nonBlocking) } +void +ReadRequest::Abort() +{ + fBytesRead = 0; + release_sem(fLock); +} + + /** Reads the contents of the buffer chain to the specified * buffer, if any. */ @@ -961,6 +1017,7 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode, fs_cookie *_cookie, vnode_id *_newVnodeID) { Volume *volume = (Volume *)_volume; + bool wasCreated = true; TRACE(("pipefs_create(): dir = %p, name = '%s', perms = %d, &id = %p\n", _dir, name, mode, _newVnodeID)); @@ -975,15 +1032,23 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode, status_t status = B_OK; Inode *inode = volume->FindNode(name); - if (inode != NULL) { + if (inode != NULL && (openMode & O_EXCL) != 0) { status = B_FILE_EXISTS; goto err; } - inode = volume->CreateNode(directory, name, S_IFIFO | mode); if (inode == NULL) { - status = B_NO_MEMORY; - goto err; + // we need to create a new pipe + inode = volume->CreateNode(directory, name, S_IFIFO | mode); + if (inode == NULL) { + status = B_NO_MEMORY; + goto err; + } + } else { + // we can just open the pipe again + void *dummy; + get_vnode(volume->ID(), inode->ID(), &dummy); + wasCreated = false; } volume->Unlock(); @@ -992,7 +1057,9 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode, *_cookie = (void *)cookie; *_newVnodeID = inode->ID(); - notify_listener(B_ENTRY_CREATED, volume->ID(), directory->ID(), 0, inode->ID(), name); + + if (wasCreated) + notify_listener(B_ENTRY_CREATED, volume->ID(), directory->ID(), 0, inode->ID(), name); return B_OK; @@ -1007,13 +1074,14 @@ err: static status_t pipefs_open(fs_volume _volume, fs_vnode _node, int openMode, fs_cookie *_cookie) { - // allow to open the file, but it can't be done anything with it + Inode *inode = (Inode *)_node; file_cookie *cookie = (file_cookie *)malloc(sizeof(file_cookie)); if (cookie == NULL) return B_NO_MEMORY; cookie->open_mode = openMode; + inode->Open(openMode); *_cookie = (void *)cookie; @@ -1022,11 +1090,15 @@ pipefs_open(fs_volume _volume, fs_vnode _node, int openMode, fs_cookie *_cookie) static status_t -pipefs_close(fs_volume _volume, fs_vnode _vnode, fs_cookie _cookie) +pipefs_close(fs_volume _volume, fs_vnode _node, fs_cookie _cookie) { - TRACE(("pipefs_close: entry vnode %p, cookie %p\n", _vnode, _cookie)); + file_cookie *cookie = (file_cookie *)_cookie; + Inode *inode = (Inode *)_node; - return 0; + TRACE(("pipefs_close: entry vnode %p, cookie %p\n", _node, _cookie)); + + inode->Close(cookie->open_mode); + return B_OK; } @@ -1039,7 +1111,7 @@ pipefs_free_cookie(fs_volume _volume, fs_vnode _node, fs_cookie _cookie) free(cookie); - return 0; + return B_OK; } @@ -1055,18 +1127,33 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/, void *buffer, size_t *_length) { file_cookie *cookie = (file_cookie *)_cookie; + Inode *inode = (Inode *)_node; TRACE(("pipefs_read: vnode %p, cookie %p, len 0x%lx, mode = %d\n", _node, cookie, *_length, cookie->open_mode)); if ((cookie->open_mode & O_RWMASK) != O_RDONLY) return B_NOT_ALLOWED; + if (inode->WriterCount() == 0) { + uint32 available; + + benaphore_lock(inode->RequestLock()); + available = inode->BytesInChain(); + benaphore_unlock(inode->RequestLock()); + + // as long there is no writer, and the pipe is empty, + // we always just return 0 to indicate end of file + if (available == 0) { + *_length = 0; + return B_OK; + } + } + // issue read request ReadRequest request(buffer, *_length); - - Inode *inode = (Inode *)_node; - inode->AddRequest(request); + if (inode->AddRequest(request) != B_OK) + return B_ERROR; // wait for it to be filled @@ -1095,6 +1182,12 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/ if ((cookie->open_mode & O_RWMASK) != O_WRONLY) return B_NOT_ALLOWED; + if (inode->ReaderCount() == 0) { + // if there is no reader yet, we signal the thread and return + send_signal(find_thread(NULL), SIGPIPE); + return EPIPE; + } + benaphore_lock(inode->RequestLock()); size_t bytesLeft = *_length; @@ -1342,7 +1435,11 @@ pipefs_read_stat(fs_volume _volume, fs_vnode _node, struct stat *stat) stat->st_dev = volume->ID(); stat->st_ino = inode->ID(); + + benaphore_lock(inode->RequestLock()); stat->st_size = inode->BytesInChain(); + benaphore_unlock(inode->RequestLock()); + stat->st_mode = inode->Type(); stat->st_nlink = 1;