multifd: Use normal pages array on the send side
We are only sending normal pages through multifd channels. Later on this series, we are going to also send zero pages. We are going to detect if a page is zero or non zero in the multifd channel thread, not on the main thread. So we receive an array of pages page->offset[N] And we will end with: p->normal[N - zero_pages] p->zero[zero_pages]. In this patch, we just copy all the pages in offset to normal. for (i = 0; i < pages->num; i++) { p->narmal[p->normal_num] = pages->offset[i]; p->normal_num++: } Later in the series this becomes: for (i = 0; i < pages->num; i++) { if (buffer_is_zero(page->offset[i])) { p->zerol[p->zero_num] = pages->offset[i]; p->zero_num++: } else { p->narmal[p->normal_num] = pages->offset[i]; p->normal_num++: } } Signed-off-by: Juan Quintela <quintela@redhat.com> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com> --- Improving comment (dave) Renaming num_normal_pages to total_normal_pages (peter)
This commit is contained in:
parent
c27779a215
commit
815956f039
@ -106,16 +106,16 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
|
|||||||
int ret;
|
int ret;
|
||||||
uint32_t i;
|
uint32_t i;
|
||||||
|
|
||||||
for (i = 0; i < p->pages->num; i++) {
|
for (i = 0; i < p->normal_num; i++) {
|
||||||
uint32_t available = z->zbuff_len - out_size;
|
uint32_t available = z->zbuff_len - out_size;
|
||||||
int flush = Z_NO_FLUSH;
|
int flush = Z_NO_FLUSH;
|
||||||
|
|
||||||
if (i == p->pages->num - 1) {
|
if (i == p->normal_num - 1) {
|
||||||
flush = Z_SYNC_FLUSH;
|
flush = Z_SYNC_FLUSH;
|
||||||
}
|
}
|
||||||
|
|
||||||
zs->avail_in = page_size;
|
zs->avail_in = page_size;
|
||||||
zs->next_in = p->pages->block->host + p->pages->offset[i];
|
zs->next_in = p->pages->block->host + p->normal[i];
|
||||||
|
|
||||||
zs->avail_out = available;
|
zs->avail_out = available;
|
||||||
zs->next_out = z->zbuff + out_size;
|
zs->next_out = z->zbuff + out_size;
|
||||||
|
@ -121,13 +121,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
|
|||||||
z->out.size = z->zbuff_len;
|
z->out.size = z->zbuff_len;
|
||||||
z->out.pos = 0;
|
z->out.pos = 0;
|
||||||
|
|
||||||
for (i = 0; i < p->pages->num; i++) {
|
for (i = 0; i < p->normal_num; i++) {
|
||||||
ZSTD_EndDirective flush = ZSTD_e_continue;
|
ZSTD_EndDirective flush = ZSTD_e_continue;
|
||||||
|
|
||||||
if (i == p->pages->num - 1) {
|
if (i == p->normal_num - 1) {
|
||||||
flush = ZSTD_e_flush;
|
flush = ZSTD_e_flush;
|
||||||
}
|
}
|
||||||
z->in.src = p->pages->block->host + p->pages->offset[i];
|
z->in.src = p->pages->block->host + p->normal[i];
|
||||||
z->in.size = page_size;
|
z->in.size = page_size;
|
||||||
z->in.pos = 0;
|
z->in.pos = 0;
|
||||||
|
|
||||||
|
@ -89,13 +89,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
|
|||||||
MultiFDPages_t *pages = p->pages;
|
MultiFDPages_t *pages = p->pages;
|
||||||
size_t page_size = qemu_target_page_size();
|
size_t page_size = qemu_target_page_size();
|
||||||
|
|
||||||
for (int i = 0; i < p->pages->num; i++) {
|
for (int i = 0; i < p->normal_num; i++) {
|
||||||
p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
|
p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
|
||||||
p->iov[p->iovs_num].iov_len = page_size;
|
p->iov[p->iovs_num].iov_len = page_size;
|
||||||
p->iovs_num++;
|
p->iovs_num++;
|
||||||
}
|
}
|
||||||
|
|
||||||
p->next_packet_size = p->pages->num * page_size;
|
p->next_packet_size = p->normal_num * page_size;
|
||||||
p->flags |= MULTIFD_FLAG_NOCOMP;
|
p->flags |= MULTIFD_FLAG_NOCOMP;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -262,7 +262,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
|
|||||||
|
|
||||||
packet->flags = cpu_to_be32(p->flags);
|
packet->flags = cpu_to_be32(p->flags);
|
||||||
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
|
packet->pages_alloc = cpu_to_be32(p->pages->allocated);
|
||||||
packet->pages_used = cpu_to_be32(p->pages->num);
|
packet->pages_used = cpu_to_be32(p->normal_num);
|
||||||
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
|
packet->next_packet_size = cpu_to_be32(p->next_packet_size);
|
||||||
packet->packet_num = cpu_to_be64(p->packet_num);
|
packet->packet_num = cpu_to_be64(p->packet_num);
|
||||||
|
|
||||||
@ -270,9 +270,9 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
|
|||||||
strncpy(packet->ramblock, p->pages->block->idstr, 256);
|
strncpy(packet->ramblock, p->pages->block->idstr, 256);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (i = 0; i < p->pages->num; i++) {
|
for (i = 0; i < p->normal_num; i++) {
|
||||||
/* there are architectures where ram_addr_t is 32 bit */
|
/* there are architectures where ram_addr_t is 32 bit */
|
||||||
uint64_t temp = p->pages->offset[i];
|
uint64_t temp = p->normal[i];
|
||||||
|
|
||||||
packet->offset[i] = cpu_to_be64(temp);
|
packet->offset[i] = cpu_to_be64(temp);
|
||||||
}
|
}
|
||||||
@ -559,6 +559,8 @@ void multifd_save_cleanup(void)
|
|||||||
p->packet = NULL;
|
p->packet = NULL;
|
||||||
g_free(p->iov);
|
g_free(p->iov);
|
||||||
p->iov = NULL;
|
p->iov = NULL;
|
||||||
|
g_free(p->normal);
|
||||||
|
p->normal = NULL;
|
||||||
multifd_send_state->ops->send_cleanup(p, &local_err);
|
multifd_send_state->ops->send_cleanup(p, &local_err);
|
||||||
if (local_err) {
|
if (local_err) {
|
||||||
migrate_set_error(migrate_get_current(), local_err);
|
migrate_set_error(migrate_get_current(), local_err);
|
||||||
@ -643,12 +645,17 @@ static void *multifd_send_thread(void *opaque)
|
|||||||
qemu_mutex_lock(&p->mutex);
|
qemu_mutex_lock(&p->mutex);
|
||||||
|
|
||||||
if (p->pending_job) {
|
if (p->pending_job) {
|
||||||
uint32_t used = p->pages->num;
|
|
||||||
uint64_t packet_num = p->packet_num;
|
uint64_t packet_num = p->packet_num;
|
||||||
uint32_t flags = p->flags;
|
uint32_t flags = p->flags;
|
||||||
p->iovs_num = 1;
|
p->iovs_num = 1;
|
||||||
|
p->normal_num = 0;
|
||||||
|
|
||||||
if (used) {
|
for (int i = 0; i < p->pages->num; i++) {
|
||||||
|
p->normal[p->normal_num] = p->pages->offset[i];
|
||||||
|
p->normal_num++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->normal_num) {
|
||||||
ret = multifd_send_state->ops->send_prepare(p, &local_err);
|
ret = multifd_send_state->ops->send_prepare(p, &local_err);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
qemu_mutex_unlock(&p->mutex);
|
qemu_mutex_unlock(&p->mutex);
|
||||||
@ -658,12 +665,12 @@ static void *multifd_send_thread(void *opaque)
|
|||||||
multifd_send_fill_packet(p);
|
multifd_send_fill_packet(p);
|
||||||
p->flags = 0;
|
p->flags = 0;
|
||||||
p->num_packets++;
|
p->num_packets++;
|
||||||
p->num_pages += used;
|
p->total_normal_pages += p->normal_num;
|
||||||
p->pages->num = 0;
|
p->pages->num = 0;
|
||||||
p->pages->block = NULL;
|
p->pages->block = NULL;
|
||||||
qemu_mutex_unlock(&p->mutex);
|
qemu_mutex_unlock(&p->mutex);
|
||||||
|
|
||||||
trace_multifd_send(p->id, packet_num, used, flags,
|
trace_multifd_send(p->id, packet_num, p->normal_num, flags,
|
||||||
p->next_packet_size);
|
p->next_packet_size);
|
||||||
|
|
||||||
p->iov[0].iov_len = p->packet_len;
|
p->iov[0].iov_len = p->packet_len;
|
||||||
@ -713,7 +720,7 @@ out:
|
|||||||
qemu_mutex_unlock(&p->mutex);
|
qemu_mutex_unlock(&p->mutex);
|
||||||
|
|
||||||
rcu_unregister_thread();
|
rcu_unregister_thread();
|
||||||
trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
|
trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -913,6 +920,7 @@ int multifd_save_setup(Error **errp)
|
|||||||
p->tls_hostname = g_strdup(s->hostname);
|
p->tls_hostname = g_strdup(s->hostname);
|
||||||
/* We need one extra place for the packet header */
|
/* We need one extra place for the packet header */
|
||||||
p->iov = g_new0(struct iovec, page_count + 1);
|
p->iov = g_new0(struct iovec, page_count + 1);
|
||||||
|
p->normal = g_new0(ram_addr_t, page_count);
|
||||||
socket_send_channel_create(multifd_new_send_channel_async, p);
|
socket_send_channel_create(multifd_new_send_channel_async, p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,14 +104,18 @@ typedef struct {
|
|||||||
/* thread local variables */
|
/* thread local variables */
|
||||||
/* packets sent through this channel */
|
/* packets sent through this channel */
|
||||||
uint64_t num_packets;
|
uint64_t num_packets;
|
||||||
/* pages sent through this channel */
|
/* non zero pages sent through this channel */
|
||||||
uint64_t num_pages;
|
uint64_t total_normal_pages;
|
||||||
/* syncs main thread and channels */
|
/* syncs main thread and channels */
|
||||||
QemuSemaphore sem_sync;
|
QemuSemaphore sem_sync;
|
||||||
/* buffers to send */
|
/* buffers to send */
|
||||||
struct iovec *iov;
|
struct iovec *iov;
|
||||||
/* number of iovs used */
|
/* number of iovs used */
|
||||||
uint32_t iovs_num;
|
uint32_t iovs_num;
|
||||||
|
/* Pages that are not zero */
|
||||||
|
ram_addr_t *normal;
|
||||||
|
/* num of non zero pages */
|
||||||
|
uint32_t normal_num;
|
||||||
/* used for compression methods */
|
/* used for compression methods */
|
||||||
void *data;
|
void *data;
|
||||||
} MultiFDSendParams;
|
} MultiFDSendParams;
|
||||||
|
@ -124,13 +124,13 @@ multifd_recv_sync_main_wait(uint8_t id) "channel %u"
|
|||||||
multifd_recv_terminate_threads(bool error) "error %d"
|
multifd_recv_terminate_threads(bool error) "error %d"
|
||||||
multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
|
multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
|
||||||
multifd_recv_thread_start(uint8_t id) "%u"
|
multifd_recv_thread_start(uint8_t id) "%u"
|
||||||
multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u"
|
multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u"
|
||||||
multifd_send_error(uint8_t id) "channel %u"
|
multifd_send_error(uint8_t id) "channel %u"
|
||||||
multifd_send_sync_main(long packet_num) "packet num %ld"
|
multifd_send_sync_main(long packet_num) "packet num %ld"
|
||||||
multifd_send_sync_main_signal(uint8_t id) "channel %u"
|
multifd_send_sync_main_signal(uint8_t id) "channel %u"
|
||||||
multifd_send_sync_main_wait(uint8_t id) "channel %u"
|
multifd_send_sync_main_wait(uint8_t id) "channel %u"
|
||||||
multifd_send_terminate_threads(bool error) "error %d"
|
multifd_send_terminate_threads(bool error) "error %d"
|
||||||
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
|
multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %" PRIu64
|
||||||
multifd_send_thread_start(uint8_t id) "%u"
|
multifd_send_thread_start(uint8_t id) "%u"
|
||||||
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
|
multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
|
||||||
multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
|
multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
|
||||||
|
Loading…
Reference in New Issue
Block a user