diff --git a/sys/net/if_wg.c b/sys/net/if_wg.c index b57402fb9e75..992186715d37 100644 --- a/sys/net/if_wg.c +++ b/sys/net/if_wg.c @@ -1,4 +1,4 @@ -/* $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $ */ +/* $NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $ */ /* * Copyright (C) Ryota Ozaki @@ -41,7 +41,7 @@ */ #include -__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $"); +__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $"); #ifdef _KERNEL_OPT #include "opt_inet.h" @@ -61,7 +61,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $") #include #include #include -#include #include #include #include @@ -77,8 +76,10 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $") #include #include #include +#include #include #include +#include #include #include @@ -120,10 +121,11 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $") * Data structures * - struct wg_softc is an instance of wg interfaces * - It has a list of peers (struct wg_peer) - * - It has a kthread that sends/receives handshake messages and + * - It has a threadpool job that sends/receives handshake messages and * runs event handlers * - It has its own two routing tables: one is for IPv4 and the other IPv6 * - struct wg_peer is a representative of a peer + * - It has a struct work to handle handshakes and timer tasks * - It has a pair of session instances (struct wg_session) * - It has a pair of endpoint instances (struct wg_sockaddr) * - Normally one endpoint is used and the second one is used only on @@ -446,18 +448,6 @@ sliwin_update(struct sliwin *W, uint64_t S) return 0; } -struct wg_worker { - kmutex_t wgw_lock; - kcondvar_t wgw_cv; - bool wgw_todie; - struct socket *wgw_so4; - struct socket *wgw_so6; - int wgw_wakeup_reasons; -#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 __BIT(0) -#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6 __BIT(1) -#define WG_WAKEUP_REASON_PEER __BIT(2) -}; - struct wg_session { struct wg_peer *wgs_peer; struct psref_target @@ -550,6 +540,7 @@ struct wg_peer { pserialize_t wgp_psz; struct psref_target wgp_psref; kmutex_t *wgp_lock; + kmutex_t *wgp_intr_lock; uint8_t wgp_pubkey[WG_STATIC_KEY_LEN]; struct wg_sockaddr *wgp_endpoint; @@ -594,7 +585,8 @@ struct wg_peer { struct wg_ppsratecheck wgp_ppsratecheck; - volatile unsigned int wgp_tasks; + struct work wgp_work; + unsigned int wgp_tasks; #define WGP_TASK_SEND_INIT_MESSAGE __BIT(0) #define WGP_TASK_RETRY_HANDSHAKE __BIT(1) #define WGP_TASK_ESTABLISH_SESSION __BIT(2) @@ -609,6 +601,7 @@ struct wg_softc { struct ifnet wg_if; LIST_ENTRY(wg_softc) wg_list; kmutex_t *wg_lock; + kmutex_t *wg_intr_lock; krwlock_t *wg_rwlock; uint8_t wg_privkey[WG_STATIC_KEY_LEN]; @@ -621,11 +614,21 @@ struct wg_softc { struct thmap *wg_sessions_byindex; uint16_t wg_listen_port; - struct wg_worker *wg_worker; - lwp_t *wg_worker_lwp; + struct threadpool *wg_threadpool; + struct threadpool_job wg_job; + int wg_upcalls; +#define WG_UPCALL_INET __BIT(0) +#define WG_UPCALL_INET6 __BIT(1) + +#ifdef INET + struct socket *wg_so4; struct radix_node_head *wg_rtable_ipv4; +#endif +#ifdef INET6 + struct socket *wg_so6; struct radix_node_head *wg_rtable_ipv6; +#endif struct wg_ppsratecheck wg_ppsratecheck; @@ -659,8 +662,6 @@ static unsigned wg_keepalive_timeout = WG_KEEPALIVE_TIMEOUT; static struct mbuf * wg_get_mbuf(size_t, size_t); -static void wg_wakeup_worker(struct wg_worker *, int); - static int wg_send_data_msg(struct wg_peer *, struct wg_session *, struct mbuf *); static int wg_send_cookie_msg(struct wg_softc *, struct wg_peer *, @@ -704,6 +705,8 @@ static int wg_bind_port(struct wg_softc *, const uint16_t); static int wg_init(struct ifnet *); static void wg_stop(struct ifnet *, int); +static void wg_peer_work(struct work *, void *); +static void wg_job(struct threadpool_job *); static void wgintr(void *); static void wg_purge_pending_packets(struct wg_peer *); @@ -788,6 +791,7 @@ static struct if_clone wg_cloner = IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy); static struct pktqueue *wg_pktq __read_mostly; +static struct workqueue *wg_wq __read_mostly; void wgattach(int); /* ARGSUSED */ @@ -803,6 +807,7 @@ wgattach(int count) static void wginit(void) { + int error __diagused; wg_psref_class = psref_class_create("wg", IPL_SOFTNET); @@ -812,6 +817,10 @@ wginit(void) wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL); KASSERT(wg_pktq != NULL); + error = workqueue_create(&wg_wq, "wgpeer", wg_peer_work, NULL, + PRI_NONE, IPL_SOFTNET, WQ_MPSAFE|WQ_PERCPU); + KASSERT(error == 0); + if_clone_attach(&wg_cloner); } @@ -1555,17 +1564,17 @@ out: } static struct socket * -wg_get_so_by_af(struct wg_worker *wgw, const int af) +wg_get_so_by_af(struct wg_softc *wg, const int af) { - return (af == AF_INET) ? wgw->wgw_so4 : wgw->wgw_so6; + return (af == AF_INET) ? wg->wg_so4 : wg->wg_so6; } static struct socket * wg_get_so_by_peer(struct wg_peer *wgp, struct wg_sockaddr *wgsa) { - return wg_get_so_by_af(wgp->wgp_sc->wg_worker, wgsa_family(wgsa)); + return wg_get_so_by_af(wgp->wgp_sc, wgsa_family(wgsa)); } static struct wg_sockaddr * @@ -2246,9 +2255,18 @@ static void wg_schedule_peer_task(struct wg_peer *wgp, int task) { - atomic_or_uint(&wgp->wgp_tasks, task); + mutex_enter(wgp->wgp_intr_lock); WG_DLOG("tasks=%d, task=%d\n", wgp->wgp_tasks, task); - wg_wakeup_worker(wgp->wgp_sc->wg_worker, WG_WAKEUP_REASON_PEER); + if (wgp->wgp_tasks == 0) + /* + * XXX If the current CPU is already loaded -- e.g., if + * there's already a bunch of handshakes queued up -- + * consider tossing this over to another CPU to + * distribute the load. + */ + workqueue_enqueue(wg_wq, &wgp->wgp_work, NULL); + wgp->wgp_tasks |= task; + mutex_exit(wgp->wgp_intr_lock); } static void @@ -2783,7 +2801,7 @@ wg_receive_packets(struct wg_softc *wg, const int af) struct mbuf *paddr = NULL; struct sockaddr *src; - so = wg_get_so_by_af(wg->wg_worker, af); + so = wg_get_so_by_af(wg, af); flags = MSG_DONTWAIT; dummy_uio.uio_resid = 1000000000; @@ -2987,28 +3005,16 @@ wg_task_destroy_prev_session(struct wg_softc *wg, struct wg_peer *wgp) } static void -wg_process_peer_tasks(struct wg_softc *wg) +wg_peer_work(struct work *wk, void *cookie) { - struct wg_peer *wgp; - int s; + struct wg_peer *wgp = container_of(wk, struct wg_peer, wgp_work); + struct wg_softc *wg = wgp->wgp_sc; + int tasks; - /* XXX should avoid checking all peers */ - s = pserialize_read_enter(); - WG_PEER_READER_FOREACH(wgp, wg) { - struct psref psref; - unsigned int tasks; - - if (wgp->wgp_tasks == 0) - continue; - - wg_get_peer(wgp, &psref); - pserialize_read_exit(s); - - restart: - tasks = atomic_swap_uint(&wgp->wgp_tasks, 0); - KASSERT(tasks != 0); - - WG_DLOG("tasks=%x\n", tasks); + mutex_enter(wgp->wgp_intr_lock); + while ((tasks = wgp->wgp_tasks) != 0) { + wgp->wgp_tasks = 0; + mutex_exit(wgp->wgp_intr_lock); mutex_enter(wgp->wgp_lock); if (ISSET(tasks, WGP_TASK_SEND_INIT_MESSAGE)) @@ -3025,66 +3031,37 @@ wg_process_peer_tasks(struct wg_softc *wg) wg_task_destroy_prev_session(wg, wgp); mutex_exit(wgp->wgp_lock); - /* New tasks may be scheduled during processing tasks */ - WG_DLOG("wgp_tasks=%d\n", wgp->wgp_tasks); - if (wgp->wgp_tasks != 0) - goto restart; - - s = pserialize_read_enter(); - wg_put_peer(wgp, &psref); + mutex_enter(wgp->wgp_intr_lock); } - pserialize_read_exit(s); + mutex_exit(wgp->wgp_intr_lock); } static void -wg_worker(void *arg) +wg_job(struct threadpool_job *job) { - struct wg_softc *wg = arg; - struct wg_worker *wgw = wg->wg_worker; - bool todie = false; - - KASSERT(wg != NULL); - KASSERT(wgw != NULL); - - while (!todie) { - int reasons; - int bound; - - mutex_enter(&wgw->wgw_lock); - /* New tasks may come during task handling */ - while ((reasons = wgw->wgw_wakeup_reasons) == 0 && - !(todie = wgw->wgw_todie)) - cv_wait(&wgw->wgw_cv, &wgw->wgw_lock); - wgw->wgw_wakeup_reasons = 0; - mutex_exit(&wgw->wgw_lock); + struct wg_softc *wg = container_of(job, struct wg_softc, wg_job); + int bound, upcalls; + mutex_enter(wg->wg_intr_lock); + while ((upcalls = wg->wg_upcalls) != 0) { + wg->wg_upcalls = 0; + mutex_exit(wg->wg_intr_lock); bound = curlwp_bind(); - if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4)) + if (ISSET(upcalls, WG_UPCALL_INET)) wg_receive_packets(wg, AF_INET); - if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6)) + if (ISSET(upcalls, WG_UPCALL_INET6)) wg_receive_packets(wg, AF_INET6); - if (ISSET(reasons, WG_WAKEUP_REASON_PEER)) - wg_process_peer_tasks(wg); curlwp_bindx(bound); + mutex_enter(wg->wg_intr_lock); } - kthread_exit(0); -} - -static void -wg_wakeup_worker(struct wg_worker *wgw, const int reason) -{ - - mutex_enter(&wgw->wgw_lock); - wgw->wgw_wakeup_reasons |= reason; - cv_broadcast(&wgw->wgw_cv); - mutex_exit(&wgw->wgw_lock); + threadpool_job_done(job); + mutex_exit(wg->wg_intr_lock); } static int wg_bind_port(struct wg_softc *wg, const uint16_t port) { int error; - struct wg_worker *wgw = wg->wg_worker; uint16_t old_port = wg->wg_listen_port; if (port != 0 && old_port == port) @@ -3096,7 +3073,7 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port) sin->sin_addr.s_addr = INADDR_ANY; sin->sin_port = htons(port); - error = sobind(wgw->wgw_so4, sintosa(sin), curlwp); + error = sobind(wg->wg_so4, sintosa(sin), curlwp); if (error != 0) return error; @@ -3107,7 +3084,7 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port) sin6->sin6_addr = in6addr_any; sin6->sin6_port = htons(port); - error = sobind(wgw->wgw_so6, sin6tosa(sin6), curlwp); + error = sobind(wg->wg_so6, sin6tosa(sin6), curlwp); if (error != 0) return error; #endif @@ -3118,15 +3095,19 @@ wg_bind_port(struct wg_softc *wg, const uint16_t port) } static void -wg_so_upcall(struct socket *so, void *arg, int events, int waitflag) +wg_so_upcall(struct socket *so, void *cookie, int events, int waitflag) { - struct wg_worker *wgw = arg; + struct wg_softc *wg = cookie; int reason; reason = (so->so_proto->pr_domain->dom_family == AF_INET) ? - WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 : - WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6; - wg_wakeup_worker(wgw, reason); + WG_UPCALL_INET : + WG_UPCALL_INET6; + + mutex_enter(wg->wg_intr_lock); + wg->wg_upcalls |= reason; + threadpool_schedule_job(wg->wg_threadpool, &wg->wg_job); + mutex_exit(wg->wg_intr_lock); } static int @@ -3184,8 +3165,7 @@ wg_overudp_cb(struct mbuf **mp, int offset, struct socket *so, } static int -wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af, - struct socket **sop) +wg_socreate(struct wg_softc *wg, int af, struct socket **sop) { int error; struct socket *so; @@ -3195,7 +3175,7 @@ wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af, return error; solock(so); - so->so_upcallarg = wgw; + so->so_upcallarg = wg; so->so_upcall = wg_so_upcall; so->so_rcv.sb_flags |= SB_UPCALL; if (af == AF_INET) @@ -3211,79 +3191,6 @@ wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af, return 0; } -static int -wg_worker_init(struct wg_softc *wg) -{ - int error; - struct wg_worker *wgw; - const char *ifname = wg->wg_if.if_xname; - struct socket *so; - - wgw = kmem_zalloc(sizeof(*wgw), KM_SLEEP); - - mutex_init(&wgw->wgw_lock, MUTEX_DEFAULT, IPL_SOFTNET); - cv_init(&wgw->wgw_cv, ifname); - wgw->wgw_todie = false; - wgw->wgw_wakeup_reasons = 0; - - error = wg_worker_socreate(wg, wgw, AF_INET, &so); - if (error != 0) - goto error; - wgw->wgw_so4 = so; -#ifdef INET6 - error = wg_worker_socreate(wg, wgw, AF_INET6, &so); - if (error != 0) - goto error; - wgw->wgw_so6 = so; -#endif - - wg->wg_worker = wgw; - - error = kthread_create(PRI_NONE, KTHREAD_MPSAFE | KTHREAD_MUSTJOIN, - NULL, wg_worker, wg, &wg->wg_worker_lwp, "%s", ifname); - if (error != 0) - goto error; - - return 0; - -error: -#ifdef INET6 - if (wgw->wgw_so6 != NULL) - soclose(wgw->wgw_so6); -#endif - if (wgw->wgw_so4 != NULL) - soclose(wgw->wgw_so4); - cv_destroy(&wgw->wgw_cv); - mutex_destroy(&wgw->wgw_lock); - - kmem_free(wgw, sizeof(*wgw)); - - return error; -} - -static void -wg_worker_destroy(struct wg_softc *wg) -{ - struct wg_worker *wgw = wg->wg_worker; - - mutex_enter(&wgw->wgw_lock); - wgw->wgw_todie = true; - wgw->wgw_wakeup_reasons = 0; - cv_broadcast(&wgw->wgw_cv); - mutex_exit(&wgw->wgw_lock); - - kthread_join(wg->wg_worker_lwp); - -#ifdef INET6 - soclose(wgw->wgw_so6); -#endif - soclose(wgw->wgw_so4); - cv_destroy(&wgw->wgw_cv); - mutex_destroy(&wgw->wgw_lock); - kmem_free(wg->wg_worker, sizeof(struct wg_worker)); - wg->wg_worker = NULL; -} - static bool wg_session_hit_limits(struct wg_session *wgs) { @@ -3385,6 +3292,7 @@ wg_alloc_peer(struct wg_softc *wg) wgp->wgp_endpoint_changing = false; wgp->wgp_endpoint_available = false; wgp->wgp_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); + wgp->wgp_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET); wgp->wgp_psz = pserialize_create(); psref_target_init(&wgp->wgp_psref, wg_psref_class); @@ -3454,6 +3362,9 @@ wg_destroy_peer(struct wg_peer *wgp) callout_halt(&wgp->wgp_handshake_timeout_timer, NULL); callout_halt(&wgp->wgp_session_dtor_timer, NULL); + /* Wait for any queued work to complete. */ + workqueue_wait(wg_wq, &wgp->wgp_work); + wgs = wgp->wgp_session_unstable; if (wgs->wgs_state != WGS_STATE_UNKNOWN) { mutex_enter(wgp->wgp_lock); @@ -3486,6 +3397,7 @@ wg_destroy_peer(struct wg_peer *wgp) kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0)); pserialize_destroy(wgp->wgp_psz); + mutex_obj_free(wgp->wgp_intr_lock); mutex_obj_free(wgp->wgp_lock); kmem_free(wgp, sizeof(*wgp)); @@ -3618,28 +3530,39 @@ wg_clone_create(struct if_clone *ifc, int unit) if_initname(&wg->wg_if, ifc->ifc_name, unit); - error = wg_worker_init(wg); - if (error) - goto fail0; - - rn_inithead((void **)&wg->wg_rtable_ipv4, - offsetof(struct sockaddr_in, sin_addr) * NBBY); -#ifdef INET6 - rn_inithead((void **)&wg->wg_rtable_ipv6, - offsetof(struct sockaddr_in6, sin6_addr) * NBBY); -#endif - PSLIST_INIT(&wg->wg_peers); wg->wg_peers_bypubkey = thmap_create(0, NULL, THMAP_NOCOPY); wg->wg_peers_byname = thmap_create(0, NULL, THMAP_NOCOPY); wg->wg_sessions_byindex = thmap_create(0, NULL, THMAP_NOCOPY); wg->wg_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE); + wg->wg_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET); wg->wg_rwlock = rw_obj_alloc(); + threadpool_job_init(&wg->wg_job, wg_job, wg->wg_intr_lock, + "%s", if_name(&wg->wg_if)); wg->wg_ops = &wg_ops_rumpkernel; + error = threadpool_get(&wg->wg_threadpool, PRI_NONE); + if (error) + goto fail0; + +#ifdef INET + error = wg_socreate(wg, AF_INET, &wg->wg_so4); + if (error) + goto fail1; + rn_inithead((void **)&wg->wg_rtable_ipv4, + offsetof(struct sockaddr_in, sin_addr) * NBBY); +#endif +#ifdef INET6 + error = wg_socreate(wg, AF_INET6, &wg->wg_so6); + if (error) + goto fail2; + rn_inithead((void **)&wg->wg_rtable_ipv6, + offsetof(struct sockaddr_in6, sin6_addr) * NBBY); +#endif + error = wg_if_attach(wg); if (error) - goto fail1; + goto fail3; mutex_enter(&wg_softcs.lock); LIST_INSERT_HEAD(&wg_softcs.list, wg, wg_list); @@ -3647,24 +3570,47 @@ wg_clone_create(struct if_clone *ifc, int unit) return 0; -fail2: __unused +fail4: __unused mutex_enter(&wg_softcs.lock); LIST_REMOVE(wg, wg_list); mutex_exit(&wg_softcs.lock); wg_if_detach(wg); - wg_destroy_all_peers(wg); +fail3: wg_destroy_all_peers(wg); +#ifdef INET6 + solock(wg->wg_so6); + wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL; + sounlock(wg->wg_so6); +#endif +#ifdef INET + solock(wg->wg_so4); + wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL; + sounlock(wg->wg_so4); +#endif + mutex_enter(wg->wg_intr_lock); + threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job); + mutex_exit(wg->wg_intr_lock); +#ifdef INET6 + if (wg->wg_rtable_ipv6 != NULL) + free(wg->wg_rtable_ipv6, M_RTABLE); + soclose(wg->wg_so6); +fail2: +#endif +#ifdef INET + if (wg->wg_rtable_ipv4 != NULL) + free(wg->wg_rtable_ipv4, M_RTABLE); + soclose(wg->wg_so4); +fail1: +#endif + threadpool_put(wg->wg_threadpool, PRI_NONE); +fail0: threadpool_job_destroy(&wg->wg_job); rw_obj_free(wg->wg_rwlock); + mutex_obj_free(wg->wg_intr_lock); mutex_obj_free(wg->wg_lock); thmap_destroy(wg->wg_sessions_byindex); thmap_destroy(wg->wg_peers_byname); thmap_destroy(wg->wg_peers_bypubkey); PSLIST_DESTROY(&wg->wg_peers); - if (wg->wg_rtable_ipv6 != NULL) - free(wg->wg_rtable_ipv6, M_RTABLE); - if (wg->wg_rtable_ipv4 != NULL) - free(wg->wg_rtable_ipv4, M_RTABLE); -fail1: wg_worker_destroy(wg); -fail0: kmem_free(wg, sizeof(*wg)); + kmem_free(wg, sizeof(*wg)); return error; } @@ -3685,17 +3631,38 @@ wg_clone_destroy(struct ifnet *ifp) mutex_exit(&wg_softcs.lock); wg_if_detach(wg); wg_destroy_all_peers(wg); +#ifdef INET6 + solock(wg->wg_so6); + wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL; + sounlock(wg->wg_so6); +#endif +#ifdef INET + solock(wg->wg_so4); + wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL; + sounlock(wg->wg_so4); +#endif + mutex_enter(wg->wg_intr_lock); + threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job); + mutex_exit(wg->wg_intr_lock); +#ifdef INET6 + if (wg->wg_rtable_ipv6 != NULL) + free(wg->wg_rtable_ipv6, M_RTABLE); + soclose(wg->wg_so6); +#endif +#ifdef INET + if (wg->wg_rtable_ipv4 != NULL) + free(wg->wg_rtable_ipv4, M_RTABLE); + soclose(wg->wg_so4); +#endif + threadpool_put(wg->wg_threadpool, PRI_NONE); + threadpool_job_destroy(&wg->wg_job); rw_obj_free(wg->wg_rwlock); + mutex_obj_free(wg->wg_intr_lock); mutex_obj_free(wg->wg_lock); thmap_destroy(wg->wg_sessions_byindex); thmap_destroy(wg->wg_peers_byname); thmap_destroy(wg->wg_peers_bypubkey); PSLIST_DESTROY(&wg->wg_peers); - if (wg->wg_rtable_ipv4 != NULL) - free(wg->wg_rtable_ipv4, M_RTABLE); - if (wg->wg_rtable_ipv6 != NULL) - free(wg->wg_rtable_ipv6, M_RTABLE); - wg_worker_destroy(wg); kmem_free(wg, sizeof(*wg)); return 0;