COLO: Remove colo_state migration struct
[qemu.git] / migration / colo.c
blobd3163b51c8727dc53d35467fede8d05ed654a3db
1 /*
2 * COarse-grain LOck-stepping Virtual Machines for Non-stop Service (COLO)
3 * (a.k.a. Fault Tolerance or Continuous Replication)
5 * Copyright (c) 2016 HUAWEI TECHNOLOGIES CO., LTD.
6 * Copyright (c) 2016 FUJITSU LIMITED
7 * Copyright (c) 2016 Intel Corporation
9 * This work is licensed under the terms of the GNU GPL, version 2 or
10 * later. See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "sysemu/sysemu.h"
15 #include "qapi/error.h"
16 #include "qapi/qapi-commands-migration.h"
17 #include "qemu-file-channel.h"
18 #include "migration.h"
19 #include "qemu-file.h"
20 #include "savevm.h"
21 #include "migration/colo.h"
22 #include "block.h"
23 #include "io/channel-buffer.h"
24 #include "trace.h"
25 #include "qemu/error-report.h"
26 #include "migration/failover.h"
27 #include "replication.h"
28 #include "net/colo-compare.h"
29 #include "net/colo.h"
30 #include "block/block.h"
32 static bool vmstate_loading;
33 static Notifier packets_compare_notifier;
35 #define COLO_BUFFER_BASE_SIZE (4 * 1024 * 1024)
37 bool migration_in_colo_state(void)
39 MigrationState *s = migrate_get_current();
41 return (s->state == MIGRATION_STATUS_COLO);
44 bool migration_incoming_in_colo_state(void)
46 MigrationIncomingState *mis = migration_incoming_get_current();
48 return mis && (mis->state == MIGRATION_STATUS_COLO);
51 static bool colo_runstate_is_stopped(void)
53 return runstate_check(RUN_STATE_COLO) || !runstate_is_running();
56 static void secondary_vm_do_failover(void)
58 int old_state;
59 MigrationIncomingState *mis = migration_incoming_get_current();
60 Error *local_err = NULL;
62 /* Can not do failover during the process of VM's loading VMstate, Or
63 * it will break the secondary VM.
65 if (vmstate_loading) {
66 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
67 FAILOVER_STATUS_RELAUNCH);
68 if (old_state != FAILOVER_STATUS_ACTIVE) {
69 error_report("Unknown error while do failover for secondary VM,"
70 "old_state: %s", FailoverStatus_str(old_state));
72 return;
75 migrate_set_state(&mis->state, MIGRATION_STATUS_COLO,
76 MIGRATION_STATUS_COMPLETED);
78 replication_stop_all(true, &local_err);
79 if (local_err) {
80 error_report_err(local_err);
83 if (!autostart) {
84 error_report("\"-S\" qemu option will be ignored in secondary side");
85 /* recover runstate to normal migration finish state */
86 autostart = true;
89 * Make sure COLO incoming thread not block in recv or send,
90 * If mis->from_src_file and mis->to_src_file use the same fd,
91 * The second shutdown() will return -1, we ignore this value,
92 * It is harmless.
94 if (mis->from_src_file) {
95 qemu_file_shutdown(mis->from_src_file);
97 if (mis->to_src_file) {
98 qemu_file_shutdown(mis->to_src_file);
101 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
102 FAILOVER_STATUS_COMPLETED);
103 if (old_state != FAILOVER_STATUS_ACTIVE) {
104 error_report("Incorrect state (%s) while doing failover for "
105 "secondary VM", FailoverStatus_str(old_state));
106 return;
108 /* Notify COLO incoming thread that failover work is finished */
109 qemu_sem_post(&mis->colo_incoming_sem);
110 /* For Secondary VM, jump to incoming co */
111 if (mis->migration_incoming_co) {
112 qemu_coroutine_enter(mis->migration_incoming_co);
116 static void primary_vm_do_failover(void)
118 MigrationState *s = migrate_get_current();
119 int old_state;
120 Error *local_err = NULL;
122 migrate_set_state(&s->state, MIGRATION_STATUS_COLO,
123 MIGRATION_STATUS_COMPLETED);
126 * Wake up COLO thread which may blocked in recv() or send(),
127 * The s->rp_state.from_dst_file and s->to_dst_file may use the
128 * same fd, but we still shutdown the fd for twice, it is harmless.
130 if (s->to_dst_file) {
131 qemu_file_shutdown(s->to_dst_file);
133 if (s->rp_state.from_dst_file) {
134 qemu_file_shutdown(s->rp_state.from_dst_file);
137 old_state = failover_set_state(FAILOVER_STATUS_ACTIVE,
138 FAILOVER_STATUS_COMPLETED);
139 if (old_state != FAILOVER_STATUS_ACTIVE) {
140 error_report("Incorrect state (%s) while doing failover for Primary VM",
141 FailoverStatus_str(old_state));
142 return;
145 replication_stop_all(true, &local_err);
146 if (local_err) {
147 error_report_err(local_err);
148 local_err = NULL;
151 /* Notify COLO thread that failover work is finished */
152 qemu_sem_post(&s->colo_exit_sem);
155 COLOMode get_colo_mode(void)
157 if (migration_in_colo_state()) {
158 return COLO_MODE_PRIMARY;
159 } else if (migration_incoming_in_colo_state()) {
160 return COLO_MODE_SECONDARY;
161 } else {
162 return COLO_MODE_UNKNOWN;
166 void colo_do_failover(MigrationState *s)
168 /* Make sure VM stopped while failover happened. */
169 if (!colo_runstate_is_stopped()) {
170 vm_stop_force_state(RUN_STATE_COLO);
173 if (get_colo_mode() == COLO_MODE_PRIMARY) {
174 primary_vm_do_failover();
175 } else {
176 secondary_vm_do_failover();
180 void qmp_xen_set_replication(bool enable, bool primary,
181 bool has_failover, bool failover,
182 Error **errp)
184 #ifdef CONFIG_REPLICATION
185 ReplicationMode mode = primary ?
186 REPLICATION_MODE_PRIMARY :
187 REPLICATION_MODE_SECONDARY;
189 if (has_failover && enable) {
190 error_setg(errp, "Parameter 'failover' is only for"
191 " stopping replication");
192 return;
195 if (enable) {
196 replication_start_all(mode, errp);
197 } else {
198 if (!has_failover) {
199 failover = NULL;
201 replication_stop_all(failover, failover ? NULL : errp);
203 #else
204 abort();
205 #endif
208 ReplicationStatus *qmp_query_xen_replication_status(Error **errp)
210 #ifdef CONFIG_REPLICATION
211 Error *err = NULL;
212 ReplicationStatus *s = g_new0(ReplicationStatus, 1);
214 replication_get_error_all(&err);
215 if (err) {
216 s->error = true;
217 s->has_desc = true;
218 s->desc = g_strdup(error_get_pretty(err));
219 } else {
220 s->error = false;
223 error_free(err);
224 return s;
225 #else
226 abort();
227 #endif
230 void qmp_xen_colo_do_checkpoint(Error **errp)
232 #ifdef CONFIG_REPLICATION
233 replication_do_checkpoint_all(errp);
234 #else
235 abort();
236 #endif
239 static void colo_send_message(QEMUFile *f, COLOMessage msg,
240 Error **errp)
242 int ret;
244 if (msg >= COLO_MESSAGE__MAX) {
245 error_setg(errp, "%s: Invalid message", __func__);
246 return;
248 qemu_put_be32(f, msg);
249 qemu_fflush(f);
251 ret = qemu_file_get_error(f);
252 if (ret < 0) {
253 error_setg_errno(errp, -ret, "Can't send COLO message");
255 trace_colo_send_message(COLOMessage_str(msg));
258 static void colo_send_message_value(QEMUFile *f, COLOMessage msg,
259 uint64_t value, Error **errp)
261 Error *local_err = NULL;
262 int ret;
264 colo_send_message(f, msg, &local_err);
265 if (local_err) {
266 error_propagate(errp, local_err);
267 return;
269 qemu_put_be64(f, value);
270 qemu_fflush(f);
272 ret = qemu_file_get_error(f);
273 if (ret < 0) {
274 error_setg_errno(errp, -ret, "Failed to send value for message:%s",
275 COLOMessage_str(msg));
279 static COLOMessage colo_receive_message(QEMUFile *f, Error **errp)
281 COLOMessage msg;
282 int ret;
284 msg = qemu_get_be32(f);
285 ret = qemu_file_get_error(f);
286 if (ret < 0) {
287 error_setg_errno(errp, -ret, "Can't receive COLO message");
288 return msg;
290 if (msg >= COLO_MESSAGE__MAX) {
291 error_setg(errp, "%s: Invalid message", __func__);
292 return msg;
294 trace_colo_receive_message(COLOMessage_str(msg));
295 return msg;
298 static void colo_receive_check_message(QEMUFile *f, COLOMessage expect_msg,
299 Error **errp)
301 COLOMessage msg;
302 Error *local_err = NULL;
304 msg = colo_receive_message(f, &local_err);
305 if (local_err) {
306 error_propagate(errp, local_err);
307 return;
309 if (msg != expect_msg) {
310 error_setg(errp, "Unexpected COLO message %d, expected %d",
311 msg, expect_msg);
315 static uint64_t colo_receive_message_value(QEMUFile *f, uint32_t expect_msg,
316 Error **errp)
318 Error *local_err = NULL;
319 uint64_t value;
320 int ret;
322 colo_receive_check_message(f, expect_msg, &local_err);
323 if (local_err) {
324 error_propagate(errp, local_err);
325 return 0;
328 value = qemu_get_be64(f);
329 ret = qemu_file_get_error(f);
330 if (ret < 0) {
331 error_setg_errno(errp, -ret, "Failed to get value for COLO message: %s",
332 COLOMessage_str(expect_msg));
334 return value;
337 static int colo_do_checkpoint_transaction(MigrationState *s,
338 QIOChannelBuffer *bioc,
339 QEMUFile *fb)
341 Error *local_err = NULL;
342 int ret = -1;
344 colo_send_message(s->to_dst_file, COLO_MESSAGE_CHECKPOINT_REQUEST,
345 &local_err);
346 if (local_err) {
347 goto out;
350 colo_receive_check_message(s->rp_state.from_dst_file,
351 COLO_MESSAGE_CHECKPOINT_REPLY, &local_err);
352 if (local_err) {
353 goto out;
355 /* Reset channel-buffer directly */
356 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
357 bioc->usage = 0;
359 qemu_mutex_lock_iothread();
360 if (failover_get_state() != FAILOVER_STATUS_NONE) {
361 qemu_mutex_unlock_iothread();
362 goto out;
364 vm_stop_force_state(RUN_STATE_COLO);
365 qemu_mutex_unlock_iothread();
366 trace_colo_vm_state_change("run", "stop");
368 * Failover request bh could be called after vm_stop_force_state(),
369 * So we need check failover_request_is_active() again.
371 if (failover_get_state() != FAILOVER_STATUS_NONE) {
372 goto out;
375 colo_notify_compares_event(NULL, COLO_EVENT_CHECKPOINT, &local_err);
376 if (local_err) {
377 goto out;
380 /* Disable block migration */
381 migrate_set_block_enabled(false, &local_err);
382 qemu_savevm_state_header(fb);
383 qemu_savevm_state_setup(fb);
384 qemu_mutex_lock_iothread();
385 replication_do_checkpoint_all(&local_err);
386 if (local_err) {
387 qemu_mutex_unlock_iothread();
388 goto out;
390 qemu_savevm_state_complete_precopy(fb, false, false);
391 qemu_mutex_unlock_iothread();
393 qemu_fflush(fb);
395 colo_send_message(s->to_dst_file, COLO_MESSAGE_VMSTATE_SEND, &local_err);
396 if (local_err) {
397 goto out;
400 * We need the size of the VMstate data in Secondary side,
401 * With which we can decide how much data should be read.
403 colo_send_message_value(s->to_dst_file, COLO_MESSAGE_VMSTATE_SIZE,
404 bioc->usage, &local_err);
405 if (local_err) {
406 goto out;
409 qemu_put_buffer(s->to_dst_file, bioc->data, bioc->usage);
410 qemu_fflush(s->to_dst_file);
411 ret = qemu_file_get_error(s->to_dst_file);
412 if (ret < 0) {
413 goto out;
416 colo_receive_check_message(s->rp_state.from_dst_file,
417 COLO_MESSAGE_VMSTATE_RECEIVED, &local_err);
418 if (local_err) {
419 goto out;
422 colo_receive_check_message(s->rp_state.from_dst_file,
423 COLO_MESSAGE_VMSTATE_LOADED, &local_err);
424 if (local_err) {
425 goto out;
428 ret = 0;
430 qemu_mutex_lock_iothread();
431 vm_start();
432 qemu_mutex_unlock_iothread();
433 trace_colo_vm_state_change("stop", "run");
435 out:
436 if (local_err) {
437 error_report_err(local_err);
439 return ret;
442 static void colo_compare_notify_checkpoint(Notifier *notifier, void *data)
444 colo_checkpoint_notify(data);
447 static void colo_process_checkpoint(MigrationState *s)
449 QIOChannelBuffer *bioc;
450 QEMUFile *fb = NULL;
451 int64_t current_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
452 Error *local_err = NULL;
453 int ret;
455 failover_init_state();
457 s->rp_state.from_dst_file = qemu_file_get_return_path(s->to_dst_file);
458 if (!s->rp_state.from_dst_file) {
459 error_report("Open QEMUFile from_dst_file failed");
460 goto out;
463 packets_compare_notifier.notify = colo_compare_notify_checkpoint;
464 colo_compare_register_notifier(&packets_compare_notifier);
467 * Wait for Secondary finish loading VM states and enter COLO
468 * restore.
470 colo_receive_check_message(s->rp_state.from_dst_file,
471 COLO_MESSAGE_CHECKPOINT_READY, &local_err);
472 if (local_err) {
473 goto out;
475 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
476 fb = qemu_fopen_channel_output(QIO_CHANNEL(bioc));
477 object_unref(OBJECT(bioc));
479 qemu_mutex_lock_iothread();
480 replication_start_all(REPLICATION_MODE_PRIMARY, &local_err);
481 if (local_err) {
482 qemu_mutex_unlock_iothread();
483 goto out;
486 vm_start();
487 qemu_mutex_unlock_iothread();
488 trace_colo_vm_state_change("stop", "run");
490 timer_mod(s->colo_delay_timer,
491 current_time + s->parameters.x_checkpoint_delay);
493 while (s->state == MIGRATION_STATUS_COLO) {
494 if (failover_get_state() != FAILOVER_STATUS_NONE) {
495 error_report("failover request");
496 goto out;
499 qemu_sem_wait(&s->colo_checkpoint_sem);
501 ret = colo_do_checkpoint_transaction(s, bioc, fb);
502 if (ret < 0) {
503 goto out;
507 out:
508 /* Throw the unreported error message after exited from loop */
509 if (local_err) {
510 error_report_err(local_err);
513 if (fb) {
514 qemu_fclose(fb);
517 /* Hope this not to be too long to wait here */
518 qemu_sem_wait(&s->colo_exit_sem);
519 qemu_sem_destroy(&s->colo_exit_sem);
522 * It is safe to unregister notifier after failover finished.
523 * Besides, colo_delay_timer and colo_checkpoint_sem can't be
524 * released befor unregister notifier, or there will be use-after-free
525 * error.
527 colo_compare_unregister_notifier(&packets_compare_notifier);
528 timer_del(s->colo_delay_timer);
529 timer_free(s->colo_delay_timer);
530 qemu_sem_destroy(&s->colo_checkpoint_sem);
533 * Must be called after failover BH is completed,
534 * Or the failover BH may shutdown the wrong fd that
535 * re-used by other threads after we release here.
537 if (s->rp_state.from_dst_file) {
538 qemu_fclose(s->rp_state.from_dst_file);
542 void colo_checkpoint_notify(void *opaque)
544 MigrationState *s = opaque;
545 int64_t next_notify_time;
547 qemu_sem_post(&s->colo_checkpoint_sem);
548 s->colo_checkpoint_time = qemu_clock_get_ms(QEMU_CLOCK_HOST);
549 next_notify_time = s->colo_checkpoint_time +
550 s->parameters.x_checkpoint_delay;
551 timer_mod(s->colo_delay_timer, next_notify_time);
554 void migrate_start_colo_process(MigrationState *s)
556 qemu_mutex_unlock_iothread();
557 qemu_sem_init(&s->colo_checkpoint_sem, 0);
558 s->colo_delay_timer = timer_new_ms(QEMU_CLOCK_HOST,
559 colo_checkpoint_notify, s);
561 qemu_sem_init(&s->colo_exit_sem, 0);
562 migrate_set_state(&s->state, MIGRATION_STATUS_ACTIVE,
563 MIGRATION_STATUS_COLO);
564 colo_process_checkpoint(s);
565 qemu_mutex_lock_iothread();
568 static void colo_wait_handle_message(QEMUFile *f, int *checkpoint_request,
569 Error **errp)
571 COLOMessage msg;
572 Error *local_err = NULL;
574 msg = colo_receive_message(f, &local_err);
575 if (local_err) {
576 error_propagate(errp, local_err);
577 return;
580 switch (msg) {
581 case COLO_MESSAGE_CHECKPOINT_REQUEST:
582 *checkpoint_request = 1;
583 break;
584 default:
585 *checkpoint_request = 0;
586 error_setg(errp, "Got unknown COLO message: %d", msg);
587 break;
591 void *colo_process_incoming_thread(void *opaque)
593 MigrationIncomingState *mis = opaque;
594 QEMUFile *fb = NULL;
595 QIOChannelBuffer *bioc = NULL; /* Cache incoming device state */
596 uint64_t total_size;
597 uint64_t value;
598 Error *local_err = NULL;
600 rcu_register_thread();
601 qemu_sem_init(&mis->colo_incoming_sem, 0);
603 migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
604 MIGRATION_STATUS_COLO);
606 failover_init_state();
608 mis->to_src_file = qemu_file_get_return_path(mis->from_src_file);
609 if (!mis->to_src_file) {
610 error_report("COLO incoming thread: Open QEMUFile to_src_file failed");
611 goto out;
614 * Note: the communication between Primary side and Secondary side
615 * should be sequential, we set the fd to unblocked in migration incoming
616 * coroutine, and here we are in the COLO incoming thread, so it is ok to
617 * set the fd back to blocked.
619 qemu_file_set_blocking(mis->from_src_file, true);
621 bioc = qio_channel_buffer_new(COLO_BUFFER_BASE_SIZE);
622 fb = qemu_fopen_channel_input(QIO_CHANNEL(bioc));
623 object_unref(OBJECT(bioc));
625 qemu_mutex_lock_iothread();
626 replication_start_all(REPLICATION_MODE_SECONDARY, &local_err);
627 if (local_err) {
628 qemu_mutex_unlock_iothread();
629 goto out;
631 vm_start();
632 trace_colo_vm_state_change("stop", "run");
633 qemu_mutex_unlock_iothread();
635 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_READY,
636 &local_err);
637 if (local_err) {
638 goto out;
641 while (mis->state == MIGRATION_STATUS_COLO) {
642 int request = 0;
644 colo_wait_handle_message(mis->from_src_file, &request, &local_err);
645 if (local_err) {
646 goto out;
648 assert(request);
649 if (failover_get_state() != FAILOVER_STATUS_NONE) {
650 error_report("failover request");
651 goto out;
654 qemu_mutex_lock_iothread();
655 vm_stop_force_state(RUN_STATE_COLO);
656 trace_colo_vm_state_change("run", "stop");
657 qemu_mutex_unlock_iothread();
659 /* FIXME: This is unnecessary for periodic checkpoint mode */
660 colo_send_message(mis->to_src_file, COLO_MESSAGE_CHECKPOINT_REPLY,
661 &local_err);
662 if (local_err) {
663 goto out;
666 colo_receive_check_message(mis->from_src_file,
667 COLO_MESSAGE_VMSTATE_SEND, &local_err);
668 if (local_err) {
669 goto out;
672 value = colo_receive_message_value(mis->from_src_file,
673 COLO_MESSAGE_VMSTATE_SIZE, &local_err);
674 if (local_err) {
675 goto out;
679 * Read VM device state data into channel buffer,
680 * It's better to re-use the memory allocated.
681 * Here we need to handle the channel buffer directly.
683 if (value > bioc->capacity) {
684 bioc->capacity = value;
685 bioc->data = g_realloc(bioc->data, bioc->capacity);
687 total_size = qemu_get_buffer(mis->from_src_file, bioc->data, value);
688 if (total_size != value) {
689 error_report("Got %" PRIu64 " VMState data, less than expected"
690 " %" PRIu64, total_size, value);
691 goto out;
693 bioc->usage = total_size;
694 qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
696 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_RECEIVED,
697 &local_err);
698 if (local_err) {
699 goto out;
702 qemu_mutex_lock_iothread();
703 qemu_system_reset(SHUTDOWN_CAUSE_NONE);
704 vmstate_loading = true;
705 if (qemu_loadvm_state(fb) < 0) {
706 error_report("COLO: loadvm failed");
707 qemu_mutex_unlock_iothread();
708 goto out;
711 replication_get_error_all(&local_err);
712 if (local_err) {
713 qemu_mutex_unlock_iothread();
714 goto out;
716 /* discard colo disk buffer */
717 replication_do_checkpoint_all(&local_err);
718 if (local_err) {
719 qemu_mutex_unlock_iothread();
720 goto out;
723 vmstate_loading = false;
724 vm_start();
725 trace_colo_vm_state_change("stop", "run");
726 qemu_mutex_unlock_iothread();
728 if (failover_get_state() == FAILOVER_STATUS_RELAUNCH) {
729 failover_set_state(FAILOVER_STATUS_RELAUNCH,
730 FAILOVER_STATUS_NONE);
731 failover_request_active(NULL);
732 goto out;
735 colo_send_message(mis->to_src_file, COLO_MESSAGE_VMSTATE_LOADED,
736 &local_err);
737 if (local_err) {
738 goto out;
742 out:
743 vmstate_loading = false;
744 /* Throw the unreported error message after exited from loop */
745 if (local_err) {
746 error_report_err(local_err);
749 if (fb) {
750 qemu_fclose(fb);
753 /* Hope this not to be too long to loop here */
754 qemu_sem_wait(&mis->colo_incoming_sem);
755 qemu_sem_destroy(&mis->colo_incoming_sem);
756 /* Must be called after failover BH is completed */
757 if (mis->to_src_file) {
758 qemu_fclose(mis->to_src_file);
760 migration_incoming_disable_colo();
762 rcu_unregister_thread();
763 return NULL;