Use mutexes & condvars.

This commit is contained in:
ad 2007-03-12 16:20:53 +00:00
parent 42390b383a
commit 7888016cf9
2 changed files with 98 additions and 95 deletions

View File

@ -1,11 +1,11 @@
/* $NetBSD: sys_pipe.c,v 1.79 2007/03/04 06:03:09 christos Exp $ */
/* $NetBSD: sys_pipe.c,v 1.80 2007/03/12 16:20:53 ad Exp $ */
/*-
* Copyright (c) 2003 The NetBSD Foundation, Inc.
* Copyright (c) 2003, 2007 The NetBSD Foundation, Inc.
* All rights reserved.
*
* This code is derived from software contributed to The NetBSD Foundation
* by Paul Kranenburg.
* by Paul Kranenburg, and by Andrew Doran.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -83,7 +83,7 @@
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.79 2007/03/04 06:03:09 christos Exp $");
__KERNEL_RCSID(0, "$NetBSD: sys_pipe.c,v 1.80 2007/03/12 16:20:53 ad Exp $");
#include <sys/param.h>
#include <sys/systm.h>
@ -305,7 +305,9 @@ pipe_create(struct pipe **pipep, int allockva)
getmicrotime(&pipe->pipe_ctime);
pipe->pipe_atime = pipe->pipe_ctime;
pipe->pipe_mtime = pipe->pipe_ctime;
simple_lock_init(&pipe->pipe_slock);
mutex_init(&pipe->pipe_lock, MUTEX_DEFAULT, IPL_NONE);
cv_init(&pipe->pipe_cv, "pipe");
cv_init(&pipe->pipe_lkcv, "pipelk");
if (allockva && (error = pipespace(pipe, PIPE_SIZE)))
return (error);
@ -322,22 +324,23 @@ pipe_create(struct pipe **pipep, int allockva)
static int
pipelock(struct pipe *pipe, int catch)
{
int error;
LOCK_ASSERT(simple_lock_held(&pipe->pipe_slock));
KASSERT(mutex_owned(&pipe->pipe_lock));
while (pipe->pipe_state & PIPE_LOCKFL) {
int error;
const int pcatch = catch ? PCATCH : 0;
pipe->pipe_state |= PIPE_LWANT;
error = ltsleep(pipe, PSOCK | pcatch, "pipelk", 0,
&pipe->pipe_slock);
if (error != 0)
return error;
if (catch) {
error = cv_wait_sig(&pipe->pipe_lkcv,
&pipe->pipe_lock);
if (error != 0)
return error;
} else
cv_wait(&pipe->pipe_lkcv, &pipe->pipe_lock);
}
pipe->pipe_state |= PIPE_LOCKFL;
simple_unlock(&pipe->pipe_slock);
mutex_exit(&pipe->pipe_lock);
return 0;
}
@ -354,7 +357,7 @@ pipeunlock(struct pipe *pipe)
pipe->pipe_state &= ~PIPE_LOCKFL;
if (pipe->pipe_state & PIPE_LWANT) {
pipe->pipe_state &= ~PIPE_LWANT;
wakeup(pipe);
cv_broadcast(&pipe->pipe_lkcv);
}
}
@ -410,7 +413,7 @@ pipe_read(struct file *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
size_t size;
size_t ocnt;
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
++rpipe->pipe_busy;
ocnt = bp->cnt;
@ -471,10 +474,10 @@ again:
rpipe->pipe_map.pos += size;
rpipe->pipe_map.cnt -= size;
if (rpipe->pipe_map.cnt == 0) {
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
rpipe->pipe_state &= ~PIPE_DIRECTR;
wakeup(rpipe);
PIPE_UNLOCK(rpipe);
cv_broadcast(&rpipe->pipe_cv);
mutex_exit(&rpipe->pipe_lock);
}
#endif
} else {
@ -484,14 +487,14 @@ again:
if (nread > 0)
break;
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
/*
* detect EOF condition
* read returns 0 on EOF, no need to set error
*/
if (rpipe->pipe_state & PIPE_EOF) {
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
break;
}
@ -499,7 +502,7 @@ again:
* don't block on non-blocking I/O
*/
if (fp->f_flag & FNONBLOCK) {
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
error = EAGAIN;
break;
}
@ -529,13 +532,13 @@ again:
*/
if (rpipe->pipe_state & PIPE_WANTW) {
rpipe->pipe_state &= ~PIPE_WANTW;
wakeup(rpipe);
cv_broadcast(&rpipe->pipe_cv);
}
/* Now wait until the pipe is filled */
rpipe->pipe_state |= PIPE_WANTR;
error = ltsleep(rpipe, PSOCK | PCATCH,
"piperd", 0, &rpipe->pipe_slock);
error = cv_wait_sig(&rpipe->pipe_cv,
&rpipe->pipe_lock);
if (error != 0)
goto unlocked_error;
goto again;
@ -545,7 +548,7 @@ again:
if (error == 0)
getmicrotime(&rpipe->pipe_atime);
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
pipeunlock(rpipe);
unlocked_error:
@ -556,14 +559,14 @@ unlocked_error:
*/
if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANTCLOSE)) {
rpipe->pipe_state &= ~(PIPE_WANTCLOSE|PIPE_WANTW);
wakeup(rpipe);
cv_broadcast(&rpipe->pipe_cv);
} else if (bp->cnt < MINPIPESIZE) {
/*
* Handle write blocking hysteresis.
*/
if (rpipe->pipe_state & PIPE_WANTW) {
rpipe->pipe_state &= ~PIPE_WANTW;
wakeup(rpipe);
cv_broadcast(&rpipe->pipe_cv);
}
}
@ -578,7 +581,7 @@ unlocked_error:
rpipe->pipe_state &= ~PIPE_SIGNALR;
}
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
return (error);
}
@ -701,18 +704,17 @@ pipe_direct_write(struct file *fp, struct pipe *wpipe, struct uio *uio)
*/
/* Relase the pipe lock while we wait */
PIPE_LOCK(wpipe);
mutex_enter(&wpipe->pipe_lock);
pipeunlock(wpipe);
while (error == 0 && wpipe->pipe_buffer.cnt > 0) {
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
wpipe->pipe_state |= PIPE_WANTW;
error = ltsleep(wpipe, PSOCK | PCATCH, "pipdwc", 0,
&wpipe->pipe_slock);
error = cv_wait_sig(&wpipe->pipe_cv, &wpipe->pipe_lock);
if (error == 0 && wpipe->pipe_state & PIPE_EOF)
error = EPIPE;
}
@ -724,11 +726,10 @@ pipe_direct_write(struct file *fp, struct pipe *wpipe, struct uio *uio)
while (error == 0 && (wpipe->pipe_state & PIPE_DIRECTR)) {
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
pipeselwakeup(wpipe, wpipe, POLL_IN);
error = ltsleep(wpipe, PSOCK | PCATCH, "pipdwt", 0,
&wpipe->pipe_slock);
error = cv_wait_sig(&wpipe->pipe_cv, &wpipe->pipe_lock);
if (error == 0 && wpipe->pipe_state & PIPE_EOF)
error = EPIPE;
}
@ -756,7 +757,7 @@ pipe_direct_write(struct file *fp, struct pipe *wpipe, struct uio *uio)
*/
if (wpipe->pipe_map.cnt == bcnt) {
wpipe->pipe_map.cnt = 0;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
return (error);
}
@ -790,7 +791,7 @@ pipe_write(struct file *fp, off_t *offset, struct uio *uio, kauth_cred_t cred,
retry:
error = 0;
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
wpipe = rpipe->pipe_peer;
/*
@ -798,16 +799,16 @@ retry:
*/
if (wpipe == NULL)
error = EPIPE;
else if (simple_lock_try(&wpipe->pipe_slock) == 0) {
else if (mutex_tryenter(&wpipe->pipe_lock) == 0) {
/* Deal with race for peer */
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
goto retry;
} else if ((wpipe->pipe_state & PIPE_EOF) != 0) {
PIPE_UNLOCK(wpipe);
mutex_exit(&wpipe->pipe_lock);
error = EPIPE;
}
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
if (error != 0)
return (error);
@ -819,9 +820,9 @@ retry:
if (wpipe->pipe_busy == 0
&& (wpipe->pipe_state & PIPE_WANTCLOSE)) {
wpipe->pipe_state &= ~(PIPE_WANTCLOSE | PIPE_WANTR);
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
PIPE_UNLOCK(wpipe);
mutex_exit(&wpipe->pipe_lock);
return (error);
}
@ -854,14 +855,14 @@ retry:
* We break out if a signal occurs or the reader goes away.
*/
while (error == 0 && wpipe->pipe_state & PIPE_DIRECTW) {
PIPE_LOCK(wpipe);
mutex_enter(&wpipe->pipe_lock);
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
pipeunlock(wpipe);
error = ltsleep(wpipe, PSOCK | PCATCH,
"pipbww", 0, &wpipe->pipe_slock);
error = cv_wait_sig(&wpipe->pipe_cv,
&wpipe->pipe_lock);
(void)pipelock(wpipe, 0);
if (wpipe->pipe_state & PIPE_EOF)
@ -967,12 +968,12 @@ retry:
/*
* If the "read-side" has been blocked, wake it up now.
*/
PIPE_LOCK(wpipe);
mutex_enter(&wpipe->pipe_lock);
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
PIPE_UNLOCK(wpipe);
mutex_exit(&wpipe->pipe_lock);
/*
* don't block on non-blocking I/O
@ -989,11 +990,11 @@ retry:
if (bp->cnt)
pipeselwakeup(wpipe, wpipe, POLL_OUT);
PIPE_LOCK(wpipe);
mutex_enter(&wpipe->pipe_lock);
pipeunlock(wpipe);
wpipe->pipe_state |= PIPE_WANTW;
error = ltsleep(wpipe, PSOCK | PCATCH, "pipewr", 0,
&wpipe->pipe_slock);
error = cv_wait_sig(&wpipe->pipe_cv,
&wpipe->pipe_lock);
(void)pipelock(wpipe, 0);
if (error != 0)
break;
@ -1008,11 +1009,11 @@ retry:
}
}
PIPE_LOCK(wpipe);
mutex_enter(&wpipe->pipe_lock);
--wpipe->pipe_busy;
if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANTCLOSE)) {
wpipe->pipe_state &= ~(PIPE_WANTCLOSE | PIPE_WANTR);
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
} else if (bp->cnt > 0) {
/*
* If we have put any characters in the buffer, we wake up
@ -1020,7 +1021,7 @@ retry:
*/
if (wpipe->pipe_state & PIPE_WANTR) {
wpipe->pipe_state &= ~PIPE_WANTR;
wakeup(wpipe);
cv_broadcast(&wpipe->pipe_cv);
}
}
@ -1047,7 +1048,7 @@ retry:
wpipe->pipe_state |= PIPE_SIGNALR;
pipeunlock(wpipe);
PIPE_UNLOCK(wpipe);
mutex_exit(&wpipe->pipe_lock);
return (error);
}
@ -1066,43 +1067,43 @@ pipe_ioctl(struct file *fp, u_long cmd, void *data, struct lwp *l)
return (0);
case FIOASYNC:
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
if (*(int *)data) {
pipe->pipe_state |= PIPE_ASYNC;
} else {
pipe->pipe_state &= ~PIPE_ASYNC;
}
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
return (0);
case FIONREAD:
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
#ifndef PIPE_NODIRECT
if (pipe->pipe_state & PIPE_DIRECTW)
*(int *)data = pipe->pipe_map.cnt;
else
#endif
*(int *)data = pipe->pipe_buffer.cnt;
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
return (0);
case FIONWRITE:
/* Look at other side */
pipe = pipe->pipe_peer;
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
#ifndef PIPE_NODIRECT
if (pipe->pipe_state & PIPE_DIRECTW)
*(int *)data = pipe->pipe_map.cnt;
else
#endif
*(int *)data = pipe->pipe_buffer.cnt;
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
return (0);
case FIONSPACE:
/* Look at other side */
pipe = pipe->pipe_peer;
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
#ifndef PIPE_NODIRECT
/*
* If we're in direct-mode, we don't really have a
@ -1115,7 +1116,7 @@ pipe_ioctl(struct file *fp, u_long cmd, void *data, struct lwp *l)
#endif
*(int *)data = pipe->pipe_buffer.size -
pipe->pipe_buffer.cnt;
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
return (0);
case TIOCSPGRP:
@ -1139,11 +1140,11 @@ pipe_poll(struct file *fp, int events, struct lwp *l)
int revents = 0;
retry:
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
wpipe = rpipe->pipe_peer;
if (wpipe != NULL && simple_lock_try(&wpipe->pipe_slock) == 0) {
if (wpipe != NULL && mutex_tryenter(&wpipe->pipe_lock) == 0) {
/* Deal with race for peer */
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
goto retry;
}
@ -1156,7 +1157,7 @@ retry:
revents |= events & (POLLIN | POLLRDNORM);
eof |= (rpipe->pipe_state & PIPE_EOF);
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
if (wpipe == NULL)
revents |= events & (POLLOUT | POLLWRNORM);
@ -1170,7 +1171,7 @@ retry:
revents |= events & (POLLOUT | POLLWRNORM);
eof |= (wpipe->pipe_state & PIPE_EOF);
PIPE_UNLOCK(wpipe);
mutex_exit(&wpipe->pipe_lock);
}
if (wpipe == NULL || eof)
@ -1258,7 +1259,7 @@ pipeclose(struct file *fp, struct pipe *pipe)
return;
retry:
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
pipeselwakeup(pipe, pipe, POLL_HUP);
@ -1268,9 +1269,9 @@ retry:
*/
pipe->pipe_state |= PIPE_EOF;
while (pipe->pipe_busy) {
wakeup(pipe);
cv_broadcast(&pipe->pipe_cv);
pipe->pipe_state |= PIPE_WANTCLOSE;
ltsleep(pipe, PSOCK, "pipecl", 0, &pipe->pipe_slock);
cv_wait_sig(&pipe->pipe_cv, &pipe->pipe_lock);
}
/*
@ -1278,26 +1279,29 @@ retry:
*/
if ((ppipe = pipe->pipe_peer) != NULL) {
/* Deal with race for peer */
if (simple_lock_try(&ppipe->pipe_slock) == 0) {
PIPE_UNLOCK(pipe);
if (mutex_tryenter(&ppipe->pipe_lock) == 0) {
mutex_exit(&pipe->pipe_lock);
goto retry;
}
pipeselwakeup(ppipe, ppipe, POLL_HUP);
ppipe->pipe_state |= PIPE_EOF;
wakeup(ppipe);
cv_broadcast(&ppipe->pipe_cv);
ppipe->pipe_peer = NULL;
PIPE_UNLOCK(ppipe);
mutex_exit(&ppipe->pipe_lock);
}
KASSERT((pipe->pipe_state & PIPE_LOCKFL) == 0);
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
/*
* free resources
*/
pipe_free_kmem(pipe);
mutex_destroy(&pipe->pipe_lock);
cv_destroy(&pipe->pipe_cv);
cv_destroy(&pipe->pipe_lkcv);
pool_put(&pipe_pool, pipe);
}
@ -1327,9 +1331,9 @@ filt_pipedetach(struct knote *kn)
panic("filt_pipedetach: inconsistent knote");
#endif
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
SLIST_REMOVE(&pipe->pipe_sel.sel_klist, kn, knote, kn_selnext);
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
}
/*ARGSUSED*/
@ -1340,7 +1344,7 @@ filt_piperead(struct knote *kn, long hint)
struct pipe *wpipe = rpipe->pipe_peer;
if ((hint & NOTE_SUBMIT) == 0)
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
kn->kn_data = rpipe->pipe_buffer.cnt;
if ((kn->kn_data == 0) && (rpipe->pipe_state & PIPE_DIRECTW))
kn->kn_data = rpipe->pipe_map.cnt;
@ -1350,11 +1354,11 @@ filt_piperead(struct knote *kn, long hint)
(wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
kn->kn_flags |= EV_EOF;
if ((hint & NOTE_SUBMIT) == 0)
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
return (1);
}
if ((hint & NOTE_SUBMIT) == 0)
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
return (kn->kn_data > 0);
}
@ -1366,13 +1370,13 @@ filt_pipewrite(struct knote *kn, long hint)
struct pipe *wpipe = rpipe->pipe_peer;
if ((hint & NOTE_SUBMIT) == 0)
PIPE_LOCK(rpipe);
mutex_enter(&rpipe->pipe_lock);
/* XXXSMP: race for peer */
if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) {
kn->kn_data = 0;
kn->kn_flags |= EV_EOF;
if ((hint & NOTE_SUBMIT) == 0)
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
return (1);
}
kn->kn_data = wpipe->pipe_buffer.size - wpipe->pipe_buffer.cnt;
@ -1380,7 +1384,7 @@ filt_pipewrite(struct knote *kn, long hint)
kn->kn_data = 0;
if ((hint & NOTE_SUBMIT) == 0)
PIPE_UNLOCK(rpipe);
mutex_exit(&rpipe->pipe_lock);
return (kn->kn_data >= PIPE_BUF);
}
@ -1414,9 +1418,9 @@ pipe_kqfilter(struct file *fp, struct knote *kn)
}
kn->kn_hook = pipe;
PIPE_LOCK(pipe);
mutex_enter(&pipe->pipe_lock);
SLIST_INSERT_HEAD(&pipe->pipe_sel.sel_klist, kn, kn_selnext);
PIPE_UNLOCK(pipe);
mutex_exit(&pipe->pipe_lock);
return (0);
}

