Fix a few locking problems with multithreaded clients.

TODO: make server deal graciously with out-of-resources conditions
This commit is contained in:
pooka 2010-11-26 18:51:03 +00:00
parent f8063a37d7
commit f0d58f7830
2 changed files with 47 additions and 29 deletions

View File

@ -1,4 +1,4 @@
/* $NetBSD: rumpuser_sp.c,v 1.17 2010/11/26 14:37:08 pooka Exp $ */ /* $NetBSD: rumpuser_sp.c,v 1.18 2010/11/26 18:51:03 pooka Exp $ */
/* /*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved. * Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@ -38,7 +38,7 @@
*/ */
#include <sys/cdefs.h> #include <sys/cdefs.h>
__RCSID("$NetBSD: rumpuser_sp.c,v 1.17 2010/11/26 14:37:08 pooka Exp $"); __RCSID("$NetBSD: rumpuser_sp.c,v 1.18 2010/11/26 18:51:03 pooka Exp $");
#include <sys/types.h> #include <sys/types.h>
#include <sys/atomic.h> #include <sys/atomic.h>
@ -65,7 +65,7 @@ __RCSID("$NetBSD: rumpuser_sp.c,v 1.17 2010/11/26 14:37:08 pooka Exp $");
#include "sp_common.c" #include "sp_common.c"
#define MAXCLI 4 #define MAXCLI 256
static struct pollfd pfdlist[MAXCLI]; static struct pollfd pfdlist[MAXCLI];
static struct spclient spclist[MAXCLI]; static struct spclient spclist[MAXCLI];
@ -329,7 +329,6 @@ spcrelease(struct spclient *spc)
spc->spc_dying = 0; spc->spc_dying = 0;
atomic_inc_uint(&disco); atomic_inc_uint(&disco);
} }
static void static void
@ -622,10 +621,9 @@ spserver(void *arg)
DPRINTF(("rump_sp: server mainloop\n")); DPRINTF(("rump_sp: server mainloop\n"));
for (;;) { for (;;) {
/* g/c hangarounds (eventually) */
if (disco) {
int discoed; int discoed;
/* g/c hangarounds (eventually) */
discoed = atomic_swap_uint(&disco, 0); discoed = atomic_swap_uint(&disco, 0);
while (discoed--) { while (discoed--) {
nfds--; nfds--;
@ -639,8 +637,6 @@ spserver(void *arg)
} }
DPRINTF(("rump_sp: set maxidx to [%u]\n", DPRINTF(("rump_sp: set maxidx to [%u]\n",
maxidx)); maxidx));
assert(maxidx+1 >= nfds);
}
} }
DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1)); DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
@ -744,7 +740,7 @@ rumpuser_sp_init(const struct rumpuser_sp_ops *spopsp, const char *url)
fprintf(stderr, "rump_sp: server bind failed\n"); fprintf(stderr, "rump_sp: server bind failed\n");
return errno; return errno;
} }
if (listen(s, 20) == -1) { if (listen(s, MAXCLI) == -1) {
fprintf(stderr, "rump_sp: server listen failed\n"); fprintf(stderr, "rump_sp: server listen failed\n");
return errno; return errno;
} }

View File

@ -1,4 +1,4 @@
/* $NetBSD: sp_common.c,v 1.11 2010/11/26 14:37:08 pooka Exp $ */ /* $NetBSD: sp_common.c,v 1.12 2010/11/26 18:51:03 pooka Exp $ */
/* /*
* Copyright (c) 2010 Antti Kantee. All Rights Reserved. * Copyright (c) 2010 Antti Kantee. All Rights Reserved.
@ -133,7 +133,7 @@ struct spclient {
TAILQ_HEAD(, respwait) spc_respwait; TAILQ_HEAD(, respwait) spc_respwait;
/* rest of the fields are zeroed upon disconnect */ /* rest of the fields are zeroed upon disconnect */
#define SPC_ZEROFF offsetof(struct spclient, spc_pid) #define SPC_ZEROFF offsetof(struct spclient, spc_pfd)
struct pollfd *spc_pfd; struct pollfd *spc_pfd;
struct rsp_hdr spc_hdr; struct rsp_hdr spc_hdr;
@ -154,26 +154,42 @@ static int readframe(struct spclient *);
static void handlereq(struct spclient *); static void handlereq(struct spclient *);
static void static void
sendlock(struct spclient *spc) sendlockl(struct spclient *spc)
{ {
pthread_mutex_lock(&spc->spc_mtx); /* assert(pthread_mutex_owned) */
while (spc->spc_ostatus != SPCSTATUS_FREE) { while (spc->spc_ostatus != SPCSTATUS_FREE) {
spc->spc_ostatus = SPCSTATUS_WANTED; spc->spc_ostatus = SPCSTATUS_WANTED;
pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx); pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
} }
spc->spc_ostatus = SPCSTATUS_BUSY; spc->spc_ostatus = SPCSTATUS_BUSY;
}
static void
sendlock(struct spclient *spc)
{
pthread_mutex_lock(&spc->spc_mtx);
sendlockl(spc);
pthread_mutex_unlock(&spc->spc_mtx); pthread_mutex_unlock(&spc->spc_mtx);
} }
static void
sendunlockl(struct spclient *spc)
{
/* assert(pthread_mutex_owned) */
if (spc->spc_ostatus == SPCSTATUS_WANTED)
pthread_cond_broadcast(&spc->spc_cv);
spc->spc_ostatus = SPCSTATUS_FREE;
}
static void static void
sendunlock(struct spclient *spc) sendunlock(struct spclient *spc)
{ {
pthread_mutex_lock(&spc->spc_mtx); pthread_mutex_lock(&spc->spc_mtx);
if (spc->spc_ostatus == SPCSTATUS_WANTED) sendunlockl(spc);
pthread_cond_broadcast(&spc->spc_cv);
spc->spc_ostatus = SPCSTATUS_FREE;
pthread_mutex_unlock(&spc->spc_mtx); pthread_mutex_unlock(&spc->spc_mtx);
} }
@ -224,12 +240,16 @@ putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
pthread_mutex_lock(&spc->spc_mtx); pthread_mutex_lock(&spc->spc_mtx);
rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++; rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++;
TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries); TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
sendlockl(spc);
} }
static void static void
unputwait(struct spclient *spc, struct respwait *rw) unputwait(struct spclient *spc, struct respwait *rw)
{ {
sendunlockl(spc);
TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
pthread_mutex_unlock(&spc->spc_mtx); pthread_mutex_unlock(&spc->spc_mtx);
pthread_cond_destroy(&rw->rw_cv); pthread_cond_destroy(&rw->rw_cv);
@ -267,7 +287,7 @@ kickall(struct spclient *spc)
/* DIAGASSERT(mutex_owned(spc_lock)) */ /* DIAGASSERT(mutex_owned(spc_lock)) */
TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries)
pthread_cond_signal(&rw->rw_cv); pthread_cond_broadcast(&rw->rw_cv);
} }
static int static int
@ -276,6 +296,8 @@ waitresp(struct spclient *spc, struct respwait *rw)
struct pollfd pfd; struct pollfd pfd;
int rv = 0; int rv = 0;
sendunlockl(spc);
while (rw->rw_data == NULL && spc->spc_dying == 0) { while (rw->rw_data == NULL && spc->spc_dying == 0) {
/* are we free to receive? */ /* are we free to receive? */
if (spc->spc_istatus == SPCSTATUS_FREE) { if (spc->spc_istatus == SPCSTATUS_FREE) {