wg: Use threadpool(9) and workqueue(9) for asynchronous tasks.

- Using threadpool(9) job per interface to receive incoming handshake
  messages gives the same concurrency for active interfaces but
  doesn't waste kthreads for inactive ones.

  => Can't really do this with a global workqueue(9) because there's
     no bound on the amount of time wg_receive_packets() might run
     for; we really need separate threads or threadpool jobs in order
     to avoid having one interface starve all the others.

- Using a global workqueue(9) for asynchronous peer tasks avoids
  creating unnecessary kthreads.

  => Each task does a more or less bounded amount of work, so it's OK
     to share a global workqueue -- there's no advantage to adding
     concurrency for what is almost certainly going to be CPU-bound
     asymmetric crypto.

  => This way we don't need a thread per peer or iteration over a
     list of all peers, so the task mechanism should no longer be a
     bottleneck to scaling to thousands of peers.

XXX This doesn't distribute the load across CPUs -- it keeps it on
the same CPU where the packet came in.  Should consider doing
something to balance the load -- maybe note if the current CPU is
loaded, and if so, sort CPUs by queue length or some other measure of
load and pick the least loaded one or something.
This commit is contained in:
riastradh 2020-09-07 01:15:25 +00:00
parent 95085a12ec
commit e059bdb000
1 changed files with 165 additions and 198 deletions

View File

