sheepdog: serialize requests to overwrapping area

Current sheepdog driver only serializes create requests in oid
unit. This mechanism isn't enough for handling requests to
overwrapping area spanning multiple oids, so it can result bugs like
below:
https://bugs.launchpad.net/sheepdog-project/+bug/1456421

This patch adds a new serialization mechanism for the problem. The
difference from the old one is:
1. serialize entire aiocb if their targetting areas overwrap
2. serialize all requests (read, write, and discard), not only creates

This patch also removes the old mechanism because the new one can be
an alternative.

Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Stefan Hajnoczi <stefanha@redhat.com>
Cc: Teruaki Ishizaki <ishizaki.teruaki@lab.ntt.co.jp>
Cc: Vasiliy Tolstov <v.tolstov@selfip.ru>
Signed-off-by: Hitoshi Mitake <mitake.hitoshi@lab.ntt.co.jp>
Tested-by: Vasiliy Tolstov <v.tolstov@selfip.ru>
Signed-off-by: Jeff Cody <jcody@redhat.com>
This commit is contained in:
Hitoshi Mitake 2015-07-18 01:44:24 +09:00 committed by Jeff Cody
parent f8787f8723
commit 6a55c82cec

View File

@ -318,6 +318,10 @@ enum AIOCBState {
AIOCB_DISCARD_OBJ, AIOCB_DISCARD_OBJ,
}; };
#define AIOCBOverwrapping(x, y) \
(!(x->max_affect_data_idx < y->min_affect_data_idx \
|| y->max_affect_data_idx < x->min_affect_data_idx))
struct SheepdogAIOCB { struct SheepdogAIOCB {
BlockAIOCB common; BlockAIOCB common;
@ -334,6 +338,11 @@ struct SheepdogAIOCB {
bool cancelable; bool cancelable;
int nr_pending; int nr_pending;
uint32_t min_affect_data_idx;
uint32_t max_affect_data_idx;
QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
}; };
typedef struct BDRVSheepdogState { typedef struct BDRVSheepdogState {
@ -362,8 +371,10 @@ typedef struct BDRVSheepdogState {
/* Every aio request must be linked to either of these queues. */ /* Every aio request must be linked to either of these queues. */
QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head; QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head; QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
CoQueue overwrapping_queue;
QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
} BDRVSheepdogState; } BDRVSheepdogState;
static const char * sd_strerror(int err) static const char * sd_strerror(int err)
@ -498,13 +509,7 @@ static void sd_aio_cancel(BlockAIOCB *blockacb)
AIOReq *aioreq, *next; AIOReq *aioreq, *next;
if (sd_acb_cancelable(acb)) { if (sd_acb_cancelable(acb)) {
/* Remove outstanding requests from pending and failed queues. */ /* Remove outstanding requests from failed queue. */
QLIST_FOREACH_SAFE(aioreq, &s->pending_aio_head, aio_siblings,
next) {
if (aioreq->aiocb == acb) {
free_aio_req(s, aioreq);
}
}
QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings, QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
next) { next) {
if (aioreq->aiocb == acb) { if (aioreq->aiocb == acb) {
@ -529,6 +534,10 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
int64_t sector_num, int nb_sectors) int64_t sector_num, int nb_sectors)
{ {
SheepdogAIOCB *acb; SheepdogAIOCB *acb;
uint32_t object_size;
BDRVSheepdogState *s = bs->opaque;
object_size = (UINT32_C(1) << s->inode.block_size_shift);
acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL); acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
@ -542,6 +551,11 @@ static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
acb->coroutine = qemu_coroutine_self(); acb->coroutine = qemu_coroutine_self();
acb->ret = 0; acb->ret = 0;
acb->nr_pending = 0; acb->nr_pending = 0;
acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
return acb; return acb;
} }
@ -703,38 +717,6 @@ static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
static int get_sheep_fd(BDRVSheepdogState *s, Error **errp); static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
static void co_write_request(void *opaque); static void co_write_request(void *opaque);
static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
{
AIOReq *aio_req;
QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
if (aio_req->oid == oid) {
return aio_req;
}
}
return NULL;
}
/*
* This function searchs pending requests to the object `oid', and
* sends them.
*/
static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
{
AIOReq *aio_req;
SheepdogAIOCB *acb;
while ((aio_req = find_pending_req(s, oid)) != NULL) {
acb = aio_req->aiocb;
/* move aio_req from pending list to inflight one */
QLIST_REMOVE(aio_req, aio_siblings);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
acb->aiocb_type);
}
}
static coroutine_fn void reconnect_to_sdog(void *opaque) static coroutine_fn void reconnect_to_sdog(void *opaque)
{ {
BDRVSheepdogState *s = opaque; BDRVSheepdogState *s = opaque;
@ -840,12 +822,6 @@ static void coroutine_fn aio_read_response(void *opaque)
s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx); s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx); s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
} }
/*
* Some requests may be blocked because simultaneous
* create requests are not allowed, so we search the
* pending requests here.
*/
send_pending_req(s, aio_req->oid);
} }
break; break;
case AIOCB_READ_UDATA: case AIOCB_READ_UDATA:
@ -1341,30 +1317,6 @@ out:
return ret; return ret;
} }
/* Return true if the specified request is linked to the pending list. */
static bool check_simultaneous_create(BDRVSheepdogState *s, AIOReq *aio_req)
{
AIOReq *areq;
QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
if (areq != aio_req && areq->oid == aio_req->oid) {
/*
* Sheepdog cannot handle simultaneous create requests to the same
* object, so we cannot send the request until the previous request
* finishes.
*/
DPRINTF("simultaneous create to %" PRIx64 "\n", aio_req->oid);
aio_req->flags = 0;
aio_req->base_oid = 0;
aio_req->create = false;
QLIST_REMOVE(aio_req, aio_siblings);
QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
return true;
}
}
return false;
}
static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req) static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
{ {
SheepdogAIOCB *acb = aio_req->aiocb; SheepdogAIOCB *acb = aio_req->aiocb;
@ -1379,10 +1331,6 @@ static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
goto out; goto out;
} }
if (check_simultaneous_create(s, aio_req)) {
return;
}
if (s->inode.data_vdi_id[idx]) { if (s->inode.data_vdi_id[idx]) {
aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx); aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
aio_req->flags |= SD_FLAG_CMD_COW; aio_req->flags |= SD_FLAG_CMD_COW;
@ -1458,8 +1406,8 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
filename = qemu_opt_get(opts, "filename"); filename = qemu_opt_get(opts, "filename");
QLIST_INIT(&s->inflight_aio_head); QLIST_INIT(&s->inflight_aio_head);
QLIST_INIT(&s->pending_aio_head);
QLIST_INIT(&s->failed_aio_head); QLIST_INIT(&s->failed_aio_head);
QLIST_INIT(&s->inflight_aiocb_head);
s->fd = -1; s->fd = -1;
memset(vdi, 0, sizeof(vdi)); memset(vdi, 0, sizeof(vdi));
@ -1524,6 +1472,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE; bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
pstrcpy(s->name, sizeof(s->name), vdi); pstrcpy(s->name, sizeof(s->name), vdi);
qemu_co_mutex_init(&s->lock); qemu_co_mutex_init(&s->lock);
qemu_co_queue_init(&s->overwrapping_queue);
qemu_opts_del(opts); qemu_opts_del(opts);
g_free(buf); g_free(buf);
return 0; return 0;
@ -2195,12 +2144,6 @@ static int coroutine_fn sd_co_rw_vector(void *p)
old_oid, done); old_oid, done);
QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings); QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
if (create) {
if (check_simultaneous_create(s, aio_req)) {
goto done;
}
}
add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
acb->aiocb_type); acb->aiocb_type);
done: done:
@ -2215,6 +2158,20 @@ out:
return 1; return 1;
} }
static bool check_overwrapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *aiocb)
{
SheepdogAIOCB *cb;
QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
if (AIOCBOverwrapping(aiocb, cb)) {
return true;
}
}
QLIST_INSERT_HEAD(&s->inflight_aiocb_head, aiocb, aiocb_siblings);
return false;
}
static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num, static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
int nb_sectors, QEMUIOVector *qiov) int nb_sectors, QEMUIOVector *qiov)
{ {
@ -2234,14 +2191,25 @@ static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
acb->aio_done_func = sd_write_done; acb->aio_done_func = sd_write_done;
acb->aiocb_type = AIOCB_WRITE_UDATA; acb->aiocb_type = AIOCB_WRITE_UDATA;
retry:
if (check_overwrapping_aiocb(s, acb)) {
qemu_co_queue_wait(&s->overwrapping_queue);
goto retry;
}
ret = sd_co_rw_vector(acb); ret = sd_co_rw_vector(acb);
if (ret <= 0) { if (ret <= 0) {
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb); qemu_aio_unref(acb);
return ret; return ret;
} }
qemu_coroutine_yield(); qemu_coroutine_yield();
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
return acb->ret; return acb->ret;
} }
@ -2250,19 +2218,30 @@ static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
{ {
SheepdogAIOCB *acb; SheepdogAIOCB *acb;
int ret; int ret;
BDRVSheepdogState *s = bs->opaque;
acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors); acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
acb->aiocb_type = AIOCB_READ_UDATA; acb->aiocb_type = AIOCB_READ_UDATA;
acb->aio_done_func = sd_finish_aiocb; acb->aio_done_func = sd_finish_aiocb;
retry:
if (check_overwrapping_aiocb(s, acb)) {
qemu_co_queue_wait(&s->overwrapping_queue);
goto retry;
}
ret = sd_co_rw_vector(acb); ret = sd_co_rw_vector(acb);
if (ret <= 0) { if (ret <= 0) {
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb); qemu_aio_unref(acb);
return ret; return ret;
} }
qemu_coroutine_yield(); qemu_coroutine_yield();
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
return acb->ret; return acb->ret;
} }
@ -2610,14 +2589,25 @@ static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
acb->aiocb_type = AIOCB_DISCARD_OBJ; acb->aiocb_type = AIOCB_DISCARD_OBJ;
acb->aio_done_func = sd_finish_aiocb; acb->aio_done_func = sd_finish_aiocb;
retry:
if (check_overwrapping_aiocb(s, acb)) {
qemu_co_queue_wait(&s->overwrapping_queue);
goto retry;
}
ret = sd_co_rw_vector(acb); ret = sd_co_rw_vector(acb);
if (ret <= 0) { if (ret <= 0) {
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
qemu_aio_unref(acb); qemu_aio_unref(acb);
return ret; return ret;
} }
qemu_coroutine_yield(); qemu_coroutine_yield();
QLIST_REMOVE(acb, aiocb_siblings);
qemu_co_queue_restart_all(&s->overwrapping_queue);
return acb->ret; return acb->ret;
} }