diff --git a/share/man/man9/workqueue.9 b/share/man/man9/workqueue.9 index e567b35d8c80..c1b8327cba1c 100644 --- a/share/man/man9/workqueue.9 +++ b/share/man/man9/workqueue.9 @@ -1,4 +1,4 @@ -.\" $NetBSD: workqueue.9,v 1.12 2017/12/28 07:00:52 ozaki-r Exp $ +.\" $NetBSD: workqueue.9,v 1.12.6.1 2024/04/18 15:51:36 martin Exp $ .\" .\" Copyright (c)2005 YAMAMOTO Takashi, .\" All rights reserved. @@ -128,11 +128,11 @@ waits for a specified work on the workqueue .Fa wq to finish. -The caller must ensure that no new work will be enqueued to the workqueue -beforehand. -Note that if the workqueue is -.Dv WQ_PERCPU , -the caller can enqueue a new work to another queue other than the waiting queue. +The caller must ensure that +.Fa wk +will not be enqueued to the workqueue again until after +.Fn workqueue_wait +returns. .Pp .\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - .Fn workqueue_destroy diff --git a/sys/kern/subr_workqueue.c b/sys/kern/subr_workqueue.c index 869ccf488d2c..d58578a0c35f 100644 --- a/sys/kern/subr_workqueue.c +++ b/sys/kern/subr_workqueue.c @@ -1,4 +1,4 @@ -/* $NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $ */ +/* $NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi, @@ -27,18 +27,20 @@ */ #include -__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37 2018/06/13 05:26:12 ozaki-r Exp $"); +__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.37.6.1 2024/04/18 15:51:35 martin Exp $"); #include -#include -#include -#include -#include -#include -#include -#include + #include +#include +#include +#include +#include +#include #include +#include +#include +#include typedef struct work_impl { SIMPLEQ_ENTRY(work_impl) wk_entry; @@ -50,9 +52,8 @@ struct workqueue_queue { kmutex_t q_mutex; kcondvar_t q_cv; struct workqhead q_queue_pending; - struct workqhead q_queue_running; + uint64_t q_gen; lwp_t *q_worker; - work_impl_t *q_waiter; }; struct workqueue { @@ -70,6 +71,49 @@ struct workqueue { #define POISON 0xaabbccdd +SDT_PROBE_DEFINE7(sdt, kernel, workqueue, create, + "struct workqueue *"/*wq*/, + "const char *"/*name*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/, + "pri_t"/*prio*/, + "int"/*ipl*/, + "int"/*flags*/); +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, destroy, + "struct workqueue *"/*wq*/); + +SDT_PROBE_DEFINE3(sdt, kernel, workqueue, enqueue, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "struct cpu_info *"/*ci*/); +SDT_PROBE_DEFINE4(sdt, kernel, workqueue, entry, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/); +SDT_PROBE_DEFINE4(sdt, kernel, workqueue, return, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/, + "void (*)(struct work *, void *)"/*func*/, + "void *"/*arg*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__start, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__self, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__hit, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); +SDT_PROBE_DEFINE2(sdt, kernel, workqueue, wait__done, + "struct workqueue *"/*wq*/, + "struct work *"/*wk*/); + +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__start, + "struct workqueue *"/*wq*/); +SDT_PROBE_DEFINE1(sdt, kernel, workqueue, exit__done, + "struct workqueue *"/*wq*/); + static size_t workqueue_size(int flags) { @@ -97,13 +141,13 @@ workqueue_runlist(struct workqueue *wq, struct workqhead *list) work_impl_t *wk; work_impl_t *next; - /* - * note that "list" is not a complete SIMPLEQ. - */ - for (wk = SIMPLEQ_FIRST(list); wk != NULL; wk = next) { next = SIMPLEQ_NEXT(wk, wk_entry); + SDT_PROBE4(sdt, kernel, workqueue, entry, + wq, wk, wq->wq_func, wq->wq_arg); (*wq->wq_func)((void *)wk, wq->wq_arg); + SDT_PROBE4(sdt, kernel, workqueue, return, + wq, wk, wq->wq_func, wq->wq_arg); } } @@ -116,31 +160,36 @@ workqueue_worker(void *cookie) /* find the workqueue of this kthread */ q = workqueue_queue_lookup(wq, curlwp->l_cpu); + mutex_enter(&q->q_mutex); for (;;) { - /* - * we violate abstraction of SIMPLEQ. - */ + struct workqhead tmp; + + SIMPLEQ_INIT(&tmp); - mutex_enter(&q->q_mutex); while (SIMPLEQ_EMPTY(&q->q_queue_pending)) cv_wait(&q->q_cv, &q->q_mutex); - KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running)); - q->q_queue_running.sqh_first = - q->q_queue_pending.sqh_first; /* XXX */ + SIMPLEQ_CONCAT(&tmp, &q->q_queue_pending); SIMPLEQ_INIT(&q->q_queue_pending); + + /* + * Mark the queue as actively running a batch of work + * by setting the generation number odd. + */ + q->q_gen |= 1; mutex_exit(&q->q_mutex); - workqueue_runlist(wq, &q->q_queue_running); + workqueue_runlist(wq, &tmp); + /* + * Notify workqueue_wait that we have completed a batch + * of work by incrementing the generation number. + */ mutex_enter(&q->q_mutex); - KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running)); - SIMPLEQ_INIT(&q->q_queue_running); - if (__predict_false(q->q_waiter != NULL)) { - /* Wake up workqueue_wait */ - cv_signal(&q->q_cv); - } - mutex_exit(&q->q_mutex); + KASSERTMSG(q->q_gen & 1, "q=%p gen=%"PRIu64, q, q->q_gen); + q->q_gen++; + cv_broadcast(&q->q_cv); } + mutex_exit(&q->q_mutex); } static void @@ -168,7 +217,7 @@ workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q, mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl); cv_init(&q->q_cv, wq->wq_name); SIMPLEQ_INIT(&q->q_queue_pending); - SIMPLEQ_INIT(&q->q_queue_running); + q->q_gen = 0; ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0); if (wq->wq_prio < PRI_KERNEL) ktf |= KTHREAD_TS; @@ -206,7 +255,7 @@ workqueue_exit(struct work *wk, void *arg) KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending)); mutex_enter(&q->q_mutex); q->q_worker = NULL; - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); mutex_exit(&q->q_mutex); kthread_exit(0); } @@ -223,7 +272,7 @@ workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q) KASSERT(q->q_worker != NULL); mutex_enter(&q->q_mutex); SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry); - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); while (q->q_worker != NULL) { cv_wait(&q->q_cv, &q->q_mutex); } @@ -281,33 +330,56 @@ workqueue_create(struct workqueue **wqp, const char *name, } static bool -workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target) +workqueue_q_wait(struct workqueue *wq, struct workqueue_queue *q, + work_impl_t *wk_target) { work_impl_t *wk; bool found = false; + uint64_t gen; mutex_enter(&q->q_mutex); - if (q->q_worker == curlwp) + + /* + * Avoid a deadlock scenario. We can't guarantee that + * wk_target has completed at this point, but we can't wait for + * it either, so do nothing. + * + * XXX Are there use-cases that require this semantics? + */ + if (q->q_worker == curlwp) { + SDT_PROBE2(sdt, kernel, workqueue, wait__self, wq, wk_target); goto out; + } + + /* + * Wait until the target is no longer pending. If we find it + * on this queue, the caller can stop looking in other queues. + * If we don't find it in this queue, however, we can't skip + * waiting -- it may be hidden in the running queue which we + * have no access to. + */ again: SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) { - if (wk == wk_target) - goto found; + if (wk == wk_target) { + SDT_PROBE2(sdt, kernel, workqueue, wait__hit, wq, wk); + found = true; + cv_wait(&q->q_cv, &q->q_mutex); + goto again; + } } - SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) { - if (wk == wk_target) - goto found; + + /* + * The target may be in the batch of work currently running, + * but we can't touch that queue. So if there's anything + * running, wait until the generation changes. + */ + gen = q->q_gen; + if (gen & 1) { + do + cv_wait(&q->q_cv, &q->q_mutex); + while (gen == q->q_gen); } - found: - if (wk != NULL) { - found = true; - KASSERT(q->q_waiter == NULL); - q->q_waiter = wk; - cv_wait(&q->q_cv, &q->q_mutex); - goto again; - } - if (q->q_waiter != NULL) - q->q_waiter = NULL; + out: mutex_exit(&q->q_mutex); @@ -326,19 +398,23 @@ workqueue_wait(struct workqueue *wq, struct work *wk) struct workqueue_queue *q; bool found; + ASSERT_SLEEPABLE(); + + SDT_PROBE2(sdt, kernel, workqueue, wait__start, wq, wk); if (ISSET(wq->wq_flags, WQ_PERCPU)) { struct cpu_info *ci; CPU_INFO_ITERATOR cii; for (CPU_INFO_FOREACH(cii, ci)) { q = workqueue_queue_lookup(wq, ci); - found = workqueue_q_wait(q, (work_impl_t *)wk); + found = workqueue_q_wait(wq, q, (work_impl_t *)wk); if (found) break; } } else { q = workqueue_queue_lookup(wq, NULL); - (void) workqueue_q_wait(q, (work_impl_t *)wk); + (void)workqueue_q_wait(wq, q, (work_impl_t *)wk); } + SDT_PROBE2(sdt, kernel, workqueue, wait__done, wq, wk); } void @@ -348,6 +424,9 @@ workqueue_destroy(struct workqueue *wq) struct cpu_info *ci; CPU_INFO_ITERATOR cii; + ASSERT_SLEEPABLE(); + + SDT_PROBE1(sdt, kernel, workqueue, exit__start, wq); wq->wq_func = workqueue_exit; for (CPU_INFO_FOREACH(cii, ci)) { q = workqueue_queue_lookup(wq, ci); @@ -355,6 +434,7 @@ workqueue_destroy(struct workqueue *wq) workqueue_finiqueue(wq, q); } } + SDT_PROBE1(sdt, kernel, workqueue, exit__done, wq); kmem_free(wq->wq_ptr, workqueue_size(wq->wq_flags)); } @@ -377,15 +457,16 @@ workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci) struct workqueue_queue *q; work_impl_t *wk = (void *)wk0; + SDT_PROBE3(sdt, kernel, workqueue, enqueue, wq, wk0, ci); + KASSERT(wq->wq_flags & WQ_PERCPU || ci == NULL); q = workqueue_queue_lookup(wq, ci); mutex_enter(&q->q_mutex); - KASSERT(q->q_waiter == NULL); #ifdef DEBUG workqueue_check_duplication(q, wk); #endif SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry); - cv_signal(&q->q_cv); + cv_broadcast(&q->q_cv); mutex_exit(&q->q_mutex); } diff --git a/tests/rump/kernspace/kernspace.h b/tests/rump/kernspace/kernspace.h index 4126ba4478d2..1f098e1961e1 100644 --- a/tests/rump/kernspace/kernspace.h +++ b/tests/rump/kernspace/kernspace.h @@ -1,4 +1,4 @@ -/* $NetBSD: kernspace.h,v 1.8 2018/12/28 19:54:36 thorpej Exp $ */ +/* $NetBSD: kernspace.h,v 1.8.2.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2010, 2018 The NetBSD Foundation, Inc. @@ -42,6 +42,7 @@ void rumptest_alloc(size_t); void rumptest_lockme(enum locktest); void rumptest_workqueue1(void); void rumptest_workqueue_wait(void); +void rumptest_workqueue_wait_pause(void); void rumptest_sendsig(char *); void rumptest_localsig(int); diff --git a/tests/rump/kernspace/workqueue.c b/tests/rump/kernspace/workqueue.c index 696c8ef7403e..da0dbd975126 100644 --- a/tests/rump/kernspace/workqueue.c +++ b/tests/rump/kernspace/workqueue.c @@ -1,4 +1,4 @@ -/* $NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $ */ +/* $NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2017 The NetBSD Foundation, Inc. @@ -29,7 +29,7 @@ #include #if !defined(lint) -__RCSID("$NetBSD: workqueue.c,v 1.6 2017/12/28 07:46:34 ozaki-r Exp $"); +__RCSID("$NetBSD: workqueue.c,v 1.6.8.1 2024/04/18 15:51:35 martin Exp $"); #endif /* !lint */ #include @@ -48,13 +48,19 @@ struct test_softc { struct workqueue *wq; struct work wk; int counter; -}; - + bool pause; +}; + static void rump_work1(struct work *wk, void *arg) { struct test_softc *sc = arg; + memset(wk, 0x5a, sizeof(*wk)); + + if (sc->pause) + kpause("tstwk1", /*intr*/false, /*timo*/2, /*lock*/NULL); + mutex_enter(&sc->mtx); ++sc->counter; cv_broadcast(&sc->cv); @@ -137,3 +143,34 @@ rumptest_workqueue_wait(void) destroy_sc(sc); #undef ITERATIONS } + +void +rumptest_workqueue_wait_pause(void) +{ + struct test_softc *sc; + struct work dummy; + + sc = create_sc(); + sc->pause = true; + +#define ITERATIONS 1 + for (size_t i = 0; i < ITERATIONS; ++i) { + struct work wk; + + KASSERT(sc->counter == i); + workqueue_enqueue(sc->wq, &wk, NULL); + workqueue_enqueue(sc->wq, &sc->wk, NULL); + kpause("tstwk2", /*intr*/false, /*timo*/1, /*lock*/NULL); + workqueue_wait(sc->wq, &sc->wk); + workqueue_wait(sc->wq, &wk); + KASSERT(sc->counter == (i + 2)); + } + + KASSERT(sc->counter == 2*ITERATIONS); + + /* Wait for a work that is not enqueued. Just return immediately. */ + workqueue_wait(sc->wq, &dummy); + + destroy_sc(sc); +#undef ITERATIONS +} diff --git a/tests/rump/rumpkern/Makefile b/tests/rump/rumpkern/Makefile index 46baa9540f35..2f897a9e0636 100644 --- a/tests/rump/rumpkern/Makefile +++ b/tests/rump/rumpkern/Makefile @@ -1,4 +1,4 @@ -# $NetBSD: Makefile,v 1.18 2018/12/26 14:27:23 thorpej Exp $ +# $NetBSD: Makefile,v 1.18.2.1 2024/04/18 15:51:35 martin Exp $ .include @@ -25,8 +25,8 @@ LDADD.t_modlinkset+= -lukfs -lrumpdev_disk -lrumpdev -lrumpfs_msdos LDADD.t_modlinkset+= -lrumpfs_cd9660 ${ADD_TO_LD} LDADD+= ${ADD_TO_LD} -KERNSPACE != cd ${.CURDIR}/../kernspace && ${PRINTOBJDIR} -LDADD+= -L${KERNSPACE} -lkernspace -lrump +PROGDPLIBS+= kernspace ${.CURDIR}/../kernspace +LDADD+= -lrump WARNS= 4 diff --git a/tests/rump/rumpkern/t_workqueue.c b/tests/rump/rumpkern/t_workqueue.c index 688ce4fa9d54..5ca8d694f109 100644 --- a/tests/rump/rumpkern/t_workqueue.c +++ b/tests/rump/rumpkern/t_workqueue.c @@ -1,4 +1,4 @@ -/* $NetBSD: t_workqueue.c,v 1.2 2017/12/28 07:10:26 ozaki-r Exp $ */ +/* $NetBSD: t_workqueue.c,v 1.2.8.1 2024/04/18 15:51:35 martin Exp $ */ /*- * Copyright (c) 2017 The NetBSD Foundation, Inc. @@ -72,10 +72,36 @@ ATF_TC_BODY(workqueue_wait, tc) rump_unschedule(); } +static void +sigsegv(int signo) +{ + atf_tc_fail("SIGSEGV"); +} + +ATF_TC(workqueue_wait_pause); +ATF_TC_HEAD(workqueue_wait_pause, tc) +{ + + atf_tc_set_md_var(tc, "descr", "Checks workqueue_wait with pause"); +} + +ATF_TC_BODY(workqueue_wait_pause, tc) +{ + + REQUIRE_LIBC(signal(SIGSEGV, &sigsegv), SIG_ERR); + + rump_init(); + + rump_schedule(); + rumptest_workqueue_wait_pause(); /* panics or SIGSEGVs if fails */ + rump_unschedule(); +} + ATF_TP_ADD_TCS(tp) { ATF_TP_ADD_TC(tp, workqueue1); ATF_TP_ADD_TC(tp, workqueue_wait); + ATF_TP_ADD_TC(tp, workqueue_wait_pause); return atf_no_error(); }