diff --git a/lib/librumpclient/rumpclient.c b/lib/librumpclient/rumpclient.c index e418955a2898..5e9bad68d9bb 100644 --- a/lib/librumpclient/rumpclient.c +++ b/lib/librumpclient/rumpclient.c @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.c,v 1.17 2011/01/21 10:43:33 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.18 2011/01/24 17:47:51 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -50,6 +50,7 @@ __RCSID("$NetBSD"); #include #include #include +#include #include #include #include @@ -78,19 +79,51 @@ static struct spclient clispc = { .spc_fd = -1, }; -static int kq; +static int kq = -1; static sigset_t fullset; +static int doconnect(int); +static int handshake_req(struct spclient *, uint32_t *, int, bool); + +int didrecon; + static int -waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) +send_with_recon(struct spclient *spc, const void *data, size_t dlen) { + int rv; + + do { + rv = dosend(spc, data, dlen); + if (__predict_false(rv == ENOTCONN || rv == EBADF)) { + if ((rv = doconnect(1)) != 0) + continue; + if ((rv = handshake_req(&clispc, NULL, 0, true)) != 0) + continue; + rv = ENOTCONN; + break; + } + } while (__predict_false(rv != 0)); + + return rv; +} + +static int +cliwaitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask, + bool keeplock) +{ + uint64_t mygen; + bool imalive = true; pthread_mutex_lock(&spc->spc_mtx); - sendunlockl(spc); + if (!keeplock) + sendunlockl(spc); + mygen = spc->spc_generation; rw->rw_error = 0; - while (!rw->rw_done && rw->rw_error == 0 - && spc->spc_state != SPCSTATE_DYING){ + while (!rw->rw_done && rw->rw_error == 0) { + if (__predict_false(spc->spc_generation != mygen || !imalive)) + break; + /* are we free to receive? */ if (spc->spc_istatus == SPCSTATUS_FREE) { struct kevent kev[8]; @@ -105,7 +138,16 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) case 0: rv = host_kevent(kq, NULL, 0, kev, __arraycount(kev), NULL); - assert(rv > 0); + + /* + * XXX: don't know how this can + * happen (timeout cannot expire + * since there isn't one), but + * it does happen + */ + if (__predict_false(rv == 0)) + continue; + for (i = 0; i < rv; i++) { if (kev[i].filter == EVFILT_SIGNAL) @@ -116,7 +158,7 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) continue; case -1: - spc->spc_state = SPCSTATE_DYING; + imalive = false; goto cleanup; default: break; @@ -160,12 +202,12 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) pthread_mutex_unlock(&spc->spc_mtx); pthread_cond_destroy(&rw->rw_cv); - if (spc->spc_state == SPCSTATE_DYING) + if (spc->spc_generation != mygen || !imalive) { return ENOTCONN; + } return rw->rw_error; } - static int syscall_req(struct spclient *spc, int sysnum, const void *data, size_t dlen, void **resp) @@ -182,18 +224,18 @@ syscall_req(struct spclient *spc, int sysnum, pthread_sigmask(SIG_SETMASK, &fullset, &omask); do { - putwait(spc, &rw, &rhdr); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, data, dlen); - if (rv) { + if ((rv = send_with_recon(spc, &rhdr, sizeof(rhdr))) != 0) { unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; + continue; + } + if ((rv = send_with_recon(spc, data, dlen)) != 0) { + unputwait(spc, &rw); + continue; } - rv = waitresp(spc, &rw, &omask); - } while (rv == EAGAIN); + rv = cliwaitresp(spc, &rw, &omask, false); + } while (rv == ENOTCONN || rv == EAGAIN); pthread_sigmask(SIG_SETMASK, &omask, NULL); *resp = rw.rw_data; @@ -201,7 +243,7 @@ syscall_req(struct spclient *spc, int sysnum, } static int -handshake_req(struct spclient *spc, uint32_t *auth, int cancel) +handshake_req(struct spclient *spc, uint32_t *auth, int cancel, bool haslock) { struct handshake_fork rf; struct rsp_hdr rhdr; @@ -219,20 +261,28 @@ handshake_req(struct spclient *spc, uint32_t *auth, int cancel) rhdr.rsp_handshake = HANDSHAKE_GUEST; pthread_sigmask(SIG_SETMASK, &fullset, &omask); - putwait(spc, &rw, &rhdr); + if (haslock) + putwait_locked(spc, &rw, &rhdr); + else + putwait(spc, &rw, &rhdr); rv = dosend(spc, &rhdr, sizeof(rhdr)); if (auth) { memcpy(rf.rf_auth, auth, AUTHLEN*sizeof(*auth)); rf.rf_cancel = cancel; - rv = dosend(spc, &rf, sizeof(rf)); + rv = send_with_recon(spc, &rf, sizeof(rf)); } - if (rv != 0 || cancel) { - unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; + if (rv || cancel) { + if (haslock) + unputwait_locked(spc, &rw); + else + unputwait(spc, &rw); + if (cancel) { + pthread_sigmask(SIG_SETMASK, &omask, NULL); + return rv; + } + } else { + rv = cliwaitresp(spc, &rw, &omask, haslock); } - - rv = waitresp(spc, &rw, &omask); pthread_sigmask(SIG_SETMASK, &omask, NULL); if (rv) return rv; @@ -257,26 +307,51 @@ prefork_req(struct spclient *spc, void **resp) rhdr.rsp_error = 0; pthread_sigmask(SIG_SETMASK, &fullset, &omask); - putwait(spc, &rw, &rhdr); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - if (rv != 0) { - unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; - } + do { + putwait(spc, &rw, &rhdr); + rv = send_with_recon(spc, &rhdr, sizeof(rhdr)); + if (rv != 0) { + unputwait(spc, &rw); + continue; + } - rv = waitresp(spc, &rw, &omask); + rv = cliwaitresp(spc, &rw, &omask, false); + } while (rv == ENOTCONN || rv == EAGAIN); pthread_sigmask(SIG_SETMASK, &omask, NULL); + *resp = rw.rw_data; return rv; } +/* + * prevent response code from deadlocking with reconnect code + */ static int +resp_sendlock(struct spclient *spc) +{ + int rv = 0; + + pthread_mutex_lock(&spc->spc_mtx); + while (spc->spc_ostatus != SPCSTATUS_FREE) { + if (__predict_false(spc->spc_reconnecting)) { + rv = EBUSY; + goto out; + } + spc->spc_ostatus = SPCSTATUS_WANTED; + pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx); + } + spc->spc_ostatus = SPCSTATUS_BUSY; + + out: + pthread_mutex_unlock(&spc->spc_mtx); + return rv; +} + +static void send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen, int wantstr) { struct rsp_hdr rhdr; - int rv; if (wantstr) dlen = MIN(dlen, strlen(data)+1); @@ -287,19 +362,17 @@ send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen, rhdr.rsp_type = RUMPSP_COPYIN; rhdr.rsp_sysnum = 0; - sendlock(spc); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, data, dlen); + if (resp_sendlock(spc) != 0) + return; + (void)dosend(spc, &rhdr, sizeof(rhdr)); + (void)dosend(spc, data, dlen); sendunlock(spc); - - return rv; } -static int +static void send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr) { struct rsp_hdr rhdr; - int rv; rhdr.rsp_len = sizeof(rhdr) + sizeof(addr); rhdr.rsp_reqno = reqno; @@ -307,12 +380,11 @@ send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr) rhdr.rsp_type = RUMPSP_ANONMMAP; rhdr.rsp_sysnum = 0; - sendlock(spc); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, &addr, sizeof(addr)); + if (resp_sendlock(spc) != 0) + return; + (void)dosend(spc, &rhdr, sizeof(rhdr)); + (void)dosend(spc, &addr, sizeof(addr)); sendunlock(spc); - - return rv; } int @@ -383,7 +455,7 @@ handlereq(struct spclient *spc) break; case RUMPSP_RAISE: DPRINTF(("rump_sp handlereq: raise sig %d\n", rhdr->rsp_signo)); - raise(rhdr->rsp_signo); + raise((int)rhdr->rsp_signo); /* * We most likely have signals blocked, but the signal * will be handled soon enough when we return. @@ -402,22 +474,92 @@ static unsigned ptab_idx; static struct sockaddr *serv_sa; static int -doconnect(void) +doconnect(int retry) { + time_t prevreconmsg; + unsigned reconretries; + struct respwait rw; + struct rsp_hdr rhdr; struct kevent kev[NSIG+1]; char banner[MAXBANNER]; + struct pollfd pfd; int s, error, flags, i; ssize_t n; + if (kq != -1) + host_close(kq); + kq = -1; + + prevreconmsg = 0; + reconretries = 0; + + again: + if (clispc.spc_fd != -1) + host_close(clispc.spc_fd); + clispc.spc_fd = -1; + + /* + * for reconnect, gate everyone out of the receiver code + */ + putwait_locked(&clispc, &rw, &rhdr); + + pthread_mutex_lock(&clispc.spc_mtx); + clispc.spc_reconnecting = 1; + pthread_cond_broadcast(&clispc.spc_cv); + clispc.spc_generation++; + while (clispc.spc_istatus != SPCSTATUS_FREE) { + clispc.spc_istatus = SPCSTATUS_WANTED; + pthread_cond_wait(&rw.rw_cv, &clispc.spc_mtx); + } + kickall(&clispc); + + /* + * we can release it already since we hold the + * send lock during reconnect + * XXX: assert it + */ + clispc.spc_istatus = SPCSTATUS_FREE; + pthread_mutex_unlock(&clispc.spc_mtx); + unputwait_locked(&clispc, &rw); + + free(clispc.spc_buf); + clispc.spc_off = 0; + s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0); if (s == -1) return -1; - if (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) { - error = errno; - fprintf(stderr, "rump_sp: client connect failed\n"); - errno = error; - return -1; + pfd.fd = s; + pfd.events = POLLIN; + while (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) { + if (errno == EINTR) + continue; + if (!retry) { + error = errno; + fprintf(stderr, "rump_sp: client connect failed: %s\n", + strerror(errno)); + errno = error; + return -1; + } + + if (prevreconmsg == 0) { + fprintf(stderr, "rump_sp: connection to kernel lost, " + "trying to reconnect ...\n"); + prevreconmsg = time(NULL); + } + if (time(NULL) - prevreconmsg > 120) { + fprintf(stderr, "rump_sp: still trying to " + "reconnect ...\n"); + prevreconmsg = time(NULL); + } + + /* adhoc backoff timer */ + if (reconretries++ < 10) { + usleep(100000 * reconretries); + } else { + sleep(MIN(10, reconretries-9)); + } + goto again; } if ((error = parsetab[ptab_idx].connhook(s)) != 0) { @@ -440,15 +582,22 @@ doconnect(void) return -1; } banner[n] = '\0'; + /* parse the banner some day */ flags = host_fcntl(s, F_GETFL, 0); if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { - fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n"); + fprintf(stderr, "rump_sp: socket fd NONBLOCK: %s\n", + strerror(errno)); errno = EINVAL; return -1; } + clispc.spc_fd = s; + clispc.spc_state = SPCSTATE_RUNNING; + clispc.spc_reconnecting = 0; - /* parse the banner some day */ + if (prevreconmsg) { + fprintf(stderr, "rump_sp: reconnected!\n"); + } /* setup kqueue, we want all signals and the fd */ if ((kq = host_kqueue()) == -1) { @@ -461,7 +610,8 @@ doconnect(void) for (i = 0; i < NSIG; i++) { EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0); } - EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); + EV_SET(&kev[NSIG], clispc.spc_fd, + EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); if (host_kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) { error = errno; fprintf(stderr, "rump_sp: kevent() failed"); @@ -469,7 +619,13 @@ doconnect(void) return -1; } - clispc.spc_fd = s; + return 0; +} + +static int +doinit(void) +{ + TAILQ_INIT(&clispc.spc_respwait); pthread_mutex_init(&clispc.spc_mtx, NULL); pthread_cond_init(&clispc.spc_cv, NULL); @@ -521,14 +677,17 @@ rumpclient_init() return -1; } - if (doconnect() == -1) + if (doinit() == -1) + return -1; + if (doconnect(0) == -1) return -1; - error = handshake_req(&clispc, NULL, 0); + error = handshake_req(&clispc, NULL, 0, false); if (error) { pthread_mutex_destroy(&clispc.spc_mtx); pthread_cond_destroy(&clispc.spc_cv); - host_close(clispc.spc_fd); + if (clispc.spc_fd != -1) + host_close(clispc.spc_fd); errno = error; return -1; } @@ -569,16 +728,16 @@ rumpclient_fork_init(struct rumpclient_fork *rpf) { int error; - host_close(clispc.spc_fd); - host_close(kq); - kq = -1; memset(&clispc, 0, sizeof(clispc)); clispc.spc_fd = -1; + kq = -1; - if (doconnect() == -1) + if (doinit() == -1) + return -1; + if (doconnect(1) == -1) return -1; - error = handshake_req(&clispc, rpf->fork_auth, 0); + error = handshake_req(&clispc, rpf->fork_auth, 0, false); if (error) { pthread_mutex_destroy(&clispc.spc_mtx); pthread_cond_destroy(&clispc.spc_cv); diff --git a/lib/librumpuser/sp_common.c b/lib/librumpuser/sp_common.c index 9dfd85984b28..03ab5412d7dd 100644 --- a/lib/librumpuser/sp_common.c +++ b/lib/librumpuser/sp_common.c @@ -1,4 +1,4 @@ -/* $NetBSD: sp_common.c,v 1.25 2011/01/22 13:41:22 pooka Exp $ */ +/* $NetBSD: sp_common.c,v 1.26 2011/01/24 17:47:52 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -177,7 +177,9 @@ struct spclient { uint64_t spc_nextreq; uint64_t spc_syscallreq; + uint64_t spc_generation; int spc_ostatus, spc_istatus; + int spc_reconnecting; LIST_HEAD(, prefork) spc_pflist; }; @@ -223,7 +225,7 @@ sendlockl(struct spclient *spc) spc->spc_ostatus = SPCSTATUS_BUSY; } -static void +static void __unused sendlock(struct spclient *spc) { @@ -273,14 +275,16 @@ dosend(struct spclient *spc, const void *data, size_t dlen) n = host_sendto(fd, sdata + sent, dlen - sent, MSG_NOSIGNAL, NULL, 0); - if (n == 0) { - return ENOTCONN; - } if (n == -1) { + if (errno == EPIPE) + return ENOTCONN; if (errno != EAGAIN) return errno; continue; } + if (n == 0) { + return ENOTCONN; + } sent += n; } @@ -288,7 +292,7 @@ dosend(struct spclient *spc, const void *data, size_t dlen) } static void -putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +doputwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) { rw->rw_data = NULL; @@ -298,11 +302,43 @@ putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) pthread_mutex_lock(&spc->spc_mtx); rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++; TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries); +} +static void __unused +putwait_locked(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +{ + + doputwait(spc, rw, rhdr); + pthread_mutex_unlock(&spc->spc_mtx); +} + +static void +putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +{ + + doputwait(spc, rw, rhdr); sendlockl(spc); pthread_mutex_unlock(&spc->spc_mtx); } +static void +dounputwait(struct spclient *spc, struct respwait *rw) +{ + + TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); + pthread_mutex_unlock(&spc->spc_mtx); + pthread_cond_destroy(&rw->rw_cv); + +} + +static void __unused +unputwait_locked(struct spclient *spc, struct respwait *rw) +{ + + pthread_mutex_lock(&spc->spc_mtx); + dounputwait(spc, rw); +} + static void unputwait(struct spclient *spc, struct respwait *rw) { @@ -310,9 +346,7 @@ unputwait(struct spclient *spc, struct respwait *rw) pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); - TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); - pthread_mutex_unlock(&spc->spc_mtx); - pthread_cond_destroy(&rw->rw_cv); + dounputwait(spc, rw); } static void