ram.c: Move core decompression code into its own file
[qemu/armbru.git] / migration / ram-compress.c
blobc25562f12d204618d3a6415514e312c4f704a24a
1 /*
2 * QEMU System Emulator
4 * Copyright (c) 2003-2008 Fabrice Bellard
5 * Copyright (c) 2011-2015 Red Hat Inc
7 * Authors:
8 * Juan Quintela <quintela@redhat.com>
10 * Permission is hereby granted, free of charge, to any person obtaining a copy
11 * of this software and associated documentation files (the "Software"), to deal
12 * in the Software without restriction, including without limitation the rights
13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14 * copies of the Software, and to permit persons to whom the Software is
15 * furnished to do so, subject to the following conditions:
17 * The above copyright notice and this permission notice shall be included in
18 * all copies or substantial portions of the Software.
20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 * THE SOFTWARE.
29 #include "qemu/osdep.h"
30 #include "qemu/cutils.h"
32 #include "ram-compress.h"
34 #include "qemu/error-report.h"
35 #include "migration.h"
36 #include "options.h"
37 #include "io/channel-null.h"
38 #include "exec/ram_addr.h"
40 CompressionStats compression_counters;
42 static CompressParam *comp_param;
43 static QemuThread *compress_threads;
44 /* comp_done_cond is used to wake up the migration thread when
45 * one of the compression threads has finished the compression.
46 * comp_done_lock is used to co-work with comp_done_cond.
48 static QemuMutex comp_done_lock;
49 static QemuCond comp_done_cond;
51 struct DecompressParam {
52 bool done;
53 bool quit;
54 QemuMutex mutex;
55 QemuCond cond;
56 void *des;
57 uint8_t *compbuf;
58 int len;
59 z_stream stream;
61 typedef struct DecompressParam DecompressParam;
63 static QEMUFile *decomp_file;
64 static DecompressParam *decomp_param;
65 static QemuThread *decompress_threads;
66 static QemuMutex decomp_done_lock;
67 static QemuCond decomp_done_cond;
69 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
70 RAMBlock *block, ram_addr_t offset,
71 uint8_t *source_buf);
73 static void *do_data_compress(void *opaque)
75 CompressParam *param = opaque;
76 RAMBlock *block;
77 ram_addr_t offset;
78 CompressResult result;
80 qemu_mutex_lock(&param->mutex);
81 while (!param->quit) {
82 if (param->trigger) {
83 block = param->block;
84 offset = param->offset;
85 param->trigger = false;
86 qemu_mutex_unlock(&param->mutex);
88 result = do_compress_ram_page(param->file, &param->stream,
89 block, offset, param->originbuf);
91 qemu_mutex_lock(&comp_done_lock);
92 param->done = true;
93 param->result = result;
94 qemu_cond_signal(&comp_done_cond);
95 qemu_mutex_unlock(&comp_done_lock);
97 qemu_mutex_lock(&param->mutex);
98 } else {
99 qemu_cond_wait(&param->cond, &param->mutex);
102 qemu_mutex_unlock(&param->mutex);
104 return NULL;
107 void compress_threads_save_cleanup(void)
109 int i, thread_count;
111 if (!migrate_compress() || !comp_param) {
112 return;
115 thread_count = migrate_compress_threads();
116 for (i = 0; i < thread_count; i++) {
118 * we use it as a indicator which shows if the thread is
119 * properly init'd or not
121 if (!comp_param[i].file) {
122 break;
125 qemu_mutex_lock(&comp_param[i].mutex);
126 comp_param[i].quit = true;
127 qemu_cond_signal(&comp_param[i].cond);
128 qemu_mutex_unlock(&comp_param[i].mutex);
130 qemu_thread_join(compress_threads + i);
131 qemu_mutex_destroy(&comp_param[i].mutex);
132 qemu_cond_destroy(&comp_param[i].cond);
133 deflateEnd(&comp_param[i].stream);
134 g_free(comp_param[i].originbuf);
135 qemu_fclose(comp_param[i].file);
136 comp_param[i].file = NULL;
138 qemu_mutex_destroy(&comp_done_lock);
139 qemu_cond_destroy(&comp_done_cond);
140 g_free(compress_threads);
141 g_free(comp_param);
142 compress_threads = NULL;
143 comp_param = NULL;
146 int compress_threads_save_setup(void)
148 int i, thread_count;
150 if (!migrate_compress()) {
151 return 0;
153 thread_count = migrate_compress_threads();
154 compress_threads = g_new0(QemuThread, thread_count);
155 comp_param = g_new0(CompressParam, thread_count);
156 qemu_cond_init(&comp_done_cond);
157 qemu_mutex_init(&comp_done_lock);
158 for (i = 0; i < thread_count; i++) {
159 comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
160 if (!comp_param[i].originbuf) {
161 goto exit;
164 if (deflateInit(&comp_param[i].stream,
165 migrate_compress_level()) != Z_OK) {
166 g_free(comp_param[i].originbuf);
167 goto exit;
170 /* comp_param[i].file is just used as a dummy buffer to save data,
171 * set its ops to empty.
173 comp_param[i].file = qemu_file_new_output(
174 QIO_CHANNEL(qio_channel_null_new()));
175 comp_param[i].done = true;
176 comp_param[i].quit = false;
177 qemu_mutex_init(&comp_param[i].mutex);
178 qemu_cond_init(&comp_param[i].cond);
179 qemu_thread_create(compress_threads + i, "compress",
180 do_data_compress, comp_param + i,
181 QEMU_THREAD_JOINABLE);
183 return 0;
185 exit:
186 compress_threads_save_cleanup();
187 return -1;
190 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
191 RAMBlock *block, ram_addr_t offset,
192 uint8_t *source_buf)
194 uint8_t *p = block->host + offset;
195 int ret;
197 if (buffer_is_zero(p, TARGET_PAGE_SIZE)) {
198 return RES_ZEROPAGE;
202 * copy it to a internal buffer to avoid it being modified by VM
203 * so that we can catch up the error during compression and
204 * decompression
206 memcpy(source_buf, p, TARGET_PAGE_SIZE);
207 ret = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
208 if (ret < 0) {
209 qemu_file_set_error(migrate_get_current()->to_dst_file, ret);
210 error_report("compressed data failed!");
211 return RES_NONE;
213 return RES_COMPRESS;
216 static inline void compress_reset_result(CompressParam *param)
218 param->result = RES_NONE;
219 param->block = NULL;
220 param->offset = 0;
223 void flush_compressed_data(int (send_queued_data(CompressParam *)))
225 int idx, thread_count;
227 thread_count = migrate_compress_threads();
229 qemu_mutex_lock(&comp_done_lock);
230 for (idx = 0; idx < thread_count; idx++) {
231 while (!comp_param[idx].done) {
232 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
235 qemu_mutex_unlock(&comp_done_lock);
237 for (idx = 0; idx < thread_count; idx++) {
238 qemu_mutex_lock(&comp_param[idx].mutex);
239 if (!comp_param[idx].quit) {
240 CompressParam *param = &comp_param[idx];
241 send_queued_data(param);
242 compress_reset_result(param);
244 qemu_mutex_unlock(&comp_param[idx].mutex);
248 static inline void set_compress_params(CompressParam *param, RAMBlock *block,
249 ram_addr_t offset)
251 param->block = block;
252 param->offset = offset;
253 param->trigger = true;
256 int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset,
257 int (send_queued_data(CompressParam *)))
259 int idx, thread_count, pages = -1;
260 bool wait = migrate_compress_wait_thread();
262 thread_count = migrate_compress_threads();
263 qemu_mutex_lock(&comp_done_lock);
264 retry:
265 for (idx = 0; idx < thread_count; idx++) {
266 if (comp_param[idx].done) {
267 CompressParam *param = &comp_param[idx];
268 qemu_mutex_lock(&param->mutex);
269 param->done = false;
270 send_queued_data(param);
271 compress_reset_result(param);
272 set_compress_params(param, block, offset);
274 qemu_cond_signal(&param->cond);
275 qemu_mutex_unlock(&param->mutex);
276 pages = 1;
277 break;
282 * wait for the free thread if the user specifies 'compress-wait-thread',
283 * otherwise we will post the page out in the main thread as normal page.
285 if (pages < 0 && wait) {
286 qemu_cond_wait(&comp_done_cond, &comp_done_lock);
287 goto retry;
289 qemu_mutex_unlock(&comp_done_lock);
291 return pages;
294 /* return the size after decompression, or negative value on error */
295 static int
296 qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
297 const uint8_t *source, size_t source_len)
299 int err;
301 err = inflateReset(stream);
302 if (err != Z_OK) {
303 return -1;
306 stream->avail_in = source_len;
307 stream->next_in = (uint8_t *)source;
308 stream->avail_out = dest_len;
309 stream->next_out = dest;
311 err = inflate(stream, Z_NO_FLUSH);
312 if (err != Z_STREAM_END) {
313 return -1;
316 return stream->total_out;
319 static void *do_data_decompress(void *opaque)
321 DecompressParam *param = opaque;
322 unsigned long pagesize;
323 uint8_t *des;
324 int len, ret;
326 qemu_mutex_lock(&param->mutex);
327 while (!param->quit) {
328 if (param->des) {
329 des = param->des;
330 len = param->len;
331 param->des = 0;
332 qemu_mutex_unlock(&param->mutex);
334 pagesize = TARGET_PAGE_SIZE;
336 ret = qemu_uncompress_data(&param->stream, des, pagesize,
337 param->compbuf, len);
338 if (ret < 0 && migrate_get_current()->decompress_error_check) {
339 error_report("decompress data failed");
340 qemu_file_set_error(decomp_file, ret);
343 qemu_mutex_lock(&decomp_done_lock);
344 param->done = true;
345 qemu_cond_signal(&decomp_done_cond);
346 qemu_mutex_unlock(&decomp_done_lock);
348 qemu_mutex_lock(&param->mutex);
349 } else {
350 qemu_cond_wait(&param->cond, &param->mutex);
353 qemu_mutex_unlock(&param->mutex);
355 return NULL;
358 int wait_for_decompress_done(void)
360 int idx, thread_count;
362 if (!migrate_compress()) {
363 return 0;
366 thread_count = migrate_decompress_threads();
367 qemu_mutex_lock(&decomp_done_lock);
368 for (idx = 0; idx < thread_count; idx++) {
369 while (!decomp_param[idx].done) {
370 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
373 qemu_mutex_unlock(&decomp_done_lock);
374 return qemu_file_get_error(decomp_file);
377 void compress_threads_load_cleanup(void)
379 int i, thread_count;
381 if (!migrate_compress()) {
382 return;
384 thread_count = migrate_decompress_threads();
385 for (i = 0; i < thread_count; i++) {
387 * we use it as a indicator which shows if the thread is
388 * properly init'd or not
390 if (!decomp_param[i].compbuf) {
391 break;
394 qemu_mutex_lock(&decomp_param[i].mutex);
395 decomp_param[i].quit = true;
396 qemu_cond_signal(&decomp_param[i].cond);
397 qemu_mutex_unlock(&decomp_param[i].mutex);
399 for (i = 0; i < thread_count; i++) {
400 if (!decomp_param[i].compbuf) {
401 break;
404 qemu_thread_join(decompress_threads + i);
405 qemu_mutex_destroy(&decomp_param[i].mutex);
406 qemu_cond_destroy(&decomp_param[i].cond);
407 inflateEnd(&decomp_param[i].stream);
408 g_free(decomp_param[i].compbuf);
409 decomp_param[i].compbuf = NULL;
411 g_free(decompress_threads);
412 g_free(decomp_param);
413 decompress_threads = NULL;
414 decomp_param = NULL;
415 decomp_file = NULL;
418 int compress_threads_load_setup(QEMUFile *f)
420 int i, thread_count;
422 if (!migrate_compress()) {
423 return 0;
426 thread_count = migrate_decompress_threads();
427 decompress_threads = g_new0(QemuThread, thread_count);
428 decomp_param = g_new0(DecompressParam, thread_count);
429 qemu_mutex_init(&decomp_done_lock);
430 qemu_cond_init(&decomp_done_cond);
431 decomp_file = f;
432 for (i = 0; i < thread_count; i++) {
433 if (inflateInit(&decomp_param[i].stream) != Z_OK) {
434 goto exit;
437 decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
438 qemu_mutex_init(&decomp_param[i].mutex);
439 qemu_cond_init(&decomp_param[i].cond);
440 decomp_param[i].done = true;
441 decomp_param[i].quit = false;
442 qemu_thread_create(decompress_threads + i, "decompress",
443 do_data_decompress, decomp_param + i,
444 QEMU_THREAD_JOINABLE);
446 return 0;
447 exit:
448 compress_threads_load_cleanup();
449 return -1;
452 void decompress_data_with_multi_threads(QEMUFile *f, void *host, int len)
454 int idx, thread_count;
456 thread_count = migrate_decompress_threads();
457 QEMU_LOCK_GUARD(&decomp_done_lock);
458 while (true) {
459 for (idx = 0; idx < thread_count; idx++) {
460 if (decomp_param[idx].done) {
461 decomp_param[idx].done = false;
462 qemu_mutex_lock(&decomp_param[idx].mutex);
463 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
464 decomp_param[idx].des = host;
465 decomp_param[idx].len = len;
466 qemu_cond_signal(&decomp_param[idx].cond);
467 qemu_mutex_unlock(&decomp_param[idx].mutex);
468 break;
471 if (idx < thread_count) {
472 break;
473 } else {
474 qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);