block/nbd: nbd reconnect
Implement reconnect. To achieve this: 1. add new modes: connecting-wait: means, that reconnecting is in progress, and there were small number of reconnect attempts, so all requests are waiting for the connection. connecting-nowait: reconnecting is in progress, there were a lot of attempts of reconnect, all requests will return errors. two old modes are used too: connected: normal state quit: exiting after fatal error or on close Possible transitions are: * -> quit connecting-* -> connected connecting-wait -> connecting-nowait (transition is done after reconnect-delay seconds in connecting-wait mode) connected -> connecting-wait 2. Implement reconnect in connection_co. So, in connecting-* mode, connection_co, tries to reconnect unlimited times. 3. Retry nbd queries on channel error, if we are in connecting-wait state. Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> Message-Id: <20191009084158.15614-3-vsementsov@virtuozzo.com> Reviewed-by: Eric Blake <eblake@redhat.com> Signed-off-by: Eric Blake <eblake@redhat.com>
This commit is contained in:
parent
3d692649d1
commit
f7651539d8
245
block/nbd.c
245
block/nbd.c
@ -1,6 +1,7 @@
|
||||
/*
|
||||
* QEMU Block driver for NBD
|
||||
*
|
||||
* Copyright (c) 2019 Virtuozzo International GmbH.
|
||||
* Copyright (C) 2016 Red Hat, Inc.
|
||||
* Copyright (C) 2008 Bull S.A.S.
|
||||
* Author: Laurent Vivier <Laurent.Vivier@bull.net>
|
||||
@ -55,6 +56,8 @@ typedef struct {
|
||||
} NBDClientRequest;
|
||||
|
||||
typedef enum NBDClientState {
|
||||
NBD_CLIENT_CONNECTING_WAIT,
|
||||
NBD_CLIENT_CONNECTING_NOWAIT,
|
||||
NBD_CLIENT_CONNECTED,
|
||||
NBD_CLIENT_QUIT
|
||||
} NBDClientState;
|
||||
@ -67,8 +70,14 @@ typedef struct BDRVNBDState {
|
||||
CoMutex send_mutex;
|
||||
CoQueue free_sema;
|
||||
Coroutine *connection_co;
|
||||
QemuCoSleepState *connection_co_sleep_ns_state;
|
||||
bool drained;
|
||||
bool wait_drained_end;
|
||||
int in_flight;
|
||||
NBDClientState state;
|
||||
int connect_status;
|
||||
Error *connect_err;
|
||||
bool wait_in_flight;
|
||||
|
||||
NBDClientRequest requests[MAX_NBD_REQUESTS];
|
||||
NBDReply reply;
|
||||
@ -83,10 +92,21 @@ typedef struct BDRVNBDState {
|
||||
char *x_dirty_bitmap;
|
||||
} BDRVNBDState;
|
||||
|
||||
/* @ret will be used for reconnect in future */
|
||||
static int nbd_client_connect(BlockDriverState *bs, Error **errp);
|
||||
|
||||
static void nbd_channel_error(BDRVNBDState *s, int ret)
|
||||
{
|
||||
if (ret == -EIO) {
|
||||
if (s->state == NBD_CLIENT_CONNECTED) {
|
||||
s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
|
||||
NBD_CLIENT_CONNECTING_NOWAIT;
|
||||
}
|
||||
} else {
|
||||
if (s->state == NBD_CLIENT_CONNECTED) {
|
||||
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
|
||||
}
|
||||
s->state = NBD_CLIENT_QUIT;
|
||||
}
|
||||
}
|
||||
|
||||
static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
|
||||
@ -129,7 +149,13 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
|
||||
{
|
||||
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
|
||||
|
||||
/*
|
||||
* s->connection_co is either yielded from nbd_receive_reply or from
|
||||
* nbd_co_reconnect_loop()
|
||||
*/
|
||||
if (s->state == NBD_CLIENT_CONNECTED) {
|
||||
qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
|
||||
}
|
||||
|
||||
bdrv_inc_in_flight(bs);
|
||||
|
||||
@ -140,24 +166,150 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
|
||||
aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
|
||||
}
|
||||
|
||||
static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
|
||||
{
|
||||
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
|
||||
|
||||
s->drained = true;
|
||||
if (s->connection_co_sleep_ns_state) {
|
||||
qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
|
||||
}
|
||||
}
|
||||
|
||||
static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
|
||||
{
|
||||
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
|
||||
|
||||
s->drained = false;
|
||||
if (s->wait_drained_end) {
|
||||
s->wait_drained_end = false;
|
||||
aio_co_wake(s->connection_co);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void nbd_teardown_connection(BlockDriverState *bs)
|
||||
{
|
||||
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
|
||||
|
||||
assert(s->ioc);
|
||||
|
||||
if (s->state == NBD_CLIENT_CONNECTED) {
|
||||
/* finish any pending coroutines */
|
||||
qio_channel_shutdown(s->ioc,
|
||||
QIO_CHANNEL_SHUTDOWN_BOTH,
|
||||
NULL);
|
||||
assert(s->ioc);
|
||||
qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
|
||||
}
|
||||
s->state = NBD_CLIENT_QUIT;
|
||||
if (s->connection_co) {
|
||||
if (s->connection_co_sleep_ns_state) {
|
||||
qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
|
||||
}
|
||||
}
|
||||
BDRV_POLL_WHILE(bs, s->connection_co);
|
||||
}
|
||||
|
||||
nbd_client_detach_aio_context(bs);
|
||||
static bool nbd_client_connecting(BDRVNBDState *s)
|
||||
{
|
||||
return s->state == NBD_CLIENT_CONNECTING_WAIT ||
|
||||
s->state == NBD_CLIENT_CONNECTING_NOWAIT;
|
||||
}
|
||||
|
||||
static bool nbd_client_connecting_wait(BDRVNBDState *s)
|
||||
{
|
||||
return s->state == NBD_CLIENT_CONNECTING_WAIT;
|
||||
}
|
||||
|
||||
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
|
||||
{
|
||||
Error *local_err = NULL;
|
||||
|
||||
if (!nbd_client_connecting(s)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* Wait for completion of all in-flight requests */
|
||||
|
||||
qemu_co_mutex_lock(&s->send_mutex);
|
||||
|
||||
while (s->in_flight > 0) {
|
||||
qemu_co_mutex_unlock(&s->send_mutex);
|
||||
nbd_recv_coroutines_wake_all(s);
|
||||
s->wait_in_flight = true;
|
||||
qemu_coroutine_yield();
|
||||
s->wait_in_flight = false;
|
||||
qemu_co_mutex_lock(&s->send_mutex);
|
||||
}
|
||||
|
||||
qemu_co_mutex_unlock(&s->send_mutex);
|
||||
|
||||
if (!nbd_client_connecting(s)) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Now we are sure that nobody is accessing the channel, and no one will
|
||||
* try until we set the state to CONNECTED.
|
||||
*/
|
||||
|
||||
/* Finalize previous connection if any */
|
||||
if (s->ioc) {
|
||||
nbd_client_detach_aio_context(s->bs);
|
||||
object_unref(OBJECT(s->sioc));
|
||||
s->sioc = NULL;
|
||||
object_unref(OBJECT(s->ioc));
|
||||
s->ioc = NULL;
|
||||
}
|
||||
|
||||
s->connect_status = nbd_client_connect(s->bs, &local_err);
|
||||
error_free(s->connect_err);
|
||||
s->connect_err = NULL;
|
||||
error_propagate(&s->connect_err, local_err);
|
||||
|
||||
if (s->connect_status < 0) {
|
||||
/* failed attempt */
|
||||
return;
|
||||
}
|
||||
|
||||
/* successfully connected */
|
||||
s->state = NBD_CLIENT_CONNECTED;
|
||||
qemu_co_queue_restart_all(&s->free_sema);
|
||||
}
|
||||
|
||||
static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
|
||||
{
|
||||
uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
|
||||
uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
|
||||
uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
|
||||
uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
|
||||
|
||||
nbd_reconnect_attempt(s);
|
||||
|
||||
while (nbd_client_connecting(s)) {
|
||||
if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
|
||||
qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
|
||||
{
|
||||
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
|
||||
qemu_co_queue_restart_all(&s->free_sema);
|
||||
}
|
||||
|
||||
qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
|
||||
&s->connection_co_sleep_ns_state);
|
||||
if (s->drained) {
|
||||
bdrv_dec_in_flight(s->bs);
|
||||
s->wait_drained_end = true;
|
||||
while (s->drained) {
|
||||
/*
|
||||
* We may be entered once from nbd_client_attach_aio_context_bh
|
||||
* and then from nbd_client_co_drain_end. So here is a loop.
|
||||
*/
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
bdrv_inc_in_flight(s->bs);
|
||||
}
|
||||
if (timeout < max_timeout) {
|
||||
timeout *= 2;
|
||||
}
|
||||
|
||||
nbd_reconnect_attempt(s);
|
||||
}
|
||||
}
|
||||
|
||||
static coroutine_fn void nbd_connection_entry(void *opaque)
|
||||
@ -177,16 +329,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
|
||||
* Therefore we keep an additional in_flight reference all the time and
|
||||
* only drop it temporarily here.
|
||||
*/
|
||||
|
||||
if (nbd_client_connecting(s)) {
|
||||
nbd_co_reconnect_loop(s);
|
||||
}
|
||||
|
||||
if (s->state != NBD_CLIENT_CONNECTED) {
|
||||
continue;
|
||||
}
|
||||
|
||||
assert(s->reply.handle == 0);
|
||||
ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
|
||||
|
||||
if (local_err) {
|
||||
trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
|
||||
error_free(local_err);
|
||||
local_err = NULL;
|
||||
}
|
||||
if (ret <= 0) {
|
||||
nbd_channel_error(s, ret ? ret : -EIO);
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -201,7 +363,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
|
||||
(nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
|
||||
{
|
||||
nbd_channel_error(s, -EINVAL);
|
||||
break;
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -220,10 +382,19 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
|
||||
qemu_coroutine_yield();
|
||||
}
|
||||
|
||||
qemu_co_queue_restart_all(&s->free_sema);
|
||||
nbd_recv_coroutines_wake_all(s);
|
||||
bdrv_dec_in_flight(s->bs);
|
||||
|
||||
s->connection_co = NULL;
|
||||
if (s->ioc) {
|
||||
nbd_client_detach_aio_context(s->bs);
|
||||
object_unref(OBJECT(s->sioc));
|
||||
s->sioc = NULL;
|
||||
object_unref(OBJECT(s->ioc));
|
||||
s->ioc = NULL;
|
||||
}
|
||||
|
||||
aio_wait_kick();
|
||||
}
|
||||
|
||||
@ -235,7 +406,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
|
||||
int rc, i = -1;
|
||||
|
||||
qemu_co_mutex_lock(&s->send_mutex);
|
||||
while (s->in_flight == MAX_NBD_REQUESTS) {
|
||||
while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
|
||||
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
|
||||
}
|
||||
|
||||
@ -286,8 +457,12 @@ err:
|
||||
s->requests[i].coroutine = NULL;
|
||||
s->in_flight--;
|
||||
}
|
||||
if (s->in_flight == 0 && s->wait_in_flight) {
|
||||
aio_co_wake(s->connection_co);
|
||||
} else {
|
||||
qemu_co_queue_next(&s->free_sema);
|
||||
}
|
||||
}
|
||||
qemu_co_mutex_unlock(&s->send_mutex);
|
||||
return rc;
|
||||
}
|
||||
@ -666,10 +841,15 @@ static coroutine_fn int nbd_co_receive_one_chunk(
|
||||
} else {
|
||||
/* For assert at loop start in nbd_connection_entry */
|
||||
*reply = s->reply;
|
||||
s->reply.handle = 0;
|
||||
}
|
||||
s->reply.handle = 0;
|
||||
|
||||
if (s->connection_co) {
|
||||
if (s->connection_co && !s->wait_in_flight) {
|
||||
/*
|
||||
* We must check s->wait_in_flight, because we may entered by
|
||||
* nbd_recv_coroutines_wake_all(), in this case we should not
|
||||
* wake connection_co here, it will woken by last request.
|
||||
*/
|
||||
aio_co_wake(s->connection_co);
|
||||
}
|
||||
|
||||
@ -781,7 +961,11 @@ break_loop:
|
||||
|
||||
qemu_co_mutex_lock(&s->send_mutex);
|
||||
s->in_flight--;
|
||||
if (s->in_flight == 0 && s->wait_in_flight) {
|
||||
aio_co_wake(s->connection_co);
|
||||
} else {
|
||||
qemu_co_queue_next(&s->free_sema);
|
||||
}
|
||||
qemu_co_mutex_unlock(&s->send_mutex);
|
||||
|
||||
return false;
|
||||
@ -927,20 +1111,26 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
|
||||
} else {
|
||||
assert(request->type != NBD_CMD_WRITE);
|
||||
}
|
||||
|
||||
do {
|
||||
ret = nbd_co_send_request(bs, request, write_qiov);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = nbd_co_receive_return_code(s, request->handle,
|
||||
&request_ret, &local_err);
|
||||
if (local_err) {
|
||||
trace_nbd_co_request_fail(request->from, request->len, request->handle,
|
||||
request->flags, request->type,
|
||||
trace_nbd_co_request_fail(request->from, request->len,
|
||||
request->handle, request->flags,
|
||||
request->type,
|
||||
nbd_cmd_lookup(request->type),
|
||||
ret, error_get_pretty(local_err));
|
||||
error_free(local_err);
|
||||
local_err = NULL;
|
||||
}
|
||||
} while (ret < 0 && nbd_client_connecting_wait(s));
|
||||
|
||||
return ret ? ret : request_ret;
|
||||
}
|
||||
|
||||
@ -981,9 +1171,10 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
|
||||
request.len -= slop;
|
||||
}
|
||||
|
||||
do {
|
||||
ret = nbd_co_send_request(bs, &request, NULL);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
|
||||
@ -994,7 +1185,10 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
|
||||
nbd_cmd_lookup(request.type),
|
||||
ret, error_get_pretty(local_err));
|
||||
error_free(local_err);
|
||||
local_err = NULL;
|
||||
}
|
||||
} while (ret < 0 && nbd_client_connecting_wait(s));
|
||||
|
||||
return ret ? ret : request_ret;
|
||||
}
|
||||
|
||||
@ -1131,20 +1325,25 @@ static int coroutine_fn nbd_client_co_block_status(
|
||||
if (s->info.min_block) {
|
||||
assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
|
||||
}
|
||||
do {
|
||||
ret = nbd_co_send_request(bs, &request, NULL);
|
||||
if (ret < 0) {
|
||||
return ret;
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
|
||||
&extent, &request_ret, &local_err);
|
||||
&extent, &request_ret,
|
||||
&local_err);
|
||||
if (local_err) {
|
||||
trace_nbd_co_request_fail(request.from, request.len, request.handle,
|
||||
request.flags, request.type,
|
||||
nbd_cmd_lookup(request.type),
|
||||
ret, error_get_pretty(local_err));
|
||||
error_free(local_err);
|
||||
local_err = NULL;
|
||||
}
|
||||
} while (ret < 0 && nbd_client_connecting_wait(s));
|
||||
|
||||
if (ret < 0 || request_ret < 0) {
|
||||
return ret ? ret : request_ret;
|
||||
}
|
||||
@ -1175,9 +1374,9 @@ static void nbd_client_close(BlockDriverState *bs)
|
||||
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
|
||||
NBDRequest request = { .type = NBD_CMD_DISC };
|
||||
|
||||
assert(s->ioc);
|
||||
|
||||
if (s->ioc) {
|
||||
nbd_send_request(s->ioc, &request);
|
||||
}
|
||||
|
||||
nbd_teardown_connection(bs);
|
||||
}
|
||||
@ -1821,6 +2020,8 @@ static BlockDriver bdrv_nbd = {
|
||||
.bdrv_getlength = nbd_getlength,
|
||||
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
|
||||
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
|
||||
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
|
||||
.bdrv_co_drain_end = nbd_client_co_drain_end,
|
||||
.bdrv_refresh_filename = nbd_refresh_filename,
|
||||
.bdrv_co_block_status = nbd_client_co_block_status,
|
||||
.bdrv_dirname = nbd_dirname,
|
||||
@ -1844,6 +2045,8 @@ static BlockDriver bdrv_nbd_tcp = {
|
||||
.bdrv_getlength = nbd_getlength,
|
||||
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
|
||||
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
|
||||
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
|
||||
.bdrv_co_drain_end = nbd_client_co_drain_end,
|
||||
.bdrv_refresh_filename = nbd_refresh_filename,
|
||||
.bdrv_co_block_status = nbd_client_co_block_status,
|
||||
.bdrv_dirname = nbd_dirname,
|
||||
@ -1867,6 +2070,8 @@ static BlockDriver bdrv_nbd_unix = {
|
||||
.bdrv_getlength = nbd_getlength,
|
||||
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
|
||||
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
|
||||
.bdrv_co_drain_begin = nbd_client_co_drain_begin,
|
||||
.bdrv_co_drain_end = nbd_client_co_drain_end,
|
||||
.bdrv_refresh_filename = nbd_refresh_filename,
|
||||
.bdrv_co_block_status = nbd_client_co_block_status,
|
||||
.bdrv_dirname = nbd_dirname,
|
||||
|
Loading…
Reference in New Issue
Block a user