Pull up following revision(s) (requested by riastradh in ticket #1830):

sys/kern/subr_workqueue.c: revision 1.40
	sys/kern/subr_workqueue.c: revision 1.41
	sys/kern/subr_workqueue.c: revision 1.42
	sys/kern/subr_workqueue.c: revision 1.43
	sys/kern/subr_workqueue.c: revision 1.44
	sys/kern/subr_workqueue.c: revision 1.45
	sys/kern/subr_workqueue.c: revision 1.46
	tests/rump/kernspace/workqueue.c: revision 1.7
	sys/kern/subr_workqueue.c: revision 1.47
	tests/rump/kernspace/workqueue.c: revision 1.8
	tests/rump/kernspace/workqueue.c: revision 1.9
	tests/rump/rumpkern/t_workqueue.c: revision 1.3
	tests/rump/rumpkern/t_workqueue.c: revision 1.4
	tests/rump/kernspace/kernspace.h: revision 1.9
	tests/rump/rumpkern/Makefile: revision 1.20
	sys/kern/subr_workqueue.c: revision 1.39
	share/man/man9/workqueue.9: revision 1.15
	(all via patch)

workqueue: Lift unnecessary restriction on workqueue_wait.

Allow multiple concurrent waits at a time, and allow enqueueing work
at the same time (as long as it's not the work we're waiting for).

This way multiple users can use a shared global workqueue and safely
wait for individual work items concurrently, while the workqueue is
still in use for other items (e.g., wg(4) peers).

This has the side effect of taking away a diagnostic measure, but I
think allowing the diagnostic's false positives instead of rejecting
them is worth it.  We could cheaply add it back with some false
negatives if it's important.
workqueue(9): workqueue_wait and workqueue_destroy may sleep.

But might not, so assert sleepable up front.
workqueue(9): Sprinkle dtrace probes.
tests/rump/rumpkern: Use PROGDPLIBS, not explicit -L/-l.

This way we relink the t_* test programs whenever changes under
tests/rump/kernspace change libkernspace.a.

workqueue(9) tests: Nix trailing whitespace.

workqueue(9) tests: Destroy struct work immediately on entry.

workqueue(9) tests: Add test for PR kern/57574.

workqueue(9): Avoid touching running work items in workqueue_wait.

As soon as the workqueue function has called, it is forbidden to
touch the struct work passed to it -- the function might free or
reuse the data structure it is embedded in.

So workqueue_wait is forbidden to search the queue for the batch of
running work items.  Instead, use a generation number which is odd
while the thread is processing a batch of work and even when not.
There's still a small optimization available with the struct work
pointer to wait for: if we find the work item in one of the per-CPU
_pending_ queues, then after we wait for a batch of work to complete
on that CPU, we don't need to wait for work on any other CPUs.
PR kern/57574

workqueue(9): Sprinkle dtrace probes for workqueue_wait edge cases.

Let's make it easy to find out whether these are hit.

workqueue(9): Stop violating queue(3) internals.

workqueue(9): Avoid unnecessary mutex_exit/enter cycle each loop.

workqueue(9): Sort includes.
No functional change intended.

workqueue(9): Factor out wq->wq_flags & WQ_FPU in workqueue_worker.
No functional change intended.  Makes it clearer that s is
initialized when used.
This commit is contained in:
martin 2024-04-18 15:51:35 +00:00
parent 2fda30a8a1
commit c390936c21
6 changed files with 214 additions and 69 deletions

View File

@ -1,4 +1,4 @@
.\" $NetBSD: workqueue.9,v 1.12 2017/12/28 07:00:52 ozaki-r Exp $
.\" $NetBSD: workqueue.9,v 1.12.6.1 2024/04/18 15:51:36 martin Exp $
.\"
.\" Copyright (c)2005 YAMAMOTO Takashi,
.\" All rights reserved.
@ -128,11 +128,11 @@ waits for a specified work
on the workqueue
.Fa wq
to finish.
The caller must ensure that no new work will be enqueued to the workqueue
beforehand.
Note that if the workqueue is
.Dv WQ_PERCPU ,
the caller can enqueue a new work to another queue other than the waiting queue.
The caller must ensure that
.Fa wk
will not be enqueued to the workqueue again until after
.Fn workqueue_wait
returns.
.Pp
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Fn workqueue_destroy

View File

@ -1,4 +1,4 @@
/* $NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $ */
/* $NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $ */
/*-
* Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
@ -27,18 +27,20 @@
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $");
__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $");
#include <sys/param.h>
#include <sys/cpu.h>
#include <sys/systm.h>
#include <sys/kthread.h>
#include <sys/kmem.h>
#include <sys/proc.h>
#include <sys/workqueue.h>
#include <sys/mutex.h>
#include <sys/condvar.h>
#include <sys/cpu.h>
#include <sys/kmem.h>
#include <sys/kthread.h>
#include <sys/mutex.h>
#include <sys/proc.h>
#include <sys/queue.h>
#include <sys/sdt.h>
#include <sys/systm.h>
#include <sys/workqueue.h>
typedef struct work_impl {
SIMPLEQ_ENTRY(work_impl) wk_entry;
@ -50,9 +52,8 @@ struct workqueue_queue {
kmutex_t q_mutex;
kcondvar_t q_cv;
struct workqhead q_queue_pending;
struct workqhead q_queue_running;
uint64_t q_gen;
lwp_t *q_worker;
work_impl_t *q_waiter;
};
struct workqueue {
@ -70,6 +71,49 @@ struct workqueue {
#define POISON 0xaabbccdd
SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create,
"struct workqueue *"/*wq*/,
"const char *"/*name*/,
"void (*)(struct work *, void *)"/*func*/,
"void *"/*arg*/,
"pri_t"/*prio*/,
"int"/*ipl*/,
"int"/*flags*/);
SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy,
"struct workqueue *"/*wq*/);
SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/,
"struct cpu_info *"/*ci*/);
SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/,
"void (*)(struct work *, void *)"/*func*/,
"void *"/*arg*/);
SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/,
"void (*)(struct work *, void *)"/*func*/,
"void *"/*arg*/);
SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/);
SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/);
SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/);
SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done,
"struct workqueue *"/*wq*/,
"struct work *"/*wk*/);
SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start,
"struct workqueue *"/*wq*/);
SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done,
"struct workqueue *"/*wq*/);
static size_t
workqueue_size(int flags)
{
@ -97,13 +141,13 @@ workqueue_runlist(struct workqueue *wq, struct workqhead *list)
work_impl_t *wk;
work_impl_t *next;
/*
* note that "list" is not a complete SIMPLEQ.
*/
for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) {
next = SIMPLEQ_NEXT(wk, wk_entry);
SDT_PROBE4(sdt, kernel, workqueue, entry,
wq, wk, wq->wq_func, wq->wq_arg);
(*wq->wq_func)((void *)wk, wq->wq_arg);
SDT_PROBE4(sdt, kernel, workqueue, return,
wq, wk, wq->wq_func, wq->wq_arg);
}
}
@ -116,31 +160,36 @@ workqueue_worker(void *cookie)
/* find the workqueue of this kthread */
q = workqueue_queue_lookup(wq, curlwp->l_cpu);
mutex_enter(&q->q_mutex);
for (;;) {
/*
* we violate abstraction of SIMPLEQ.
*/
struct workqhead tmp;
SIMPLEQ_INIT(&tmp);
mutex_enter(&q->q_mutex);
while (SIMPLEQ_EMPTY(&q->q_queue_pending))
cv_wait(&q->q_cv, &q->q_mutex);
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
q->q_queue_running.sqh_first =
q->q_queue_pending.sqh_first; /* XXX */
SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending);
SIMPLEQ_INIT(&q->q_queue_pending);
/*
* Mark the queue as actively running a batch of work
* by setting the generation number odd.
*/
q->q_gen |= 1;
mutex_exit(&q->q_mutex);
workqueue_runlist(wq, &q->q_queue_running);
workqueue_runlist(wq, &tmp);
/*
* Notify workqueue_wait that we have completed a batch
* of work by incrementing the generation number.
*/
mutex_enter(&q->q_mutex);
KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
SIMPLEQ_INIT(&q->q_queue_running);
if (__predict_false(q->q_waiter != NULL)) {
/* Wake up workqueue_wait */
cv_signal(&q->q_cv);
}
mutex_exit(&q->q_mutex);
KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen);
q->q_gen++;
cv_broadcast(&q->q_cv);
}
mutex_exit(&q->q_mutex);
}
static void
@ -168,7 +217,7 @@ workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
cv_init(&q->q_cv, wq->wq_name);
SIMPLEQ_INIT(&q->q_queue_pending);
SIMPLEQ_INIT(&q->q_queue_running);
q->q_gen = 0;
ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
if (wq->wq_prio < PRI_KERNEL)
ktf |= KTHREAD_TS;
@ -206,7 +255,7 @@ workqueue_exit(struct work *wk, void *arg)
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
mutex_enter(&q->q_mutex);
q->q_worker = NULL;
cv_signal(&q->q_cv);
cv_broadcast(&q->q_cv);
mutex_exit(&q->q_mutex);
kthread_exit(0);
}
@ -223,7 +272,7 @@ workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
KASSERT(q->q_worker != NULL);
mutex_enter(&q->q_mutex);
SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
cv_signal(&q->q_cv);
cv_broadcast(&q->q_cv);
while (q->q_worker != NULL) {
cv_wait(&q->q_cv, &q->q_mutex);
}
@ -281,33 +330,56 @@ workqueue_create(struct workqueue **wqp, const char *name,
}
static bool
workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target)
workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q,
work_impl_t *wk_target)
{
work_impl_t *wk;
bool found = false;
uint64_t gen;
mutex_enter(&q->q_mutex);
if (q->q_worker == curlwp)
/*
* Avoid a deadlock scenario. We can't guarantee that
* wk_target has completed at this point, but we can't wait for
* it either, so do nothing.
*
* XXX Are there use-cases that require this semantics?
*/
if (q->q_worker == curlwp) {
SDT_PROBE2(sdt, kernel, workqueue, wait__self, wq, wk_target);
goto out;
}
/*
* Wait until the target is no longer pending. If we find it
* on this queue, the caller can stop looking in other queues.
* If we don't find it in this queue, however, we can't skip
* waiting -- it may be hidden in the running queue which we
* have no access to.
*/
again:
SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
if (wk == wk_target)
goto found;
if (wk == wk_target) {
SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk);
found = true;
cv_wait(&q->q_cv, &q->q_mutex);
goto again;
}
}
SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) {
if (wk == wk_target)
goto found;
/*
* The target may be in the batch of work currently running,
* but we can't touch that queue. So if there's anything
* running, wait until the generation changes.
*/
gen = q->q_gen;
if (gen & 1) {
do
cv_wait(&q->q_cv, &q->q_mutex);
while (gen == q->q_gen);
}
found:
if (wk != NULL) {
found = true;
KASSERT(q->q_waiter == NULL);
q->q_waiter = wk;
cv_wait(&q->q_cv, &q->q_mutex);
goto again;
}
if (q->q_waiter != NULL)
q->q_waiter = NULL;
out:
mutex_exit(&q->q_mutex);
@ -326,19 +398,23 @@ workqueue_wait(struct workqueue *wq, struct work *wk)
struct workqueue_queue *q;
bool found;
ASSERT_SLEEPABLE();
SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk);
if (ISSET(wq->wq_flags, WQ_PERCPU)) {
struct cpu_info *ci;
CPU_INFO_ITERATOR cii;
for (CPU_INFO_FOREACH(cii, ci)) {
q = workqueue_queue_lookup(wq, ci);
found = workqueue_q_wait(q, (work_impl_t *)wk);
found = workqueue_q_wait(wq, q, (work_impl_t *)wk);
if (found)
break;
}
} else {
q = workqueue_queue_lookup(wq, NULL);
(void) workqueue_q_wait(q, (work_impl_t *)wk);
(void)workqueue_q_wait(wq, q, (work_impl_t *)wk);
}
SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk);
}
void
@ -348,6 +424,9 @@ workqueue_destroy(struct workqueue *wq)
struct cpu_info *ci;
CPU_INFO_ITERATOR cii;
ASSERT_SLEEPABLE();
SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq);
wq->wq_func = workqueue_exit;
for (CPU_INFO_FOREACH(cii, ci)) {
q = workqueue_queue_lookup(wq, ci);
@ -355,6 +434,7 @@ workqueue_destroy(struct workqueue *wq)
workqueue_finiqueue(wq, q);
}
}
SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq);
kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags));
}
@ -377,15 +457,16 @@ workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
struct workqueue_queue *q;
work_impl_t *wk = (void *)wk0;
SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci);
KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL);
q = workqueue_queue_lookup(wq, ci);
mutex_enter(&q->q_mutex);
KASSERT(q->q_waiter == NULL);
#ifdef DEBUG
workqueue_check_duplication(q, wk);
#endif
SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
cv_signal(&q->q_cv);
cv_broadcast(&q->q_cv);
mutex_exit(&q->q_mutex);
}

