pipefs_create() now also allows to open an existing pipe.

The inode now keeps track of the number of readers/writers.
Inode::BytesInChain() is now only used protected by the request lock.
Implemented some missing POSIX demands:
- when the last writer is closed, all waiting read requests are aborted
- pipefs_write() now returns EPIPE and signals the current thread with
  SIGPIPE in case there are no readers left
- pipefs_read() will now return 0 in case there are no writers and no
  unread buffers left in the pipe.


git-svn-id: file:///srv/svn/repos/haiku/trunk/current@9717 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Axel Dörfler 2004-11-01 20:36:09 +00:00
parent fa00c58619
commit 0759a7e5da

View File

@ -50,6 +50,8 @@ class ReadRequest {
~ReadRequest();
status_t Wait(bool nonBlocking);
void Abort();
status_t PutBufferChain(cbuf *bufferChain, size_t *_bytesRead = NULL,
bool releasePartiallyFilled = true);
status_t PutBuffer(const void **_buffer, size_t *_bufferSize);
@ -149,6 +151,11 @@ class Inode {
status_t AddRequest(ReadRequest &request);
status_t RemoveRequest(ReadRequest &request);
void Open(int openMode);
void Close(int openMode);
int32 ReaderCount() const { return fReaderCount; }
int32 WriterCount() const { return fWriterCount; }
static int32 HashNextOffset();
static uint32 hash_func(void *_node, const void *_key, uint32 range);
static int compare_func(void *_node, const void *_key);
@ -171,6 +178,9 @@ class Inode {
benaphore fRequestLock;
sem_id fWriteLock;
int32 fReaderCount;
int32 fWriterCount;
};
@ -439,7 +449,9 @@ Inode::Inode(Volume *volume, Inode *parent, const char *name, int32 type)
:
fNext(NULL),
fHashNext(NULL),
fBufferChain(NULL)
fBufferChain(NULL),
fReaderCount(0),
fWriterCount(0)
{
fName = strdup(name);
if (fName == NULL)
@ -660,7 +672,43 @@ Inode::RemoveRequest(ReadRequest &request)
}
int32
void
Inode::Open(int openMode)
{
if ((openMode & O_ACCMODE) == O_WRONLY)
atomic_add(&fWriterCount, 1);
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
atomic_add(&fReaderCount, 1);
}
void
Inode::Close(int openMode)
{
if ((openMode & O_ACCMODE) == O_WRONLY
&& atomic_add(&fWriterCount, -1) == 1) {
// Our last writer has been closed; if the pipe
// contains no data, unlock all waiting readers
benaphore_lock(&fRequestLock);
if (cbuf_get_length(fBufferChain) == 0) {
ReadRequest *request;
DoublyLinked::Iterator<ReadRequest> iterator(fRequests);
while ((request = iterator.Next()) != NULL)
request->Abort();
}
benaphore_unlock(&fRequestLock);
}
if ((openMode & O_ACCMODE) == O_RDONLY || (openMode & O_ACCMODE) == O_RDWR)
atomic_add(&fReaderCount, -1);
}
int32
Inode::HashNextOffset()
{
Inode *inode;
@ -722,6 +770,14 @@ ReadRequest::Wait(bool nonBlocking)
}
void
ReadRequest::Abort()
{
fBytesRead = 0;
release_sem(fLock);
}
/** Reads the contents of the buffer chain to the specified
* buffer, if any.
*/
@ -961,6 +1017,7 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode,
fs_cookie *_cookie, vnode_id *_newVnodeID)
{
Volume *volume = (Volume *)_volume;
bool wasCreated = true;
TRACE(("pipefs_create(): dir = %p, name = '%s', perms = %d, &id = %p\n",
_dir, name, mode, _newVnodeID));
@ -975,15 +1032,23 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode,
status_t status = B_OK;
Inode *inode = volume->FindNode(name);
if (inode != NULL) {
if (inode != NULL && (openMode & O_EXCL) != 0) {
status = B_FILE_EXISTS;
goto err;
}
inode = volume->CreateNode(directory, name, S_IFIFO | mode);
if (inode == NULL) {
status = B_NO_MEMORY;
goto err;
// we need to create a new pipe
inode = volume->CreateNode(directory, name, S_IFIFO | mode);
if (inode == NULL) {
status = B_NO_MEMORY;
goto err;
}
} else {
// we can just open the pipe again
void *dummy;
get_vnode(volume->ID(), inode->ID(), &dummy);
wasCreated = false;
}
volume->Unlock();
@ -992,7 +1057,9 @@ pipefs_create(fs_volume _volume, fs_vnode _dir, const char *name, int openMode,
*_cookie = (void *)cookie;
*_newVnodeID = inode->ID();
notify_listener(B_ENTRY_CREATED, volume->ID(), directory->ID(), 0, inode->ID(), name);
if (wasCreated)
notify_listener(B_ENTRY_CREATED, volume->ID(), directory->ID(), 0, inode->ID(), name);
return B_OK;
@ -1007,13 +1074,14 @@ err:
static status_t
pipefs_open(fs_volume _volume, fs_vnode _node, int openMode, fs_cookie *_cookie)
{
// allow to open the file, but it can't be done anything with it
Inode *inode = (Inode *)_node;
file_cookie *cookie = (file_cookie *)malloc(sizeof(file_cookie));
if (cookie == NULL)
return B_NO_MEMORY;
cookie->open_mode = openMode;
inode->Open(openMode);
*_cookie = (void *)cookie;
@ -1022,11 +1090,15 @@ pipefs_open(fs_volume _volume, fs_vnode _node, int openMode, fs_cookie *_cookie)
static status_t
pipefs_close(fs_volume _volume, fs_vnode _vnode, fs_cookie _cookie)
pipefs_close(fs_volume _volume, fs_vnode _node, fs_cookie _cookie)
{
TRACE(("pipefs_close: entry vnode %p, cookie %p\n", _vnode, _cookie));
file_cookie *cookie = (file_cookie *)_cookie;
Inode *inode = (Inode *)_node;
return 0;
TRACE(("pipefs_close: entry vnode %p, cookie %p\n", _node, _cookie));
inode->Close(cookie->open_mode);
return B_OK;
}
@ -1039,7 +1111,7 @@ pipefs_free_cookie(fs_volume _volume, fs_vnode _node, fs_cookie _cookie)
free(cookie);
return 0;
return B_OK;
}
@ -1055,18 +1127,33 @@ pipefs_read(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/,
void *buffer, size_t *_length)
{
file_cookie *cookie = (file_cookie *)_cookie;
Inode *inode = (Inode *)_node;
TRACE(("pipefs_read: vnode %p, cookie %p, len 0x%lx, mode = %d\n", _node, cookie, *_length, cookie->open_mode));
if ((cookie->open_mode & O_RWMASK) != O_RDONLY)
return B_NOT_ALLOWED;
if (inode->WriterCount() == 0) {
uint32 available;
benaphore_lock(inode->RequestLock());
available = inode->BytesInChain();
benaphore_unlock(inode->RequestLock());
// as long there is no writer, and the pipe is empty,
// we always just return 0 to indicate end of file
if (available == 0) {
*_length = 0;
return B_OK;
}
}
// issue read request
ReadRequest request(buffer, *_length);
Inode *inode = (Inode *)_node;
inode->AddRequest(request);
if (inode->AddRequest(request) != B_OK)
return B_ERROR;
// wait for it to be filled
@ -1095,6 +1182,12 @@ pipefs_write(fs_volume _volume, fs_vnode _node, fs_cookie _cookie, off_t /*pos*/
if ((cookie->open_mode & O_RWMASK) != O_WRONLY)
return B_NOT_ALLOWED;
if (inode->ReaderCount() == 0) {
// if there is no reader yet, we signal the thread and return
send_signal(find_thread(NULL), SIGPIPE);
return EPIPE;
}
benaphore_lock(inode->RequestLock());
size_t bytesLeft = *_length;
@ -1342,7 +1435,11 @@ pipefs_read_stat(fs_volume _volume, fs_vnode _node, struct stat *stat)
stat->st_dev = volume->ID();
stat->st_ino = inode->ID();
benaphore_lock(inode->RequestLock());
stat->st_size = inode->BytesInChain();
benaphore_unlock(inode->RequestLock());
stat->st_mode = inode->Type();
stat->st_nlink = 1;