diff --git a/migration/qemu-file.c b/migration/qemu-file.c index bb63c779cc..bafe3a0c0d 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -658,8 +658,32 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } -/* Compress size bytes of data start at p with specific compression - * level and store the compressed data to the buffer of f. +/* return the size after compression, or negative value on error */ +static int qemu_compress_data(z_stream *stream, uint8_t *dest, size_t dest_len, + const uint8_t *source, size_t source_len) +{ + int err; + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = source_len; + stream->next_in = (uint8_t *)source; + stream->avail_out = dest_len; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->next_out - dest; +} + +/* Compress size bytes of data start at p and store the compressed + * data to the buffer of f. * * When f is not writable, return -1 if f has no space to save the * compressed data. @@ -667,9 +691,8 @@ uint64_t qemu_get_be64(QEMUFile *f) * do fflush first, if f still has no space to save the compressed * data, return -1. */ - -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, - int level) +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, + const uint8_t *p, size_t size) { ssize_t blen = IO_BUF_SIZE - f->buf_index - sizeof(int32_t); @@ -683,8 +706,10 @@ ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, return -1; } } - if (compress2(f->buf + f->buf_index + sizeof(int32_t), (uLongf *)&blen, - (Bytef *)p, size, level) != Z_OK) { + + blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t), + blen, p, size); + if (blen < 0) { error_report("Compress Failed!"); return 0; } diff --git a/migration/qemu-file.h b/migration/qemu-file.h index f4f356ab12..2ccfcfb2a8 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -25,6 +25,8 @@ #ifndef MIGRATION_QEMU_FILE_H #define MIGRATION_QEMU_FILE_H +#include + /* Read a chunk of data from a file at the given position. The pos argument * can be ignored if the file is only be used for streaming. The number of * bytes actually read should be returned. @@ -132,8 +134,8 @@ bool qemu_file_is_writable(QEMUFile *f); size_t qemu_peek_buffer(QEMUFile *f, uint8_t **buf, size_t size, size_t offset); size_t qemu_get_buffer_in_place(QEMUFile *f, uint8_t **buf, size_t size); -ssize_t qemu_put_compression_data(QEMUFile *f, const uint8_t *p, size_t size, - int level); +ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream, + const uint8_t *p, size_t size); int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src); /* diff --git a/migration/ram.c b/migration/ram.c index 409c847a76..a21514a469 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -269,6 +269,7 @@ struct CompressParam { QemuCond cond; RAMBlock *block; ram_addr_t offset; + z_stream stream; }; typedef struct CompressParam CompressParam; @@ -299,7 +300,7 @@ static QemuThread *decompress_threads; static QemuMutex decomp_done_lock; static QemuCond decomp_done_cond; -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset); static void *do_data_compress(void *opaque) @@ -316,7 +317,7 @@ static void *do_data_compress(void *opaque) param->block = NULL; qemu_mutex_unlock(¶m->mutex); - do_compress_ram_page(param->file, block, offset); + do_compress_ram_page(param->file, ¶m->stream, block, offset); qemu_mutex_lock(&comp_done_lock); param->done = true; @@ -357,10 +358,19 @@ static void compress_threads_save_cleanup(void) terminate_compression_threads(); thread_count = migrate_compress_threads(); for (i = 0; i < thread_count; i++) { + /* + * we use it as a indicator which shows if the thread is + * properly init'd or not + */ + if (!comp_param[i].file) { + break; + } qemu_thread_join(compress_threads + i); - qemu_fclose(comp_param[i].file); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); + deflateEnd(&comp_param[i].stream); + qemu_fclose(comp_param[i].file); + comp_param[i].file = NULL; } qemu_mutex_destroy(&comp_done_lock); qemu_cond_destroy(&comp_done_cond); @@ -370,12 +380,12 @@ static void compress_threads_save_cleanup(void) comp_param = NULL; } -static void compress_threads_save_setup(void) +static int compress_threads_save_setup(void) { int i, thread_count; if (!migrate_use_compression()) { - return; + return 0; } thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); @@ -383,6 +393,11 @@ static void compress_threads_save_setup(void) qemu_cond_init(&comp_done_cond); qemu_mutex_init(&comp_done_lock); for (i = 0; i < thread_count; i++) { + if (deflateInit(&comp_param[i].stream, + migrate_compress_level()) != Z_OK) { + goto exit; + } + /* comp_param[i].file is just used as a dummy buffer to save data, * set its ops to empty. */ @@ -395,6 +410,11 @@ static void compress_threads_save_setup(void) do_data_compress, comp_param + i, QEMU_THREAD_JOINABLE); } + return 0; + +exit: + compress_threads_save_cleanup(); + return -1; } /* Multiple fd's */ @@ -1031,7 +1051,7 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) return pages; } -static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, +static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block, ram_addr_t offset) { RAMState *rs = ram_state; @@ -1040,8 +1060,7 @@ static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, bytes_sent = save_page_header(rs, f, block, offset | RAM_SAVE_FLAG_COMPRESS_PAGE); - blen = qemu_put_compression_data(f, p, TARGET_PAGE_SIZE, - migrate_compress_level()); + blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE); if (blen < 0) { bytes_sent = 0; qemu_file_set_error(migrate_get_current()->to_dst_file, blen); @@ -2214,9 +2233,14 @@ static int ram_save_setup(QEMUFile *f, void *opaque) RAMState **rsp = opaque; RAMBlock *block; + if (compress_threads_save_setup()) { + return -1; + } + /* migration has already setup the bitmap, reuse it. */ if (!migration_in_colo_state()) { if (ram_init_all(rsp) != 0) { + compress_threads_save_cleanup(); return -1; } } @@ -2236,7 +2260,6 @@ static int ram_save_setup(QEMUFile *f, void *opaque) } rcu_read_unlock(); - compress_threads_save_setup(); ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP);