4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "migration.h"
37 #include "io/channel-null.h"
38 #include "exec/target_page.h"
39 #include "exec/ramblock.h"
41 CompressionStats compression_counters
;
43 static CompressParam
*comp_param
;
44 static QemuThread
*compress_threads
;
45 /* comp_done_cond is used to wake up the migration thread when
46 * one of the compression threads has finished the compression.
47 * comp_done_lock is used to co-work with comp_done_cond.
49 static QemuMutex comp_done_lock
;
50 static QemuCond comp_done_cond
;
52 struct DecompressParam
{
62 typedef struct DecompressParam DecompressParam
;
64 static QEMUFile
*decomp_file
;
65 static DecompressParam
*decomp_param
;
66 static QemuThread
*decompress_threads
;
67 static QemuMutex decomp_done_lock
;
68 static QemuCond decomp_done_cond
;
70 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
71 RAMBlock
*block
, ram_addr_t offset
,
74 static void *do_data_compress(void *opaque
)
76 CompressParam
*param
= opaque
;
79 CompressResult result
;
81 qemu_mutex_lock(¶m
->mutex
);
82 while (!param
->quit
) {
85 offset
= param
->offset
;
86 param
->trigger
= false;
87 qemu_mutex_unlock(¶m
->mutex
);
89 result
= do_compress_ram_page(param
->file
, ¶m
->stream
,
90 block
, offset
, param
->originbuf
);
92 qemu_mutex_lock(&comp_done_lock
);
94 param
->result
= result
;
95 qemu_cond_signal(&comp_done_cond
);
96 qemu_mutex_unlock(&comp_done_lock
);
98 qemu_mutex_lock(¶m
->mutex
);
100 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
103 qemu_mutex_unlock(¶m
->mutex
);
108 void compress_threads_save_cleanup(void)
112 if (!migrate_compress() || !comp_param
) {
116 thread_count
= migrate_compress_threads();
117 for (i
= 0; i
< thread_count
; i
++) {
119 * we use it as a indicator which shows if the thread is
120 * properly init'd or not
122 if (!comp_param
[i
].file
) {
126 qemu_mutex_lock(&comp_param
[i
].mutex
);
127 comp_param
[i
].quit
= true;
128 qemu_cond_signal(&comp_param
[i
].cond
);
129 qemu_mutex_unlock(&comp_param
[i
].mutex
);
131 qemu_thread_join(compress_threads
+ i
);
132 qemu_mutex_destroy(&comp_param
[i
].mutex
);
133 qemu_cond_destroy(&comp_param
[i
].cond
);
134 deflateEnd(&comp_param
[i
].stream
);
135 g_free(comp_param
[i
].originbuf
);
136 qemu_fclose(comp_param
[i
].file
);
137 comp_param
[i
].file
= NULL
;
139 qemu_mutex_destroy(&comp_done_lock
);
140 qemu_cond_destroy(&comp_done_cond
);
141 g_free(compress_threads
);
143 compress_threads
= NULL
;
147 int compress_threads_save_setup(void)
151 if (!migrate_compress()) {
154 thread_count
= migrate_compress_threads();
155 compress_threads
= g_new0(QemuThread
, thread_count
);
156 comp_param
= g_new0(CompressParam
, thread_count
);
157 qemu_cond_init(&comp_done_cond
);
158 qemu_mutex_init(&comp_done_lock
);
159 for (i
= 0; i
< thread_count
; i
++) {
160 comp_param
[i
].originbuf
= g_try_malloc(qemu_target_page_size());
161 if (!comp_param
[i
].originbuf
) {
165 if (deflateInit(&comp_param
[i
].stream
,
166 migrate_compress_level()) != Z_OK
) {
167 g_free(comp_param
[i
].originbuf
);
171 /* comp_param[i].file is just used as a dummy buffer to save data,
172 * set its ops to empty.
174 comp_param
[i
].file
= qemu_file_new_output(
175 QIO_CHANNEL(qio_channel_null_new()));
176 comp_param
[i
].done
= true;
177 comp_param
[i
].quit
= false;
178 qemu_mutex_init(&comp_param
[i
].mutex
);
179 qemu_cond_init(&comp_param
[i
].cond
);
180 qemu_thread_create(compress_threads
+ i
, "compress",
181 do_data_compress
, comp_param
+ i
,
182 QEMU_THREAD_JOINABLE
);
187 compress_threads_save_cleanup();
191 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
192 RAMBlock
*block
, ram_addr_t offset
,
195 uint8_t *p
= block
->host
+ offset
;
196 size_t page_size
= qemu_target_page_size();
199 assert(qemu_file_buffer_empty(f
));
201 if (buffer_is_zero(p
, page_size
)) {
206 * copy it to a internal buffer to avoid it being modified by VM
207 * so that we can catch up the error during compression and
210 memcpy(source_buf
, p
, page_size
);
211 ret
= qemu_put_compression_data(f
, stream
, source_buf
, page_size
);
213 qemu_file_set_error(migrate_get_current()->to_dst_file
, ret
);
214 error_report("compressed data failed!");
221 static inline void compress_reset_result(CompressParam
*param
)
223 param
->result
= RES_NONE
;
228 void flush_compressed_data(int (send_queued_data(CompressParam
*)))
230 int idx
, thread_count
;
232 thread_count
= migrate_compress_threads();
234 qemu_mutex_lock(&comp_done_lock
);
235 for (idx
= 0; idx
< thread_count
; idx
++) {
236 while (!comp_param
[idx
].done
) {
237 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
240 qemu_mutex_unlock(&comp_done_lock
);
242 for (idx
= 0; idx
< thread_count
; idx
++) {
243 qemu_mutex_lock(&comp_param
[idx
].mutex
);
244 if (!comp_param
[idx
].quit
) {
245 CompressParam
*param
= &comp_param
[idx
];
246 send_queued_data(param
);
247 assert(qemu_file_buffer_empty(param
->file
));
248 compress_reset_result(param
);
250 qemu_mutex_unlock(&comp_param
[idx
].mutex
);
254 static inline void set_compress_params(CompressParam
*param
, RAMBlock
*block
,
257 param
->block
= block
;
258 param
->offset
= offset
;
259 param
->trigger
= true;
262 int compress_page_with_multi_thread(RAMBlock
*block
, ram_addr_t offset
,
263 int (send_queued_data(CompressParam
*)))
265 int idx
, thread_count
, pages
= -1;
266 bool wait
= migrate_compress_wait_thread();
268 thread_count
= migrate_compress_threads();
269 qemu_mutex_lock(&comp_done_lock
);
271 for (idx
= 0; idx
< thread_count
; idx
++) {
272 if (comp_param
[idx
].done
) {
273 CompressParam
*param
= &comp_param
[idx
];
274 qemu_mutex_lock(¶m
->mutex
);
276 send_queued_data(param
);
277 assert(qemu_file_buffer_empty(param
->file
));
278 compress_reset_result(param
);
279 set_compress_params(param
, block
, offset
);
281 qemu_cond_signal(¶m
->cond
);
282 qemu_mutex_unlock(¶m
->mutex
);
289 * wait for the free thread if the user specifies 'compress-wait-thread',
290 * otherwise we will post the page out in the main thread as normal page.
292 if (pages
< 0 && wait
) {
293 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
296 qemu_mutex_unlock(&comp_done_lock
);
301 /* return the size after decompression, or negative value on error */
303 qemu_uncompress_data(z_stream
*stream
, uint8_t *dest
, size_t dest_len
,
304 const uint8_t *source
, size_t source_len
)
308 err
= inflateReset(stream
);
313 stream
->avail_in
= source_len
;
314 stream
->next_in
= (uint8_t *)source
;
315 stream
->avail_out
= dest_len
;
316 stream
->next_out
= dest
;
318 err
= inflate(stream
, Z_NO_FLUSH
);
319 if (err
!= Z_STREAM_END
) {
323 return stream
->total_out
;
326 static void *do_data_decompress(void *opaque
)
328 DecompressParam
*param
= opaque
;
329 unsigned long pagesize
;
333 qemu_mutex_lock(¶m
->mutex
);
334 while (!param
->quit
) {
339 qemu_mutex_unlock(¶m
->mutex
);
341 pagesize
= qemu_target_page_size();
343 ret
= qemu_uncompress_data(¶m
->stream
, des
, pagesize
,
344 param
->compbuf
, len
);
345 if (ret
< 0 && migrate_get_current()->decompress_error_check
) {
346 error_report("decompress data failed");
347 qemu_file_set_error(decomp_file
, ret
);
350 qemu_mutex_lock(&decomp_done_lock
);
352 qemu_cond_signal(&decomp_done_cond
);
353 qemu_mutex_unlock(&decomp_done_lock
);
355 qemu_mutex_lock(¶m
->mutex
);
357 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
360 qemu_mutex_unlock(¶m
->mutex
);
365 int wait_for_decompress_done(void)
367 int idx
, thread_count
;
369 if (!migrate_compress()) {
373 thread_count
= migrate_decompress_threads();
374 qemu_mutex_lock(&decomp_done_lock
);
375 for (idx
= 0; idx
< thread_count
; idx
++) {
376 while (!decomp_param
[idx
].done
) {
377 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
380 qemu_mutex_unlock(&decomp_done_lock
);
381 return qemu_file_get_error(decomp_file
);
384 void compress_threads_load_cleanup(void)
388 if (!migrate_compress()) {
391 thread_count
= migrate_decompress_threads();
392 for (i
= 0; i
< thread_count
; i
++) {
394 * we use it as a indicator which shows if the thread is
395 * properly init'd or not
397 if (!decomp_param
[i
].compbuf
) {
401 qemu_mutex_lock(&decomp_param
[i
].mutex
);
402 decomp_param
[i
].quit
= true;
403 qemu_cond_signal(&decomp_param
[i
].cond
);
404 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
406 for (i
= 0; i
< thread_count
; i
++) {
407 if (!decomp_param
[i
].compbuf
) {
411 qemu_thread_join(decompress_threads
+ i
);
412 qemu_mutex_destroy(&decomp_param
[i
].mutex
);
413 qemu_cond_destroy(&decomp_param
[i
].cond
);
414 inflateEnd(&decomp_param
[i
].stream
);
415 g_free(decomp_param
[i
].compbuf
);
416 decomp_param
[i
].compbuf
= NULL
;
418 g_free(decompress_threads
);
419 g_free(decomp_param
);
420 decompress_threads
= NULL
;
425 int compress_threads_load_setup(QEMUFile
*f
)
429 if (!migrate_compress()) {
433 thread_count
= migrate_decompress_threads();
434 decompress_threads
= g_new0(QemuThread
, thread_count
);
435 decomp_param
= g_new0(DecompressParam
, thread_count
);
436 qemu_mutex_init(&decomp_done_lock
);
437 qemu_cond_init(&decomp_done_cond
);
439 for (i
= 0; i
< thread_count
; i
++) {
440 if (inflateInit(&decomp_param
[i
].stream
) != Z_OK
) {
444 size_t compbuf_size
= compressBound(qemu_target_page_size());
445 decomp_param
[i
].compbuf
= g_malloc0(compbuf_size
);
446 qemu_mutex_init(&decomp_param
[i
].mutex
);
447 qemu_cond_init(&decomp_param
[i
].cond
);
448 decomp_param
[i
].done
= true;
449 decomp_param
[i
].quit
= false;
450 qemu_thread_create(decompress_threads
+ i
, "decompress",
451 do_data_decompress
, decomp_param
+ i
,
452 QEMU_THREAD_JOINABLE
);
456 compress_threads_load_cleanup();
460 void decompress_data_with_multi_threads(QEMUFile
*f
, void *host
, int len
)
462 int idx
, thread_count
;
464 thread_count
= migrate_decompress_threads();
465 QEMU_LOCK_GUARD(&decomp_done_lock
);
467 for (idx
= 0; idx
< thread_count
; idx
++) {
468 if (decomp_param
[idx
].done
) {
469 decomp_param
[idx
].done
= false;
470 qemu_mutex_lock(&decomp_param
[idx
].mutex
);
471 qemu_get_buffer(f
, decomp_param
[idx
].compbuf
, len
);
472 decomp_param
[idx
].des
= host
;
473 decomp_param
[idx
].len
= len
;
474 qemu_cond_signal(&decomp_param
[idx
].cond
);
475 qemu_mutex_unlock(&decomp_param
[idx
].mutex
);
479 if (idx
< thread_count
) {
482 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);