Add reconnect code to librumpclient. In case the connection to

the kernel server is lost, the client will now automatically attempt
to reconnect.

Among other things, this makes it possible to "reboot" and restart
the TCP/IP stack from under firefox without any perceivable less
of service.  If pages were loading at the time the TCP/IP server
was killed, there may be some broken links, but nothing a ctrl-r
cannot fix.
This commit is contained in:
pooka 2011-01-24 17:47:51 +00:00
parent 0600fed083
commit 19a57922d0
2 changed files with 269 additions and 76 deletions

View File

@ -1,4 +1,4 @@
/* $NetBSD: rumpclient.c,v 1.17 2011/01/21 10:43:33 pooka Exp $ */
/* $NetBSD: rumpclient.c,v 1.18 2011/01/24 17:47:51 pooka Exp $ */
/*
* Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
@ -50,6 +50,7 @@ __RCSID("$NetBSD");
#include <pthread.h>
#include <signal.h>
#include <stdarg.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@ -78,19 +79,51 @@ static struct spclient clispc = {
.spc_fd = -1,
};
static int kq;
static int kq = -1;
static sigset_t fullset;
static int doconnect(int);
static int handshake_req(struct spclient *, uint32_t *, int, bool);
int didrecon;
static int
waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
send_with_recon(struct spclient *spc, const void *data, size_t dlen)
{
int rv;
do {
rv = dosend(spc, data, dlen);
if (__predict_false(rv == ENOTCONN || rv == EBADF)) {
if ((rv = doconnect(1)) != 0)
continue;
if ((rv = handshake_req(&clispc, NULL, 0, true)) != 0)
continue;
rv = ENOTCONN;
break;
}
} while (__predict_false(rv != 0));
return rv;
}
static int
cliwaitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask,
bool keeplock)
{
uint64_t mygen;
bool imalive = true;
pthread_mutex_lock(&spc->spc_mtx);
sendunlockl(spc);
if (!keeplock)
sendunlockl(spc);
mygen = spc->spc_generation;
rw->rw_error = 0;
while (!rw->rw_done && rw->rw_error == 0
&& spc->spc_state != SPCSTATE_DYING){
while (!rw->rw_done && rw->rw_error == 0) {
if (__predict_false(spc->spc_generation != mygen || !imalive))
break;
/* are we free to receive? */
if (spc->spc_istatus == SPCSTATUS_FREE) {
struct kevent kev[8];
@ -105,7 +138,16 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
case 0:
rv = host_kevent(kq, NULL, 0,
kev, __arraycount(kev), NULL);
assert(rv > 0);
/*
* XXX: don't know how this can
* happen (timeout cannot expire
* since there isn't one), but
* it does happen
*/
if (__predict_false(rv == 0))
continue;
for (i = 0; i < rv; i++) {
if (kev[i].filter
== EVFILT_SIGNAL)
@ -116,7 +158,7 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
continue;
case -1:
spc->spc_state = SPCSTATE_DYING;
imalive = false;
goto cleanup;
default:
break;
@ -160,12 +202,12 @@ waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv);
if (spc->spc_state == SPCSTATE_DYING)
if (spc->spc_generation != mygen || !imalive) {
return ENOTCONN;
}
return rw->rw_error;
}
static int
syscall_req(struct spclient *spc, int sysnum,
const void *data, size_t dlen, void **resp)
@ -182,18 +224,18 @@ syscall_req(struct spclient *spc, int sysnum,
pthread_sigmask(SIG_SETMASK, &fullset, &omask);
do {
putwait(spc, &rw, &rhdr);
rv = dosend(spc, &rhdr, sizeof(rhdr));
rv = dosend(spc, data, dlen);
if (rv) {
if ((rv = send_with_recon(spc, &rhdr, sizeof(rhdr))) != 0) {
unputwait(spc, &rw);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
return rv;
continue;
}
if ((rv = send_with_recon(spc, data, dlen)) != 0) {
unputwait(spc, &rw);
continue;
}
rv = waitresp(spc, &rw, &omask);
} while (rv == EAGAIN);
rv = cliwaitresp(spc, &rw, &omask, false);
} while (rv == ENOTCONN || rv == EAGAIN);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
*resp = rw.rw_data;
@ -201,7 +243,7 @@ syscall_req(struct spclient *spc, int sysnum,
}
static int
handshake_req(struct spclient *spc, uint32_t *auth, int cancel)
handshake_req(struct spclient *spc, uint32_t *auth, int cancel, bool haslock)
{
struct handshake_fork rf;
struct rsp_hdr rhdr;
@ -219,20 +261,28 @@ handshake_req(struct spclient *spc, uint32_t *auth, int cancel)
rhdr.rsp_handshake = HANDSHAKE_GUEST;
pthread_sigmask(SIG_SETMASK, &fullset, &omask);
putwait(spc, &rw, &rhdr);
if (haslock)
putwait_locked(spc, &rw, &rhdr);
else
putwait(spc, &rw, &rhdr);
rv = dosend(spc, &rhdr, sizeof(rhdr));
if (auth) {
memcpy(rf.rf_auth, auth, AUTHLEN*sizeof(*auth));
rf.rf_cancel = cancel;
rv = dosend(spc, &rf, sizeof(rf));
rv = send_with_recon(spc, &rf, sizeof(rf));
}
if (rv != 0 || cancel) {
unputwait(spc, &rw);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
return rv;
if (rv || cancel) {
if (haslock)
unputwait_locked(spc, &rw);
else
unputwait(spc, &rw);
if (cancel) {
pthread_sigmask(SIG_SETMASK, &omask, NULL);
return rv;
}
} else {
rv = cliwaitresp(spc, &rw, &omask, haslock);
}
rv = waitresp(spc, &rw, &omask);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
if (rv)
return rv;
@ -257,26 +307,51 @@ prefork_req(struct spclient *spc, void **resp)
rhdr.rsp_error = 0;
pthread_sigmask(SIG_SETMASK, &fullset, &omask);
putwait(spc, &rw, &rhdr);
rv = dosend(spc, &rhdr, sizeof(rhdr));
if (rv != 0) {
unputwait(spc, &rw);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
return rv;
}
do {
putwait(spc, &rw, &rhdr);
rv = send_with_recon(spc, &rhdr, sizeof(rhdr));
if (rv != 0) {
unputwait(spc, &rw);
continue;
}
rv = waitresp(spc, &rw, &omask);
rv = cliwaitresp(spc, &rw, &omask, false);
} while (rv == ENOTCONN || rv == EAGAIN);
pthread_sigmask(SIG_SETMASK, &omask, NULL);
*resp = rw.rw_data;
return rv;
}
/*
* prevent response code from deadlocking with reconnect code
*/
static int
resp_sendlock(struct spclient *spc)
{
int rv = 0;
pthread_mutex_lock(&spc->spc_mtx);
while (spc->spc_ostatus != SPCSTATUS_FREE) {
if (__predict_false(spc->spc_reconnecting)) {
rv = EBUSY;
goto out;
}
spc->spc_ostatus = SPCSTATUS_WANTED;
pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
}
spc->spc_ostatus = SPCSTATUS_BUSY;
out:
pthread_mutex_unlock(&spc->spc_mtx);
return rv;
}
static void
send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen,
int wantstr)
{
struct rsp_hdr rhdr;
int rv;
if (wantstr)
dlen = MIN(dlen, strlen(data)+1);
@ -287,19 +362,17 @@ send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen,
rhdr.rsp_type = RUMPSP_COPYIN;
rhdr.rsp_sysnum = 0;
sendlock(spc);
rv = dosend(spc, &rhdr, sizeof(rhdr));
rv = dosend(spc, data, dlen);
if (resp_sendlock(spc) != 0)
return;
(void)dosend(spc, &rhdr, sizeof(rhdr));
(void)dosend(spc, data, dlen);
sendunlock(spc);
return rv;
}
static int
static void
send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr)
{
struct rsp_hdr rhdr;
int rv;
rhdr.rsp_len = sizeof(rhdr) + sizeof(addr);
rhdr.rsp_reqno = reqno;
@ -307,12 +380,11 @@ send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr)
rhdr.rsp_type = RUMPSP_ANONMMAP;
rhdr.rsp_sysnum = 0;
sendlock(spc);
rv = dosend(spc, &rhdr, sizeof(rhdr));
rv = dosend(spc, &addr, sizeof(addr));
if (resp_sendlock(spc) != 0)
return;
(void)dosend(spc, &rhdr, sizeof(rhdr));
(void)dosend(spc, &addr, sizeof(addr));
sendunlock(spc);
return rv;
}
int
@ -383,7 +455,7 @@ handlereq(struct spclient *spc)
break;
case RUMPSP_RAISE:
DPRINTF(("rump_sp handlereq: raise sig %d\n", rhdr->rsp_signo));
raise(rhdr->rsp_signo);
raise((int)rhdr->rsp_signo);
/*
* We most likely have signals blocked, but the signal
* will be handled soon enough when we return.
@ -402,22 +474,92 @@ static unsigned ptab_idx;
static struct sockaddr *serv_sa;
static int
doconnect(void)
doconnect(int retry)
{
time_t prevreconmsg;
unsigned reconretries;
struct respwait rw;
struct rsp_hdr rhdr;
struct kevent kev[NSIG+1];
char banner[MAXBANNER];
struct pollfd pfd;
int s, error, flags, i;
ssize_t n;
if (kq != -1)
host_close(kq);
kq = -1;
prevreconmsg = 0;
reconretries = 0;
again:
if (clispc.spc_fd != -1)
host_close(clispc.spc_fd);
clispc.spc_fd = -1;
/*
* for reconnect, gate everyone out of the receiver code
*/
putwait_locked(&clispc, &rw, &rhdr);
pthread_mutex_lock(&clispc.spc_mtx);
clispc.spc_reconnecting = 1;
pthread_cond_broadcast(&clispc.spc_cv);
clispc.spc_generation++;
while (clispc.spc_istatus != SPCSTATUS_FREE) {
clispc.spc_istatus = SPCSTATUS_WANTED;
pthread_cond_wait(&rw.rw_cv, &clispc.spc_mtx);
}
kickall(&clispc);
/*
* we can release it already since we hold the
* send lock during reconnect
* XXX: assert it
*/
clispc.spc_istatus = SPCSTATUS_FREE;
pthread_mutex_unlock(&clispc.spc_mtx);
unputwait_locked(&clispc, &rw);
free(clispc.spc_buf);
clispc.spc_off = 0;
s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0);
if (s == -1)
return -1;
if (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) {
error = errno;
fprintf(stderr, "rump_sp: client connect failed\n");
errno = error;
return -1;
pfd.fd = s;
pfd.events = POLLIN;
while (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) {
if (errno == EINTR)
continue;
if (!retry) {
error = errno;
fprintf(stderr, "rump_sp: client connect failed: %s\n",
strerror(errno));
errno = error;
return -1;
}
if (prevreconmsg == 0) {
fprintf(stderr, "rump_sp: connection to kernel lost, "
"trying to reconnect ...\n");
prevreconmsg = time(NULL);
}
if (time(NULL) - prevreconmsg > 120) {
fprintf(stderr, "rump_sp: still trying to "
"reconnect ...\n");
prevreconmsg = time(NULL);
}
/* adhoc backoff timer */
if (reconretries++ < 10) {
usleep(100000 * reconretries);
} else {
sleep(MIN(10, reconretries-9));
}
goto again;
}
if ((error = parsetab[ptab_idx].connhook(s)) != 0) {
@ -440,15 +582,22 @@ doconnect(void)
return -1;
}
banner[n] = '\0';
/* parse the banner some day */
flags = host_fcntl(s, F_GETFL, 0);
if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n");
fprintf(stderr, "rump_sp: socket fd NONBLOCK: %s\n",
strerror(errno));
errno = EINVAL;
return -1;
}
clispc.spc_fd = s;
clispc.spc_state = SPCSTATE_RUNNING;
clispc.spc_reconnecting = 0;
/* parse the banner some day */
if (prevreconmsg) {
fprintf(stderr, "rump_sp: reconnected!\n");
}
/* setup kqueue, we want all signals and the fd */
if ((kq = host_kqueue()) == -1) {
@ -461,7 +610,8 @@ doconnect(void)
for (i = 0; i < NSIG; i++) {
EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0);
}
EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0);
EV_SET(&kev[NSIG], clispc.spc_fd,
EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0);
if (host_kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) {
error = errno;
fprintf(stderr, "rump_sp: kevent() failed");
@ -469,7 +619,13 @@ doconnect(void)
return -1;
}
clispc.spc_fd = s;
return 0;
}
static int
doinit(void)
{
TAILQ_INIT(&clispc.spc_respwait);
pthread_mutex_init(&clispc.spc_mtx, NULL);
pthread_cond_init(&clispc.spc_cv, NULL);
@ -521,14 +677,17 @@ rumpclient_init()
return -1;
}
if (doconnect() == -1)
if (doinit() == -1)
return -1;
if (doconnect(0) == -1)
return -1;
error = handshake_req(&clispc, NULL, 0);
error = handshake_req(&clispc, NULL, 0, false);
if (error) {
pthread_mutex_destroy(&clispc.spc_mtx);
pthread_cond_destroy(&clispc.spc_cv);
host_close(clispc.spc_fd);
if (clispc.spc_fd != -1)
host_close(clispc.spc_fd);
errno = error;
return -1;
}
@ -569,16 +728,16 @@ rumpclient_fork_init(struct rumpclient_fork *rpf)
{
int error;
host_close(clispc.spc_fd);
host_close(kq);
kq = -1;
memset(&clispc, 0, sizeof(clispc));
clispc.spc_fd = -1;
kq = -1;
if (doconnect() == -1)
if (doinit() == -1)
return -1;
if (doconnect(1) == -1)
return -1;
error = handshake_req(&clispc, rpf->fork_auth, 0);
error = handshake_req(&clispc, rpf->fork_auth, 0, false);
if (error) {
pthread_mutex_destroy(&clispc.spc_mtx);
pthread_cond_destroy(&clispc.spc_cv);

View File

@ -1,4 +1,4 @@
/* $NetBSD: sp_common.c,v 1.25 2011/01/22 13:41:22 pooka Exp $ */
/* $NetBSD: sp_common.c,v 1.26 2011/01/24 17:47:52 pooka Exp $ */
/*
* Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved.
@ -177,7 +177,9 @@ struct spclient {
uint64_t spc_nextreq;
uint64_t spc_syscallreq;
uint64_t spc_generation;
int spc_ostatus, spc_istatus;
int spc_reconnecting;
LIST_HEAD(, prefork) spc_pflist;
};
@ -223,7 +225,7 @@ sendlockl(struct spclient *spc)
spc->spc_ostatus = SPCSTATUS_BUSY;
}
static void
static void __unused
sendlock(struct spclient *spc)
{
@ -273,14 +275,16 @@ dosend(struct spclient *spc, const void *data, size_t dlen)
n = host_sendto(fd, sdata + sent, dlen - sent,
MSG_NOSIGNAL, NULL, 0);
if (n == 0) {
return ENOTCONN;
}
if (n == -1) {
if (errno == EPIPE)
return ENOTCONN;
if (errno != EAGAIN)
return errno;
continue;
}
if (n == 0) {
return ENOTCONN;
}
sent += n;
}
@ -288,7 +292,7 @@ dosend(struct spclient *spc, const void *data, size_t dlen)
}
static void
putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
doputwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
{
rw->rw_data = NULL;
@ -298,11 +302,43 @@ putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
pthread_mutex_lock(&spc->spc_mtx);
rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++;
TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
}
static void __unused
putwait_locked(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
{
doputwait(spc, rw, rhdr);
pthread_mutex_unlock(&spc->spc_mtx);
}
static void
putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
{
doputwait(spc, rw, rhdr);
sendlockl(spc);
pthread_mutex_unlock(&spc->spc_mtx);
}
static void
dounputwait(struct spclient *spc, struct respwait *rw)
{
TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv);
}
static void __unused
unputwait_locked(struct spclient *spc, struct respwait *rw)
{
pthread_mutex_lock(&spc->spc_mtx);
dounputwait(spc, rw);
}
static void
unputwait(struct spclient *spc, struct respwait *rw)
{
@ -310,9 +346,7 @@ unputwait(struct spclient *spc, struct respwait *rw)
pthread_mutex_lock(&spc->spc_mtx);
sendunlockl(spc);
TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv);
dounputwait(spc, rw);
}
static void