oskit/oskit-20020317/com/pipe_io.c

706 lines
17 KiB
C
Executable File

/*
* Copyright (c) 1997-1999 University of Utah and the Flux Group.
* All rights reserved.
*
* This file is part of the Flux OSKit. The OSKit is free software, also known
* as "open source;" you can redistribute it and/or modify it under the terms
* of the GNU General Public License (GPL), version 2, as published by the Free
* Software Foundation (FSF). To explore alternate licensing terms, contact
* the University of Utah at csl-dist@cs.utah.edu or +1-801-585-3271.
*
* The OSKit is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GPL for more details. You should have
* received a copy of the GPL along with the OSKit; see the file COPYING. If
* not, write to the FSF, 59 Temple Place #330, Boston, MA 02111-1307, USA.
*/
/*
* Pipes. A pipe is two stream-like objects connected together via a
* couple of shared buffers. The implementation is truely UNIDIRECTIONAL!
* If you try to use them bidirectionally, it will almost certainly
* deadlock. If you need something so fancy as true bidirectional pipes,
* I suggest using socketpair() in the C/Posix library instead (although
* that requires linking with the network stack).
*/
#include <oskit/com/pipe.h>
#include <oskit/com/stream.h>
#include <oskit/io/asyncio.h>
#include <oskit/com/services.h>
#include <oskit/com/lock_mgr.h>
#include <oskit/com/lock.h>
#include <oskit/com/condvar.h>
#include <oskit/com/listener_mgr.h>
#include <oskit/queue.h>
#include <oskit/error.h>
#include <malloc.h>
#include <stddef.h>
#include <assert.h>
#include <string.h>
#include <stdlib.h>
/*
* A pipe has an input buffer and a connection to the other side of
* the pipe. A write to a pipe writes to the input buffer on the other
* side, and a read comes from the local side. In either case, there
* must be resource available or the caller will sleep until there is
* sufficient resource.
*
* Pipe semantics dictate that the pipe stays valid even after one
* side has closed so that the other side can read the remaining
* buffered data. Both sides of the pipe will be deallocated only
* when both counts go to zero.
*
* Note that the lock/condvar pair is *shared* between both sides of
* the pipe.
*/
struct pipe {
oskit_pipe_t pipei; /* Pipe COM interface */
oskit_asyncio_t pipea; /* Asyncio COM interface */
oskit_u32_t count; /* Reference count */
struct pipe *sister; /* Other side of the pipe */
oskit_u32_t flags; /* Flags */
queue_head_t pipeq; /* Queue of data buffers */
int dataready; /* Amount of data ready */
oskit_lock_t *lock; /* Thread lock (shared) */
oskit_condvar_t *condvar; /* Thread condvar */
struct listener_mgr *readers; /* listeners for asyncio */
struct listener_mgr *writers; /* listeners for asyncio */
};
/*
* Pipe buffers are queues of mbuf like thingies, each with an associated
* buffer, offset and size.
*/
struct pbuf {
queue_chain_t chain; /* Queueing element */
int size; /* Original size for dealloc */
int count; /* Amount of data ready */
char *bufp; /* Current pointer into data */
char data[0]; /* Buffer space for data */
};
/*
* The maximum size that a pipe buffer can grow to.
*/
/*#define MAXPIPEBUF (1024 * 64) */
#define MAXPIPEBUF 512
/*
* Flags.
*/
#define PIPE_CLOSED 0x01
#define PIPE_SLEEPREAD 0x02
#define PIPE_SLEEPWRITE 0x04
#define PIPE_WIDOWED 0x08
#define PIPE_READSEL 0x10
#define PIPE_WRITESEL 0x20
/*
* Thread safe locking. Not sure it makes any sense to use pipes in
* single threaded application, but whatever ...
*/
#define PLOCK(pipe) if (pipe->lock) oskit_lock_lock(pipe->lock)
#define PUNLOCK(pipe) if (pipe->lock) oskit_lock_unlock(pipe->lock)
#define PWAIT(pipe) \
if (pipe->condvar) oskit_condvar_wait(pipe->condvar, pipe->lock)
#define PSIGNAL(pipe) \
if (pipe->condvar) oskit_condvar_signal(pipe->condvar)
/*
* Stream operators
*/
static oskit_error_t
pipe_query(oskit_pipe_t *f, const oskit_iid_t *iid, void **out_ihandle)
{
struct pipe *pipe = (struct pipe *) f;
assert(pipe->count);
if (memcmp(iid, &oskit_iunknown_iid, sizeof(*iid)) == 0 ||
memcmp(iid, &oskit_stream_iid, sizeof(*iid)) == 0 ||
memcmp(iid, &oskit_pipe_iid, sizeof(*iid)) == 0) {
*out_ihandle = pipe;
++pipe->count;
return 0;
}
if (memcmp(iid, &oskit_asyncio_iid, sizeof(*iid)) == 0) {
*out_ihandle = &pipe->pipea;
++pipe->count;
return 0;
}
*out_ihandle = NULL;
return OSKIT_E_NOINTERFACE;
}
static OSKIT_COMDECL_U
pipe_addref(oskit_pipe_t *f)
{
struct pipe *pipe = (struct pipe *) f;
assert(pipe->count);
return ++pipe->count;
}
static void
pipe_free(struct pipe *pipe)
{
struct pbuf *pbuf;
/*
* Free all the buffers and then free the pipe object.
*/
while (!queue_empty(&pipe->pipeq)) {
queue_remove_first(&pipe->pipeq, pbuf, struct pbuf *, chain);
sfree(pbuf, sizeof(*pbuf) + pbuf->size);
}
if (pipe->lock)
oskit_lock_release(pipe->lock);
if (pipe->condvar)
oskit_condvar_release(pipe->condvar);
oskit_destroy_listener_mgr(pipe->readers);
oskit_destroy_listener_mgr(pipe->writers);
free(pipe);
}
static oskit_u32_t
pipe_release(oskit_pipe_t *f)
{
struct pipe *pipe = (struct pipe *) f;
struct pipe *psis = pipe->sister;
int newcount;
PLOCK(pipe);
assert(pipe->count);
if ((newcount = --pipe->count) == 0) {
/*
* Both sides of the pipe must be closed before it
* can be deallocated.
*/
if (psis->flags & PIPE_CLOSED) {
pipe_free(pipe->sister);
pipe_free(pipe);
return 0;
}
pipe->flags |= PIPE_CLOSED;
psis->flags |= PIPE_WIDOWED;
if (psis->flags & (PIPE_SLEEPREAD|PIPE_SLEEPWRITE))
PSIGNAL(psis);
}
PUNLOCK(pipe);
return newcount;
}
/*** Operations inherited from oskit_stream interface ***/
static OSKIT_COMDECL
pipe_read(oskit_pipe_t *f, void *buf, oskit_u32_t len,
oskit_u32_t *out_actual)
{
struct pipe *pipe = (struct pipe *) f;
struct pipe *psis = pipe->sister;
char *bp = buf;
struct pbuf *pbuf;
int count;
PLOCK(pipe);
/*
* Look for read on closed pipe.
*/
if (pipe->flags & PIPE_CLOSED) {
PUNLOCK(pipe);
return OSKIT_EBADF;
}
*out_actual = 0;
/*
* If no data available, sleep until the sister puts some data in.
* Look for EOF condition; Nothing to read and pipe is widowed.
*/
while (!pipe->dataready) {
if (pipe->flags & PIPE_WIDOWED) {
PUNLOCK(pipe);
return 0;
}
pipe->flags |= PIPE_SLEEPREAD;
PWAIT(pipe);
pipe->flags &= ~PIPE_SLEEPREAD;
}
/*
* Iterate through the buffers, copying out as much data as
* possible, up to the maximum specified by the caller.
*/
while (!queue_empty(&pipe->pipeq) && len > 0) {
pbuf = (struct pbuf *) queue_first(&pipe->pipeq);
/*
* For a buffer, copyout only as much data as can fit.
*/
count = (pbuf->count > len) ? len : pbuf->count;
memcpy(bp, pbuf->bufp, count);
pbuf->bufp += count;
pbuf->count -= count;
*out_actual += count;
len -= count;
bp += count;
/*
* If the buffer is empty, dealloc.
*/
if (pbuf->count == 0) {
queue_remove(&pipe->pipeq, pbuf, struct pbuf *, chain);
sfree(pbuf, sizeof(*pbuf) + pbuf->size);
}
pipe->dataready -= count;
/*
* Look for a select sleeper on the write side.
*/
if (psis->flags & PIPE_WRITESEL) {
PUNLOCK(pipe);
oskit_listener_mgr_notify(psis->writers);
PLOCK(pipe);
}
/*
* If the sister was waiting to put more data in, then
* wake it up.
*/
if (psis->flags & PIPE_SLEEPWRITE) {
psis->flags &= ~PIPE_SLEEPWRITE;
PSIGNAL(psis);
}
}
PUNLOCK(pipe);
return 0;
}
static OSKIT_COMDECL
pipe_write(oskit_pipe_t *f, const void *buf,
oskit_u32_t len, oskit_u32_t *out_actual)
{
struct pipe *pipe = (struct pipe *) f;
struct pipe *psis = pipe->sister;
char *bp = (char *) buf;
struct pbuf *pbuf;
int count;
PLOCK(pipe);
/*
* Look for write on closed pipe.
*/
if (pipe->flags & PIPE_CLOSED) {
PUNLOCK(pipe);
return OSKIT_EBADF;
}
/*
* Look for a broken pipe.
*/
if (pipe->flags & PIPE_WIDOWED) {
PUNLOCK(pipe);
return OSKIT_EPIPE;
}
*out_actual = 0;
/*
* Loop, stuffing stuff into the other side until we run out of
* stuff, or reach the maximum allowed. If the other side fills,
* up, must sleep until it wakes this side up again. Each time
* data is stuffed into the other side, be sure to wake it up.
*/
while (len > 0) {
/*
* Is there room for the data? If not, must wait.
*/
while (psis->dataready >= MAXPIPEBUF) {
pipe->flags |= PIPE_SLEEPWRITE;
PWAIT(pipe);
pipe->flags &= ~PIPE_SLEEPWRITE;
}
if (MAXPIPEBUF - psis->dataready > len)
count = len;
else
count = MAXPIPEBUF - psis->dataready;
if ((pbuf = smalloc(sizeof(*pbuf) + count)) == 0)
panic("pipe_write: Out of memory");
memcpy(pbuf->data, bp, count);
pbuf->bufp = pbuf->data;
pbuf->count = count;
pbuf->size = count;
*out_actual += count;
len -= count;
bp += count;
queue_enter(&psis->pipeq, pbuf, struct pbuf *, chain);
psis->dataready += count;
/*
* Look for a select sleeper on the read side.
*/
if (psis->flags & PIPE_READSEL) {
PUNLOCK(pipe);
oskit_listener_mgr_notify(psis->readers);
PLOCK(pipe);
}
/*
* Wakeup the sister side if it was waiting for data.
*/
if (psis->flags & PIPE_SLEEPREAD) {
psis->flags &= ~PIPE_SLEEPREAD;
PSIGNAL(psis);
}
}
PUNLOCK(pipe);
return 0;
}
static OSKIT_COMDECL
pipe_seek(oskit_pipe_t *f, oskit_s64_t offset,
oskit_seek_t whence, oskit_u64_t *out_newpos)
{
return OSKIT_ESPIPE;
}
static OSKIT_COMDECL
pipe_setsize(oskit_pipe_t *f, oskit_u64_t new_size)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_copy_to(oskit_pipe_t *f, oskit_stream_t *dst, oskit_u64_t size,
oskit_u64_t *out_read, oskit_u64_t *out_written)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_commit(oskit_pipe_t *f, oskit_u32_t commit_flags)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_revert(oskit_pipe_t *f)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_lockregion(oskit_pipe_t *f, oskit_u64_t offset,
oskit_u64_t size, oskit_u32_t lock_type)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_unlockregion(oskit_pipe_t *f, oskit_u64_t offset,
oskit_u64_t size, oskit_u32_t lock_type)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_stat(oskit_pipe_t *f, oskit_stream_stat_t *out_stat,
oskit_u32_t stat_flags)
{
return OSKIT_ENOTSUP;
}
static OSKIT_COMDECL
pipe_clone(oskit_pipe_t *f, oskit_pipe_t **out_stream)
{
return OSKIT_ENOTSUP;
}
static struct oskit_pipe_ops pipe_ops = {
pipe_query,
pipe_addref,
pipe_release,
pipe_read,
pipe_write,
pipe_seek,
pipe_setsize,
pipe_copy_to,
pipe_commit,
pipe_revert,
pipe_lockregion,
pipe_unlockregion,
pipe_stat,
pipe_clone,
};
/*
**********************************************************************
* Async IO interface,
*/
static OSKIT_COMDECL
pipe_asyncio_query(oskit_asyncio_t *f,
const struct oskit_guid *iid, void **out_ihandle)
{
struct pipe *pipe = (struct pipe *) (f-1);
return pipe_query(&pipe->pipei, iid, out_ihandle);
}
static OSKIT_COMDECL_U
pipe_asyncio_addref(oskit_asyncio_t *f)
{
struct pipe *pipe = (struct pipe *) (f-1);
return pipe_addref(&pipe->pipei);
}
static OSKIT_COMDECL_U
pipe_asyncio_release(oskit_asyncio_t *f)
{
struct pipe *pipe = (struct pipe *) (f-1);
return pipe_release(&pipe->pipei);
}
/*
* return a mask with all conditions that currently apply to that socket
* must be called with splnet()!
*/
static oskit_u32_t
get_pipe_conditions(struct pipe *pipe)
{
struct pipe *psis = pipe->sister;
oskit_u32_t res = 0;
if (pipe->dataready)
res |= OSKIT_ASYNCIO_READABLE;
if (pipe->flags & (PIPE_CLOSED|PIPE_WIDOWED))
res |= OSKIT_ASYNCIO_EXCEPTION;
if (psis->dataready < MAXPIPEBUF)
res |= OSKIT_ASYNCIO_WRITABLE;
return res;
}
/*
* Poll for currently pending asynchronous I/O conditions.
* If successful, returns a mask of the OSKIT_ASYNC_IO_* flags above,
* indicating which conditions are currently present.
*/
static OSKIT_COMDECL
pipe_asyncio_poll(oskit_asyncio_t *f)
{
struct pipe *pipe = (struct pipe *) (f-1);
oskit_u32_t res = 0;
PLOCK(pipe);
res = get_pipe_conditions(pipe);
PUNLOCK(pipe);
return res;
}
/*
* Add a callback object (a "listener" for async I/O events).
* When an event of interest occurs on this I/O object
* (i.e., when one of the three I/O conditions becomes true),
* all registered listeners will be called.
* Also, if successful, this method returns a mask
* describing which of the OSKIT_ASYNC_IO_* conditions are already true,
* which the caller must check in order to avoid missing events
* that occur just before the listener is registered.
*/
static OSKIT_COMDECL
pipe_asyncio_add_listener(oskit_asyncio_t *f,
struct oskit_listener *l, oskit_s32_t mask)
{
struct pipe *pipe = (struct pipe *) (f-1);
oskit_s32_t cond;
PLOCK(pipe);
cond = get_pipe_conditions(pipe);
/* for read and exceptional conditions */
if (mask & (OSKIT_ASYNCIO_READABLE|OSKIT_ASYNCIO_EXCEPTION)) {
oskit_listener_mgr_add(pipe->readers, l);
pipe->flags |= PIPE_READSEL;
}
/* for write */
if (mask & OSKIT_ASYNCIO_WRITABLE) {
oskit_listener_mgr_add(pipe->writers, l);
pipe->flags |= PIPE_WRITESEL;
}
PUNLOCK(pipe);
return cond;
}
/*
* Remove a previously registered listener callback object.
* Returns an error if the specified callback has not been registered.
*/
static OSKIT_COMDECL
pipe_asyncio_remove_listener(oskit_asyncio_t *f, struct oskit_listener *l0)
{
struct pipe *pipe = (struct pipe *) (f-1);
oskit_error_t rc1, rc2;
PLOCK(pipe);
/*
* we don't know where was added - if at all - so let's check
* both lists
*
* turn off notifications if no listeners left
*/
rc1 = oskit_listener_mgr_remove(pipe->readers, l0);
if (oskit_listener_mgr_count(pipe->readers) == 0) {
pipe->flags &= ~PIPE_READSEL;
}
rc2 = oskit_listener_mgr_remove(pipe->writers, l0);
if (oskit_listener_mgr_count(pipe->writers) == 0) {
pipe->flags &= ~PIPE_WRITESEL;
}
PUNLOCK(pipe);
/* flag error if both removes failed */
return (rc1 && rc2) ? OSKIT_E_INVALIDARG : 0; /* is that right ? */
}
/*
* return the number of bytes that can be read, basically ioctl(FIONREAD)
*/
static OSKIT_COMDECL
pipe_asyncio_readable(oskit_asyncio_t *f)
{
struct pipe *pipe = (struct pipe *) (f-1);
oskit_u32_t count;
PLOCK(pipe);
count = pipe->dataready;
PUNLOCK(pipe);
return count;
}
static struct oskit_asyncio_ops pipe_asyncioops =
{
pipe_asyncio_query,
pipe_asyncio_addref,
pipe_asyncio_release,
pipe_asyncio_poll,
pipe_asyncio_add_listener,
pipe_asyncio_remove_listener,
pipe_asyncio_readable
};
#ifdef KNIT
extern oskit_lock_mgr_t *oskit_lock_mgr;
#define lock_mgr oskit_lock_mgr
#endif
/*
* Sole user entry point for this module. Create an `oskit_pipe' object.
*/
OSKIT_COMDECL
oskit_create_pipe(oskit_pipe_t **out_pipe0, oskit_pipe_t **out_pipe1)
{
struct pipe *pipe0, *pipe1;
#ifndef KNIT
oskit_lock_mgr_t *lock_mgr;
#endif
oskit_lock_t *lock;
oskit_condvar_t *condvar;
if ((pipe0 = malloc(sizeof(struct pipe))) == 0)
return OSKIT_ENOMEM;
if ((pipe1 = malloc(sizeof(struct pipe))) == 0) {
free(pipe0);
return OSKIT_ENOMEM;
}
memset(pipe0, 0, sizeof(*pipe0));
memset(pipe1, 0, sizeof(*pipe1));
pipe0->pipei.ops = &pipe_ops;
pipe1->pipei.ops = &pipe_ops;
pipe0->pipea.ops = &pipe_asyncioops;
pipe1->pipea.ops = &pipe_asyncioops;
pipe0->count = 1;
pipe1->count = 1;
pipe0->sister = pipe1;
pipe1->sister = pipe0;
queue_init(&pipe0->pipeq);
queue_init(&pipe1->pipeq);
pipe0->readers = oskit_create_listener_mgr((oskit_iunknown_t *)
&pipe0->pipea);
pipe0->writers = oskit_create_listener_mgr((oskit_iunknown_t *)
&pipe0->pipea);
pipe1->readers = oskit_create_listener_mgr((oskit_iunknown_t *)
&pipe1->pipea);
pipe1->writers = oskit_create_listener_mgr((oskit_iunknown_t *)
&pipe1->pipea);
/*
* See if thread-safe locks are required. Note that I don't think it
* makes any sense to use pipes in a single threaded kernel, but
* at least the program will link and run. It will probably just
* deadlock though.
*/
#ifndef KNIT
if (oskit_lookup_first(&oskit_lock_mgr_iid, (void *) &lock_mgr))
panic("oskit_create_pipe: oskit_lookup_first");
#endif
if (lock_mgr) {
if (oskit_lock_mgr_allocate_lock(lock_mgr, &lock))
panic("oskit_create_pipe: lock_mgr_allocate_lock");
oskit_lock_addref(lock);
pipe0->lock = lock;
pipe1->lock = lock;
if (oskit_lock_mgr_allocate_condvar(lock_mgr, &condvar))
panic("oskit_create_pipe: lock_mgr_allocate_condvar");
pipe0->condvar = condvar;
if (oskit_lock_mgr_allocate_condvar(lock_mgr, &condvar))
panic("oskit_create_pipe: lock_mgr_allocate_condvar");
pipe1->condvar = condvar;
}
*out_pipe0 = &pipe0->pipei;
*out_pipe1 = &pipe1->pipei;
return 0;
}