4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "migration.h"
37 #include "io/channel-null.h"
38 #include "exec/ram_addr.h"
40 CompressionStats compression_counters
;
42 static CompressParam
*comp_param
;
43 static QemuThread
*compress_threads
;
44 /* comp_done_cond is used to wake up the migration thread when
45 * one of the compression threads has finished the compression.
46 * comp_done_lock is used to co-work with comp_done_cond.
48 static QemuMutex comp_done_lock
;
49 static QemuCond comp_done_cond
;
51 struct DecompressParam
{
61 typedef struct DecompressParam DecompressParam
;
63 static QEMUFile
*decomp_file
;
64 static DecompressParam
*decomp_param
;
65 static QemuThread
*decompress_threads
;
66 static QemuMutex decomp_done_lock
;
67 static QemuCond decomp_done_cond
;
69 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
70 RAMBlock
*block
, ram_addr_t offset
,
73 static void *do_data_compress(void *opaque
)
75 CompressParam
*param
= opaque
;
78 CompressResult result
;
80 qemu_mutex_lock(¶m
->mutex
);
81 while (!param
->quit
) {
84 offset
= param
->offset
;
85 param
->trigger
= false;
86 qemu_mutex_unlock(¶m
->mutex
);
88 result
= do_compress_ram_page(param
->file
, ¶m
->stream
,
89 block
, offset
, param
->originbuf
);
91 qemu_mutex_lock(&comp_done_lock
);
93 param
->result
= result
;
94 qemu_cond_signal(&comp_done_cond
);
95 qemu_mutex_unlock(&comp_done_lock
);
97 qemu_mutex_lock(¶m
->mutex
);
99 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
102 qemu_mutex_unlock(¶m
->mutex
);
107 void compress_threads_save_cleanup(void)
111 if (!migrate_compress() || !comp_param
) {
115 thread_count
= migrate_compress_threads();
116 for (i
= 0; i
< thread_count
; i
++) {
118 * we use it as a indicator which shows if the thread is
119 * properly init'd or not
121 if (!comp_param
[i
].file
) {
125 qemu_mutex_lock(&comp_param
[i
].mutex
);
126 comp_param
[i
].quit
= true;
127 qemu_cond_signal(&comp_param
[i
].cond
);
128 qemu_mutex_unlock(&comp_param
[i
].mutex
);
130 qemu_thread_join(compress_threads
+ i
);
131 qemu_mutex_destroy(&comp_param
[i
].mutex
);
132 qemu_cond_destroy(&comp_param
[i
].cond
);
133 deflateEnd(&comp_param
[i
].stream
);
134 g_free(comp_param
[i
].originbuf
);
135 qemu_fclose(comp_param
[i
].file
);
136 comp_param
[i
].file
= NULL
;
138 qemu_mutex_destroy(&comp_done_lock
);
139 qemu_cond_destroy(&comp_done_cond
);
140 g_free(compress_threads
);
142 compress_threads
= NULL
;
146 int compress_threads_save_setup(void)
150 if (!migrate_compress()) {
153 thread_count
= migrate_compress_threads();
154 compress_threads
= g_new0(QemuThread
, thread_count
);
155 comp_param
= g_new0(CompressParam
, thread_count
);
156 qemu_cond_init(&comp_done_cond
);
157 qemu_mutex_init(&comp_done_lock
);
158 for (i
= 0; i
< thread_count
; i
++) {
159 comp_param
[i
].originbuf
= g_try_malloc(TARGET_PAGE_SIZE
);
160 if (!comp_param
[i
].originbuf
) {
164 if (deflateInit(&comp_param
[i
].stream
,
165 migrate_compress_level()) != Z_OK
) {
166 g_free(comp_param
[i
].originbuf
);
170 /* comp_param[i].file is just used as a dummy buffer to save data,
171 * set its ops to empty.
173 comp_param
[i
].file
= qemu_file_new_output(
174 QIO_CHANNEL(qio_channel_null_new()));
175 comp_param
[i
].done
= true;
176 comp_param
[i
].quit
= false;
177 qemu_mutex_init(&comp_param
[i
].mutex
);
178 qemu_cond_init(&comp_param
[i
].cond
);
179 qemu_thread_create(compress_threads
+ i
, "compress",
180 do_data_compress
, comp_param
+ i
,
181 QEMU_THREAD_JOINABLE
);
186 compress_threads_save_cleanup();
190 static CompressResult
do_compress_ram_page(QEMUFile
*f
, z_stream
*stream
,
191 RAMBlock
*block
, ram_addr_t offset
,
194 uint8_t *p
= block
->host
+ offset
;
197 if (buffer_is_zero(p
, TARGET_PAGE_SIZE
)) {
202 * copy it to a internal buffer to avoid it being modified by VM
203 * so that we can catch up the error during compression and
206 memcpy(source_buf
, p
, TARGET_PAGE_SIZE
);
207 ret
= qemu_put_compression_data(f
, stream
, source_buf
, TARGET_PAGE_SIZE
);
209 qemu_file_set_error(migrate_get_current()->to_dst_file
, ret
);
210 error_report("compressed data failed!");
216 static inline void compress_reset_result(CompressParam
*param
)
218 param
->result
= RES_NONE
;
223 void flush_compressed_data(int (send_queued_data(CompressParam
*)))
225 int idx
, thread_count
;
227 thread_count
= migrate_compress_threads();
229 qemu_mutex_lock(&comp_done_lock
);
230 for (idx
= 0; idx
< thread_count
; idx
++) {
231 while (!comp_param
[idx
].done
) {
232 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
235 qemu_mutex_unlock(&comp_done_lock
);
237 for (idx
= 0; idx
< thread_count
; idx
++) {
238 qemu_mutex_lock(&comp_param
[idx
].mutex
);
239 if (!comp_param
[idx
].quit
) {
240 CompressParam
*param
= &comp_param
[idx
];
241 send_queued_data(param
);
242 compress_reset_result(param
);
244 qemu_mutex_unlock(&comp_param
[idx
].mutex
);
248 static inline void set_compress_params(CompressParam
*param
, RAMBlock
*block
,
251 param
->block
= block
;
252 param
->offset
= offset
;
253 param
->trigger
= true;
256 int compress_page_with_multi_thread(RAMBlock
*block
, ram_addr_t offset
,
257 int (send_queued_data(CompressParam
*)))
259 int idx
, thread_count
, pages
= -1;
260 bool wait
= migrate_compress_wait_thread();
262 thread_count
= migrate_compress_threads();
263 qemu_mutex_lock(&comp_done_lock
);
265 for (idx
= 0; idx
< thread_count
; idx
++) {
266 if (comp_param
[idx
].done
) {
267 CompressParam
*param
= &comp_param
[idx
];
268 qemu_mutex_lock(¶m
->mutex
);
270 send_queued_data(param
);
271 compress_reset_result(param
);
272 set_compress_params(param
, block
, offset
);
274 qemu_cond_signal(¶m
->cond
);
275 qemu_mutex_unlock(¶m
->mutex
);
282 * wait for the free thread if the user specifies 'compress-wait-thread',
283 * otherwise we will post the page out in the main thread as normal page.
285 if (pages
< 0 && wait
) {
286 qemu_cond_wait(&comp_done_cond
, &comp_done_lock
);
289 qemu_mutex_unlock(&comp_done_lock
);
294 /* return the size after decompression, or negative value on error */
296 qemu_uncompress_data(z_stream
*stream
, uint8_t *dest
, size_t dest_len
,
297 const uint8_t *source
, size_t source_len
)
301 err
= inflateReset(stream
);
306 stream
->avail_in
= source_len
;
307 stream
->next_in
= (uint8_t *)source
;
308 stream
->avail_out
= dest_len
;
309 stream
->next_out
= dest
;
311 err
= inflate(stream
, Z_NO_FLUSH
);
312 if (err
!= Z_STREAM_END
) {
316 return stream
->total_out
;
319 static void *do_data_decompress(void *opaque
)
321 DecompressParam
*param
= opaque
;
322 unsigned long pagesize
;
326 qemu_mutex_lock(¶m
->mutex
);
327 while (!param
->quit
) {
332 qemu_mutex_unlock(¶m
->mutex
);
334 pagesize
= TARGET_PAGE_SIZE
;
336 ret
= qemu_uncompress_data(¶m
->stream
, des
, pagesize
,
337 param
->compbuf
, len
);
338 if (ret
< 0 && migrate_get_current()->decompress_error_check
) {
339 error_report("decompress data failed");
340 qemu_file_set_error(decomp_file
, ret
);
343 qemu_mutex_lock(&decomp_done_lock
);
345 qemu_cond_signal(&decomp_done_cond
);
346 qemu_mutex_unlock(&decomp_done_lock
);
348 qemu_mutex_lock(¶m
->mutex
);
350 qemu_cond_wait(¶m
->cond
, ¶m
->mutex
);
353 qemu_mutex_unlock(¶m
->mutex
);
358 int wait_for_decompress_done(void)
360 int idx
, thread_count
;
362 if (!migrate_compress()) {
366 thread_count
= migrate_decompress_threads();
367 qemu_mutex_lock(&decomp_done_lock
);
368 for (idx
= 0; idx
< thread_count
; idx
++) {
369 while (!decomp_param
[idx
].done
) {
370 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);
373 qemu_mutex_unlock(&decomp_done_lock
);
374 return qemu_file_get_error(decomp_file
);
377 void compress_threads_load_cleanup(void)
381 if (!migrate_compress()) {
384 thread_count
= migrate_decompress_threads();
385 for (i
= 0; i
< thread_count
; i
++) {
387 * we use it as a indicator which shows if the thread is
388 * properly init'd or not
390 if (!decomp_param
[i
].compbuf
) {
394 qemu_mutex_lock(&decomp_param
[i
].mutex
);
395 decomp_param
[i
].quit
= true;
396 qemu_cond_signal(&decomp_param
[i
].cond
);
397 qemu_mutex_unlock(&decomp_param
[i
].mutex
);
399 for (i
= 0; i
< thread_count
; i
++) {
400 if (!decomp_param
[i
].compbuf
) {
404 qemu_thread_join(decompress_threads
+ i
);
405 qemu_mutex_destroy(&decomp_param
[i
].mutex
);
406 qemu_cond_destroy(&decomp_param
[i
].cond
);
407 inflateEnd(&decomp_param
[i
].stream
);
408 g_free(decomp_param
[i
].compbuf
);
409 decomp_param
[i
].compbuf
= NULL
;
411 g_free(decompress_threads
);
412 g_free(decomp_param
);
413 decompress_threads
= NULL
;
418 int compress_threads_load_setup(QEMUFile
*f
)
422 if (!migrate_compress()) {
426 thread_count
= migrate_decompress_threads();
427 decompress_threads
= g_new0(QemuThread
, thread_count
);
428 decomp_param
= g_new0(DecompressParam
, thread_count
);
429 qemu_mutex_init(&decomp_done_lock
);
430 qemu_cond_init(&decomp_done_cond
);
432 for (i
= 0; i
< thread_count
; i
++) {
433 if (inflateInit(&decomp_param
[i
].stream
) != Z_OK
) {
437 decomp_param
[i
].compbuf
= g_malloc0(compressBound(TARGET_PAGE_SIZE
));
438 qemu_mutex_init(&decomp_param
[i
].mutex
);
439 qemu_cond_init(&decomp_param
[i
].cond
);
440 decomp_param
[i
].done
= true;
441 decomp_param
[i
].quit
= false;
442 qemu_thread_create(decompress_threads
+ i
, "decompress",
443 do_data_decompress
, decomp_param
+ i
,
444 QEMU_THREAD_JOINABLE
);
448 compress_threads_load_cleanup();
452 void decompress_data_with_multi_threads(QEMUFile
*f
, void *host
, int len
)
454 int idx
, thread_count
;
456 thread_count
= migrate_decompress_threads();
457 QEMU_LOCK_GUARD(&decomp_done_lock
);
459 for (idx
= 0; idx
< thread_count
; idx
++) {
460 if (decomp_param
[idx
].done
) {
461 decomp_param
[idx
].done
= false;
462 qemu_mutex_lock(&decomp_param
[idx
].mutex
);
463 qemu_get_buffer(f
, decomp_param
[idx
].compbuf
, len
);
464 decomp_param
[idx
].des
= host
;
465 decomp_param
[idx
].len
= len
;
466 qemu_cond_signal(&decomp_param
[idx
].cond
);
467 qemu_mutex_unlock(&decomp_param
[idx
].mutex
);
471 if (idx
< thread_count
) {
474 qemu_cond_wait(&decomp_done_cond
, &decomp_done_lock
);