View File

@ -1,4 +1,4 @@
/* $NetBSD: pipe.h,v 1.19 2007/03/04 06:03:41 christos Exp $ */
/* $NetBSD: pipe.h,v 1.20 2007/03/12 16:20:53 ad Exp $ */
/*
* Copyright (c) 1996 John S. Dyson
@ -104,7 +104,9 @@ struct pipemapping {
* Two of these are linked together to produce bi-directional pipes.
*/
struct pipe {
struct simplelock pipe_slock; /* pipe mutex */
kmutex_t pipe_lock; /* pipe mutex */
kcondvar_t pipe_cv; /* general synchronization */
kcondvar_t pipe_lkcv; /* locking */
struct pipebuf pipe_buffer; /* data storage */
struct pipemapping pipe_map; /* pipe mapping for direct I/O */
struct selinfo pipe_sel; /* for compat with select */
@ -138,9 +140,6 @@ struct pipe {
#ifdef _KERNEL
int sysctl_dopipe(int *, u_int, void *, size_t *, void *, size_t);
#define PIPE_LOCK(pipe) simple_lock(&(pipe)->pipe_slock);
#define PIPE_UNLOCK(pipe) simple_unlock(&(pipe)->pipe_slock);
#endif /* _KERNEL */
#endif /* !_SYS_PIPE_H_ */