Add workqueue_wait that waits for a specific work to finish

The caller must ensure that no new work is enqueued before calling
workqueue_wait. Note that Note that if the workqueue is WQ_PERCPU, the caller
can enqueue a new work to another queue other than the waiting queue.

Discussed on tech-kern@
This commit is contained in:
ozaki-r 2017-12-28 07:00:52 +00:00
parent 9c3646449e
commit 3e34af79cf
3 changed files with 106 additions and 21 deletions

View File

@ -1,4 +1,4 @@
.\" $NetBSD: workqueue.9,v 1.11 2015/10/13 04:22:24 riastradh Exp $
.\" $NetBSD: workqueue.9,v 1.12 2017/12/28 07:00:52 ozaki-r Exp $
.\"
.\" Copyright (c)2005 YAMAMOTO Takashi,
.\" All rights reserved.
@ -25,7 +25,7 @@
.\" SUCH DAMAGE.
.\"
.\" ------------------------------------------------------------
.Dd October 24, 2011
.Dd December 28, 2017
.Dt WORKQUEUE 9
.Os
.\" ------------------------------------------------------------
@ -47,6 +47,10 @@
"struct workqueue *wq" "struct work *wk" "struct cpu_info *ci"
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Ft void
.Fn workqueue_wait \
"struct workqueue *wq" "struct work *wk"
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Ft void
.Fn workqueue_destroy \
"struct workqueue *wq"
.\" ------------------------------------------------------------
@ -118,6 +122,19 @@ the
framework.
.Pp
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Fn workqueue_wait
waits for a specified work
.Fa wk
on the workqueue
.Fa wq
to finish.
The caller must ensure that no new work will be enqueued to the workqueue
beforehand.
Note that if the workqueue is
.Dv WQ_PERCPU ,
the caller can enqueue a new work to another queue other than the waiting queue.
.Pp
.\" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
.Fn workqueue_destroy
destroys a workqueue and frees associated resources.
The caller should ensure that the workqueue has no work enqueued beforehand.

View File

