4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "qemu/stats64.h"
36 #include "migration.h"
38 #include "io/channel-null.h"
39 #include "exec/target_page.h"
40 #include "exec/ramblock.h"
42 #include "migration-stats.h"
48 int64_t compressed_size
;
49 double compression_rate
;
50 /* compression statistics since the beginning of the period */
51 /* amount of count that no free thread to compress data */
52 uint64_t compress_thread_busy_prev
;
53 /* amount bytes after compression */
54 uint64_t compressed_size_prev
;
55 /* amount of compressed pages */
56 uint64_t compress_pages_prev
;
57 } compression_counters
;
59 static CompressParam
*comp_param
;
60 static QemuThread
*compress_threads
;
61 /* comp_done_cond is used to wake up the migration thread when
62 * one of the compression threads has finished the compression.
63 * comp_done_lock is used to co-work with comp_done_cond.
65 static QemuMutex comp_done_lock
;
66 static QemuCond comp_done_cond
;
68 struct DecompressParam
{
78 typedef struct DecompressParam DecompressParam
;
80 static QEMUFile
*decomp_file
;
81 static DecompressParam
*decomp_param
;
82 static QemuThread
*decompress_threads
;
83 static QemuMutex decomp_done_lock
;
84 static QemuCond decomp_done_cond
;
86 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
87 RAMBlock
*block
, ram_addr_t offset
,
90 static void *do_data_compress(void *opaque
)
92 CompressParam
*param
= opaque
;
95 CompressResult result
;
97 qemu_mutex_lock(¶m
->mutex
);
98 while (!param
->quit
) {
100 block
= param
->block
;
101 offset
= param
->offset
;
102 param
->trigger
= false;
103 qemu_mutex_unlock(¶m
->mutex
);
105 result
= do_compress_ram_page(param
->file
, ¶m
->stream
,
106 block
, offset
, param
->originbuf
);
108 qemu_mutex_lock(&comp_done_lock
);
110 param
->result
= result
;
111 qemu_cond_signal(&comp_done_cond
);
112 qemu_mutex_unlock(&comp_done_lock
);
114 qemu_mutex_lock(¶m
->mutex
);
116 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
119 qemu_mutex_unlock(¶m
->mutex
);
124 void compress_threads_save_cleanup(void)
128 if (!migrate_compress() || !comp_param
) {
132 thread_count
= migrate_compress_threads();
133 for (i
= 0; i
< thread_count
; i
++) {
135 * we use it as a indicator which shows if the thread is
136 * properly init'd or not
138 if (!comp_param
[i
].file
) {
142 qemu_mutex_lock(&comp_param
[i
].mutex
);
143 comp_param
[i
].quit
= true;
144 qemu_cond_signal(&comp_param
[i
].cond
);
145 qemu_mutex_unlock(&comp_param
[i
].mutex
);
147 qemu_thread_join(compress_threads
+ i
);
148 qemu_mutex_destroy(&comp_param
[i
].mutex
);
149 qemu_cond_destroy(&comp_param
[i
].cond
);
150 deflateEnd(&comp_param
[i
].stream
);
151 g_free(comp_param
[i
].originbuf
);
152 qemu_fclose(comp_param
[i
].file
);
153 comp_param
[i
].file
= NULL
;
155 qemu_mutex_destroy(&comp_done_lock
);
156 qemu_cond_destroy(&comp_done_cond
);
157 g_free(compress_threads
);
159 compress_threads
= NULL
;
163 int compress_threads_save_setup(void)
167 if (!migrate_compress()) {
170 thread_count
= migrate_compress_threads();
171 compress_threads
= g_new0(QemuThread
, thread_count
);
172 comp_param
= g_new0(CompressParam
, thread_count
);
173 qemu_cond_init(&comp_done_cond
);
174 qemu_mutex_init(&comp_done_lock
);
175 for (i
= 0; i
< thread_count
; i
++) {
176 comp_param
[i
].originbuf
= g_try_malloc(qemu_target_page_size());
177 if (!comp_param
[i
].originbuf
) {
181 if (deflateInit(&comp_param
[i
].stream
,
182 migrate_compress_level()) != Z_OK
) {
183 g_free(comp_param
[i
].originbuf
);
187 /* comp_param[i].file is just used as a dummy buffer to save data,
188 * set its ops to empty.
190 comp_param
[i
].file
= qemu_file_new_output(
191 QIO_CHANNEL(qio_channel_null_new()));
192 comp_param
[i
].done
= true;
193 comp_param
[i
].quit
= false;
194 qemu_mutex_init(&comp_param
[i
].mutex
);
195 qemu_cond_init(&comp_param
[i
].cond
);
196 qemu_thread_create(compress_threads
+ i
, "compress",
197 do_data_compress
, comp_param
+ i
,
198 QEMU_THREAD_JOINABLE
);
203 compress_threads_save_cleanup();
207 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
208 RAMBlock
*block
, ram_addr_t offset
,
211 uint8_t *p
= block
->host
+ offset
;
212 size_t page_size
= qemu_target_page_size();
215 assert(qemu_file_buffer_empty(f
));
217 if (buffer_is_zero(p
, page_size
)) {
222 * copy it to a internal buffer to avoid it being modified by VM
223 * so that we can catch up the error during compression and
226 memcpy(source_buf
, p
, page_size
);
227 ret
= qemu_put_compression_data(f
, stream
, source_buf
, page_size
);
229 qemu_file_set_error(migrate_get_current()->to_dst_file
, ret
);
230 error_report("compressed data failed!");
237 static inline void compress_reset_result(CompressParam
*param
)
239 param
->result
= RES_NONE
;
244 void compress_flush_data(void)
246 int thread_count
= migrate_compress_threads();
248 if (!migrate_compress()) {
252 qemu_mutex_lock(&comp_done_lock
);
253 for (int i
= 0; i
< thread_count
; i
++) {
254 while (!comp_param
[i
].done
) {
255 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
258 qemu_mutex_unlock(&comp_done_lock
);
260 for (int i
= 0; i
< thread_count
; i
++) {
261 qemu_mutex_lock(&comp_param
[i
].mutex
);
262 if (!comp_param
[i
].quit
) {
263 CompressParam
*param
= &comp_param
[i
];
264 compress_send_queued_data(param
);
265 assert(qemu_file_buffer_empty(param
->file
));
266 compress_reset_result(param
);
268 qemu_mutex_unlock(&comp_param
[i
].mutex
);
272 static inline void set_compress_params(CompressParam
*param
, RAMBlock
*block
,
275 param
->block
= block
;
276 param
->offset
= offset
;
277 param
->trigger
= true;
281 * Return true when it compress a page
283 bool compress_page_with_multi_thread(RAMBlock
*block
, ram_addr_t offset
,
284 int (send_queued_data(CompressParam
*)))
287 bool wait
= migrate_compress_wait_thread();
289 thread_count
= migrate_compress_threads();
290 qemu_mutex_lock(&comp_done_lock
);
293 for (int i
= 0; i
< thread_count
; i
++) {
294 if (comp_param
[i
].done
) {
295 CompressParam
*param
= &comp_param
[i
];
296 qemu_mutex_lock(¶m
->mutex
);
298 send_queued_data(param
);
299 assert(qemu_file_buffer_empty(param
->file
));
300 compress_reset_result(param
);
301 set_compress_params(param
, block
, offset
);
303 qemu_cond_signal(¶m
->cond
);
304 qemu_mutex_unlock(¶m
->mutex
);
305 qemu_mutex_unlock(&comp_done_lock
);
310 qemu_mutex_unlock(&comp_done_lock
);
311 compression_counters
.busy
++;
315 * wait for a free thread if the user specifies
316 * 'compress-wait-thread', otherwise we will post the page out
317 * in the main thread as normal page.
319 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
323 /* return the size after decompression, or negative value on error */
325 qemu_uncompress_data(z_stream
*stream
, uint8_t *dest
, size_t dest_len
,
326 const uint8_t *source
, size_t source_len
)
330 err
= inflateReset(stream
);
335 stream
->avail_in
= source_len
;
336 stream
->next_in
= (uint8_t *)source
;
337 stream
->avail_out
= dest_len
;
338 stream
->next_out
= dest
;
340 err
= inflate(stream
, Z_NO_FLUSH
);
341 if (err
!= Z_STREAM_END
) {
345 return stream
->total_out
;
348 static void *do_data_decompress(void *opaque
)
350 DecompressParam
*param
= opaque
;
351 unsigned long pagesize
;
355 qemu_mutex_lock(¶m
->mutex
);
356 while (!param
->quit
) {
361 qemu_mutex_unlock(¶m
->mutex
);
363 pagesize
= qemu_target_page_size();
365 ret
= qemu_uncompress_data(¶m
->stream
, des
, pagesize
,
366 param
->compbuf
, len
);
367 if (ret
< 0 && migrate_get_current()->decompress_error_check
) {
368 error_report("decompress data failed");
369 qemu_file_set_error(decomp_file
, ret
);
372 qemu_mutex_lock(&decomp_done_lock
);
374 qemu_cond_signal(&decomp_done_cond
);
375 qemu_mutex_unlock(&decomp_done_lock
);
377 qemu_mutex_lock(¶m
->mutex
);
379 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
382 qemu_mutex_unlock(¶m
->mutex
);
387 int wait_for_decompress_done(void)
389 if (!migrate_compress()) {
393 int thread_count
= migrate_decompress_threads();
394 qemu_mutex_lock(&decomp_done_lock
);
395 for (int i
= 0; i
< thread_count
; i
++) {
396 while (!decomp_param
[i
].done
) {
397 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
400 qemu_mutex_unlock(&decomp_done_lock
);
401 return qemu_file_get_error(decomp_file
);
404 void compress_threads_load_cleanup(void)
408 if (!migrate_compress()) {
411 thread_count
= migrate_decompress_threads();
412 for (i
= 0; i
< thread_count
; i
++) {
414 * we use it as a indicator which shows if the thread is
415 * properly init'd or not
417 if (!decomp_param
[i
].compbuf
) {
421 qemu_mutex_lock(&decomp_param
[i
].mutex
);
422 decomp_param
[i
].quit
= true;
423 qemu_cond_signal(&decomp_param
[i
].cond
);
424 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
426 for (i
= 0; i
< thread_count
; i
++) {
427 if (!decomp_param
[i
].compbuf
) {
431 qemu_thread_join(decompress_threads
+ i
);
432 qemu_mutex_destroy(&decomp_param
[i
].mutex
);
433 qemu_cond_destroy(&decomp_param
[i
].cond
);
434 inflateEnd(&decomp_param
[i
].stream
);
435 g_free(decomp_param
[i
].compbuf
);
436 decomp_param
[i
].compbuf
= NULL
;
438 g_free(decompress_threads
);
439 g_free(decomp_param
);
440 decompress_threads
= NULL
;
445 int compress_threads_load_setup(QEMUFile
*f
)
449 if (!migrate_compress()) {
454 * set compression_counters memory to zero for a new migration
456 memset(&compression_counters
, 0, sizeof(compression_counters
));
458 thread_count
= migrate_decompress_threads();
459 decompress_threads
= g_new0(QemuThread
, thread_count
);
460 decomp_param
= g_new0(DecompressParam
, thread_count
);
461 qemu_mutex_init(&decomp_done_lock
);
462 qemu_cond_init(&decomp_done_cond
);
464 for (i
= 0; i
< thread_count
; i
++) {
465 if (inflateInit(&decomp_param
[i
].stream
) != Z_OK
) {
469 size_t compbuf_size
= compressBound(qemu_target_page_size());
470 decomp_param
[i
].compbuf
= g_malloc0(compbuf_size
);
471 qemu_mutex_init(&decomp_param
[i
].mutex
);
472 qemu_cond_init(&decomp_param
[i
].cond
);
473 decomp_param
[i
].done
= true;
474 decomp_param
[i
].quit
= false;
475 qemu_thread_create(decompress_threads
+ i
, "decompress",
476 do_data_decompress
, decomp_param
+ i
,
477 QEMU_THREAD_JOINABLE
);
481 compress_threads_load_cleanup();
485 void decompress_data_with_multi_threads(QEMUFile
*f
, void *host
, int len
)
487 int thread_count
= migrate_decompress_threads();
488 QEMU_LOCK_GUARD(&decomp_done_lock
);
490 for (int i
= 0; i
< thread_count
; i
++) {
491 if (decomp_param
[i
].done
) {
492 decomp_param
[i
].done
= false;
493 qemu_mutex_lock(&decomp_param
[i
].mutex
);
494 qemu_get_buffer(f
, decomp_param
[i
].compbuf
, len
);
495 decomp_param
[i
].des
= host
;
496 decomp_param
[i
].len
= len
;
497 qemu_cond_signal(&decomp_param
[i
].cond
);
498 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
502 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
506 void populate_compress(MigrationInfo
*info
)
508 if (!migrate_compress()) {
511 info
->compression
= g_malloc0(sizeof(*info
->compression
));
512 info
->compression
->pages
= compression_counters
.pages
;
513 info
->compression
->busy
= compression_counters
.busy
;
514 info
->compression
->busy_rate
= compression_counters
.busy_rate
;
515 info
->compression
->compressed_size
= compression_counters
.compressed_size
;
516 info
->compression
->compression_rate
= compression_counters
.compression_rate
;
519 uint64_t compress_ram_pages(void)
521 return compression_counters
.pages
;
524 void update_compress_thread_counts(const CompressParam
*param
, int bytes_xmit
)
526 ram_transferred_add(bytes_xmit
);
528 if (param
->result
== RES_ZEROPAGE
) {
529 stat64_add(&mig_stats
.zero_pages
, 1);
533 /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
534 compression_counters
.compressed_size
+= bytes_xmit
- 8;
535 compression_counters
.pages
++;
538 void compress_update_rates(uint64_t page_count
)
540 if (!migrate_compress()) {
543 compression_counters
.busy_rate
= (double)(compression_counters
.busy
-
544 compression_counters
.compress_thread_busy_prev
) / page_count
;
545 compression_counters
.compress_thread_busy_prev
=
546 compression_counters
.busy
;
548 double compressed_size
= compression_counters
.compressed_size
-
549 compression_counters
.compressed_size_prev
;
550 if (compressed_size
) {
551 double uncompressed_size
= (compression_counters
.pages
-
552 compression_counters
.compress_pages_prev
) *
553 qemu_target_page_size();
555 /* Compression-Ratio = Uncompressed-size / Compressed-size */
556 compression_counters
.compression_rate
=
557 uncompressed_size
/ compressed_size
;
559 compression_counters
.compress_pages_prev
=
560 compression_counters
.pages
;
561 compression_counters
.compressed_size_prev
=
562 compression_counters
.compressed_size
;