/* * QEMU System Emulator * * Copyright (c) 2003-2008 Fabrice Bellard * Copyright (c) 2011-2015 Red Hat Inc * * Authors: * Juan Quintela * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ #include "qemu/osdep.h" #include "qemu/cutils.h" #include "ram-compress.h" #include "qemu/error-report.h" #include "qemu/stats64.h" #include "migration.h" #include "options.h" #include "io/channel-null.h" #include "exec/target_page.h" #include "exec/ramblock.h" #include "ram.h" #include "migration-stats.h" CompressionStats compression_counters; static CompressParam *comp_param; static QemuThread *compress_threads; /* comp_done_cond is used to wake up the migration thread when * one of the compression threads has finished the compression. * comp_done_lock is used to co-work with comp_done_cond. */ static QemuMutex comp_done_lock; static QemuCond comp_done_cond; struct DecompressParam { bool done; bool quit; QemuMutex mutex; QemuCond cond; void *des; uint8_t *compbuf; int len; z_stream stream; }; typedef struct DecompressParam DecompressParam; static QEMUFile *decomp_file; static DecompressParam *decomp_param; static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset, uint8_t *source_buf); static void *do_data_compress(void *opaque) { CompressParam *param = opaque; RAMBlock *block; ram_addr_t offset; CompressResult result; qemu_mutex_lock(¶m->mutex); while (!param->quit) { if (param->trigger) { block = param->block; offset = param->offset; param->trigger = false; qemu_mutex_unlock(¶m->mutex); result = do_compress_ram_page(param->file, ¶m->stream, block, offset, param->originbuf); qemu_mutex_lock(&comp_done_lock); param->done = true; param->result = result; qemu_cond_signal(&comp_done_cond); qemu_mutex_unlock(&comp_done_lock); qemu_mutex_lock(¶m->mutex); } else { qemu_cond_wait(¶m->cond, ¶m->mutex); } } qemu_mutex_unlock(¶m->mutex); return NULL; } void compress_threads_save_cleanup(void) { int i, thread_count; if (!migrate_compress() || !comp_param) { return; } thread_count = migrate_compress_threads(); for (i = 0; i < thread_count; i++) { /* * we use it as a indicator which shows if the thread is * properly init'd or not */ if (!comp_param[i].file) { break; } qemu_mutex_lock(&comp_param[i].mutex); comp_param[i].quit = true; qemu_cond_signal(&comp_param[i].cond); qemu_mutex_unlock(&comp_param[i].mutex); qemu_thread_join(compress_threads + i); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); deflateEnd(&comp_param[i].stream); g_free(comp_param[i].originbuf); qemu_fclose(comp_param[i].file); comp_param[i].file = NULL; } qemu_mutex_destroy(&comp_done_lock); qemu_cond_destroy(&comp_done_cond); g_free(compress_threads); g_free(comp_param); compress_threads = NULL; comp_param = NULL; } int compress_threads_save_setup(void) { int i, thread_count; if (!migrate_compress()) { return 0; } thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); qemu_cond_init(&comp_done_cond); qemu_mutex_init(&comp_done_lock); for (i = 0; i < thread_count; i++) { comp_param[i].originbuf = g_try_malloc(qemu_target_page_size()); if (!comp_param[i].originbuf) { goto exit; } if (deflateInit(&comp_param[i].stream, migrate_compress_level()) != Z_OK) { g_free(comp_param[i].originbuf); goto exit; } /* comp_param[i].file is just used as a dummy buffer to save data, * set its ops to empty. */ comp_param[i].file = qemu_file_new_output( QIO_CHANNEL(qio_channel_null_new())); comp_param[i].done = true; comp_param[i].quit = false; qemu_mutex_init(&comp_param[i].mutex); qemu_cond_init(&comp_param[i].cond); qemu_thread_create(compress_threads + i, "compress", do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); } return 0; exit: compress_threads_save_cleanup(); return -1; } static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset, uint8_t *source_buf) { uint8_t *p = block->host + offset; size_t page_size = qemu_target_page_size(); int ret; assert(qemu_file_buffer_empty(f)); if (buffer_is_zero(p, page_size)) { return RES_ZEROPAGE; } /* * copy it to a internal buffer to avoid it being modified by VM * so that we can catch up the error during compression and * decompression */ memcpy(source_buf, p, page_size); ret = qemu_put_compression_data(f, stream, source_buf, page_size); if (ret < 0) { qemu_file_set_error(migrate_get_current()->to_dst_file, ret); error_report("compressed data failed!"); qemu_fflush(f); return RES_NONE; } return RES_COMPRESS; } static inline void compress_reset_result(CompressParam *param) { param->result = RES_NONE; param->block = NULL; param->offset = 0; } void flush_compressed_data(int (send_queued_data(CompressParam *))) { int thread_count = migrate_compress_threads(); qemu_mutex_lock(&comp_done_lock); for (int i = 0; i < thread_count; i++) { while (!comp_param[i].done) { qemu_cond_wait(&comp_done_cond, &comp_done_lock); } } qemu_mutex_unlock(&comp_done_lock); for (int i = 0; i < thread_count; i++) { qemu_mutex_lock(&comp_param[i].mutex); if (!comp_param[i].quit) { CompressParam *param = &comp_param[i]; send_queued_data(param); assert(qemu_file_buffer_empty(param->file)); compress_reset_result(param); } qemu_mutex_unlock(&comp_param[i].mutex); } } static inline void set_compress_params(CompressParam *param, RAMBlock *block, ram_addr_t offset) { param->block = block; param->offset = offset; param->trigger = true; } int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset, int (send_queued_data(CompressParam *))) { int thread_count, pages = -1; bool wait = migrate_compress_wait_thread(); thread_count = migrate_compress_threads(); qemu_mutex_lock(&comp_done_lock); retry: for (int i = 0; i < thread_count; i++) { if (comp_param[i].done) { CompressParam *param = &comp_param[i]; qemu_mutex_lock(¶m->mutex); param->done = false; send_queued_data(param); assert(qemu_file_buffer_empty(param->file)); compress_reset_result(param); set_compress_params(param, block, offset); qemu_cond_signal(¶m->cond); qemu_mutex_unlock(¶m->mutex); pages = 1; break; } } /* * wait for the free thread if the user specifies 'compress-wait-thread', * otherwise we will post the page out in the main thread as normal page. */ if (pages < 0 && wait) { qemu_cond_wait(&comp_done_cond, &comp_done_lock); goto retry; } qemu_mutex_unlock(&comp_done_lock); return pages; } /* return the size after decompression, or negative value on error */ static int qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len, const uint8_t *source, size_t source_len) { int err; err = inflateReset(stream); if (err != Z_OK) { return -1; } stream->avail_in = source_len; stream->next_in = (uint8_t *)source; stream->avail_out = dest_len; stream->next_out = dest; err = inflate(stream, Z_NO_FLUSH); if (err != Z_STREAM_END) { return -1; } return stream->total_out; } static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; unsigned long pagesize; uint8_t *des; int len, ret; qemu_mutex_lock(¶m->mutex); while (!param->quit) { if (param->des) { des = param->des; len = param->len; param->des = 0; qemu_mutex_unlock(¶m->mutex); pagesize = qemu_target_page_size(); ret = qemu_uncompress_data(¶m->stream, des, pagesize, param->compbuf, len); if (ret < 0 && migrate_get_current()->decompress_error_check) { error_report("decompress data failed"); qemu_file_set_error(decomp_file, ret); } qemu_mutex_lock(&decomp_done_lock); param->done = true; qemu_cond_signal(&decomp_done_cond); qemu_mutex_unlock(&decomp_done_lock); qemu_mutex_lock(¶m->mutex); } else { qemu_cond_wait(¶m->cond, ¶m->mutex); } } qemu_mutex_unlock(¶m->mutex); return NULL; } int wait_for_decompress_done(void) { if (!migrate_compress()) { return 0; } int thread_count = migrate_decompress_threads(); qemu_mutex_lock(&decomp_done_lock); for (int i = 0; i < thread_count; i++) { while (!decomp_param[i].done) { qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); } } qemu_mutex_unlock(&decomp_done_lock); return qemu_file_get_error(decomp_file); } void compress_threads_load_cleanup(void) { int i, thread_count; if (!migrate_compress()) { return; } thread_count = migrate_decompress_threads(); for (i = 0; i < thread_count; i++) { /* * we use it as a indicator which shows if the thread is * properly init'd or not */ if (!decomp_param[i].compbuf) { break; } qemu_mutex_lock(&decomp_param[i].mutex); decomp_param[i].quit = true; qemu_cond_signal(&decomp_param[i].cond); qemu_mutex_unlock(&decomp_param[i].mutex); } for (i = 0; i < thread_count; i++) { if (!decomp_param[i].compbuf) { break; } qemu_thread_join(decompress_threads + i); qemu_mutex_destroy(&decomp_param[i].mutex); qemu_cond_destroy(&decomp_param[i].cond); inflateEnd(&decomp_param[i].stream); g_free(decomp_param[i].compbuf); decomp_param[i].compbuf = NULL; } g_free(decompress_threads); g_free(decomp_param); decompress_threads = NULL; decomp_param = NULL; decomp_file = NULL; } int compress_threads_load_setup(QEMUFile *f) { int i, thread_count; if (!migrate_compress()) { return 0; } /* * set compression_counters memory to zero for a new migration */ memset(&compression_counters, 0, sizeof(compression_counters)); thread_count = migrate_decompress_threads(); decompress_threads = g_new0(QemuThread, thread_count); decomp_param = g_new0(DecompressParam, thread_count); qemu_mutex_init(&decomp_done_lock); qemu_cond_init(&decomp_done_cond); decomp_file = f; for (i = 0; i < thread_count; i++) { if (inflateInit(&decomp_param[i].stream) != Z_OK) { goto exit; } size_t compbuf_size = compressBound(qemu_target_page_size()); decomp_param[i].compbuf = g_malloc0(compbuf_size); qemu_mutex_init(&decomp_param[i].mutex); qemu_cond_init(&decomp_param[i].cond); decomp_param[i].done = true; decomp_param[i].quit = false; qemu_thread_create(decompress_threads + i, "decompress", do_data_decompress, decomp_param + i, QEMU_THREAD_JOINABLE); } return 0; exit: compress_threads_load_cleanup(); return -1; } void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len) { int thread_count = migrate_decompress_threads(); QEMU_LOCK_GUARD(&decomp_done_lock); while (true) { for (int i = 0; i < thread_count; i++) { if (decomp_param[i].done) { decomp_param[i].done = false; qemu_mutex_lock(&decomp_param[i].mutex); qemu_get_buffer(f, decomp_param[i].compbuf, len); decomp_param[i].des = host; decomp_param[i].len = len; qemu_cond_signal(&decomp_param[i].cond); qemu_mutex_unlock(&decomp_param[i].mutex); return; } } qemu_cond_wait(&decomp_done_cond, &decomp_done_lock); } } void populate_compress(MigrationInfo *info) { if (!migrate_compress()) { return; } info->compression = g_malloc0(sizeof(*info->compression)); info->compression->pages = compression_counters.pages; info->compression->busy = compression_counters.busy; info->compression->busy_rate = compression_counters.busy_rate; info->compression->compressed_size = compression_counters.compressed_size; info->compression->compression_rate = compression_counters.compression_rate; } uint64_t ram_compressed_pages(void) { return compression_counters.pages; } void update_compress_thread_counts(const CompressParam *param, int bytes_xmit) { ram_transferred_add(bytes_xmit); if (param->result == RES_ZEROPAGE) { stat64_add(&mig_stats.zero_pages, 1); return; } /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */ compression_counters.compressed_size += bytes_xmit - 8; compression_counters.pages++; }