@ -1,4 +1,4 @@
/* $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $ */
/* $NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $ */
/*
* Copyright (C) Ryota Ozaki <ozaki.ryota@gmail.com>
@ -41,7 +41,7 @@
*/
#include <sys/cdefs.h>
__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $");
#ifdef _KERNEL_OPT
#include "opt_inet.h"
@ -61,7 +61,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $")
#include <sys/ioctl.h>
#include <sys/kernel.h>
#include <sys/kmem.h>
#include <sys/kthread.h>
#include <sys/mbuf.h>
#include <sys/module.h>
#include <sys/mutex.h>
@ -77,8 +76,10 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $")
#include <sys/syslog.h>
#include <sys/systm.h>
#include <sys/thmap.h>
#include <sys/threadpool.h>
#include <sys/time.h>
#include <sys/timespec.h>
#include <sys/workqueue.h>
#include <net/bpf.h>
#include <net/if.h>
@ -120,10 +121,11 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $")
* Data structures
* - struct wg_softc is an instance of wg interfaces
* - It has a list of peers (struct wg_peer)
* - It has a kthread that sends/receives handshake messages and
* - It has a threadpool job that sends/receives handshake messages and
* runs event handlers
* - It has its own two routing tables: one is for IPv4 and the other IPv6
* - struct wg_peer is a representative of a peer
* - It has a struct work to handle handshakes and timer tasks
* - It has a pair of session instances (struct wg_session)
* - It has a pair of endpoint instances (struct wg_sockaddr)
* - Normally one endpoint is used and the second one is used only on
@ -446,18 +448,6 @@ sliwin_update(struct sliwin *W, uint64_t S)
return 0;
}
struct wg_worker {
kmutex_t wgw_lock;
kcondvar_t wgw_cv;
bool wgw_todie;
struct socket *wgw_so4;
struct socket *wgw_so6;
int wgw_wakeup_reasons;
#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 __BIT(0)
#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6 __BIT(1)
#define WG_WAKEUP_REASON_PEER __BIT(2)
};
struct wg_session {
struct wg_peer *wgs_peer;
struct psref_target
@ -550,6 +540,7 @@ struct wg_peer {
pserialize_t wgp_psz;
struct psref_target wgp_psref;
kmutex_t *wgp_lock;
kmutex_t *wgp_intr_lock;
uint8_t wgp_pubkey[WG_STATIC_KEY_LEN];
struct wg_sockaddr *wgp_endpoint;
@ -594,7 +585,8 @@ struct wg_peer {
struct wg_ppsratecheck wgp_ppsratecheck;
volatile unsigned int wgp_tasks;
struct work wgp_work;
unsigned int wgp_tasks;
#define WGP_TASK_SEND_INIT_MESSAGE __BIT(0)
#define WGP_TASK_RETRY_HANDSHAKE __BIT(1)
#define WGP_TASK_ESTABLISH_SESSION __BIT(2)
@ -609,6 +601,7 @@ struct wg_softc {
struct ifnet wg_if;
LIST_ENTRY(wg_softc) wg_list;
kmutex_t *wg_lock;
kmutex_t *wg_intr_lock;
krwlock_t *wg_rwlock;
uint8_t wg_privkey[WG_STATIC_KEY_LEN];
@ -621,11 +614,21 @@ struct wg_softc {
struct thmap *wg_sessions_byindex;
uint16_t wg_listen_port;
struct wg_worker *wg_worker;
lwp_t *wg_worker_lwp;
struct threadpool *wg_threadpool;
struct threadpool_job wg_job;
int wg_upcalls;
#define WG_UPCALL_INET __BIT(0)
#define WG_UPCALL_INET6 __BIT(1)
#ifdef INET
struct socket *wg_so4;
struct radix_node_head *wg_rtable_ipv4;
#endif
#ifdef INET6
struct socket *wg_so6;
struct radix_node_head *wg_rtable_ipv6;
#endif
struct wg_ppsratecheck wg_ppsratecheck;
@ -659,8 +662,6 @@ static unsigned wg_keepalive_timeout = WG_KEEPALIVE_TIMEOUT;
static struct mbuf *
wg_get_mbuf(size_t, size_t);
static void wg_wakeup_worker(struct wg_worker *, int);
static int wg_send_data_msg(struct wg_peer *, struct wg_session *,
struct mbuf *);
static int wg_send_cookie_msg(struct wg_softc *, struct wg_peer *,
@ -704,6 +705,8 @@ static int wg_bind_port(struct wg_softc *, const uint16_t);
static int wg_init(struct ifnet *);
static void wg_stop(struct ifnet *, int);
static void wg_peer_work(struct work *, void *);
static void wg_job(struct threadpool_job *);
static void wgintr(void *);
static void wg_purge_pending_packets(struct wg_peer *);
@ -788,6 +791,7 @@ static struct if_clone wg_cloner =
IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
static struct pktqueue *wg_pktq __read_mostly;
static struct workqueue *wg_wq __read_mostly;
void wgattach(int);
/* ARGSUSED */
@ -803,6 +807,7 @@ wgattach(int count)
static void
wginit(void)
{
int error __diagused;
wg_psref_class = psref_class_create("wg", IPL_SOFTNET);
@ -812,6 +817,10 @@ wginit(void)
wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
KASSERT(wg_pktq != NULL);
error = workqueue_create(&wg_wq, "wgpeer", wg_peer_work, NULL,
PRI_NONE, IPL_SOFTNET, WQ_MPSAFE|WQ_PERCPU);
KASSERT(error == 0);
if_clone_attach(&wg_cloner);
}
@ -1555,17 +1564,17 @@ out:
}
static struct socket *
wg_get_so_by_af(struct wg_worker *wgw, const int af)
wg_get_so_by_af(struct wg_softc *wg, const int af)
{
return (af == AF_INET) ? wgw->wgw_so4 : wgw->wgw_so6;
return (af == AF_INET) ? wg->wg_so4 : wg->wg_so6;
}
static struct socket *
wg_get_so_by_peer(struct wg_peer *wgp, struct wg_sockaddr *wgsa)
{
return wg_get_so_by_af(wgp->wgp_sc->wg_worker, wgsa_family(wgsa));
return wg_get_so_by_af(wgp->wgp_sc, wgsa_family(wgsa));
}
static struct wg_sockaddr *
@ -2246,9 +2255,18 @@ static void
wg_schedule_peer_task(struct wg_peer *wgp, int task)
{
atomic_or_uint(&wgp->wgp_tasks, task);
mutex_enter(wgp->wgp_intr_lock);
WG_DLOG("tasks=%d, task=%d\n", wgp->wgp_tasks, task);
wg_wakeup_worker(wgp->wgp_sc->wg_worker, WG_WAKEUP_REASON_PEER);
if (wgp->wgp_tasks == 0)
/*
* XXX If the current CPU is already loaded -- e.g., if
* there's already a bunch of handshakes queued up --
* consider tossing this over to another CPU to
* distribute the load.
*/
workqueue_enqueue(wg_wq, &wgp->wgp_work, NULL);
wgp->wgp_tasks |= task;
mutex_exit(wgp->wgp_intr_lock);
}
static void
@ -2783,7 +2801,7 @@ wg_receive_packets(struct wg_softc *wg, const int af)
struct mbuf *paddr = NULL;
struct sockaddr *src;
so = wg_get_so_by_af(wg->wg_worker, af);
so = wg_get_so_by_af(wg, af);
flags = MSG_DONTWAIT;
dummy_uio.uio_resid = 1000000000;
@ -2987,28 +3005,16 @@ wg_task_destroy_prev_session(struct wg_softc *wg, struct wg_peer *wgp)
}
static void
wg_process_peer_tasks(struct wg_softc *wg)
wg_peer_work(struct work *wk, void *cookie)
{
struct wg_peer *wgp;
int s;
struct wg_peer *wgp = container_of(wk, struct wg_peer, wgp_work);
struct wg_softc *wg = wgp->wgp_sc;
int tasks;
/* XXX should avoid checking all peers */
s = pserialize_read_enter();
WG_PEER_READER_FOREACH(wgp, wg) {
struct psref psref;
unsigned int tasks;
if (wgp->wgp_tasks == 0)
continue;
wg_get_peer(wgp, &psref);
pserialize_read_exit(s);
restart:
tasks = atomic_swap_uint(&wgp->wgp_tasks, 0);
KASSERT(tasks != 0);
WG_DLOG("tasks=%x\n", tasks);
mutex_enter(wgp->wgp_intr_lock);
while ((tasks = wgp->wgp_tasks) != 0) {
wgp->wgp_tasks = 0;
mutex_exit(wgp->wgp_intr_lock);
mutex_enter(wgp->wgp_lock);
if (ISSET(tasks, WGP_TASK_SEND_INIT_MESSAGE))
@ -3025,66 +3031,37 @@ wg_process_peer_tasks(struct wg_softc *wg)
wg_task_destroy_prev_session(wg, wgp);
mutex_exit(wgp->wgp_lock);
/* New tasks may be scheduled during processing tasks */
WG_DLOG("wgp_tasks=%d\n", wgp->wgp_tasks);
if (wgp->wgp_tasks != 0)
goto restart;
s = pserialize_read_enter();
wg_put_peer(wgp, &psref);
mutex_enter(wgp->wgp_intr_lock);
}
pserialize_read_exit(s);
mutex_exit(wgp->wgp_intr_lock);
}
static void
wg_worker(void *arg)
wg_job(struct threadpool_job *job)
{
struct wg_softc *wg = arg;
struct wg_worker *wgw = wg->wg_worker;
bool todie = false;
KASSERT(wg != NULL);
KASSERT(wgw != NULL);
while (!todie) {
int reasons;
int bound;
mutex_enter(&wgw->wgw_lock);
/* New tasks may come during task handling */
while ((reasons = wgw->wgw_wakeup_reasons) == 0 &&
!(todie = wgw->wgw_todie))
cv_wait(&wgw->wgw_cv, &wgw->wgw_lock);
wgw->wgw_wakeup_reasons = 0;
mutex_exit(&wgw->wgw_lock);
struct wg_softc *wg = container_of(job, struct wg_softc, wg_job);
int bound, upcalls;
mutex_enter(wg->wg_intr_lock);
while ((upcalls = wg->wg_upcalls) != 0) {
wg->wg_upcalls = 0;
mutex_exit(wg->wg_intr_lock);
bound = curlwp_bind();
if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4))
if (ISSET(upcalls, WG_UPCALL_INET))
wg_receive_packets(wg, AF_INET);
if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6))
if (ISSET(upcalls, WG_UPCALL_INET6))
wg_receive_packets(wg, AF_INET6);
if (ISSET(reasons, WG_WAKEUP_REASON_PEER))
wg_process_peer_tasks(wg);
curlwp_bindx(bound);
mutex_enter(wg->wg_intr_lock);
}
kthread_exit(0);
}
static void
wg_wakeup_worker(struct wg_worker *wgw, const int reason)
{
mutex_enter(&wgw->wgw_lock);
wgw->wgw_wakeup_reasons |= reason;
cv_broadcast(&wgw->wgw_cv);
mutex_exit(&wgw->wgw_lock);
threadpool_job_done(job);
mutex_exit(wg->wg_intr_lock);
}
static int
wg_bind_port(struct wg_softc *wg, const uint16_t port)
{
int error;
struct wg_worker *wgw = wg->wg_worker;
uint16_t old_port = wg->wg_listen_port;
if (port != 0 && old_port == port)
@ -3096,7 +3073,7 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port)
sin->sin_addr.s_addr = INADDR_ANY;
sin->sin_port = htons(port);
error = sobind(wgw->wgw_so4, sintosa(sin), curlwp);
error = sobind(wg->wg_so4, sintosa(sin), curlwp);
if (error != 0)
return error;
@ -3107,7 +3084,7 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port)
sin6->sin6_addr = in6addr_any;
sin6->sin6_port = htons(port);
error = sobind(wgw->wgw_so6, sin6tosa(sin6), curlwp);
error = sobind(wg->wg_so6, sin6tosa(sin6), curlwp);
if (error != 0)
return error;
#endif
@ -3118,15 +3095,19 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port)
}
static void
wg_so_upcall(struct socket *so, void *arg, int events, int waitflag)
wg_so_upcall(struct socket *so, void *cookie, int events, int waitflag)
{
struct wg_worker *wgw = arg;
struct wg_softc *wg = cookie;
int reason;
reason = (so->so_proto->pr_domain->dom_family == AF_INET) ?
WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 :
WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6;
wg_wakeup_worker(wgw, reason);
WG_UPCALL_INET :
WG_UPCALL_INET6;
mutex_enter(wg->wg_intr_lock);
wg->wg_upcalls |= reason;
threadpool_schedule_job(wg->wg_threadpool, &wg->wg_job);
mutex_exit(wg->wg_intr_lock);
}
static int
@ -3184,8 +3165,7 @@ wg_overudp_cb(struct mbuf **mp, int offset, struct socket *so,
}
static int
wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af,
struct socket **sop)
wg_socreate(struct wg_softc *wg, int af, struct socket **sop)
{
int error;
struct socket *so;
@ -3195,7 +3175,7 @@ wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af,
return error;
solock(so);
so->so_upcallarg = wgw;
so->so_upcallarg = wg;
so->so_upcall = wg_so_upcall;
so->so_rcv.sb_flags |= SB_UPCALL;
if (af == AF_INET)
@ -3211,79 +3191,6 @@ wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af,
return 0;
}
static int
wg_worker_init(struct wg_softc *wg)
{
int error;
struct wg_worker *wgw;
const char *ifname = wg->wg_if.if_xname;
struct socket *so;
wgw = kmem_zalloc(sizeof(*wgw), KM_SLEEP);
mutex_init(&wgw->wgw_lock, MUTEX_DEFAULT, IPL_SOFTNET);
cv_init(&wgw->wgw_cv, ifname);
wgw->wgw_todie = false;
wgw->wgw_wakeup_reasons = 0;
error = wg_worker_socreate(wg, wgw, AF_INET, &so);
if (error != 0)
goto error;
wgw->wgw_so4 = so;
#ifdef INET6
error = wg_worker_socreate(wg, wgw, AF_INET6, &so);
if (error != 0)
goto error;
wgw->wgw_so6 = so;
#endif
wg->wg_worker = wgw;
error = kthread_create(PRI_NONE, KTHREAD_MPSAFE | KTHREAD_MUSTJOIN,
NULL, wg_worker, wg, &wg->wg_worker_lwp, "%s", ifname);
if (error != 0)
goto error;
return 0;
error:
#ifdef INET6
if (wgw->wgw_so6 != NULL)
soclose(wgw->wgw_so6);
#endif
if (wgw->wgw_so4 != NULL)
soclose(wgw->wgw_so4);
cv_destroy(&wgw->wgw_cv);
mutex_destroy(&wgw->wgw_lock);
kmem_free(wgw, sizeof(*wgw));
return error;
}
static void
wg_worker_destroy(struct wg_softc *wg)
{
struct wg_worker *wgw = wg->wg_worker;
mutex_enter(&wgw->wgw_lock);
wgw->wgw_todie = true;
wgw->wgw_wakeup_reasons = 0;
cv_broadcast(&wgw->wgw_cv);
mutex_exit(&wgw->wgw_lock);
kthread_join(wg->wg_worker_lwp);
#ifdef INET6
soclose(wgw->wgw_so6);
#endif
soclose(wgw->wgw_so4);
cv_destroy(&wgw->wgw_cv);
mutex_destroy(&wgw->wgw_lock);
kmem_free(wg->wg_worker, sizeof(struct wg_worker));
wg->wg_worker = NULL;
}
static bool
wg_session_hit_limits(struct wg_session *wgs)
{
@ -3385,6 +3292,7 @@ wg_alloc_peer(struct wg_softc *wg)
wgp->wgp_endpoint_changing = false;
wgp->wgp_endpoint_available = false;
wgp->wgp_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
wgp->wgp_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET);
wgp->wgp_psz = pserialize_create();
psref_target_init(&wgp->wgp_psref, wg_psref_class);
@ -3454,6 +3362,9 @@ wg_destroy_peer(struct wg_peer *wgp)
callout_halt(&wgp->wgp_handshake_timeout_timer, NULL);
callout_halt(&wgp->wgp_session_dtor_timer, NULL);
/* Wait for any queued work to complete. */
workqueue_wait(wg_wq, &wgp->wgp_work);
wgs = wgp->wgp_session_unstable;
if (wgs->wgs_state != WGS_STATE_UNKNOWN) {
mutex_enter(wgp->wgp_lock);
@ -3486,6 +3397,7 @@ wg_destroy_peer(struct wg_peer *wgp)
kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0));
pserialize_destroy(wgp->wgp_psz);
mutex_obj_free(wgp->wgp_intr_lock);
mutex_obj_free(wgp->wgp_lock);
kmem_free(wgp, sizeof(*wgp));
@ -3618,28 +3530,39 @@ wg_clone_create(struct if_clone *ifc, int unit)
if_initname(&wg->wg_if, ifc->ifc_name, unit);
error = wg_worker_init(wg);
if (error)
goto fail0;
rn_inithead((void **)&wg->wg_rtable_ipv4,
offsetof(struct sockaddr_in, sin_addr) * NBBY);
#ifdef INET6
rn_inithead((void **)&wg->wg_rtable_ipv6,
offsetof(struct sockaddr_in6, sin6_addr) * NBBY);
#endif
PSLIST_INIT(&wg->wg_peers);
wg->wg_peers_bypubkey = thmap_create(0, NULL, THMAP_NOCOPY);
wg->wg_peers_byname = thmap_create(0, NULL, THMAP_NOCOPY);
wg->wg_sessions_byindex = thmap_create(0, NULL, THMAP_NOCOPY);
wg->wg_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
wg->wg_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET);
wg->wg_rwlock = rw_obj_alloc();
threadpool_job_init(&wg->wg_job, wg_job, wg->wg_intr_lock,
"%s", if_name(&wg->wg_if));
wg->wg_ops = &wg_ops_rumpkernel;
error = threadpool_get(&wg->wg_threadpool, PRI_NONE);
if (error)
goto fail0;
#ifdef INET
error = wg_socreate(wg, AF_INET, &wg->wg_so4);
if (error)
goto fail1;
rn_inithead((void **)&wg->wg_rtable_ipv4,
offsetof(struct sockaddr_in, sin_addr) * NBBY);
#endif
#ifdef INET6
error = wg_socreate(wg, AF_INET6, &wg->wg_so6);
if (error)
goto fail2;
rn_inithead((void **)&wg->wg_rtable_ipv6,
offsetof(struct sockaddr_in6, sin6_addr) * NBBY);
#endif
error = wg_if_attach(wg);
if (error)
goto fail1;
goto fail3;
mutex_enter(&wg_softcs.lock);
LIST_INSERT_HEAD(&wg_softcs.list, wg, wg_list);
@ -3647,24 +3570,47 @@ wg_clone_create(struct if_clone *ifc, int unit)
return 0;
fail2: __unused
fail4: __unused
mutex_enter(&wg_softcs.lock);
LIST_REMOVE(wg, wg_list);
mutex_exit(&wg_softcs.lock);
wg_if_detach(wg);
wg_destroy_all_peers(wg);
fail3: wg_destroy_all_peers(wg);
#ifdef INET6
solock(wg->wg_so6);
wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL;
sounlock(wg->wg_so6);
#endif
#ifdef INET
solock(wg->wg_so4);
wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL;
sounlock(wg->wg_so4);
#endif
mutex_enter(wg->wg_intr_lock);
threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job);
mutex_exit(wg->wg_intr_lock);
#ifdef INET6
if (wg->wg_rtable_ipv6 != NULL)
free(wg->wg_rtable_ipv6, M_RTABLE);
soclose(wg->wg_so6);
fail2:
#endif
#ifdef INET
if (wg->wg_rtable_ipv4 != NULL)
free(wg->wg_rtable_ipv4, M_RTABLE);
soclose(wg->wg_so4);
fail1:
#endif
threadpool_put(wg->wg_threadpool, PRI_NONE);
fail0: threadpool_job_destroy(&wg->wg_job);
rw_obj_free(wg->wg_rwlock);
mutex_obj_free(wg->wg_intr_lock);
mutex_obj_free(wg->wg_lock);
thmap_destroy(wg->wg_sessions_byindex);
thmap_destroy(wg->wg_peers_byname);
thmap_destroy(wg->wg_peers_bypubkey);
PSLIST_DESTROY(&wg->wg_peers);
if (wg->wg_rtable_ipv6 != NULL)
free(wg->wg_rtable_ipv6, M_RTABLE);
if (wg->wg_rtable_ipv4 != NULL)
free(wg->wg_rtable_ipv4, M_RTABLE);
fail1: wg_worker_destroy(wg);
fail0: kmem_free(wg, sizeof(*wg));
kmem_free(wg, sizeof(*wg));
return error;
}
@ -3685,17 +3631,38 @@ wg_clone_destroy(struct ifnet *ifp)
mutex_exit(&wg_softcs.lock);
wg_if_detach(wg);
wg_destroy_all_peers(wg);
#ifdef INET6
solock(wg->wg_so6);
wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL;
sounlock(wg->wg_so6);
#endif
#ifdef INET
solock(wg->wg_so4);
wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL;
sounlock(wg->wg_so4);
#endif
mutex_enter(wg->wg_intr_lock);
threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job);
mutex_exit(wg->wg_intr_lock);
#ifdef INET6
if (wg->wg_rtable_ipv6 != NULL)
free(wg->wg_rtable_ipv6, M_RTABLE);
soclose(wg->wg_so6);
#endif
#ifdef INET
if (wg->wg_rtable_ipv4 != NULL)
free(wg->wg_rtable_ipv4, M_RTABLE);
soclose(wg->wg_so4);
#endif
threadpool_put(wg->wg_threadpool, PRI_NONE);
threadpool_job_destroy(&wg->wg_job);
rw_obj_free(wg->wg_rwlock);
mutex_obj_free(wg->wg_intr_lock);
mutex_obj_free(wg->wg_lock);
thmap_destroy(wg->wg_sessions_byindex);
thmap_destroy(wg->wg_peers_byname);
thmap_destroy(wg->wg_peers_bypubkey);
PSLIST_DESTROY(&wg->wg_peers);
if (wg->wg_rtable_ipv4 != NULL)
free(wg->wg_rtable_ipv4, M_RTABLE);
if (wg->wg_rtable_ipv6 != NULL)
free(wg->wg_rtable_ipv6, M_RTABLE);
wg_worker_destroy(wg);
kmem_free(wg, sizeof(*wg));
return 0;