block-copy: add CoMutex lock
Group various structures fields, to better understand what we need to protect with a lock and what doesn't need it. Then, add a CoMutex to protect concurrent access of block-copy data structures. This mutex also protects .copy_bitmap, because its thread-safe API does not prevent it from assigning two tasks to the same bitmap region. Exceptions to the lock: - .sleep_state is handled in the series "coroutine: new sleep/wake API" and thus here left as TODO. - .finished, .cancelled and reads to .ret and .error_is_read will be protected in the following patch, because are used also outside coroutines. - .skip_unallocated is atomic. Including it under the mutex would increase the critical sections and make them also much more complex. We can have it as atomic since it is only written from outside and read by block-copy coroutines. Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com> Message-Id: <20210624072043.180494-5-eesposit@redhat.com> [vsementsov: fix typo in comment] Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
This commit is contained in:
parent
e3dd339fee
commit
d0c389d2ce
@ -39,7 +39,7 @@ typedef enum {
|
||||
static coroutine_fn int block_copy_task_entry(AioTask *task);
|
||||
|
||||
typedef struct BlockCopyCallState {
|
||||
/* IN parameters. Initialized in block_copy_async() and never changed. */
|
||||
/* Fields initialized in block_copy_async() and never changed. */
|
||||
BlockCopyState *s;
|
||||
int64_t offset;
|
||||
int64_t bytes;
|
||||
@ -48,33 +48,60 @@ typedef struct BlockCopyCallState {
|
||||
bool ignore_ratelimit;
|
||||
BlockCopyAsyncCallbackFunc cb;
|
||||
void *cb_opaque;
|
||||
|
||||
/* Coroutine where async block-copy is running */
|
||||
Coroutine *co;
|
||||
|
||||
/* Fields whose state changes throughout the execution */
|
||||
bool finished;
|
||||
QemuCoSleep sleep; /* TODO: protect API with a lock */
|
||||
bool cancelled;
|
||||
/* To reference all call states from BlockCopyState */
|
||||
QLIST_ENTRY(BlockCopyCallState) list;
|
||||
|
||||
/* State */
|
||||
int ret;
|
||||
bool finished;
|
||||
QemuCoSleep sleep;
|
||||
bool cancelled;
|
||||
|
||||
/* OUT parameters */
|
||||
/*
|
||||
* Fields that report information about return values and erros.
|
||||
* Protected by lock in BlockCopyState.
|
||||
*/
|
||||
bool error_is_read;
|
||||
/*
|
||||
* @ret is set concurrently by tasks under mutex. Only set once by first
|
||||
* failed task (and untouched if no task failed).
|
||||
* After finishing (call_state->finished is true), it is not modified
|
||||
* anymore and may be safely read without mutex.
|
||||
*/
|
||||
int ret;
|
||||
} BlockCopyCallState;
|
||||
|
||||
typedef struct BlockCopyTask {
|
||||
AioTask task;
|
||||
|
||||
/*
|
||||
* Fields initialized in block_copy_task_create()
|
||||
* and never changed.
|
||||
*/
|
||||
BlockCopyState *s;
|
||||
BlockCopyCallState *call_state;
|
||||
int64_t offset;
|
||||
int64_t bytes;
|
||||
/*
|
||||
* @method can also be set again in the while loop of
|
||||
* block_copy_dirty_clusters(), but it is never accessed concurrently
|
||||
* because the only other function that reads it is
|
||||
* block_copy_task_entry() and it is invoked afterwards in the same
|
||||
* iteration.
|
||||
*/
|
||||
BlockCopyMethod method;
|
||||
QLIST_ENTRY(BlockCopyTask) list;
|
||||
|
||||
/*
|
||||
* Fields whose state changes throughout the execution
|
||||
* Protected by lock in BlockCopyState.
|
||||
*/
|
||||
CoQueue wait_queue; /* coroutines blocked on this task */
|
||||
/*
|
||||
* Only protect the case of parallel read while updating @bytes
|
||||
* value in block_copy_task_shrink().
|
||||
*/
|
||||
int64_t bytes;
|
||||
QLIST_ENTRY(BlockCopyTask) list;
|
||||
} BlockCopyTask;
|
||||
|
||||
static int64_t task_end(BlockCopyTask *task)
|
||||
@ -90,17 +117,25 @@ typedef struct BlockCopyState {
|
||||
*/
|
||||
BdrvChild *source;
|
||||
BdrvChild *target;
|
||||
BdrvDirtyBitmap *copy_bitmap;
|
||||
int64_t in_flight_bytes;
|
||||
|
||||
/*
|
||||
* Fields initialized in block_copy_state_new()
|
||||
* and never changed.
|
||||
*/
|
||||
int64_t cluster_size;
|
||||
BlockCopyMethod method;
|
||||
int64_t max_transfer;
|
||||
uint64_t len;
|
||||
QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
|
||||
QLIST_HEAD(, BlockCopyCallState) calls;
|
||||
|
||||
BdrvRequestFlags write_flags;
|
||||
|
||||
/*
|
||||
* Fields whose state changes throughout the execution
|
||||
* Protected by lock.
|
||||
*/
|
||||
CoMutex lock;
|
||||
int64_t in_flight_bytes;
|
||||
BlockCopyMethod method;
|
||||
QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
|
||||
QLIST_HEAD(, BlockCopyCallState) calls;
|
||||
/*
|
||||
* skip_unallocated:
|
||||
*
|
||||
@ -115,15 +150,15 @@ typedef struct BlockCopyState {
|
||||
* skip unallocated regions, clear them in the copy_bitmap, and invoke
|
||||
* block_copy_reset_unallocated() every time it does.
|
||||
*/
|
||||
bool skip_unallocated;
|
||||
|
||||
bool skip_unallocated; /* atomic */
|
||||
/* State fields that use a thread-safe API */
|
||||
BdrvDirtyBitmap *copy_bitmap;
|
||||
ProgressMeter *progress;
|
||||
|
||||
SharedResource *mem;
|
||||
|
||||
RateLimit rate_limit;
|
||||
} BlockCopyState;
|
||||
|
||||
/* Called with lock held */
|
||||
static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
|
||||
int64_t offset, int64_t bytes)
|
||||
{
|
||||
@ -141,6 +176,9 @@ static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
|
||||
/*
|
||||
* If there are no intersecting tasks return false. Otherwise, wait for the
|
||||
* first found intersecting tasks to finish and return true.
|
||||
*
|
||||
* Called with lock held. May temporary release the lock.
|
||||
* Return value of 0 proves that lock was NOT released.
|
||||
*/
|
||||
static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
|
||||
int64_t bytes)
|
||||
@ -151,11 +189,12 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
|
||||
return false;
|
||||
}
|
||||
|
||||
qemu_co_queue_wait(&task->wait_queue, NULL);
|
||||
qemu_co_queue_wait(&task->wait_queue, &s->lock);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Called with lock held */
|
||||
static int64_t block_copy_chunk_size(BlockCopyState *s)
|
||||
{
|
||||
switch (s->method) {
|
||||
@ -178,13 +217,14 @@ static int64_t block_copy_chunk_size(BlockCopyState *s)
|
||||
* Search for the first dirty area in offset/bytes range and create task at
|
||||
* the beginning of it.
|
||||
*/
|
||||
static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
|
||||
BlockCopyCallState *call_state,
|
||||
static coroutine_fn BlockCopyTask *
|
||||
block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
|
||||
int64_t offset, int64_t bytes)
|
||||
{
|
||||
BlockCopyTask *task;
|
||||
int64_t max_chunk;
|
||||
|
||||
QEMU_LOCK_GUARD(&s->lock);
|
||||
max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
|
||||
if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
|
||||
offset, offset + bytes,
|
||||
@ -227,6 +267,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
|
||||
static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
|
||||
int64_t new_bytes)
|
||||
{
|
||||
QEMU_LOCK_GUARD(&task->s->lock);
|
||||
if (new_bytes == task->bytes) {
|
||||
return;
|
||||
}
|
||||
@ -243,6 +284,7 @@ static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
|
||||
|
||||
static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
|
||||
{
|
||||
QEMU_LOCK_GUARD(&task->s->lock);
|
||||
task->s->in_flight_bytes -= task->bytes;
|
||||
if (ret < 0) {
|
||||
bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
|
||||
@ -321,12 +363,14 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
|
||||
}
|
||||
|
||||
ratelimit_init(&s->rate_limit);
|
||||
qemu_co_mutex_init(&s->lock);
|
||||
QLIST_INIT(&s->tasks);
|
||||
QLIST_INIT(&s->calls);
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
/* Only set before running the job, no need for locking. */
|
||||
void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
|
||||
{
|
||||
s->progress = pm;
|
||||
@ -467,9 +511,12 @@ static coroutine_fn int block_copy_task_entry(AioTask *task)
|
||||
int ret;
|
||||
|
||||
ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
|
||||
|
||||
WITH_QEMU_LOCK_GUARD(&s->lock) {
|
||||
if (s->method == t->method) {
|
||||
s->method = method;
|
||||
}
|
||||
|
||||
if (ret < 0) {
|
||||
if (!t->call_state->ret) {
|
||||
t->call_state->ret = ret;
|
||||
@ -478,6 +525,7 @@ static coroutine_fn int block_copy_task_entry(AioTask *task)
|
||||
} else {
|
||||
progress_work_done(s->progress, t->bytes);
|
||||
}
|
||||
}
|
||||
co_put_to_shres(s->mem, t->bytes);
|
||||
block_copy_task_end(t, ret);
|
||||
|
||||
@ -491,7 +539,7 @@ static int block_copy_block_status(BlockCopyState *s, int64_t offset,
|
||||
BlockDriverState *base;
|
||||
int ret;
|
||||
|
||||
if (s->skip_unallocated) {
|
||||
if (qatomic_read(&s->skip_unallocated)) {
|
||||
base = bdrv_backing_chain_next(s->source->bs);
|
||||
} else {
|
||||
base = NULL;
|
||||
@ -578,10 +626,12 @@ int64_t block_copy_reset_unallocated(BlockCopyState *s,
|
||||
bytes = clusters * s->cluster_size;
|
||||
|
||||
if (!ret) {
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
|
||||
progress_set_remaining(s->progress,
|
||||
bdrv_get_dirty_count(s->copy_bitmap) +
|
||||
s->in_flight_bytes);
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
}
|
||||
|
||||
*count = bytes;
|
||||
@ -639,7 +689,8 @@ block_copy_dirty_clusters(BlockCopyCallState *call_state)
|
||||
if (status_bytes < task->bytes) {
|
||||
block_copy_task_shrink(task, status_bytes);
|
||||
}
|
||||
if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) {
|
||||
if (qatomic_read(&s->skip_unallocated) &&
|
||||
!(ret & BDRV_BLOCK_ALLOCATED)) {
|
||||
block_copy_task_end(task, 0);
|
||||
trace_block_copy_skip_range(s, task->offset, task->bytes);
|
||||
offset = task_end(task);
|
||||
@ -721,14 +772,38 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
|
||||
int ret;
|
||||
BlockCopyState *s = call_state->s;
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
QLIST_INSERT_HEAD(&s->calls, call_state, list);
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
|
||||
do {
|
||||
ret = block_copy_dirty_clusters(call_state);
|
||||
|
||||
if (ret == 0 && !call_state->cancelled) {
|
||||
WITH_QEMU_LOCK_GUARD(&s->lock) {
|
||||
/*
|
||||
* Check that there is no task we still need to
|
||||
* wait to complete
|
||||
*/
|
||||
ret = block_copy_wait_one(s, call_state->offset,
|
||||
call_state->bytes);
|
||||
if (ret == 0) {
|
||||
/*
|
||||
* No pending tasks, but check again the bitmap in this
|
||||
* same critical section, since a task might have failed
|
||||
* between this and the critical section in
|
||||
* block_copy_dirty_clusters().
|
||||
*
|
||||
* block_copy_wait_one return value 0 also means that it
|
||||
* didn't release the lock. So, we are still in the same
|
||||
* critical section, not interrupted by any concurrent
|
||||
* access to state.
|
||||
*/
|
||||
ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
|
||||
call_state->offset,
|
||||
call_state->bytes) >= 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
@ -748,7 +823,9 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
|
||||
call_state->cb(call_state->cb_opaque);
|
||||
}
|
||||
|
||||
qemu_co_mutex_lock(&s->lock);
|
||||
QLIST_REMOVE(call_state, list);
|
||||
qemu_co_mutex_unlock(&s->lock);
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -851,7 +928,7 @@ BdrvDirtyBitmap *block_copy_dirty_bitmap(BlockCopyState *s)
|
||||
|
||||
void block_copy_set_skip_unallocated(BlockCopyState *s, bool skip)
|
||||
{
|
||||
s->skip_unallocated = skip;
|
||||
qatomic_set(&s->skip_unallocated, skip);
|
||||
}
|
||||
|
||||
void block_copy_set_speed(BlockCopyState *s, uint64_t speed)
|
||||
|
Loading…
Reference in New Issue
Block a user