View File

@ -1,4 +1,4 @@
/* $NetBSD: kernspace.h,v 1.8 2018/12/28 19:54:36 thorpej Exp $ */
/* $NetBSD: kernspace.h,v 1.8.2.1 2024/04/18 15:51:35 martin Exp $ */
/*-
* Copyright (c) 2010, 2018 The NetBSD Foundation, Inc.
@ -42,6 +42,7 @@ void rumptest_alloc(size_t);
void rumptest_lockme(enum locktest);
void rumptest_workqueue1(void);
void rumptest_workqueue_wait(void);
void rumptest_workqueue_wait_pause(void);
void rumptest_sendsig(char *);
void rumptest_localsig(int);

View File

@ -1,4 +1,4 @@
/* $NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $ */
/* $NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $ */
/*-
* Copyright (c) 2017 The NetBSD Foundation, Inc.
@ -29,7 +29,7 @@
#include <sys/cdefs.h>
#if !defined(lint)
__RCSID("$NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $");
__RCSID("$NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $");
#endif /* !lint */
#include <sys/param.h>
@ -48,13 +48,19 @@ struct test_softc {
struct workqueue *wq;
struct work wk;
int counter;
};
bool pause;
};
static void
rump_work1(struct work *wk, void *arg)
{
struct test_softc *sc = arg;
memset(wk, 0x5a, sizeof(*wk));
if (sc->pause)
kpause("tstwk1", /*intr*/false, /*timo*/2, /*lock*/NULL);
mutex_enter(&sc->mtx);
++sc->counter;
cv_broadcast(&sc->cv);
@ -137,3 +143,34 @@ rumptest_workqueue_wait(void)
destroy_sc(sc);
#undef ITERATIONS
}
void
rumptest_workqueue_wait_pause(void)
{
struct test_softc *sc;
struct work dummy;
sc = create_sc();
sc->pause = true;
#define ITERATIONS 1
for (size_t i = 0; i < ITERATIONS; ++i) {
struct work wk;
KASSERT(sc->counter == i);
workqueue_enqueue(sc->wq, &wk, NULL);
workqueue_enqueue(sc->wq, &sc->wk, NULL);
kpause("tstwk2", /*intr*/false, /*timo*/1, /*lock*/NULL);
workqueue_wait(sc->wq, &sc->wk);
workqueue_wait(sc->wq, &wk);
KASSERT(sc->counter == (i + 2));
}
KASSERT(sc->counter == 2*ITERATIONS);
/* Wait for a work that is not enqueued. Just return immediately. */
workqueue_wait(sc->wq, &dummy);
destroy_sc(sc);
#undef ITERATIONS
}

