migration: Add the core code of multi-thread compression
Implement the core logic of the multiple thread compression. At this point, multiple thread compression can't co-work with xbzrle yet. Signed-off-by: Liang Li <liang.z.li@intel.com> Signed-off-by: Yang Zhang <yang.z.zhang@intel.com> Signed-off-by: Juan Quintela <quintela@redhat.com>
This commit is contained in:
parent
e2102428c0
commit
20eb617eac
185
arch_init.c
185
arch_init.c
@ -355,12 +355,33 @@ static DecompressParam *decomp_param;
|
|||||||
static QemuThread *decompress_threads;
|
static QemuThread *decompress_threads;
|
||||||
static uint8_t *compressed_data_buf;
|
static uint8_t *compressed_data_buf;
|
||||||
|
|
||||||
|
static int do_compress_ram_page(CompressParam *param);
|
||||||
|
|
||||||
static void *do_data_compress(void *opaque)
|
static void *do_data_compress(void *opaque)
|
||||||
{
|
{
|
||||||
|
CompressParam *param = opaque;
|
||||||
|
|
||||||
while (!quit_comp_thread) {
|
while (!quit_comp_thread) {
|
||||||
|
qemu_mutex_lock(¶m->mutex);
|
||||||
|
/* Re-check the quit_comp_thread in case of
|
||||||
|
* terminate_compression_threads is called just before
|
||||||
|
* qemu_mutex_lock(¶m->mutex) and after
|
||||||
|
* while(!quit_comp_thread), re-check it here can make
|
||||||
|
* sure the compression thread terminate as expected.
|
||||||
|
*/
|
||||||
|
while (!param->start && !quit_comp_thread) {
|
||||||
|
qemu_cond_wait(¶m->cond, ¶m->mutex);
|
||||||
|
}
|
||||||
|
if (!quit_comp_thread) {
|
||||||
|
do_compress_ram_page(param);
|
||||||
|
}
|
||||||
|
param->start = false;
|
||||||
|
qemu_mutex_unlock(¶m->mutex);
|
||||||
|
|
||||||
/* To be done */
|
qemu_mutex_lock(comp_done_lock);
|
||||||
|
param->done = true;
|
||||||
|
qemu_cond_signal(comp_done_cond);
|
||||||
|
qemu_mutex_unlock(comp_done_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -368,9 +389,15 @@ static void *do_data_compress(void *opaque)
|
|||||||
|
|
||||||
static inline void terminate_compression_threads(void)
|
static inline void terminate_compression_threads(void)
|
||||||
{
|
{
|
||||||
quit_comp_thread = true;
|
int idx, thread_count;
|
||||||
|
|
||||||
/* To be done */
|
thread_count = migrate_compress_threads();
|
||||||
|
quit_comp_thread = true;
|
||||||
|
for (idx = 0; idx < thread_count; idx++) {
|
||||||
|
qemu_mutex_lock(&comp_param[idx].mutex);
|
||||||
|
qemu_cond_signal(&comp_param[idx].cond);
|
||||||
|
qemu_mutex_unlock(&comp_param[idx].mutex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void migrate_compress_threads_join(void)
|
void migrate_compress_threads_join(void)
|
||||||
@ -420,6 +447,7 @@ void migrate_compress_threads_create(void)
|
|||||||
* it's ops to empty.
|
* it's ops to empty.
|
||||||
*/
|
*/
|
||||||
comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
|
comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
|
||||||
|
comp_param[i].done = true;
|
||||||
qemu_mutex_init(&comp_param[i].mutex);
|
qemu_mutex_init(&comp_param[i].mutex);
|
||||||
qemu_cond_init(&comp_param[i].cond);
|
qemu_cond_init(&comp_param[i].cond);
|
||||||
qemu_thread_create(compress_threads + i, "compress",
|
qemu_thread_create(compress_threads + i, "compress",
|
||||||
@ -829,6 +857,97 @@ static int ram_save_page(QEMUFile *f, RAMBlock* block, ram_addr_t offset,
|
|||||||
return pages;
|
return pages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int do_compress_ram_page(CompressParam *param)
|
||||||
|
{
|
||||||
|
int bytes_sent, blen;
|
||||||
|
uint8_t *p;
|
||||||
|
RAMBlock *block = param->block;
|
||||||
|
ram_addr_t offset = param->offset;
|
||||||
|
|
||||||
|
p = memory_region_get_ram_ptr(block->mr) + (offset & TARGET_PAGE_MASK);
|
||||||
|
|
||||||
|
bytes_sent = save_page_header(param->file, block, offset |
|
||||||
|
RAM_SAVE_FLAG_COMPRESS_PAGE);
|
||||||
|
blen = qemu_put_compression_data(param->file, p, TARGET_PAGE_SIZE,
|
||||||
|
migrate_compress_level());
|
||||||
|
bytes_sent += blen;
|
||||||
|
|
||||||
|
return bytes_sent;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void start_compression(CompressParam *param)
|
||||||
|
{
|
||||||
|
param->done = false;
|
||||||
|
qemu_mutex_lock(¶m->mutex);
|
||||||
|
param->start = true;
|
||||||
|
qemu_cond_signal(¶m->cond);
|
||||||
|
qemu_mutex_unlock(¶m->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static uint64_t bytes_transferred;
|
||||||
|
|
||||||
|
static void flush_compressed_data(QEMUFile *f)
|
||||||
|
{
|
||||||
|
int idx, len, thread_count;
|
||||||
|
|
||||||
|
if (!migrate_use_compression()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
thread_count = migrate_compress_threads();
|
||||||
|
for (idx = 0; idx < thread_count; idx++) {
|
||||||
|
if (!comp_param[idx].done) {
|
||||||
|
qemu_mutex_lock(comp_done_lock);
|
||||||
|
while (!comp_param[idx].done && !quit_comp_thread) {
|
||||||
|
qemu_cond_wait(comp_done_cond, comp_done_lock);
|
||||||
|
}
|
||||||
|
qemu_mutex_unlock(comp_done_lock);
|
||||||
|
}
|
||||||
|
if (!quit_comp_thread) {
|
||||||
|
len = qemu_put_qemu_file(f, comp_param[idx].file);
|
||||||
|
bytes_transferred += len;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void set_compress_params(CompressParam *param, RAMBlock *block,
|
||||||
|
ram_addr_t offset)
|
||||||
|
{
|
||||||
|
param->block = block;
|
||||||
|
param->offset = offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int compress_page_with_multi_thread(QEMUFile *f, RAMBlock *block,
|
||||||
|
ram_addr_t offset,
|
||||||
|
uint64_t *bytes_transferred)
|
||||||
|
{
|
||||||
|
int idx, thread_count, bytes_xmit = -1, pages = -1;
|
||||||
|
|
||||||
|
thread_count = migrate_compress_threads();
|
||||||
|
qemu_mutex_lock(comp_done_lock);
|
||||||
|
while (true) {
|
||||||
|
for (idx = 0; idx < thread_count; idx++) {
|
||||||
|
if (comp_param[idx].done) {
|
||||||
|
bytes_xmit = qemu_put_qemu_file(f, comp_param[idx].file);
|
||||||
|
set_compress_params(&comp_param[idx], block, offset);
|
||||||
|
start_compression(&comp_param[idx]);
|
||||||
|
pages = 1;
|
||||||
|
acct_info.norm_pages++;
|
||||||
|
*bytes_transferred += bytes_xmit;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pages > 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
qemu_cond_wait(comp_done_cond, comp_done_lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qemu_mutex_unlock(comp_done_lock);
|
||||||
|
|
||||||
|
return pages;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ram_save_compressed_page: compress the given page and send it to the stream
|
* ram_save_compressed_page: compress the given page and send it to the stream
|
||||||
*
|
*
|
||||||
@ -845,8 +964,60 @@ static int ram_save_compressed_page(QEMUFile *f, RAMBlock *block,
|
|||||||
uint64_t *bytes_transferred)
|
uint64_t *bytes_transferred)
|
||||||
{
|
{
|
||||||
int pages = -1;
|
int pages = -1;
|
||||||
|
uint64_t bytes_xmit;
|
||||||
|
MemoryRegion *mr = block->mr;
|
||||||
|
uint8_t *p;
|
||||||
|
int ret;
|
||||||
|
|
||||||
/* To be done*/
|
p = memory_region_get_ram_ptr(mr) + offset;
|
||||||
|
|
||||||
|
bytes_xmit = 0;
|
||||||
|
ret = ram_control_save_page(f, block->offset,
|
||||||
|
offset, TARGET_PAGE_SIZE, &bytes_xmit);
|
||||||
|
if (bytes_xmit) {
|
||||||
|
*bytes_transferred += bytes_xmit;
|
||||||
|
pages = 1;
|
||||||
|
}
|
||||||
|
if (block == last_sent_block) {
|
||||||
|
offset |= RAM_SAVE_FLAG_CONTINUE;
|
||||||
|
}
|
||||||
|
if (ret != RAM_SAVE_CONTROL_NOT_SUPP) {
|
||||||
|
if (ret != RAM_SAVE_CONTROL_DELAYED) {
|
||||||
|
if (bytes_xmit > 0) {
|
||||||
|
acct_info.norm_pages++;
|
||||||
|
} else if (bytes_xmit == 0) {
|
||||||
|
acct_info.dup_pages++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* When starting the process of a new block, the first page of
|
||||||
|
* the block should be sent out before other pages in the same
|
||||||
|
* block, and all the pages in last block should have been sent
|
||||||
|
* out, keeping this order is important, because the 'cont' flag
|
||||||
|
* is used to avoid resending the block name.
|
||||||
|
*/
|
||||||
|
if (block != last_sent_block) {
|
||||||
|
flush_compressed_data(f);
|
||||||
|
pages = save_zero_page(f, block, offset, p, bytes_transferred);
|
||||||
|
if (pages == -1) {
|
||||||
|
set_compress_params(&comp_param[0], block, offset);
|
||||||
|
/* Use the qemu thread to compress the data to make sure the
|
||||||
|
* first page is sent out before other pages
|
||||||
|
*/
|
||||||
|
bytes_xmit = do_compress_ram_page(&comp_param[0]);
|
||||||
|
acct_info.norm_pages++;
|
||||||
|
qemu_put_qemu_file(f, comp_param[0].file);
|
||||||
|
*bytes_transferred += bytes_xmit;
|
||||||
|
pages = 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pages = save_zero_page(f, block, offset, p, bytes_transferred);
|
||||||
|
if (pages == -1) {
|
||||||
|
pages = compress_page_with_multi_thread(f, block, offset,
|
||||||
|
bytes_transferred);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return pages;
|
return pages;
|
||||||
}
|
}
|
||||||
@ -914,8 +1085,6 @@ static int ram_find_and_save_block(QEMUFile *f, bool last_stage,
|
|||||||
return pages;
|
return pages;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint64_t bytes_transferred;
|
|
||||||
|
|
||||||
void acct_update_position(QEMUFile *f, size_t size, bool zero)
|
void acct_update_position(QEMUFile *f, size_t size, bool zero)
|
||||||
{
|
{
|
||||||
uint64_t pages = size / TARGET_PAGE_SIZE;
|
uint64_t pages = size / TARGET_PAGE_SIZE;
|
||||||
@ -1129,6 +1298,7 @@ static int ram_save_iterate(QEMUFile *f, void *opaque)
|
|||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
flush_compressed_data(f);
|
||||||
rcu_read_unlock();
|
rcu_read_unlock();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -1170,6 +1340,7 @@ static int ram_save_complete(QEMUFile *f, void *opaque)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
flush_compressed_data(f);
|
||||||
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
|
ram_control_after_iterate(f, RAM_CONTROL_FINISH);
|
||||||
migration_end();
|
migration_end();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user