@ -1,4 +1,4 @@
/* $NetBSD: subr_workqueue.c,v 1.33 2012/10/07 22:16:21 matt Exp $ */
/* $NetBSD: subr_workqueue.c,v 1.34 2017/12/28 07:00:52 ozaki-r Exp $ */
/*-
* Copyright (c)2002, 2005, 2006, 2007 YAMAMOTO Takashi,
@ -27,7 +27,7 @@
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.33 2012/10/07 22:16:21 matt Exp $");
__KERNEL_RCSID(0, "$NetBSD: subr_workqueue.c,v 1.34 2017/12/28 07:00:52 ozaki-r Exp $");
#include <sys/param.h>
#include <sys/cpu.h>
@ -49,8 +49,10 @@ SIMPLEQ_HEAD(workqhead, work_impl);
struct workqueue_queue {
kmutex_t q_mutex;
kcondvar_t q_cv;
struct workqhead q_queue;
struct workqhead q_queue_pending;
struct workqhead q_queue_running;
lwp_t *q_worker;
work_impl_t *q_waiter;
};
struct workqueue {
@ -115,24 +117,29 @@ workqueue_worker(void *cookie)
q = workqueue_queue_lookup(wq, curlwp->l_cpu);
for (;;) {
struct workqhead tmp;
/*
* we violate abstraction of SIMPLEQ.
*/
#if defined(DIAGNOSTIC)
tmp.sqh_last = (void *)POISON;
#endif /* defined(DIAGNOSTIC) */
mutex_enter(&q->q_mutex);
while (SIMPLEQ_EMPTY(&q->q_queue))
while (SIMPLEQ_EMPTY(&q->q_queue_pending))
cv_wait(&q->q_cv, &q->q_mutex);
tmp.sqh_first = q->q_queue.sqh_first; /* XXX */
SIMPLEQ_INIT(&q->q_queue);
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_running));
q->q_queue_running.sqh_first =
q->q_queue_pending.sqh_first; /* XXX */
SIMPLEQ_INIT(&q->q_queue_pending);
mutex_exit(&q->q_mutex);
workqueue_runlist(wq, &tmp);
workqueue_runlist(wq, &q->q_queue_running);
mutex_enter(&q->q_mutex);
KASSERT(!SIMPLEQ_EMPTY(&q->q_queue_running));
SIMPLEQ_INIT(&q->q_queue_running);
if (__predict_false(q->q_waiter != NULL)) {
/* Wake up workqueue_wait */
cv_signal(&q->q_cv);
}
mutex_exit(&q->q_mutex);
}
}
@ -159,7 +166,8 @@ workqueue_initqueue(struct workqueue *wq, struct workqueue_queue *q,
mutex_init(&q->q_mutex, MUTEX_DEFAULT, ipl);
cv_init(&q->q_cv, wq->wq_name);
SIMPLEQ_INIT(&q->q_queue);
SIMPLEQ_INIT(&q->q_queue_pending);
SIMPLEQ_INIT(&q->q_queue_running);
ktf = ((wq->wq_flags & WQ_MPSAFE) != 0 ? KTHREAD_MPSAFE : 0);
if (wq->wq_prio < PRI_KERNEL)
ktf |= KTHREAD_TS;
@ -194,7 +202,7 @@ workqueue_exit(struct work *wk, void *arg)
*/
KASSERT(q->q_worker == curlwp);
KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
mutex_enter(&q->q_mutex);
q->q_worker = NULL;
cv_signal(&q->q_cv);
@ -210,10 +218,10 @@ workqueue_finiqueue(struct workqueue *wq, struct workqueue_queue *q)
KASSERT(wq->wq_func == workqueue_exit);
wqe.wqe_q = q;
KASSERT(SIMPLEQ_EMPTY(&q->q_queue));
KASSERT(SIMPLEQ_EMPTY(&q->q_queue_pending));
KASSERT(q->q_worker != NULL);
mutex_enter(&q->q_mutex);
SIMPLEQ_INSERT_TAIL(&q->q_queue, &wqe.wqe_wk, wk_entry);
SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, &wqe.wqe_wk, wk_entry);
cv_signal(&q->q_cv);
while (q->q_worker != NULL) {
cv_wait(&q->q_cv, &q->q_mutex);
@ -271,6 +279,64 @@ workqueue_create(struct workqueue **wqp, const char *name,
return error;
}
static bool
workqueue_q_wait(struct workqueue_queue *q, work_impl_t *wk_target)
{
work_impl_t *wk;
bool found = false;
mutex_enter(&q->q_mutex);
again:
SIMPLEQ_FOREACH(wk, &q->q_queue_pending, wk_entry) {
if (wk == wk_target)
goto found;
}
SIMPLEQ_FOREACH(wk, &q->q_queue_running, wk_entry) {
if (wk == wk_target)
goto found;
}
found:
if (wk != NULL) {
found = true;
KASSERT(q->q_waiter == NULL);
q->q_waiter = wk;
cv_wait(&q->q_cv, &q->q_mutex);
goto again;
}
if (q->q_waiter != NULL)
q->q_waiter = NULL;
mutex_exit(&q->q_mutex);
return found;
}
/*
* Wait for a specified work to finish. The caller must ensure that no new
* work will be enqueued before calling workqueue_wait. Note that if the
* workqueue is WQ_PERCPU, the caller can enqueue a new work to another queue
* other than the waiting queue.
*/
void
workqueue_wait(struct workqueue *wq, struct work *wk)
{
struct workqueue_queue *q;
bool found;
if (ISSET(wq->wq_flags, WQ_PERCPU)) {
struct cpu_info *ci;
CPU_INFO_ITERATOR cii;
for (CPU_INFO_FOREACH(cii, ci)) {
q = workqueue_queue_lookup(wq, ci);
found = workqueue_q_wait(q, (work_impl_t *)wk);
if (found)
break;
}
} else {
q = workqueue_queue_lookup(wq, NULL);
(void) workqueue_q_wait(q, (work_impl_t *)wk);
}
}
void
workqueue_destroy(struct workqueue *wq)
{
@ -298,7 +364,8 @@ workqueue_enqueue(struct workqueue *wq, struct work *wk0, struct cpu_info *ci)
q = workqueue_queue_lookup(wq, ci);
mutex_enter(&q->q_mutex);
SIMPLEQ_INSERT_TAIL(&q->q_queue, wk, wk_entry);
KASSERT(q->q_waiter == NULL);
SIMPLEQ_INSERT_TAIL(&q->q_queue_pending, wk, wk_entry);
cv_signal(&q->q_cv);
mutex_exit(&q->q_mutex);
}

View File

@ -1,4 +1,4 @@
/* $NetBSD: workqueue.h,v 1.9 2007/10/19 12:16:48 ad Exp $ */
/* $NetBSD: workqueue.h,v 1.10 2017/12/28 07:00:52 ozaki-r Exp $ */
/*-
* Copyright (c)2002, 2005 YAMAMOTO Takashi,
@ -51,6 +51,7 @@ struct workqueue;
int workqueue_create(struct workqueue **, const char *,
void (*)(struct work *, void *), void *, pri_t, int, int);
void workqueue_destroy(struct workqueue *);
void workqueue_wait(struct workqueue *, struct work *);
void workqueue_enqueue(struct workqueue *, struct work *, struct cpu_info *);