View File

@ -1,4 +1,4 @@
# $NetBSD: Makefile,v 1.18 2018/12/26 14:27:23 thorpej Exp $
# $NetBSD: Makefile,v 1.18.2.1 2024/04/18 15:51:35 martin Exp $
.include <bsd.own.mk>
@ -25,8 +25,8 @@ LDADD.t_modlinkset+= -lukfs -lrumpdev_disk -lrumpdev -lrumpfs_msdos
LDADD.t_modlinkset+= -lrumpfs_cd9660 ${ADD_TO_LD}
LDADD+= ${ADD_TO_LD}
KERNSPACE != cd ${.CURDIR}/../kernspace && ${PRINTOBJDIR}
LDADD+= -L${KERNSPACE} -lkernspace -lrump
PROGDPLIBS+= kernspace ${.CURDIR}/../kernspace
LDADD+= -lrump
WARNS= 4

View File

@ -1,4 +1,4 @@
/* $NetBSD: t_workqueue.c,v 1.2 2017/12/28 07:10:26 ozaki-r Exp $ */
/* $NetBSD: t_workqueue.c,v 1.2.8.1 2024/04/18 15:51:35 martin Exp $ */
/*-
* Copyright (c) 2017 The NetBSD Foundation, Inc.
@ -72,10 +72,36 @@ ATF_TC_BODY(workqueue_wait, tc)
rump_unschedule();
}
static void
sigsegv(int signo)
{
atf_tc_fail("SIGSEGV");
}
ATF_TC(workqueue_wait_pause);
ATF_TC_HEAD(workqueue_wait_pause, tc)
{
atf_tc_set_md_var(tc, "descr", "Checks workqueue_wait with pause");
}
ATF_TC_BODY(workqueue_wait_pause, tc)
{
REQUIRE_LIBC(signal(SIGSEGV, &sigsegv), SIG_ERR);
rump_init();
rump_schedule();
rumptest_workqueue_wait_pause(); /* panics or SIGSEGVs if fails */
rump_unschedule();
}
ATF_TP_ADD_TCS(tp)
{
ATF_TP_ADD_TC(tp, workqueue1);
ATF_TP_ADD_TC(tp, workqueue_wait);
ATF_TP_ADD_TC(tp, workqueue_wait_pause);
return atf_no_error();
}