threadpool(9): Tidy up thread naming.

- `dispatcher', not `overseer' -- much more appropriate metaphor.
- Just omit `/-1' from unbound thread names.
- Just omit `@-1' from dynamic-priority (PRI_NONE) thread names.
This commit is contained in:
riastradh 2021-01-13 02:20:15 +00:00
parent 8798207fb7
commit a1c3883806
1 changed files with 92 additions and 78 deletions

View File

@ -1,4 +1,4 @@
/* $NetBSD: kern_threadpool.c,v 1.20 2021/01/13 02:19:08 riastradh Exp $ */
/* $NetBSD: kern_threadpool.c,v 1.21 2021/01/13 02:20:15 riastradh Exp $ */
/*-
* Copyright (c) 2014, 2018 The NetBSD Foundation, Inc.
@ -33,7 +33,7 @@
* Thread pools.
*
* A thread pool is a collection of worker threads idle or running
* jobs, together with an overseer thread that does not run jobs but
* jobs, together with an dispatcher thread that does not run jobs but
* can be given jobs to assign to a worker thread. Scheduling a job in
* a thread pool does not allocate or even sleep at all, except perhaps
* on an adaptive lock, unlike kthread_create. Jobs reuse threads, so
@ -56,32 +56,32 @@
* CPU. When you're done, call threadpool_percpu_put(pool_percpu,
* pri).
*
* +--MACHINE-----------------------------------------------+
* | +--CPU 0-------+ +--CPU 1-------+ +--CPU n-------+ |
* | | <overseer 0> | | <overseer 1> | ... | <overseer n> | |
* | | <idle 0a> | | <running 1a> | ... | <idle na> | |
* | | <running 0b> | | <running 1b> | ... | <idle nb> | |
* | | . | | . | ... | . | |
* | | . | | . | ... | . | |
* | | . | | . | ... | . | |
* | +--------------+ +--------------+ +--------------+ |
* | +--unbound---------+ |
* | | <overseer n+1> | |
* | | <idle (n+1)a> | |
* | | <running (n+1)b> | |
* | +------------------+ |
* +--------------------------------------------------------+
* +--MACHINE-----------------------------------------------------+
* | +--CPU 0---------+ +--CPU 1---------+ +--CPU n---------+ |
* | | <dispatcher 0> | | <dispatcher 1> | ... | <dispatcher n> | |
* | | <idle 0a> | | <running 1a> | ... | <idle na> | |
* | | <running 0b> | | <running 1b> | ... | <idle nb> | |
* | | . | | . | ... | . | |
* | | . | | . | ... | . | |
* | | . | | . | ... | . | |
* | +----------------+ +----------------+ +----------------+ |
* | +--unbound-----------+ |
* | | <dispatcher n+1> | |
* | | <idle (n+1)a> | |
* | | <running (n+1)b> | |
* | +--------------------+ |
* +--------------------------------------------------------------+
*
* XXX Why one overseer per CPU? I did that originally to avoid
* XXX Why one dispatcher per CPU? I did that originally to avoid
* touching remote CPUs' memory when scheduling a job, but that still
* requires interprocessor synchronization. Perhaps we could get by
* with a single overseer thread, at the expense of another pointer in
* struct threadpool_job to identify the CPU on which it must run
* in order for the overseer to schedule it correctly.
* with a single dispatcher thread, at the expense of another pointer
* in struct threadpool_job to identify the CPU on which it must run in
* order for the dispatcher to schedule it correctly.
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.20 2021/01/13 02:19:08 riastradh Exp $");
__KERNEL_RCSID(0, "$NetBSD: kern_threadpool.c,v 1.21 2021/01/13 02:20:15 riastradh Exp $");
#include <sys/types.h>
#include <sys/param.h>
@ -141,27 +141,27 @@ SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job,
"struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__running,
"struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__overseer,
SDT_PROBE_DEFINE2(sdt, kernel, threadpool, schedule__job__dispatcher,
"struct threadpool *"/*pool*/, "struct threadpool_job *"/*job*/);
SDT_PROBE_DEFINE3(sdt, kernel, threadpool, schedule__job__thread,
"struct threadpool *"/*pool*/,
"struct threadpool_job *"/*job*/,
"struct lwp *"/*thread*/);
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__start,
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__start,
"struct threadpool *"/*pool*/);
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__dying,
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__dying,
"struct threadpool *"/*pool*/);
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__spawn,
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__spawn,
"struct threadpool *"/*pool*/);
SDT_PROBE_DEFINE2(sdt, kernel, threadpool, overseer__race,
SDT_PROBE_DEFINE2(sdt, kernel, threadpool, dispatcher__race,
"struct threadpool *"/*pool*/,
"struct threadpool_job *"/*job*/);
SDT_PROBE_DEFINE3(sdt, kernel, threadpool, overseer__assign,
SDT_PROBE_DEFINE3(sdt, kernel, threadpool, dispatcher__assign,
"struct threadpool *"/*pool*/,
"struct threadpool_job *"/*job*/,
"struct lwp *"/*thread*/);
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, overseer__exit,
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, dispatcher__exit,
"struct threadpool *"/*pool*/);
SDT_PROBE_DEFINE1(sdt, kernel, threadpool, thread__start,
@ -189,7 +189,7 @@ struct threadpool_thread {
struct threadpool {
kmutex_t tp_lock;
struct threadpool_thread tp_overseer;
struct threadpool_thread tp_dispatcher;
struct job_head tp_jobs;
struct thread_head tp_idle_threads;
uint64_t tp_refcnt;
@ -213,7 +213,7 @@ static threadpool_job_fn_t threadpool_job_dead;
static void threadpool_job_hold(struct threadpool_job *);
static void threadpool_job_rele(struct threadpool_job *);
static void threadpool_overseer_thread(void *) __dead;
static void threadpool_dispatcher_thread(void *) __dead;
static void threadpool_thread(void *) __dead;
static pool_cache_t threadpool_thread_pc __read_mostly;
@ -356,6 +356,18 @@ threadpools_init(void)
mutex_init(&threadpools_lock, MUTEX_DEFAULT, IPL_NONE);
}
static void
threadnamesuffix(char *buf, size_t buflen, struct cpu_info *ci, int pri)
{
buf[0] = '\0';
if (ci)
snprintf(buf + strlen(buf), buflen - strlen(buf), "/%d",
cpu_index(ci));
if (pri != PRI_NONE)
snprintf(buf + strlen(buf), buflen - strlen(buf), "@%d", pri);
}
/* Thread pool creation */
static bool
@ -369,6 +381,7 @@ threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
pri_t pri)
{
struct lwp *lwp;
char suffix[16];
int ktflags;
int error;
@ -377,46 +390,46 @@ threadpool_create(struct threadpool *const pool, struct cpu_info *ci,
SDT_PROBE2(sdt, kernel, threadpool, create, ci, pri);
mutex_init(&pool->tp_lock, MUTEX_DEFAULT, IPL_VM);
/* XXX overseer */
/* XXX dispatcher */
TAILQ_INIT(&pool->tp_jobs);
TAILQ_INIT(&pool->tp_idle_threads);
pool->tp_refcnt = 1; /* overseer's reference */
pool->tp_refcnt = 1; /* dispatcher's reference */
pool->tp_flags = 0;
pool->tp_cpu = ci;
pool->tp_pri = pri;
pool->tp_overseer.tpt_lwp = NULL;
pool->tp_overseer.tpt_pool = pool;
pool->tp_overseer.tpt_job = NULL;
cv_init(&pool->tp_overseer.tpt_cv, "poolover");
pool->tp_dispatcher.tpt_lwp = NULL;
pool->tp_dispatcher.tpt_pool = pool;
pool->tp_dispatcher.tpt_job = NULL;
cv_init(&pool->tp_dispatcher.tpt_cv, "pooldisp");
ktflags = 0;
ktflags |= KTHREAD_MPSAFE;
if (pri < PRI_KERNEL)
ktflags |= KTHREAD_TS;
error = kthread_create(pri, ktflags, ci, &threadpool_overseer_thread,
&pool->tp_overseer, &lwp,
"pooloverseer/%d@%d", (ci ? cpu_index(ci) : -1), (int)pri);
threadnamesuffix(suffix, sizeof(suffix), ci, pri);
error = kthread_create(pri, ktflags, ci, &threadpool_dispatcher_thread,
&pool->tp_dispatcher, &lwp, "pooldisp%s", suffix);
if (error)
goto fail0;
mutex_spin_enter(&pool->tp_lock);
pool->tp_overseer.tpt_lwp = lwp;
cv_broadcast(&pool->tp_overseer.tpt_cv);
pool->tp_dispatcher.tpt_lwp = lwp;
cv_broadcast(&pool->tp_dispatcher.tpt_cv);
mutex_spin_exit(&pool->tp_lock);
SDT_PROBE3(sdt, kernel, threadpool, create__success, ci, pri, pool);
return 0;
fail0: KASSERT(error);
KASSERT(pool->tp_overseer.tpt_job == NULL);
KASSERT(pool->tp_overseer.tpt_pool == pool);
KASSERT(pool->tp_dispatcher.tpt_job == NULL);
KASSERT(pool->tp_dispatcher.tpt_pool == pool);
KASSERT(pool->tp_flags == 0);
KASSERT(pool->tp_refcnt == 0);
KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
cv_destroy(&pool->tp_overseer.tpt_cv);
KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
cv_destroy(&pool->tp_dispatcher.tpt_cv);
mutex_destroy(&pool->tp_lock);
SDT_PROBE3(sdt, kernel, threadpool, create__failure, ci, pri, error);
return error;
@ -435,24 +448,24 @@ threadpool_destroy(struct threadpool *pool)
mutex_spin_enter(&pool->tp_lock);
KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
pool->tp_flags |= THREADPOOL_DYING;
cv_broadcast(&pool->tp_overseer.tpt_cv);
cv_broadcast(&pool->tp_dispatcher.tpt_cv);
TAILQ_FOREACH(thread, &pool->tp_idle_threads, tpt_entry)
cv_broadcast(&thread->tpt_cv);
while (0 < pool->tp_refcnt) {
SDT_PROBE2(sdt, kernel, threadpool, destroy__wait,
pool, pool->tp_refcnt);
cv_wait(&pool->tp_overseer.tpt_cv, &pool->tp_lock);
cv_wait(&pool->tp_dispatcher.tpt_cv, &pool->tp_lock);
}
mutex_spin_exit(&pool->tp_lock);
KASSERT(pool->tp_overseer.tpt_job == NULL);
KASSERT(pool->tp_overseer.tpt_pool == pool);
KASSERT(pool->tp_dispatcher.tpt_job == NULL);
KASSERT(pool->tp_dispatcher.tpt_pool == pool);
KASSERT(pool->tp_flags == THREADPOOL_DYING);
KASSERT(pool->tp_refcnt == 0);
KASSERT(TAILQ_EMPTY(&pool->tp_idle_threads));
KASSERT(TAILQ_EMPTY(&pool->tp_jobs));
KASSERT(!cv_has_waiters(&pool->tp_overseer.tpt_cv));
cv_destroy(&pool->tp_overseer.tpt_cv);
KASSERT(!cv_has_waiters(&pool->tp_dispatcher.tpt_cv));
cv_destroy(&pool->tp_dispatcher.tpt_cv);
mutex_destroy(&pool->tp_lock);
}
@ -472,7 +485,7 @@ threadpool_rele(struct threadpool *pool)
KASSERT(mutex_owned(&pool->tp_lock));
KASSERT(0 < pool->tp_refcnt);
if (--pool->tp_refcnt == 0)
cv_broadcast(&pool->tp_overseer.tpt_cv);
cv_broadcast(&pool->tp_dispatcher.tpt_cv);
}
/* Unbound thread pools */
@ -859,10 +872,10 @@ threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
/* Otherwise, try to assign a thread to the job. */
mutex_spin_enter(&pool->tp_lock);
if (__predict_false(TAILQ_EMPTY(&pool->tp_idle_threads))) {
/* Nobody's idle. Give it to the overseer. */
SDT_PROBE2(sdt, kernel, threadpool, schedule__job__overseer,
/* Nobody's idle. Give it to the dispatcher. */
SDT_PROBE2(sdt, kernel, threadpool, schedule__job__dispatcher,
pool, job);
job->job_thread = &pool->tp_overseer;
job->job_thread = &pool->tp_dispatcher;
TAILQ_INSERT_TAIL(&pool->tp_jobs, job, job_entry);
} else {
/* Assign it to the first idle thread. */
@ -874,7 +887,7 @@ threadpool_schedule_job(struct threadpool *pool, struct threadpool_job *job)
job->job_thread->tpt_job = job;
}
/* Notify whomever we gave it to, overseer or idle thread. */
/* Notify whomever we gave it to, dispatcher or idle thread. */
KASSERT(job->job_thread != NULL);
cv_broadcast(&job->job_thread->tpt_cv);
mutex_spin_exit(&pool->tp_lock);
@ -898,19 +911,19 @@ threadpool_cancel_job_async(struct threadpool *pool, struct threadpool_job *job)
* "luck of the draw").
*
* => "job" is not yet running, but is assigned to the
* overseer.
* dispatcher.
*
* When this happens, this code makes the determination that
* the job is already running. The failure mode is that the
* caller is told the job is running, and thus has to wait.
* The overseer will eventually get to it and the job will
* The dispatcher will eventually get to it and the job will
* proceed as if it had been already running.
*/
if (job->job_thread == NULL) {
/* Nothing to do. Guaranteed not running. */
return true;
} else if (job->job_thread == &pool->tp_overseer) {
} else if (job->job_thread == &pool->tp_dispatcher) {
/* Take it off the list to guarantee it won't run. */
job->job_thread = NULL;
mutex_spin_enter(&pool->tp_lock);
@ -945,15 +958,16 @@ threadpool_cancel_job(struct threadpool *pool, struct threadpool_job *job)
cv_wait(&job->job_cv, job->job_lock);
}
/* Thread pool overseer thread */
/* Thread pool dispatcher thread */
static void __dead
threadpool_overseer_thread(void *arg)
threadpool_dispatcher_thread(void *arg)
{
struct threadpool_thread *const overseer = arg;
struct threadpool *const pool = overseer->tpt_pool;
struct threadpool_thread *const dispatcher = arg;
struct threadpool *const pool = dispatcher->tpt_pool;
struct lwp *lwp = NULL;
int ktflags;
char suffix[16];
int error;
KASSERT((pool->tp_cpu == NULL) || (pool->tp_cpu == curcpu()));
@ -961,27 +975,27 @@ threadpool_overseer_thread(void *arg)
/* Wait until we're initialized. */
mutex_spin_enter(&pool->tp_lock);
while (overseer->tpt_lwp == NULL)
cv_wait(&overseer->tpt_cv, &pool->tp_lock);
while (dispatcher->tpt_lwp == NULL)
cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
SDT_PROBE1(sdt, kernel, threadpool, overseer__start, pool);
SDT_PROBE1(sdt, kernel, threadpool, dispatcher__start, pool);
for (;;) {
/* Wait until there's a job. */
while (TAILQ_EMPTY(&pool->tp_jobs)) {
if (ISSET(pool->tp_flags, THREADPOOL_DYING)) {
SDT_PROBE1(sdt, kernel, threadpool,
overseer__dying, pool);
dispatcher__dying, pool);
break;
}
cv_wait(&overseer->tpt_cv, &pool->tp_lock);
cv_wait(&dispatcher->tpt_cv, &pool->tp_lock);
}
if (__predict_false(TAILQ_EMPTY(&pool->tp_jobs)))
break;
/* If there are no threads, we'll have to try to start one. */
if (TAILQ_EMPTY(&pool->tp_idle_threads)) {
SDT_PROBE1(sdt, kernel, threadpool, overseer__spawn,
SDT_PROBE1(sdt, kernel, threadpool, dispatcher__spawn,
pool);
threadpool_hold(pool);
mutex_spin_exit(&pool->tp_lock);
@ -991,17 +1005,17 @@ threadpool_overseer_thread(void *arg)
thread->tpt_lwp = NULL;
thread->tpt_pool = pool;
thread->tpt_job = NULL;
cv_init(&thread->tpt_cv, "poolthrd");
cv_init(&thread->tpt_cv, "pooljob");
ktflags = 0;
ktflags |= KTHREAD_MPSAFE;
if (pool->tp_pri < PRI_KERNEL)
ktflags |= KTHREAD_TS;
threadnamesuffix(suffix, sizeof(suffix), pool->tp_cpu,
pool->tp_pri);
error = kthread_create(pool->tp_pri, ktflags,
pool->tp_cpu, &threadpool_thread, thread, &lwp,
"poolthread/%d@%d",
(pool->tp_cpu ? cpu_index(pool->tp_cpu) : -1),
(int)pool->tp_pri);
"poolthread%s", suffix);
mutex_spin_enter(&pool->tp_lock);
if (error) {
@ -1037,7 +1051,7 @@ threadpool_overseer_thread(void *arg)
mutex_enter(job->job_lock);
/* If the job was cancelled, we'll no longer be its thread. */
if (__predict_true(job->job_thread == overseer)) {
if (__predict_true(job->job_thread == dispatcher)) {
mutex_spin_enter(&pool->tp_lock);
if (__predict_false(
TAILQ_EMPTY(&pool->tp_idle_threads))) {
@ -1046,7 +1060,7 @@ threadpool_overseer_thread(void *arg)
* first. We'll have to try again.
*/
SDT_PROBE2(sdt, kernel, threadpool,
overseer__race, pool, job);
dispatcher__race, pool, job);
TAILQ_INSERT_HEAD(&pool->tp_jobs, job,
job_entry);
} else {
@ -1058,7 +1072,7 @@ threadpool_overseer_thread(void *arg)
TAILQ_FIRST(&pool->tp_idle_threads);
SDT_PROBE2(sdt, kernel, threadpool,
overseer__assign, job, thread->tpt_lwp);
dispatcher__assign, job, thread->tpt_lwp);
KASSERT(thread->tpt_job == NULL);
TAILQ_REMOVE(&pool->tp_idle_threads, thread,
tpt_entry);
@ -1076,7 +1090,7 @@ threadpool_overseer_thread(void *arg)
threadpool_rele(pool);
mutex_spin_exit(&pool->tp_lock);
SDT_PROBE1(sdt, kernel, threadpool, overseer__exit, pool);
SDT_PROBE1(sdt, kernel, threadpool, dispatcher__exit, pool);
kthread_exit(0);
}