MAINTAINERS: Split vt82c686 out of fuloong2e
[qemu/armbru.git] / migration / ram-compress.c
blobd037dfe6cf12d0ee0c5d82482f6ecd652e1135c0
1 /*
2 * QEMU System Emulator
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
7 * Authors:
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "qemu/stats64.h"
36 #include "migration.h"
37 #include "options.h"
38 #include "io/channel-null.h"
39 #include "exec/target_page.h"
40 #include "exec/ramblock.h"
41 #include "ram.h"
42 #include "migration-stats.h"
44 CompressionStats compression_counters;
46 static CompressParam *comp_param;
47 static QemuThread *compress_threads;
48 /* comp_done_cond is used to wake up the migration thread when
49 * one of the compression threads has finished the compression.
50 * comp_done_lock is used to co-work with comp_done_cond.
52 static QemuMutex comp_done_lock;
53 static QemuCond comp_done_cond;
55 struct DecompressParam {
56 bool done;
57 bool quit;
58 QemuMutex mutex;
59 QemuCond cond;
60 void *des;
61 uint8_t *compbuf;
62 int len;
63 z_stream stream;
65 typedef struct DecompressParam DecompressParam;
67 static QEMUFile *decomp_file;
68 static DecompressParam *decomp_param;
69 static QemuThread *decompress_threads;
70 static QemuMutex decomp_done_lock;
71 static QemuCond decomp_done_cond;
73 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
74 RAMBlock *block, ram_addr_t offset,
75 uint8_t *source_buf);
77 static void *do_data_compress(void *opaque)
79 CompressParam *param = opaque;
80 RAMBlock *block;
81 ram_addr_t offset;
82 CompressResult result;
84 qemu_mutex_lock(&param->mutex);
85 while (!param->quit) {
86 if (param->trigger) {
87 block = param->block;
88 offset = param->offset;
89 param->trigger = false;
90 qemu_mutex_unlock(&param->mutex);
92 result = do_compress_ram_page(param->file, &param->stream,
93 block, offset, param->originbuf);
95 qemu_mutex_lock(&comp_done_lock);
96 param->done = true;
97 param->result = result;
98 qemu_cond_signal(&comp_done_cond);
99 qemu_mutex_unlock(&comp_done_lock);
101 qemu_mutex_lock(&param->mutex);
102 } else {
103 qemu_cond_wait(&param->cond, &param->mutex);
106 qemu_mutex_unlock(&param->mutex);
108 return NULL;
111 void compress_threads_save_cleanup(void)
113 int i, thread_count;
115 if (!migrate_compress() || !comp_param) {
116 return;
119 thread_count = migrate_compress_threads();
120 for (i = 0; i < thread_count; i++) {
122 * we use it as a indicator which shows if the thread is
123 * properly init'd or not
125 if (!comp_param[i].file) {
126 break;
129 qemu_mutex_lock(&comp_param[i].mutex);
130 comp_param[i].quit = true;
131 qemu_cond_signal(&comp_param[i].cond);
132 qemu_mutex_unlock(&comp_param[i].mutex);
134 qemu_thread_join(compress_threads + i);
135 qemu_mutex_destroy(&comp_param[i].mutex);
136 qemu_cond_destroy(&comp_param[i].cond);
137 deflateEnd(&comp_param[i].stream);
138 g_free(comp_param[i].originbuf);
139 qemu_fclose(comp_param[i].file);
140 comp_param[i].file = NULL;
142 qemu_mutex_destroy(&comp_done_lock);
143 qemu_cond_destroy(&comp_done_cond);
144 g_free(compress_threads);
145 g_free(comp_param);
146 compress_threads = NULL;
147 comp_param = NULL;
150 int compress_threads_save_setup(void)
152 int i, thread_count;
154 if (!migrate_compress()) {
155 return 0;
157 thread_count = migrate_compress_threads();
158 compress_threads = g_new0(QemuThread, thread_count);
159 comp_param = g_new0(CompressParam, thread_count);
160 qemu_cond_init(&comp_done_cond);
161 qemu_mutex_init(&comp_done_lock);
162 for (i = 0; i < thread_count; i++) {
163 comp_param[i].originbuf = g_try_malloc(qemu_target_page_size());
164 if (!comp_param[i].originbuf) {
165 goto exit;
168 if (deflateInit(&comp_param[i].stream,
169 migrate_compress_level()) != Z_OK) {
170 g_free(comp_param[i].originbuf);
171 goto exit;
174 /* comp_param[i].file is just used as a dummy buffer to save data,
175 * set its ops to empty.
177 comp_param[i].file = qemu_file_new_output(
178 QIO_CHANNEL(qio_channel_null_new()));
179 comp_param[i].done = true;
180 comp_param[i].quit = false;
181 qemu_mutex_init(&comp_param[i].mutex);
182 qemu_cond_init(&comp_param[i].cond);
183 qemu_thread_create(compress_threads + i, "compress",
184 do_data_compress, comp_param + i,
185 QEMU_THREAD_JOINABLE);
187 return 0;
189 exit:
190 compress_threads_save_cleanup();
191 return -1;
194 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
195 RAMBlock *block, ram_addr_t offset,
196 uint8_t *source_buf)
198 uint8_t *p = block->host + offset;
199 size_t page_size = qemu_target_page_size();
200 int ret;
202 assert(qemu_file_buffer_empty(f));
204 if (buffer_is_zero(p, page_size)) {
205 return RES_ZEROPAGE;
209 * copy it to a internal buffer to avoid it being modified by VM
210 * so that we can catch up the error during compression and
211 * decompression
213 memcpy(source_buf, p, page_size);
214 ret = qemu_put_compression_data(f, stream, source_buf, page_size);
215 if (ret < 0) {
216 qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
217 error_report("compressed data failed!");
218 qemu_fflush(f);
219 return RES_NONE;
221 return RES_COMPRESS;
224 static inline void compress_reset_result(CompressParam *param)
226 param->result = RES_NONE;
227 param->block = NULL;
228 param->offset = 0;
231 void flush_compressed_data(int (send_queued_data(CompressParam *)))
233 int thread_count = migrate_compress_threads();
235 qemu_mutex_lock(&comp_done_lock);
236 for (int i = 0; i < thread_count; i++) {
237 while (!comp_param[i].done) {
238 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
241 qemu_mutex_unlock(&comp_done_lock);
243 for (int i = 0; i < thread_count; i++) {
244 qemu_mutex_lock(&comp_param[i].mutex);
245 if (!comp_param[i].quit) {
246 CompressParam *param = &comp_param[i];
247 send_queued_data(param);
248 assert(qemu_file_buffer_empty(param->file));
249 compress_reset_result(param);
251 qemu_mutex_unlock(&comp_param[i].mutex);
255 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
256 ram_addr_t offset)
258 param->block = block;
259 param->offset = offset;
260 param->trigger = true;
263 int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
264 int (send_queued_data(CompressParam *)))
266 int thread_count, pages = -1;
267 bool wait = migrate_compress_wait_thread();
269 thread_count = migrate_compress_threads();
270 qemu_mutex_lock(&comp_done_lock);
271 retry:
272 for (int i = 0; i < thread_count; i++) {
273 if (comp_param[i].done) {
274 CompressParam *param = &comp_param[i];
275 qemu_mutex_lock(&param->mutex);
276 param->done = false;
277 send_queued_data(param);
278 assert(qemu_file_buffer_empty(param->file));
279 compress_reset_result(param);
280 set_compress_params(param, block, offset);
282 qemu_cond_signal(&param->cond);
283 qemu_mutex_unlock(&param->mutex);
284 pages = 1;
285 break;
290 * wait for the free thread if the user specifies 'compress-wait-thread',
291 * otherwise we will post the page out in the main thread as normal page.
293 if (pages < 0 && wait) {
294 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
295 goto retry;
297 qemu_mutex_unlock(&comp_done_lock);
299 return pages;
302 /* return the size after decompression, or negative value on error */
303 static int
304 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
305 const uint8_t *source, size_t source_len)
307 int err;
309 err = inflateReset(stream);
310 if (err != Z_OK) {
311 return -1;
314 stream->avail_in = source_len;
315 stream->next_in = (uint8_t *)source;
316 stream->avail_out = dest_len;
317 stream->next_out = dest;
319 err = inflate(stream, Z_NO_FLUSH);
320 if (err != Z_STREAM_END) {
321 return -1;
324 return stream->total_out;
327 static void *do_data_decompress(void *opaque)
329 DecompressParam *param = opaque;
330 unsigned long pagesize;
331 uint8_t *des;
332 int len, ret;
334 qemu_mutex_lock(&param->mutex);
335 while (!param->quit) {
336 if (param->des) {
337 des = param->des;
338 len = param->len;
339 param->des = 0;
340 qemu_mutex_unlock(&param->mutex);
342 pagesize = qemu_target_page_size();
344 ret = qemu_uncompress_data(&param->stream, des, pagesize,
345 param->compbuf, len);
346 if (ret < 0 && migrate_get_current()->decompress_error_check) {
347 error_report("decompress data failed");
348 qemu_file_set_error(decomp_file, ret);
351 qemu_mutex_lock(&decomp_done_lock);
352 param->done = true;
353 qemu_cond_signal(&decomp_done_cond);
354 qemu_mutex_unlock(&decomp_done_lock);
356 qemu_mutex_lock(&param->mutex);
357 } else {
358 qemu_cond_wait(&param->cond, &param->mutex);
361 qemu_mutex_unlock(&param->mutex);
363 return NULL;
366 int wait_for_decompress_done(void)
368 if (!migrate_compress()) {
369 return 0;
372 int thread_count = migrate_decompress_threads();
373 qemu_mutex_lock(&decomp_done_lock);
374 for (int i = 0; i < thread_count; i++) {
375 while (!decomp_param[i].done) {
376 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
379 qemu_mutex_unlock(&decomp_done_lock);
380 return qemu_file_get_error(decomp_file);
383 void compress_threads_load_cleanup(void)
385 int i, thread_count;
387 if (!migrate_compress()) {
388 return;
390 thread_count = migrate_decompress_threads();
391 for (i = 0; i < thread_count; i++) {
393 * we use it as a indicator which shows if the thread is
394 * properly init'd or not
396 if (!decomp_param[i].compbuf) {
397 break;
400 qemu_mutex_lock(&decomp_param[i].mutex);
401 decomp_param[i].quit = true;
402 qemu_cond_signal(&decomp_param[i].cond);
403 qemu_mutex_unlock(&decomp_param[i].mutex);
405 for (i = 0; i < thread_count; i++) {
406 if (!decomp_param[i].compbuf) {
407 break;
410 qemu_thread_join(decompress_threads + i);
411 qemu_mutex_destroy(&decomp_param[i].mutex);
412 qemu_cond_destroy(&decomp_param[i].cond);
413 inflateEnd(&decomp_param[i].stream);
414 g_free(decomp_param[i].compbuf);
415 decomp_param[i].compbuf = NULL;
417 g_free(decompress_threads);
418 g_free(decomp_param);
419 decompress_threads = NULL;
420 decomp_param = NULL;
421 decomp_file = NULL;
424 int compress_threads_load_setup(QEMUFile *f)
426 int i, thread_count;
428 if (!migrate_compress()) {
429 return 0;
433 * set compression_counters memory to zero for a new migration
435 memset(&compression_counters, 0, sizeof(compression_counters));
437 thread_count = migrate_decompress_threads();
438 decompress_threads = g_new0(QemuThread, thread_count);
439 decomp_param = g_new0(DecompressParam, thread_count);
440 qemu_mutex_init(&decomp_done_lock);
441 qemu_cond_init(&decomp_done_cond);
442 decomp_file = f;
443 for (i = 0; i < thread_count; i++) {
444 if (inflateInit(&decomp_param[i].stream) != Z_OK) {
445 goto exit;
448 size_t compbuf_size = compressBound(qemu_target_page_size());
449 decomp_param[i].compbuf = g_malloc0(compbuf_size);
450 qemu_mutex_init(&decomp_param[i].mutex);
451 qemu_cond_init(&decomp_param[i].cond);
452 decomp_param[i].done = true;
453 decomp_param[i].quit = false;
454 qemu_thread_create(decompress_threads + i, "decompress",
455 do_data_decompress, decomp_param + i,
456 QEMU_THREAD_JOINABLE);
458 return 0;
459 exit:
460 compress_threads_load_cleanup();
461 return -1;
464 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
466 int thread_count = migrate_decompress_threads();
467 QEMU_LOCK_GUARD(&decomp_done_lock);
468 while (true) {
469 for (int i = 0; i < thread_count; i++) {
470 if (decomp_param[i].done) {
471 decomp_param[i].done = false;
472 qemu_mutex_lock(&decomp_param[i].mutex);
473 qemu_get_buffer(f, decomp_param[i].compbuf, len);
474 decomp_param[i].des = host;
475 decomp_param[i].len = len;
476 qemu_cond_signal(&decomp_param[i].cond);
477 qemu_mutex_unlock(&decomp_param[i].mutex);
478 return;
481 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
485 void populate_compress(MigrationInfo *info)
487 if (!migrate_compress()) {
488 return;
490 info->compression = g_malloc0(sizeof(*info->compression));
491 info->compression->pages = compression_counters.pages;
492 info->compression->busy = compression_counters.busy;
493 info->compression->busy_rate = compression_counters.busy_rate;
494 info->compression->compressed_size = compression_counters.compressed_size;
495 info->compression->compression_rate = compression_counters.compression_rate;
498 uint64_t ram_compressed_pages(void)
500 return compression_counters.pages;
503 void update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
505 ram_transferred_add(bytes_xmit);
507 if (param->result == RES_ZEROPAGE) {
508 stat64_add(&mig_stats.zero_pages, 1);
509 return;
512 /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
513 compression_counters.compressed_size += bytes_xmit - 8;
514 compression_counters.pages++;