From d32ca5ad7988328f95db6a26beb374c55154c77b Mon Sep 17 00:00:00 2001 From: Juan Quintela Date: Wed, 22 Jan 2020 16:16:07 +0100 Subject: [PATCH] multifd: Split multifd code into its own file Signed-off-by: Juan Quintela Reviewed-by: Dr. David Alan Gilbert --- migration/Makefile.objs | 1 + migration/migration.c | 1 + migration/multifd.c | 899 ++++++++++++++++++++++++++++++++++++ migration/multifd.h | 139 ++++++ migration/ram.c | 988 +--------------------------------------- migration/ram.h | 7 - 6 files changed, 1041 insertions(+), 994 deletions(-) create mode 100644 migration/multifd.c create mode 100644 migration/multifd.h diff --git a/migration/Makefile.objs b/migration/Makefile.objs index a4f3bafd86..d3623d5f9b 100644 --- a/migration/Makefile.objs +++ b/migration/Makefile.objs @@ -7,6 +7,7 @@ common-obj-y += qemu-file-channel.o common-obj-y += xbzrle.o postcopy-ram.o common-obj-y += qjson.o common-obj-y += block-dirty-bitmap.o +common-obj-y += multifd.o common-obj-$(CONFIG_RDMA) += rdma.o diff --git a/migration/migration.c b/migration/migration.c index adc7d08e93..3a21a4686c 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -53,6 +53,7 @@ #include "monitor/monitor.h" #include "net/announce.h" #include "qemu/queue.h" +#include "multifd.h" #define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */ diff --git a/migration/multifd.c b/migration/multifd.c new file mode 100644 index 0000000000..b3e8ae9bcc --- /dev/null +++ b/migration/multifd.c @@ -0,0 +1,899 @@ +/* + * Multifd common code + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#include "qemu/osdep.h" +#include "qemu/rcu.h" +#include "exec/target_page.h" +#include "sysemu/sysemu.h" +#include "exec/ramblock.h" +#include "qemu/error-report.h" +#include "qapi/error.h" +#include "ram.h" +#include "migration.h" +#include "socket.h" +#include "qemu-file.h" +#include "trace.h" +#include "multifd.h" + +/* Multiple fd's */ + +#define MULTIFD_MAGIC 0x11223344U +#define MULTIFD_VERSION 1 + +typedef struct { + uint32_t magic; + uint32_t version; + unsigned char uuid[16]; /* QemuUUID */ + uint8_t id; + uint8_t unused1[7]; /* Reserved for future use */ + uint64_t unused2[4]; /* Reserved for future use */ +} __attribute__((packed)) MultiFDInit_t; + +static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) +{ + MultiFDInit_t msg = {}; + int ret; + + msg.magic = cpu_to_be32(MULTIFD_MAGIC); + msg.version = cpu_to_be32(MULTIFD_VERSION); + msg.id = p->id; + memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); + + ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + return 0; +} + +static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) +{ + MultiFDInit_t msg; + int ret; + + ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); + if (ret != 0) { + return -1; + } + + msg.magic = be32_to_cpu(msg.magic); + msg.version = be32_to_cpu(msg.version); + + if (msg.magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet magic %x " + "expected %x", msg.magic, MULTIFD_MAGIC); + return -1; + } + + if (msg.version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { + char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); + char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); + + error_setg(errp, "multifd: received uuid '%s' and expected " + "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); + g_free(uuid); + g_free(msg_uuid); + return -1; + } + + if (msg.id > migrate_multifd_channels()) { + error_setg(errp, "multifd: received channel version %d " + "expected %d", msg.version, MULTIFD_VERSION); + return -1; + } + + return msg.id; +} + +static MultiFDPages_t *multifd_pages_init(size_t size) +{ + MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); + + pages->allocated = size; + pages->iov = g_new0(struct iovec, size); + pages->offset = g_new0(ram_addr_t, size); + + return pages; +} + +static void multifd_pages_clear(MultiFDPages_t *pages) +{ + pages->used = 0; + pages->allocated = 0; + pages->packet_num = 0; + pages->block = NULL; + g_free(pages->iov); + pages->iov = NULL; + g_free(pages->offset); + pages->offset = NULL; + g_free(pages); +} + +static void multifd_send_fill_packet(MultiFDSendParams *p) +{ + MultiFDPacket_t *packet = p->packet; + int i; + + packet->flags = cpu_to_be32(p->flags); + packet->pages_alloc = cpu_to_be32(p->pages->allocated); + packet->pages_used = cpu_to_be32(p->pages->used); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); + packet->packet_num = cpu_to_be64(p->packet_num); + + if (p->pages->block) { + strncpy(packet->ramblock, p->pages->block->idstr, 256); + } + + for (i = 0; i < p->pages->used; i++) { + /* there are architectures where ram_addr_t is 32 bit */ + uint64_t temp = p->pages->offset[i]; + + packet->offset[i] = cpu_to_be64(temp); + } +} + +static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) +{ + MultiFDPacket_t *packet = p->packet; + uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + RAMBlock *block; + int i; + + packet->magic = be32_to_cpu(packet->magic); + if (packet->magic != MULTIFD_MAGIC) { + error_setg(errp, "multifd: received packet " + "magic %x and expected magic %x", + packet->magic, MULTIFD_MAGIC); + return -1; + } + + packet->version = be32_to_cpu(packet->version); + if (packet->version != MULTIFD_VERSION) { + error_setg(errp, "multifd: received packet " + "version %d and expected version %d", + packet->version, MULTIFD_VERSION); + return -1; + } + + p->flags = be32_to_cpu(packet->flags); + + packet->pages_alloc = be32_to_cpu(packet->pages_alloc); + /* + * If we received a packet that is 100 times bigger than expected + * just stop migration. It is a magic number. + */ + if (packet->pages_alloc > pages_max * 100) { + error_setg(errp, "multifd: received packet " + "with size %d and expected a maximum size of %d", + packet->pages_alloc, pages_max * 100) ; + return -1; + } + /* + * We received a packet that is bigger than expected but inside + * reasonable limits (see previous comment). Just reallocate. + */ + if (packet->pages_alloc > p->pages->allocated) { + multifd_pages_clear(p->pages); + p->pages = multifd_pages_init(packet->pages_alloc); + } + + p->pages->used = be32_to_cpu(packet->pages_used); + if (p->pages->used > packet->pages_alloc) { + error_setg(errp, "multifd: received packet " + "with %d pages and expected maximum pages are %d", + p->pages->used, packet->pages_alloc) ; + return -1; + } + + p->next_packet_size = be32_to_cpu(packet->next_packet_size); + p->packet_num = be64_to_cpu(packet->packet_num); + + if (p->pages->used == 0) { + return 0; + } + + /* make sure that ramblock is 0 terminated */ + packet->ramblock[255] = 0; + block = qemu_ram_block_by_name(packet->ramblock); + if (!block) { + error_setg(errp, "multifd: unknown ram block %s", + packet->ramblock); + return -1; + } + + for (i = 0; i < p->pages->used; i++) { + uint64_t offset = be64_to_cpu(packet->offset[i]); + + if (offset > (block->used_length - qemu_target_page_size())) { + error_setg(errp, "multifd: offset too long %" PRIu64 + " (max " RAM_ADDR_FMT ")", + offset, block->max_length); + return -1; + } + p->pages->iov[i].iov_base = block->host + offset; + p->pages->iov[i].iov_len = qemu_target_page_size(); + } + + return 0; +} + +struct { + MultiFDSendParams *params; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* send channels ready */ + QemuSemaphore channels_ready; + /* + * Have we already run terminate threads. There is a race when it + * happens that we got one error while we are exiting. + * We will use atomic operations. Only valid values are 0 and 1. + */ + int exiting; +} *multifd_send_state; + +/* + * How we use multifd_send_state->pages and channel->pages? + * + * We create a pages for each channel, and a main one. Each time that + * we need to send a batch of pages we interchange the ones between + * multifd_send_state and the channel that is sending it. There are + * two reasons for that: + * - to not have to do so many mallocs during migration + * - to make easier to know what to free at the end of migration + * + * This way we always know who is the owner of each "pages" struct, + * and we don't need any locking. It belongs to the migration thread + * or to the channel thread. Switching is safe because the migration + * thread is using the channel mutex when changing it, and the channel + * have to had finish with its own, otherwise pending_job can't be + * false. + */ + +static int multifd_send_pages(QEMUFile *f) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + uint64_t transferred; + + if (atomic_read(&multifd_send_state->exiting)) { + return -1; + } + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (p->quit) { + error_report("%s: channel %d has already quit!", __func__, i); + qemu_mutex_unlock(&p->mutex); + return -1; + } + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + assert(!p->pages->used); + assert(!p->pages->block); + + p->packet_num = multifd_send_state->packet_num++; + multifd_send_state->pages = p->pages; + p->pages = pages; + transferred = ((uint64_t) pages->used) * qemu_target_page_size() + + p->packet_len; + qemu_file_update_transfer(f, transferred); + ram_counters.multifd_bytes += transferred; + ram_counters.transferred += transferred;; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + + return 1; +} + +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = qemu_target_page_size(); + pages->used++; + + if (pages->used < pages->allocated) { + return 1; + } + } + + if (multifd_send_pages(f) < 0) { + return -1; + } + + if (pages->block != block) { + return multifd_queue_page(f, block, offset); + } + + return 1; +} + +static void multifd_send_terminate_threads(Error *err) +{ + int i; + + trace_multifd_send_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_PRE_SWITCHOVER || + s->state == MIGRATION_STATUS_DEVICE || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + /* + * We don't want to exit each threads twice. Depending on where + * we get the error, or if there are two independent errors in two + * threads at the same time, we can end calling this function + * twice. + */ + if (atomic_xchg(&multifd_send_state->exiting, 1)) { + return; + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + qemu_sem_post(&p->sem); + qemu_mutex_unlock(&p->mutex); + } +} + +void multifd_save_cleanup(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + multifd_send_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + if (p->running) { + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + socket_send_channel_destroy(p->c); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_send_state->channels_ready); + g_free(multifd_send_state->params); + multifd_send_state->params = NULL; + multifd_pages_clear(multifd_send_state->pages); + multifd_send_state->pages = NULL; + g_free(multifd_send_state); + multifd_send_state = NULL; +} + +void multifd_send_sync_main(QEMUFile *f) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + if (multifd_send_state->pages->used) { + if (multifd_send_pages(f) < 0) { + error_report("%s: multifd_send_pages fail", __func__); + return; + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_signal(p->id); + + qemu_mutex_lock(&p->mutex); + + if (p->quit) { + error_report("%s: channel %d has already quit", __func__, i); + qemu_mutex_unlock(&p->mutex); + return; + } + + p->packet_num = multifd_send_state->packet_num++; + p->flags |= MULTIFD_FLAG_SYNC; + p->pending_job++; + qemu_file_update_transfer(f, p->packet_len); + ram_counters.multifd_bytes += p->packet_len; + ram_counters.transferred += p->packet_len; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + trace_multifd_send_sync_main_wait(p->id); + qemu_sem_wait(&p->sem_sync); + } + trace_multifd_send_sync_main(multifd_send_state->packet_num); +} + +static void *multifd_send_thread(void *opaque) +{ + MultiFDSendParams *p = opaque; + Error *local_err = NULL; + int ret = 0; + uint32_t flags = 0; + + trace_multifd_send_thread_start(p->id); + rcu_register_thread(); + + if (multifd_send_initial_packet(p, &local_err) < 0) { + ret = -1; + goto out; + } + /* initial packet */ + p->num_packets = 1; + + while (true) { + qemu_sem_wait(&p->sem); + + if (atomic_read(&multifd_send_state->exiting)) { + break; + } + qemu_mutex_lock(&p->mutex); + + if (p->pending_job) { + uint32_t used = p->pages->used; + uint64_t packet_num = p->packet_num; + flags = p->flags; + + p->next_packet_size = used * qemu_target_page_size(); + multifd_send_fill_packet(p); + p->flags = 0; + p->num_packets++; + p->num_pages += used; + p->pages->used = 0; + p->pages->block = NULL; + qemu_mutex_unlock(&p->mutex); + + trace_multifd_send(p->id, packet_num, used, flags, + p->next_packet_size); + + ret = qio_channel_write_all(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret != 0) { + break; + } + + if (used) { + ret = qio_channel_writev_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + qemu_mutex_lock(&p->mutex); + p->pending_job--; + qemu_mutex_unlock(&p->mutex); + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&p->sem_sync); + } + qemu_sem_post(&multifd_send_state->channels_ready); + } else if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } else { + qemu_mutex_unlock(&p->mutex); + /* sometimes there are spurious wakeups */ + } + } + +out: + if (local_err) { + trace_multifd_send_error(p->id); + multifd_send_terminate_threads(local_err); + } + + /* + * Error happen, I will exit, but I can't just leave, tell + * who pay attention to me. + */ + if (ret != 0) { + qemu_sem_post(&p->sem_sync); + qemu_sem_post(&multifd_send_state->channels_ready); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) +{ + MultiFDSendParams *p = opaque; + QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); + Error *local_err = NULL; + + trace_multifd_new_send_channel_async(p->id); + if (qio_task_propagate_error(task, &local_err)) { + migrate_set_error(migrate_get_current(), local_err); + /* Error happen, we need to tell who pay attention to me */ + qemu_sem_post(&multifd_send_state->channels_ready); + qemu_sem_post(&p->sem_sync); + /* + * Although multifd_send_thread is not created, but main migration + * thread neet to judge whether it is running, so we need to mark + * its status. + */ + p->quit = true; + } else { + p->c = QIO_CHANNEL(sioc); + qio_channel_set_delay(p->c, false); + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, + QEMU_THREAD_JOINABLE); + } +} + +int multifd_save_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); + multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); + multifd_send_state->pages = multifd_pages_init(page_count); + qemu_sem_init(&multifd_send_state->channels_ready, 0); + atomic_set(&multifd_send_state->exiting, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDSendParams *p = &multifd_send_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem, 0); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->pending_job = 0; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); + p->packet->version = cpu_to_be32(MULTIFD_VERSION); + p->name = g_strdup_printf("multifdsend_%d", i); + socket_send_channel_create(multifd_new_send_channel_async, p); + } + return 0; +} + +struct { + MultiFDRecvParams *params; + /* number of created threads */ + int count; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; + /* global number of generated multifd packets */ + uint64_t packet_num; +} *multifd_recv_state; + +static void multifd_recv_terminate_threads(Error *err) +{ + int i; + + trace_multifd_recv_terminate_threads(err != NULL); + + if (err) { + MigrationState *s = migrate_get_current(); + migrate_set_error(s, err); + if (s->state == MIGRATION_STATUS_SETUP || + s->state == MIGRATION_STATUS_ACTIVE) { + migrate_set_state(&s->state, s->state, + MIGRATION_STATUS_FAILED); + } + } + + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + p->quit = true; + /* + * We could arrive here for two reasons: + * - normal quit, i.e. everything went fine, just finished + * - error quit: We close the channels so the channel threads + * finish the qio_channel_read_all_eof() + */ + if (p->c) { + qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); + } + qemu_mutex_unlock(&p->mutex); + } +} + +int multifd_load_cleanup(Error **errp) +{ + int i; + int ret = 0; + + if (!migrate_use_multifd()) { + return 0; + } + multifd_recv_terminate_threads(NULL); + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + if (p->running) { + p->quit = true; + /* + * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, + * however try to wakeup it without harm in cleanup phase. + */ + qemu_sem_post(&p->sem_sync); + qemu_thread_join(&p->thread); + } + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + object_unref(OBJECT(p->c)); + p->c = NULL; + qemu_mutex_destroy(&p->mutex); + qemu_sem_destroy(&p->sem_sync); + g_free(p->name); + p->name = NULL; + multifd_pages_clear(p->pages); + p->pages = NULL; + p->packet_len = 0; + g_free(p->packet); + p->packet = NULL; + } + qemu_sem_destroy(&multifd_recv_state->sem_sync); + g_free(multifd_recv_state->params); + multifd_recv_state->params = NULL; + g_free(multifd_recv_state); + multifd_recv_state = NULL; + + return ret; +} + +void multifd_recv_sync_main(void) +{ + int i; + + if (!migrate_use_multifd()) { + return; + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + trace_multifd_recv_sync_main_wait(p->id); + qemu_sem_wait(&multifd_recv_state->sem_sync); + } + for (i = 0; i < migrate_multifd_channels(); i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (multifd_recv_state->packet_num < p->packet_num) { + multifd_recv_state->packet_num = p->packet_num; + } + qemu_mutex_unlock(&p->mutex); + trace_multifd_recv_sync_main_signal(p->id); + qemu_sem_post(&p->sem_sync); + } + trace_multifd_recv_sync_main(multifd_recv_state->packet_num); +} + +static void *multifd_recv_thread(void *opaque) +{ + MultiFDRecvParams *p = opaque; + Error *local_err = NULL; + int ret; + + trace_multifd_recv_thread_start(p->id); + rcu_register_thread(); + + while (true) { + uint32_t used; + uint32_t flags; + + if (p->quit) { + break; + } + + ret = qio_channel_read_all_eof(p->c, (void *)p->packet, + p->packet_len, &local_err); + if (ret == 0) { /* EOF */ + break; + } + if (ret == -1) { /* Error */ + break; + } + + qemu_mutex_lock(&p->mutex); + ret = multifd_recv_unfill_packet(p, &local_err); + if (ret) { + qemu_mutex_unlock(&p->mutex); + break; + } + + used = p->pages->used; + flags = p->flags; + trace_multifd_recv(p->id, p->packet_num, used, flags, + p->next_packet_size); + p->num_packets++; + p->num_pages += used; + qemu_mutex_unlock(&p->mutex); + + if (used) { + ret = qio_channel_readv_all(p->c, p->pages->iov, + used, &local_err); + if (ret != 0) { + break; + } + } + + if (flags & MULTIFD_FLAG_SYNC) { + qemu_sem_post(&multifd_recv_state->sem_sync); + qemu_sem_wait(&p->sem_sync); + } + } + + if (local_err) { + multifd_recv_terminate_threads(local_err); + } + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + rcu_unregister_thread(); + trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); + + return NULL; +} + +int multifd_load_setup(Error **errp) +{ + int thread_count; + uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); + uint8_t i; + + if (!migrate_use_multifd()) { + return 0; + } + thread_count = migrate_multifd_channels(); + multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); + multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); + atomic_set(&multifd_recv_state->count, 0); + qemu_sem_init(&multifd_recv_state->sem_sync, 0); + + for (i = 0; i < thread_count; i++) { + MultiFDRecvParams *p = &multifd_recv_state->params[i]; + + qemu_mutex_init(&p->mutex); + qemu_sem_init(&p->sem_sync, 0); + p->quit = false; + p->id = i; + p->pages = multifd_pages_init(page_count); + p->packet_len = sizeof(MultiFDPacket_t) + + sizeof(uint64_t) * page_count; + p->packet = g_malloc0(p->packet_len); + p->name = g_strdup_printf("multifdrecv_%d", i); + } + return 0; +} + +bool multifd_recv_all_channels_created(void) +{ + int thread_count = migrate_multifd_channels(); + + if (!migrate_use_multifd()) { + return true; + } + + return thread_count == atomic_read(&multifd_recv_state->count); +} + +/* + * Try to receive all multifd channels to get ready for the migration. + * - Return true and do not set @errp when correctly receving all channels; + * - Return false and do not set @errp when correctly receiving the current one; + * - Return false and set @errp when failing to receive the current channel. + */ +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) +{ + MultiFDRecvParams *p; + Error *local_err = NULL; + int id; + + id = multifd_recv_initial_packet(ioc, &local_err); + if (id < 0) { + multifd_recv_terminate_threads(local_err); + error_propagate_prepend(errp, local_err, + "failed to receive packet" + " via multifd channel %d: ", + atomic_read(&multifd_recv_state->count)); + return false; + } + trace_multifd_recv_new_channel(id); + + p = &multifd_recv_state->params[id]; + if (p->c != NULL) { + error_setg(&local_err, "multifd: received id '%d' already setup'", + id); + multifd_recv_terminate_threads(local_err); + error_propagate(errp, local_err); + return false; + } + p->c = ioc; + object_ref(OBJECT(ioc)); + /* initial packet */ + p->num_packets = 1; + + p->running = true; + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, + QEMU_THREAD_JOINABLE); + atomic_inc(&multifd_recv_state->count); + return atomic_read(&multifd_recv_state->count) == + migrate_multifd_channels(); +} + diff --git a/migration/multifd.h b/migration/multifd.h new file mode 100644 index 0000000000..d8b0205977 --- /dev/null +++ b/migration/multifd.h @@ -0,0 +1,139 @@ +/* + * Multifd common functions + * + * Copyright (c) 2019-2020 Red Hat Inc + * + * Authors: + * Juan Quintela + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + */ + +#ifndef QEMU_MIGRATION_MULTIFD_H +#define QEMU_MIGRATION_MULTIFD_H + +int multifd_save_setup(Error **errp); +void multifd_save_cleanup(void); +int multifd_load_setup(Error **errp); +int multifd_load_cleanup(Error **errp); +bool multifd_recv_all_channels_created(void); +bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); +void multifd_recv_sync_main(void); +void multifd_send_sync_main(QEMUFile *f); +int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); + +#define MULTIFD_FLAG_SYNC (1 << 0) + +/* This value needs to be a multiple of qemu_target_page_size() */ +#define MULTIFD_PACKET_SIZE (512 * 1024) + +typedef struct { + uint32_t magic; + uint32_t version; + uint32_t flags; + /* maximum number of allocated pages */ + uint32_t pages_alloc; + uint32_t pages_used; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + uint64_t packet_num; + uint64_t unused[4]; /* Reserved for future use */ + char ramblock[256]; + uint64_t offset[]; +} __attribute__((packed)) MultiFDPacket_t; + +typedef struct { + /* number of used pages */ + uint32_t used; + /* number of allocated pages */ + uint32_t allocated; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* offset of each page */ + ram_addr_t *offset; + /* pointer to each page */ + struct iovec *iov; + RAMBlock *block; +} MultiFDPages_t; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* sem where to wait for more work */ + QemuSemaphore sem; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* thread has work to do */ + int pending_job; + /* array of pages to sent */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDSendParams; + +typedef struct { + /* this fields are not changed once the thread is created */ + /* channel number */ + uint8_t id; + /* channel thread name */ + char *name; + /* channel thread id */ + QemuThread thread; + /* communication channel */ + QIOChannel *c; + /* this mutex protects the following parameters */ + QemuMutex mutex; + /* is this channel thread running */ + bool running; + /* should this thread finish */ + bool quit; + /* array of pages to receive */ + MultiFDPages_t *pages; + /* packet allocated len */ + uint32_t packet_len; + /* pointer to the packet */ + MultiFDPacket_t *packet; + /* multifd flags for each packet */ + uint32_t flags; + /* global number of generated multifd packets */ + uint64_t packet_num; + /* thread local variables */ + /* size of the next packet that contains pages */ + uint32_t next_packet_size; + /* packets sent through this channel */ + uint64_t num_packets; + /* pages sent through this channel */ + uint64_t num_pages; + /* syncs main thread and channels */ + QemuSemaphore sem_sync; +} MultiFDRecvParams; + +#endif + diff --git a/migration/ram.c b/migration/ram.c index 3abd41ad33..ed23ed1c7c 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -36,7 +36,6 @@ #include "xbzrle.h" #include "ram.h" #include "migration.h" -#include "socket.h" #include "migration/register.h" #include "migration/misc.h" #include "qemu-file.h" @@ -53,9 +52,9 @@ #include "migration/colo.h" #include "block.h" #include "sysemu/sysemu.h" -#include "qemu/uuid.h" #include "savevm.h" #include "qemu/iov.h" +#include "multifd.h" /***********************************************************/ /* ram save/restore */ @@ -575,991 +574,6 @@ exit: return -1; } -/* Multiple fd's */ - -#define MULTIFD_MAGIC 0x11223344U -#define MULTIFD_VERSION 1 - -#define MULTIFD_FLAG_SYNC (1 << 0) - -/* This value needs to be a multiple of qemu_target_page_size() */ -#define MULTIFD_PACKET_SIZE (512 * 1024) - -typedef struct { - uint32_t magic; - uint32_t version; - unsigned char uuid[16]; /* QemuUUID */ - uint8_t id; - uint8_t unused1[7]; /* Reserved for future use */ - uint64_t unused2[4]; /* Reserved for future use */ -} __attribute__((packed)) MultiFDInit_t; - -typedef struct { - uint32_t magic; - uint32_t version; - uint32_t flags; - /* maximum number of allocated pages */ - uint32_t pages_alloc; - uint32_t pages_used; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - uint64_t packet_num; - uint64_t unused[4]; /* Reserved for future use */ - char ramblock[256]; - uint64_t offset[]; -} __attribute__((packed)) MultiFDPacket_t; - -typedef struct { - /* number of used pages */ - uint32_t used; - /* number of allocated pages */ - uint32_t allocated; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* offset of each page */ - ram_addr_t *offset; - /* pointer to each page */ - struct iovec *iov; - RAMBlock *block; -} MultiFDPages_t; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* sem where to wait for more work */ - QemuSemaphore sem; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* thread has work to do */ - int pending_job; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDSendParams; - -typedef struct { - /* this fields are not changed once the thread is created */ - /* channel number */ - uint8_t id; - /* channel thread name */ - char *name; - /* channel thread id */ - QemuThread thread; - /* communication channel */ - QIOChannel *c; - /* this mutex protects the following parameters */ - QemuMutex mutex; - /* is this channel thread running */ - bool running; - /* should this thread finish */ - bool quit; - /* array of pages to receive */ - MultiFDPages_t *pages; - /* packet allocated len */ - uint32_t packet_len; - /* pointer to the packet */ - MultiFDPacket_t *packet; - /* multifd flags for each packet */ - uint32_t flags; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* thread local variables */ - /* size of the next packet that contains pages */ - uint32_t next_packet_size; - /* packets sent through this channel */ - uint64_t num_packets; - /* pages sent through this channel */ - uint64_t num_pages; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; -} MultiFDRecvParams; - -static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) -{ - MultiFDInit_t msg = {}; - int ret; - - msg.magic = cpu_to_be32(MULTIFD_MAGIC); - msg.version = cpu_to_be32(MULTIFD_VERSION); - msg.id = p->id; - memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid)); - - ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - return 0; -} - -static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) -{ - MultiFDInit_t msg; - int ret; - - ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp); - if (ret != 0) { - return -1; - } - - msg.magic = be32_to_cpu(msg.magic); - msg.version = be32_to_cpu(msg.version); - - if (msg.magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet magic %x " - "expected %x", msg.magic, MULTIFD_MAGIC); - return -1; - } - - if (msg.version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) { - char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid); - char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid); - - error_setg(errp, "multifd: received uuid '%s' and expected " - "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id); - g_free(uuid); - g_free(msg_uuid); - return -1; - } - - if (msg.id > migrate_multifd_channels()) { - error_setg(errp, "multifd: received channel version %d " - "expected %d", msg.version, MULTIFD_VERSION); - return -1; - } - - return msg.id; -} - -static MultiFDPages_t *multifd_pages_init(size_t size) -{ - MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); - - pages->allocated = size; - pages->iov = g_new0(struct iovec, size); - pages->offset = g_new0(ram_addr_t, size); - - return pages; -} - -static void multifd_pages_clear(MultiFDPages_t *pages) -{ - pages->used = 0; - pages->allocated = 0; - pages->packet_num = 0; - pages->block = NULL; - g_free(pages->iov); - pages->iov = NULL; - g_free(pages->offset); - pages->offset = NULL; - g_free(pages); -} - -static void multifd_send_fill_packet(MultiFDSendParams *p) -{ - MultiFDPacket_t *packet = p->packet; - int i; - - packet->flags = cpu_to_be32(p->flags); - packet->pages_alloc = cpu_to_be32(p->pages->allocated); - packet->pages_used = cpu_to_be32(p->pages->used); - packet->next_packet_size = cpu_to_be32(p->next_packet_size); - packet->packet_num = cpu_to_be64(p->packet_num); - - if (p->pages->block) { - strncpy(packet->ramblock, p->pages->block->idstr, 256); - } - - for (i = 0; i < p->pages->used; i++) { - /* there are architectures where ram_addr_t is 32 bit */ - uint64_t temp = p->pages->offset[i]; - - packet->offset[i] = cpu_to_be64(temp); - } -} - -static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp) -{ - MultiFDPacket_t *packet = p->packet; - uint32_t pages_max = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - RAMBlock *block; - int i; - - packet->magic = be32_to_cpu(packet->magic); - if (packet->magic != MULTIFD_MAGIC) { - error_setg(errp, "multifd: received packet " - "magic %x and expected magic %x", - packet->magic, MULTIFD_MAGIC); - return -1; - } - - packet->version = be32_to_cpu(packet->version); - if (packet->version != MULTIFD_VERSION) { - error_setg(errp, "multifd: received packet " - "version %d and expected version %d", - packet->version, MULTIFD_VERSION); - return -1; - } - - p->flags = be32_to_cpu(packet->flags); - - packet->pages_alloc = be32_to_cpu(packet->pages_alloc); - /* - * If we received a packet that is 100 times bigger than expected - * just stop migration. It is a magic number. - */ - if (packet->pages_alloc > pages_max * 100) { - error_setg(errp, "multifd: received packet " - "with size %d and expected a maximum size of %d", - packet->pages_alloc, pages_max * 100) ; - return -1; - } - /* - * We received a packet that is bigger than expected but inside - * reasonable limits (see previous comment). Just reallocate. - */ - if (packet->pages_alloc > p->pages->allocated) { - multifd_pages_clear(p->pages); - p->pages = multifd_pages_init(packet->pages_alloc); - } - - p->pages->used = be32_to_cpu(packet->pages_used); - if (p->pages->used > packet->pages_alloc) { - error_setg(errp, "multifd: received packet " - "with %d pages and expected maximum pages are %d", - p->pages->used, packet->pages_alloc) ; - return -1; - } - - p->next_packet_size = be32_to_cpu(packet->next_packet_size); - p->packet_num = be64_to_cpu(packet->packet_num); - - if (p->pages->used == 0) { - return 0; - } - - /* make sure that ramblock is 0 terminated */ - packet->ramblock[255] = 0; - block = qemu_ram_block_by_name(packet->ramblock); - if (!block) { - error_setg(errp, "multifd: unknown ram block %s", - packet->ramblock); - return -1; - } - - for (i = 0; i < p->pages->used; i++) { - uint64_t offset = be64_to_cpu(packet->offset[i]); - - if (offset > (block->used_length - qemu_target_page_size())) { - error_setg(errp, "multifd: offset too long %" PRIu64 - " (max " RAM_ADDR_FMT ")", - offset, block->max_length); - return -1; - } - p->pages->iov[i].iov_base = block->host + offset; - p->pages->iov[i].iov_len = qemu_target_page_size(); - } - - return 0; -} - -struct { - MultiFDSendParams *params; - /* array of pages to sent */ - MultiFDPages_t *pages; - /* global number of generated multifd packets */ - uint64_t packet_num; - /* send channels ready */ - QemuSemaphore channels_ready; - /* - * Have we already run terminate threads. There is a race when it - * happens that we got one error while we are exiting. - * We will use atomic operations. Only valid values are 0 and 1. - */ - int exiting; -} *multifd_send_state; - -/* - * How we use multifd_send_state->pages and channel->pages? - * - * We create a pages for each channel, and a main one. Each time that - * we need to send a batch of pages we interchange the ones between - * multifd_send_state and the channel that is sending it. There are - * two reasons for that: - * - to not have to do so many mallocs during migration - * - to make easier to know what to free at the end of migration - * - * This way we always know who is the owner of each "pages" struct, - * and we don't need any locking. It belongs to the migration thread - * or to the channel thread. Switching is safe because the migration - * thread is using the channel mutex when changing it, and the channel - * have to had finish with its own, otherwise pending_job can't be - * false. - */ - -static int multifd_send_pages(QEMUFile *f) -{ - int i; - static int next_channel; - MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; - uint64_t transferred; - - if (atomic_read(&multifd_send_state->exiting)) { - return -1; - } - - qemu_sem_wait(&multifd_send_state->channels_ready); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { - p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (p->quit) { - error_report("%s: channel %d has already quit!", __func__, i); - qemu_mutex_unlock(&p->mutex); - return -1; - } - if (!p->pending_job) { - p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); - break; - } - qemu_mutex_unlock(&p->mutex); - } - assert(!p->pages->used); - assert(!p->pages->block); - - p->packet_num = multifd_send_state->packet_num++; - multifd_send_state->pages = p->pages; - p->pages = pages; - transferred = ((uint64_t) pages->used) * qemu_target_page_size() - + p->packet_len; - qemu_file_update_transfer(f, transferred); - ram_counters.multifd_bytes += transferred; - ram_counters.transferred += transferred;; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - - return 1; -} - -static int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) -{ - MultiFDPages_t *pages = multifd_send_state->pages; - - if (!pages->block) { - pages->block = block; - } - - if (pages->block == block) { - pages->offset[pages->used] = offset; - pages->iov[pages->used].iov_base = block->host + offset; - pages->iov[pages->used].iov_len = qemu_target_page_size(); - pages->used++; - - if (pages->used < pages->allocated) { - return 1; - } - } - - if (multifd_send_pages(f) < 0) { - return -1; - } - - if (pages->block != block) { - return multifd_queue_page(f, block, offset); - } - - return 1; -} - -static void multifd_send_terminate_threads(Error *err) -{ - int i; - - trace_multifd_send_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_PRE_SWITCHOVER || - s->state == MIGRATION_STATUS_DEVICE || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - /* - * We don't want to exit each threads twice. Depending on where - * we get the error, or if there are two independent errors in two - * threads at the same time, we can end calling this function - * twice. - */ - if (atomic_xchg(&multifd_send_state->exiting, 1)) { - return; - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - qemu_sem_post(&p->sem); - qemu_mutex_unlock(&p->mutex); - } -} - -void multifd_save_cleanup(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - if (p->running) { - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - socket_send_channel_destroy(p->c); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_send_state->channels_ready); - g_free(multifd_send_state->params); - multifd_send_state->params = NULL; - multifd_pages_clear(multifd_send_state->pages); - multifd_send_state->pages = NULL; - g_free(multifd_send_state); - multifd_send_state = NULL; -} - -static void multifd_send_sync_main(QEMUFile *f) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - if (multifd_send_state->pages->used) { - if (multifd_send_pages(f) < 0) { - error_report("%s: multifd_send_pages fail", __func__); - return; - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_signal(p->id); - - qemu_mutex_lock(&p->mutex); - - if (p->quit) { - error_report("%s: channel %d has already quit", __func__, i); - qemu_mutex_unlock(&p->mutex); - return; - } - - p->packet_num = multifd_send_state->packet_num++; - p->flags |= MULTIFD_FLAG_SYNC; - p->pending_job++; - qemu_file_update_transfer(f, p->packet_len); - ram_counters.multifd_bytes += p->packet_len; - ram_counters.transferred += p->packet_len; - qemu_mutex_unlock(&p->mutex); - qemu_sem_post(&p->sem); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - trace_multifd_send_sync_main_wait(p->id); - qemu_sem_wait(&p->sem_sync); - } - trace_multifd_send_sync_main(multifd_send_state->packet_num); -} - -static void *multifd_send_thread(void *opaque) -{ - MultiFDSendParams *p = opaque; - Error *local_err = NULL; - int ret = 0; - uint32_t flags = 0; - - trace_multifd_send_thread_start(p->id); - rcu_register_thread(); - - if (multifd_send_initial_packet(p, &local_err) < 0) { - ret = -1; - goto out; - } - /* initial packet */ - p->num_packets = 1; - - while (true) { - qemu_sem_wait(&p->sem); - - if (atomic_read(&multifd_send_state->exiting)) { - break; - } - qemu_mutex_lock(&p->mutex); - - if (p->pending_job) { - uint32_t used = p->pages->used; - uint64_t packet_num = p->packet_num; - flags = p->flags; - - p->next_packet_size = used * qemu_target_page_size(); - multifd_send_fill_packet(p); - p->flags = 0; - p->num_packets++; - p->num_pages += used; - p->pages->used = 0; - p->pages->block = NULL; - qemu_mutex_unlock(&p->mutex); - - trace_multifd_send(p->id, packet_num, used, flags, - p->next_packet_size); - - ret = qio_channel_write_all(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret != 0) { - break; - } - - if (used) { - ret = qio_channel_writev_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - qemu_mutex_lock(&p->mutex); - p->pending_job--; - qemu_mutex_unlock(&p->mutex); - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&p->sem_sync); - } - qemu_sem_post(&multifd_send_state->channels_ready); - } else if (p->quit) { - qemu_mutex_unlock(&p->mutex); - break; - } else { - qemu_mutex_unlock(&p->mutex); - /* sometimes there are spurious wakeups */ - } - } - -out: - if (local_err) { - trace_multifd_send_error(p->id); - multifd_send_terminate_threads(local_err); - } - - /* - * Error happen, I will exit, but I can't just leave, tell - * who pay attention to me. - */ - if (ret != 0) { - qemu_sem_post(&p->sem_sync); - qemu_sem_post(&multifd_send_state->channels_ready); - } - - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) -{ - MultiFDSendParams *p = opaque; - QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task)); - Error *local_err = NULL; - - trace_multifd_new_send_channel_async(p->id); - if (qio_task_propagate_error(task, &local_err)) { - migrate_set_error(migrate_get_current(), local_err); - /* Error happen, we need to tell who pay attention to me */ - qemu_sem_post(&multifd_send_state->channels_ready); - qemu_sem_post(&p->sem_sync); - /* - * Although multifd_send_thread is not created, but main migration - * thread neet to judge whether it is running, so we need to mark - * its status. - */ - p->quit = true; - } else { - p->c = QIO_CHANNEL(sioc); - qio_channel_set_delay(p->c, false); - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_send_thread, p, - QEMU_THREAD_JOINABLE); - } -} - -int multifd_save_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); - multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); - multifd_send_state->pages = multifd_pages_init(page_count); - qemu_sem_init(&multifd_send_state->channels_ready, 0); - atomic_set(&multifd_send_state->exiting, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDSendParams *p = &multifd_send_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem, 0); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->pending_job = 0; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); - p->packet->version = cpu_to_be32(MULTIFD_VERSION); - p->name = g_strdup_printf("multifdsend_%d", i); - socket_send_channel_create(multifd_new_send_channel_async, p); - } - return 0; -} - -struct { - MultiFDRecvParams *params; - /* number of created threads */ - int count; - /* syncs main thread and channels */ - QemuSemaphore sem_sync; - /* global number of generated multifd packets */ - uint64_t packet_num; -} *multifd_recv_state; - -static void multifd_recv_terminate_threads(Error *err) -{ - int i; - - trace_multifd_recv_terminate_threads(err != NULL); - - if (err) { - MigrationState *s = migrate_get_current(); - migrate_set_error(s, err); - if (s->state == MIGRATION_STATUS_SETUP || - s->state == MIGRATION_STATUS_ACTIVE) { - migrate_set_state(&s->state, s->state, - MIGRATION_STATUS_FAILED); - } - } - - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - p->quit = true; - /* - * We could arrive here for two reasons: - * - normal quit, i.e. everything went fine, just finished - * - error quit: We close the channels so the channel threads - * finish the qio_channel_read_all_eof() - */ - if (p->c) { - qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL); - } - qemu_mutex_unlock(&p->mutex); - } -} - -int multifd_load_cleanup(Error **errp) -{ - int i; - int ret = 0; - - if (!migrate_use_multifd()) { - return 0; - } - multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - if (p->running) { - p->quit = true; - /* - * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code, - * however try to wakeup it without harm in cleanup phase. - */ - qemu_sem_post(&p->sem_sync); - qemu_thread_join(&p->thread); - } - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - object_unref(OBJECT(p->c)); - p->c = NULL; - qemu_mutex_destroy(&p->mutex); - qemu_sem_destroy(&p->sem_sync); - g_free(p->name); - p->name = NULL; - multifd_pages_clear(p->pages); - p->pages = NULL; - p->packet_len = 0; - g_free(p->packet); - p->packet = NULL; - } - qemu_sem_destroy(&multifd_recv_state->sem_sync); - g_free(multifd_recv_state->params); - multifd_recv_state->params = NULL; - g_free(multifd_recv_state); - multifd_recv_state = NULL; - - return ret; -} - -static void multifd_recv_sync_main(void) -{ - int i; - - if (!migrate_use_multifd()) { - return; - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - trace_multifd_recv_sync_main_wait(p->id); - qemu_sem_wait(&multifd_recv_state->sem_sync); - } - for (i = 0; i < migrate_multifd_channels(); i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_lock(&p->mutex); - if (multifd_recv_state->packet_num < p->packet_num) { - multifd_recv_state->packet_num = p->packet_num; - } - qemu_mutex_unlock(&p->mutex); - trace_multifd_recv_sync_main_signal(p->id); - qemu_sem_post(&p->sem_sync); - } - trace_multifd_recv_sync_main(multifd_recv_state->packet_num); -} - -static void *multifd_recv_thread(void *opaque) -{ - MultiFDRecvParams *p = opaque; - Error *local_err = NULL; - int ret; - - trace_multifd_recv_thread_start(p->id); - rcu_register_thread(); - - while (true) { - uint32_t used; - uint32_t flags; - - if (p->quit) { - break; - } - - ret = qio_channel_read_all_eof(p->c, (void *)p->packet, - p->packet_len, &local_err); - if (ret == 0) { /* EOF */ - break; - } - if (ret == -1) { /* Error */ - break; - } - - qemu_mutex_lock(&p->mutex); - ret = multifd_recv_unfill_packet(p, &local_err); - if (ret) { - qemu_mutex_unlock(&p->mutex); - break; - } - - used = p->pages->used; - flags = p->flags; - trace_multifd_recv(p->id, p->packet_num, used, flags, - p->next_packet_size); - p->num_packets++; - p->num_pages += used; - qemu_mutex_unlock(&p->mutex); - - if (used) { - ret = qio_channel_readv_all(p->c, p->pages->iov, - used, &local_err); - if (ret != 0) { - break; - } - } - - if (flags & MULTIFD_FLAG_SYNC) { - qemu_sem_post(&multifd_recv_state->sem_sync); - qemu_sem_wait(&p->sem_sync); - } - } - - if (local_err) { - multifd_recv_terminate_threads(local_err); - } - qemu_mutex_lock(&p->mutex); - p->running = false; - qemu_mutex_unlock(&p->mutex); - - rcu_unregister_thread(); - trace_multifd_recv_thread_end(p->id, p->num_packets, p->num_pages); - - return NULL; -} - -int multifd_load_setup(Error **errp) -{ - int thread_count; - uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); - uint8_t i; - - if (!migrate_use_multifd()) { - return 0; - } - thread_count = migrate_multifd_channels(); - multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); - multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); - atomic_set(&multifd_recv_state->count, 0); - qemu_sem_init(&multifd_recv_state->sem_sync, 0); - - for (i = 0; i < thread_count; i++) { - MultiFDRecvParams *p = &multifd_recv_state->params[i]; - - qemu_mutex_init(&p->mutex); - qemu_sem_init(&p->sem_sync, 0); - p->quit = false; - p->id = i; - p->pages = multifd_pages_init(page_count); - p->packet_len = sizeof(MultiFDPacket_t) - + sizeof(uint64_t) * page_count; - p->packet = g_malloc0(p->packet_len); - p->name = g_strdup_printf("multifdrecv_%d", i); - } - return 0; -} - -bool multifd_recv_all_channels_created(void) -{ - int thread_count = migrate_multifd_channels(); - - if (!migrate_use_multifd()) { - return true; - } - - return thread_count == atomic_read(&multifd_recv_state->count); -} - -/* - * Try to receive all multifd channels to get ready for the migration. - * - Return true and do not set @errp when correctly receving all channels; - * - Return false and do not set @errp when correctly receiving the current one; - * - Return false and set @errp when failing to receive the current channel. - */ -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) -{ - MultiFDRecvParams *p; - Error *local_err = NULL; - int id; - - id = multifd_recv_initial_packet(ioc, &local_err); - if (id < 0) { - multifd_recv_terminate_threads(local_err); - error_propagate_prepend(errp, local_err, - "failed to receive packet" - " via multifd channel %d: ", - atomic_read(&multifd_recv_state->count)); - return false; - } - trace_multifd_recv_new_channel(id); - - p = &multifd_recv_state->params[id]; - if (p->c != NULL) { - error_setg(&local_err, "multifd: received id '%d' already setup'", - id); - multifd_recv_terminate_threads(local_err); - error_propagate(errp, local_err); - return false; - } - p->c = ioc; - object_ref(OBJECT(ioc)); - /* initial packet */ - p->num_packets = 1; - - p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); - atomic_inc(&multifd_recv_state->count); - return atomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); -} - /** * save_page_header: write page header to wire * diff --git a/migration/ram.h b/migration/ram.h index 42be471d52..a553d40751 100644 --- a/migration/ram.h +++ b/migration/ram.h @@ -41,13 +41,6 @@ int xbzrle_cache_resize(int64_t new_size, Error **errp); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_total(void); -int multifd_save_setup(Error **errp); -void multifd_save_cleanup(void); -int multifd_load_setup(Error **errp); -int multifd_load_cleanup(Error **errp); -bool multifd_recv_all_channels_created(void); -bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp); - uint64_t ram_pagesize_summary(void); int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len); void acct_update_position(QEMUFile *f, size_t size, bool zero);