tests/qtest: Fix boot-serial-test when using --without-default-devices
[qemu/ar7.git] / migration / multifd.c
blobadfe8c9a0a66b75f241525537c5fb1d253246156
1 /*
2 * Multifd common code
4 * Copyright (c) 2019-2020 Red Hat Inc
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "qemu/rcu.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
20 #include "ram.h"
21 #include "migration.h"
22 #include "migration-stats.h"
23 #include "socket.h"
24 #include "tls.h"
25 #include "qemu-file.h"
26 #include "trace.h"
27 #include "multifd.h"
28 #include "threadinfo.h"
29 #include "options.h"
30 #include "qemu/yank.h"
31 #include "io/channel-socket.h"
32 #include "yank_functions.h"
34 /* Multiple fd's */
36 #define MULTIFD_MAGIC 0x11223344U
37 #define MULTIFD_VERSION 1
39 typedef struct {
40 uint32_t magic;
41 uint32_t version;
42 unsigned char uuid[16]; /* QemuUUID */
43 uint8_t id;
44 uint8_t unused1[7]; /* Reserved for future use */
45 uint64_t unused2[4]; /* Reserved for future use */
46 } __attribute__((packed)) MultiFDInit_t;
48 struct {
49 MultiFDSendParams *params;
50 /* array of pages to sent */
51 MultiFDPages_t *pages;
53 * Global number of generated multifd packets.
55 * Note that we used 'uintptr_t' because it'll naturally support atomic
56 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
57 * multifd will overflow the packet_num easier, but that should be
58 * fine.
60 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
61 * hosts, however so far it does not support atomic fetch_add() yet.
62 * Make it easy for now.
64 uintptr_t packet_num;
66 * Synchronization point past which no more channels will be
67 * created.
69 QemuSemaphore channels_created;
70 /* send channels ready */
71 QemuSemaphore channels_ready;
73 * Have we already run terminate threads. There is a race when it
74 * happens that we got one error while we are exiting.
75 * We will use atomic operations. Only valid values are 0 and 1.
77 int exiting;
78 /* multifd ops */
79 MultiFDMethods *ops;
80 } *multifd_send_state;
82 /* Multifd without compression */
84 /**
85 * nocomp_send_setup: setup send side
87 * @p: Params for the channel that we are using
88 * @errp: pointer to an error
90 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
92 if (migrate_zero_copy_send()) {
93 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
96 return 0;
99 /**
100 * nocomp_send_cleanup: cleanup send side
102 * For no compression this function does nothing.
104 * @p: Params for the channel that we are using
105 * @errp: pointer to an error
107 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
109 return;
113 * nocomp_send_prepare: prepare date to be able to send
115 * For no compression we just have to calculate the size of the
116 * packet.
118 * Returns 0 for success or -1 for error
120 * @p: Params for the channel that we are using
121 * @errp: pointer to an error
123 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
125 bool use_zero_copy_send = migrate_zero_copy_send();
126 MultiFDPages_t *pages = p->pages;
127 int ret;
129 if (!use_zero_copy_send) {
131 * Only !zerocopy needs the header in IOV; zerocopy will
132 * send it separately.
134 multifd_send_prepare_header(p);
137 for (int i = 0; i < pages->num; i++) {
138 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
139 p->iov[p->iovs_num].iov_len = p->page_size;
140 p->iovs_num++;
143 p->next_packet_size = pages->num * p->page_size;
144 p->flags |= MULTIFD_FLAG_NOCOMP;
146 multifd_send_fill_packet(p);
148 if (use_zero_copy_send) {
149 /* Send header first, without zerocopy */
150 ret = qio_channel_write_all(p->c, (void *)p->packet,
151 p->packet_len, errp);
152 if (ret != 0) {
153 return -1;
157 return 0;
161 * nocomp_recv_setup: setup receive side
163 * For no compression this function does nothing.
165 * Returns 0 for success or -1 for error
167 * @p: Params for the channel that we are using
168 * @errp: pointer to an error
170 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
172 return 0;
176 * nocomp_recv_cleanup: setup receive side
178 * For no compression this function does nothing.
180 * @p: Params for the channel that we are using
182 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
187 * nocomp_recv_pages: read the data from the channel into actual pages
189 * For no compression we just need to read things into the correct place.
191 * Returns 0 for success or -1 for error
193 * @p: Params for the channel that we are using
194 * @errp: pointer to an error
196 static int nocomp_recv_pages(MultiFDRecvParams *p, Error **errp)
198 uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
200 if (flags != MULTIFD_FLAG_NOCOMP) {
201 error_setg(errp, "multifd %u: flags received %x flags expected %x",
202 p->id, flags, MULTIFD_FLAG_NOCOMP);
203 return -1;
205 for (int i = 0; i < p->normal_num; i++) {
206 p->iov[i].iov_base = p->host + p->normal[i];
207 p->iov[i].iov_len = p->page_size;
209 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
212 static MultiFDMethods multifd_nocomp_ops = {
213 .send_setup = nocomp_send_setup,
214 .send_cleanup = nocomp_send_cleanup,
215 .send_prepare = nocomp_send_prepare,
216 .recv_setup = nocomp_recv_setup,
217 .recv_cleanup = nocomp_recv_cleanup,
218 .recv_pages = nocomp_recv_pages
221 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
222 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
225 void multifd_register_ops(int method, MultiFDMethods *ops)
227 assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
228 multifd_ops[method] = ops;
231 /* Reset a MultiFDPages_t* object for the next use */
232 static void multifd_pages_reset(MultiFDPages_t *pages)
235 * We don't need to touch offset[] array, because it will be
236 * overwritten later when reused.
238 pages->num = 0;
239 pages->block = NULL;
242 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
244 MultiFDInit_t msg = {};
245 size_t size = sizeof(msg);
246 int ret;
248 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
249 msg.version = cpu_to_be32(MULTIFD_VERSION);
250 msg.id = p->id;
251 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
253 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
254 if (ret != 0) {
255 return -1;
257 stat64_add(&mig_stats.multifd_bytes, size);
258 return 0;
261 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
263 MultiFDInit_t msg;
264 int ret;
266 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
267 if (ret != 0) {
268 return -1;
271 msg.magic = be32_to_cpu(msg.magic);
272 msg.version = be32_to_cpu(msg.version);
274 if (msg.magic != MULTIFD_MAGIC) {
275 error_setg(errp, "multifd: received packet magic %x "
276 "expected %x", msg.magic, MULTIFD_MAGIC);
277 return -1;
280 if (msg.version != MULTIFD_VERSION) {
281 error_setg(errp, "multifd: received packet version %u "
282 "expected %u", msg.version, MULTIFD_VERSION);
283 return -1;
286 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
287 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
288 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
290 error_setg(errp, "multifd: received uuid '%s' and expected "
291 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
292 g_free(uuid);
293 g_free(msg_uuid);
294 return -1;
297 if (msg.id > migrate_multifd_channels()) {
298 error_setg(errp, "multifd: received channel id %u is greater than "
299 "number of channels %u", msg.id, migrate_multifd_channels());
300 return -1;
303 return msg.id;
306 static MultiFDPages_t *multifd_pages_init(uint32_t n)
308 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
310 pages->allocated = n;
311 pages->offset = g_new0(ram_addr_t, n);
313 return pages;
316 static void multifd_pages_clear(MultiFDPages_t *pages)
318 multifd_pages_reset(pages);
319 pages->allocated = 0;
320 g_free(pages->offset);
321 pages->offset = NULL;
322 g_free(pages);
325 void multifd_send_fill_packet(MultiFDSendParams *p)
327 MultiFDPacket_t *packet = p->packet;
328 MultiFDPages_t *pages = p->pages;
329 uint64_t packet_num;
330 int i;
332 packet->flags = cpu_to_be32(p->flags);
333 packet->pages_alloc = cpu_to_be32(p->pages->allocated);
334 packet->normal_pages = cpu_to_be32(pages->num);
335 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
337 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
338 packet->packet_num = cpu_to_be64(packet_num);
340 if (pages->block) {
341 strncpy(packet->ramblock, pages->block->idstr, 256);
344 for (i = 0; i < pages->num; i++) {
345 /* there are architectures where ram_addr_t is 32 bit */
346 uint64_t temp = pages->offset[i];
348 packet->offset[i] = cpu_to_be64(temp);
351 p->packets_sent++;
352 p->total_normal_pages += pages->num;
354 trace_multifd_send(p->id, packet_num, pages->num, p->flags,
355 p->next_packet_size);
358 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
360 MultiFDPacket_t *packet = p->packet;
361 int i;
363 packet->magic = be32_to_cpu(packet->magic);
364 if (packet->magic != MULTIFD_MAGIC) {
365 error_setg(errp, "multifd: received packet "
366 "magic %x and expected magic %x",
367 packet->magic, MULTIFD_MAGIC);
368 return -1;
371 packet->version = be32_to_cpu(packet->version);
372 if (packet->version != MULTIFD_VERSION) {
373 error_setg(errp, "multifd: received packet "
374 "version %u and expected version %u",
375 packet->version, MULTIFD_VERSION);
376 return -1;
379 p->flags = be32_to_cpu(packet->flags);
381 packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
383 * If we received a packet that is 100 times bigger than expected
384 * just stop migration. It is a magic number.
386 if (packet->pages_alloc > p->page_count) {
387 error_setg(errp, "multifd: received packet "
388 "with size %u and expected a size of %u",
389 packet->pages_alloc, p->page_count) ;
390 return -1;
393 p->normal_num = be32_to_cpu(packet->normal_pages);
394 if (p->normal_num > packet->pages_alloc) {
395 error_setg(errp, "multifd: received packet "
396 "with %u pages and expected maximum pages are %u",
397 p->normal_num, packet->pages_alloc) ;
398 return -1;
401 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
402 p->packet_num = be64_to_cpu(packet->packet_num);
403 p->packets_recved++;
404 p->total_normal_pages += p->normal_num;
406 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
407 p->next_packet_size);
409 if (p->normal_num == 0) {
410 return 0;
413 /* make sure that ramblock is 0 terminated */
414 packet->ramblock[255] = 0;
415 p->block = qemu_ram_block_by_name(packet->ramblock);
416 if (!p->block) {
417 error_setg(errp, "multifd: unknown ram block %s",
418 packet->ramblock);
419 return -1;
422 p->host = p->block->host;
423 for (i = 0; i < p->normal_num; i++) {
424 uint64_t offset = be64_to_cpu(packet->offset[i]);
426 if (offset > (p->block->used_length - p->page_size)) {
427 error_setg(errp, "multifd: offset too long %" PRIu64
428 " (max " RAM_ADDR_FMT ")",
429 offset, p->block->used_length);
430 return -1;
432 p->normal[i] = offset;
435 return 0;
438 static bool multifd_send_should_exit(void)
440 return qatomic_read(&multifd_send_state->exiting);
444 * The migration thread can wait on either of the two semaphores. This
445 * function can be used to kick the main thread out of waiting on either of
446 * them. Should mostly only be called when something wrong happened with
447 * the current multifd send thread.
449 static void multifd_send_kick_main(MultiFDSendParams *p)
451 qemu_sem_post(&p->sem_sync);
452 qemu_sem_post(&multifd_send_state->channels_ready);
456 * How we use multifd_send_state->pages and channel->pages?
458 * We create a pages for each channel, and a main one. Each time that
459 * we need to send a batch of pages we interchange the ones between
460 * multifd_send_state and the channel that is sending it. There are
461 * two reasons for that:
462 * - to not have to do so many mallocs during migration
463 * - to make easier to know what to free at the end of migration
465 * This way we always know who is the owner of each "pages" struct,
466 * and we don't need any locking. It belongs to the migration thread
467 * or to the channel thread. Switching is safe because the migration
468 * thread is using the channel mutex when changing it, and the channel
469 * have to had finish with its own, otherwise pending_job can't be
470 * false.
472 * Returns true if succeed, false otherwise.
474 static bool multifd_send_pages(void)
476 int i;
477 static int next_channel;
478 MultiFDSendParams *p = NULL; /* make happy gcc */
479 MultiFDPages_t *pages = multifd_send_state->pages;
481 if (multifd_send_should_exit()) {
482 return false;
485 /* We wait here, until at least one channel is ready */
486 qemu_sem_wait(&multifd_send_state->channels_ready);
489 * next_channel can remain from a previous migration that was
490 * using more channels, so ensure it doesn't overflow if the
491 * limit is lower now.
493 next_channel %= migrate_multifd_channels();
494 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
495 if (multifd_send_should_exit()) {
496 return false;
498 p = &multifd_send_state->params[i];
500 * Lockless read to p->pending_job is safe, because only multifd
501 * sender thread can clear it.
503 if (qatomic_read(&p->pending_job) == false) {
504 next_channel = (i + 1) % migrate_multifd_channels();
505 break;
510 * Make sure we read p->pending_job before all the rest. Pairs with
511 * qatomic_store_release() in multifd_send_thread().
513 smp_mb_acquire();
514 assert(!p->pages->num);
515 multifd_send_state->pages = p->pages;
516 p->pages = pages;
518 * Making sure p->pages is setup before marking pending_job=true. Pairs
519 * with the qatomic_load_acquire() in multifd_send_thread().
521 qatomic_store_release(&p->pending_job, true);
522 qemu_sem_post(&p->sem);
524 return true;
527 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
529 return pages->num == 0;
532 static inline bool multifd_queue_full(MultiFDPages_t *pages)
534 return pages->num == pages->allocated;
537 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
539 pages->offset[pages->num++] = offset;
542 /* Returns true if enqueue successful, false otherwise */
543 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
545 MultiFDPages_t *pages;
547 retry:
548 pages = multifd_send_state->pages;
550 /* If the queue is empty, we can already enqueue now */
551 if (multifd_queue_empty(pages)) {
552 pages->block = block;
553 multifd_enqueue(pages, offset);
554 return true;
558 * Not empty, meanwhile we need a flush. It can because of either:
560 * (1) The page is not on the same ramblock of previous ones, or,
561 * (2) The queue is full.
563 * After flush, always retry.
565 if (pages->block != block || multifd_queue_full(pages)) {
566 if (!multifd_send_pages()) {
567 return false;
569 goto retry;
572 /* Not empty, and we still have space, do it! */
573 multifd_enqueue(pages, offset);
574 return true;
577 /* Multifd send side hit an error; remember it and prepare to quit */
578 static void multifd_send_set_error(Error *err)
581 * We don't want to exit each threads twice. Depending on where
582 * we get the error, or if there are two independent errors in two
583 * threads at the same time, we can end calling this function
584 * twice.
586 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
587 return;
590 if (err) {
591 MigrationState *s = migrate_get_current();
592 migrate_set_error(s, err);
593 if (s->state == MIGRATION_STATUS_SETUP ||
594 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
595 s->state == MIGRATION_STATUS_DEVICE ||
596 s->state == MIGRATION_STATUS_ACTIVE) {
597 migrate_set_state(&s->state, s->state,
598 MIGRATION_STATUS_FAILED);
603 static void multifd_send_terminate_threads(void)
605 int i;
607 trace_multifd_send_terminate_threads();
610 * Tell everyone we're quitting. No xchg() needed here; we simply
611 * always set it.
613 qatomic_set(&multifd_send_state->exiting, 1);
616 * Firstly, kick all threads out; no matter whether they are just idle,
617 * or blocked in an IO system call.
619 for (i = 0; i < migrate_multifd_channels(); i++) {
620 MultiFDSendParams *p = &multifd_send_state->params[i];
622 qemu_sem_post(&p->sem);
623 if (p->c) {
624 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
629 * Finally recycle all the threads.
631 for (i = 0; i < migrate_multifd_channels(); i++) {
632 MultiFDSendParams *p = &multifd_send_state->params[i];
634 if (p->tls_thread_created) {
635 qemu_thread_join(&p->tls_thread);
638 if (p->thread_created) {
639 qemu_thread_join(&p->thread);
644 static int multifd_send_channel_destroy(QIOChannel *send)
646 return socket_send_channel_destroy(send);
649 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
651 if (p->registered_yank) {
652 migration_ioc_unregister_yank(p->c);
654 multifd_send_channel_destroy(p->c);
655 p->c = NULL;
656 qemu_sem_destroy(&p->sem);
657 qemu_sem_destroy(&p->sem_sync);
658 g_free(p->name);
659 p->name = NULL;
660 multifd_pages_clear(p->pages);
661 p->pages = NULL;
662 p->packet_len = 0;
663 g_free(p->packet);
664 p->packet = NULL;
665 g_free(p->iov);
666 p->iov = NULL;
667 multifd_send_state->ops->send_cleanup(p, errp);
669 return *errp == NULL;
672 static void multifd_send_cleanup_state(void)
674 qemu_sem_destroy(&multifd_send_state->channels_created);
675 qemu_sem_destroy(&multifd_send_state->channels_ready);
676 g_free(multifd_send_state->params);
677 multifd_send_state->params = NULL;
678 multifd_pages_clear(multifd_send_state->pages);
679 multifd_send_state->pages = NULL;
680 g_free(multifd_send_state);
681 multifd_send_state = NULL;
684 void multifd_send_shutdown(void)
686 int i;
688 if (!migrate_multifd()) {
689 return;
692 multifd_send_terminate_threads();
694 for (i = 0; i < migrate_multifd_channels(); i++) {
695 MultiFDSendParams *p = &multifd_send_state->params[i];
696 Error *local_err = NULL;
698 if (!multifd_send_cleanup_channel(p, &local_err)) {
699 migrate_set_error(migrate_get_current(), local_err);
700 error_free(local_err);
704 multifd_send_cleanup_state();
707 static int multifd_zero_copy_flush(QIOChannel *c)
709 int ret;
710 Error *err = NULL;
712 ret = qio_channel_flush(c, &err);
713 if (ret < 0) {
714 error_report_err(err);
715 return -1;
717 if (ret == 1) {
718 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
721 return ret;
724 int multifd_send_sync_main(void)
726 int i;
727 bool flush_zero_copy;
729 if (!migrate_multifd()) {
730 return 0;
732 if (multifd_send_state->pages->num) {
733 if (!multifd_send_pages()) {
734 error_report("%s: multifd_send_pages fail", __func__);
735 return -1;
739 flush_zero_copy = migrate_zero_copy_send();
741 for (i = 0; i < migrate_multifd_channels(); i++) {
742 MultiFDSendParams *p = &multifd_send_state->params[i];
744 if (multifd_send_should_exit()) {
745 return -1;
748 trace_multifd_send_sync_main_signal(p->id);
751 * We should be the only user so far, so not possible to be set by
752 * others concurrently.
754 assert(qatomic_read(&p->pending_sync) == false);
755 qatomic_set(&p->pending_sync, true);
756 qemu_sem_post(&p->sem);
758 for (i = 0; i < migrate_multifd_channels(); i++) {
759 MultiFDSendParams *p = &multifd_send_state->params[i];
761 if (multifd_send_should_exit()) {
762 return -1;
765 qemu_sem_wait(&multifd_send_state->channels_ready);
766 trace_multifd_send_sync_main_wait(p->id);
767 qemu_sem_wait(&p->sem_sync);
769 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
770 return -1;
773 trace_multifd_send_sync_main(multifd_send_state->packet_num);
775 return 0;
778 static void *multifd_send_thread(void *opaque)
780 MultiFDSendParams *p = opaque;
781 MigrationThread *thread = NULL;
782 Error *local_err = NULL;
783 int ret = 0;
785 thread = migration_threads_add(p->name, qemu_get_thread_id());
787 trace_multifd_send_thread_start(p->id);
788 rcu_register_thread();
790 if (multifd_send_initial_packet(p, &local_err) < 0) {
791 ret = -1;
792 goto out;
795 while (true) {
796 qemu_sem_post(&multifd_send_state->channels_ready);
797 qemu_sem_wait(&p->sem);
799 if (multifd_send_should_exit()) {
800 break;
804 * Read pending_job flag before p->pages. Pairs with the
805 * qatomic_store_release() in multifd_send_pages().
807 if (qatomic_load_acquire(&p->pending_job)) {
808 MultiFDPages_t *pages = p->pages;
810 p->iovs_num = 0;
811 assert(pages->num);
813 ret = multifd_send_state->ops->send_prepare(p, &local_err);
814 if (ret != 0) {
815 break;
818 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
819 0, p->write_flags, &local_err);
820 if (ret != 0) {
821 break;
824 stat64_add(&mig_stats.multifd_bytes,
825 p->next_packet_size + p->packet_len);
827 multifd_pages_reset(p->pages);
828 p->next_packet_size = 0;
831 * Making sure p->pages is published before saying "we're
832 * free". Pairs with the smp_mb_acquire() in
833 * multifd_send_pages().
835 qatomic_store_release(&p->pending_job, false);
836 } else {
838 * If not a normal job, must be a sync request. Note that
839 * pending_sync is a standalone flag (unlike pending_job), so
840 * it doesn't require explicit memory barriers.
842 assert(qatomic_read(&p->pending_sync));
843 p->flags = MULTIFD_FLAG_SYNC;
844 multifd_send_fill_packet(p);
845 ret = qio_channel_write_all(p->c, (void *)p->packet,
846 p->packet_len, &local_err);
847 if (ret != 0) {
848 break;
850 /* p->next_packet_size will always be zero for a SYNC packet */
851 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
852 p->flags = 0;
853 qatomic_set(&p->pending_sync, false);
854 qemu_sem_post(&p->sem_sync);
858 out:
859 if (ret) {
860 assert(local_err);
861 trace_multifd_send_error(p->id);
862 multifd_send_set_error(local_err);
863 multifd_send_kick_main(p);
864 error_free(local_err);
867 rcu_unregister_thread();
868 migration_threads_remove(thread);
869 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages);
871 return NULL;
874 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
876 static void *multifd_tls_handshake_thread(void *opaque)
878 MultiFDSendParams *p = opaque;
879 QIOChannelTLS *tioc = QIO_CHANNEL_TLS(p->c);
881 qio_channel_tls_handshake(tioc,
882 multifd_new_send_channel_async,
884 NULL,
885 NULL);
886 return NULL;
889 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
890 QIOChannel *ioc,
891 Error **errp)
893 MigrationState *s = migrate_get_current();
894 const char *hostname = s->hostname;
895 QIOChannelTLS *tioc;
897 tioc = migration_tls_client_create(ioc, hostname, errp);
898 if (!tioc) {
899 return false;
903 * Ownership of the socket channel now transfers to the newly
904 * created TLS channel, which has already taken a reference.
906 object_unref(OBJECT(ioc));
907 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
908 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
909 p->c = QIO_CHANNEL(tioc);
911 p->tls_thread_created = true;
912 qemu_thread_create(&p->tls_thread, "multifd-tls-handshake-worker",
913 multifd_tls_handshake_thread, p,
914 QEMU_THREAD_JOINABLE);
915 return true;
918 static bool multifd_channel_connect(MultiFDSendParams *p,
919 QIOChannel *ioc,
920 Error **errp)
922 qio_channel_set_delay(ioc, false);
924 migration_ioc_register_yank(ioc);
925 p->registered_yank = true;
926 p->c = ioc;
928 p->thread_created = true;
929 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
930 QEMU_THREAD_JOINABLE);
931 return true;
935 * When TLS is enabled this function is called once to establish the
936 * TLS connection and a second time after the TLS handshake to create
937 * the multifd channel. Without TLS it goes straight into the channel
938 * creation.
940 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
942 MultiFDSendParams *p = opaque;
943 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
944 Error *local_err = NULL;
945 bool ret;
947 trace_multifd_new_send_channel_async(p->id);
949 if (qio_task_propagate_error(task, &local_err)) {
950 ret = false;
951 goto out;
954 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
955 migrate_get_current()->hostname);
957 if (migrate_channel_requires_tls_upgrade(ioc)) {
958 ret = multifd_tls_channel_connect(p, ioc, &local_err);
959 if (ret) {
960 return;
962 } else {
963 ret = multifd_channel_connect(p, ioc, &local_err);
966 out:
968 * Here we're not interested whether creation succeeded, only that
969 * it happened at all.
971 qemu_sem_post(&multifd_send_state->channels_created);
973 if (ret) {
974 return;
977 trace_multifd_new_send_channel_async_error(p->id, local_err);
978 multifd_send_set_error(local_err);
979 if (!p->c) {
981 * If no channel has been created, drop the initial
982 * reference. Otherwise cleanup happens at
983 * multifd_send_channel_destroy()
985 object_unref(OBJECT(ioc));
987 error_free(local_err);
990 static void multifd_new_send_channel_create(gpointer opaque)
992 socket_send_channel_create(multifd_new_send_channel_async, opaque);
995 bool multifd_send_setup(void)
997 MigrationState *s = migrate_get_current();
998 Error *local_err = NULL;
999 int thread_count, ret = 0;
1000 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1001 uint8_t i;
1003 if (!migrate_multifd()) {
1004 return true;
1007 thread_count = migrate_multifd_channels();
1008 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1009 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1010 multifd_send_state->pages = multifd_pages_init(page_count);
1011 qemu_sem_init(&multifd_send_state->channels_created, 0);
1012 qemu_sem_init(&multifd_send_state->channels_ready, 0);
1013 qatomic_set(&multifd_send_state->exiting, 0);
1014 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1016 for (i = 0; i < thread_count; i++) {
1017 MultiFDSendParams *p = &multifd_send_state->params[i];
1019 qemu_sem_init(&p->sem, 0);
1020 qemu_sem_init(&p->sem_sync, 0);
1021 p->id = i;
1022 p->pages = multifd_pages_init(page_count);
1023 p->packet_len = sizeof(MultiFDPacket_t)
1024 + sizeof(uint64_t) * page_count;
1025 p->packet = g_malloc0(p->packet_len);
1026 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1027 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1028 p->name = g_strdup_printf("multifdsend_%d", i);
1029 /* We need one extra place for the packet header */
1030 p->iov = g_new0(struct iovec, page_count + 1);
1031 p->page_size = qemu_target_page_size();
1032 p->page_count = page_count;
1033 p->write_flags = 0;
1034 multifd_new_send_channel_create(p);
1038 * Wait until channel creation has started for all channels. The
1039 * creation can still fail, but no more channels will be created
1040 * past this point.
1042 for (i = 0; i < thread_count; i++) {
1043 qemu_sem_wait(&multifd_send_state->channels_created);
1046 for (i = 0; i < thread_count; i++) {
1047 MultiFDSendParams *p = &multifd_send_state->params[i];
1049 ret = multifd_send_state->ops->send_setup(p, &local_err);
1050 if (ret) {
1051 break;
1055 if (ret) {
1056 migrate_set_error(s, local_err);
1057 error_report_err(local_err);
1058 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1059 MIGRATION_STATUS_FAILED);
1060 return false;
1063 return true;
1066 struct {
1067 MultiFDRecvParams *params;
1068 /* number of created threads */
1069 int count;
1070 /* syncs main thread and channels */
1071 QemuSemaphore sem_sync;
1072 /* global number of generated multifd packets */
1073 uint64_t packet_num;
1074 /* multifd ops */
1075 MultiFDMethods *ops;
1076 } *multifd_recv_state;
1078 static void multifd_recv_terminate_threads(Error *err)
1080 int i;
1082 trace_multifd_recv_terminate_threads(err != NULL);
1084 if (err) {
1085 MigrationState *s = migrate_get_current();
1086 migrate_set_error(s, err);
1087 if (s->state == MIGRATION_STATUS_SETUP ||
1088 s->state == MIGRATION_STATUS_ACTIVE) {
1089 migrate_set_state(&s->state, s->state,
1090 MIGRATION_STATUS_FAILED);
1094 for (i = 0; i < migrate_multifd_channels(); i++) {
1095 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1097 qemu_mutex_lock(&p->mutex);
1098 p->quit = true;
1100 * We could arrive here for two reasons:
1101 * - normal quit, i.e. everything went fine, just finished
1102 * - error quit: We close the channels so the channel threads
1103 * finish the qio_channel_read_all_eof()
1105 if (p->c) {
1106 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1108 qemu_mutex_unlock(&p->mutex);
1112 void multifd_recv_shutdown(void)
1114 if (migrate_multifd()) {
1115 multifd_recv_terminate_threads(NULL);
1119 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1121 migration_ioc_unregister_yank(p->c);
1122 object_unref(OBJECT(p->c));
1123 p->c = NULL;
1124 qemu_mutex_destroy(&p->mutex);
1125 qemu_sem_destroy(&p->sem_sync);
1126 g_free(p->name);
1127 p->name = NULL;
1128 p->packet_len = 0;
1129 g_free(p->packet);
1130 p->packet = NULL;
1131 g_free(p->iov);
1132 p->iov = NULL;
1133 g_free(p->normal);
1134 p->normal = NULL;
1135 multifd_recv_state->ops->recv_cleanup(p);
1138 static void multifd_recv_cleanup_state(void)
1140 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1141 g_free(multifd_recv_state->params);
1142 multifd_recv_state->params = NULL;
1143 g_free(multifd_recv_state);
1144 multifd_recv_state = NULL;
1147 void multifd_recv_cleanup(void)
1149 int i;
1151 if (!migrate_multifd()) {
1152 return;
1154 multifd_recv_terminate_threads(NULL);
1155 for (i = 0; i < migrate_multifd_channels(); i++) {
1156 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1159 * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
1160 * however try to wakeup it without harm in cleanup phase.
1162 qemu_sem_post(&p->sem_sync);
1164 if (p->thread_created) {
1165 qemu_thread_join(&p->thread);
1168 for (i = 0; i < migrate_multifd_channels(); i++) {
1169 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1171 multifd_recv_cleanup_state();
1174 void multifd_recv_sync_main(void)
1176 int i;
1178 if (!migrate_multifd()) {
1179 return;
1181 for (i = 0; i < migrate_multifd_channels(); i++) {
1182 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1184 trace_multifd_recv_sync_main_wait(p->id);
1185 qemu_sem_wait(&multifd_recv_state->sem_sync);
1187 for (i = 0; i < migrate_multifd_channels(); i++) {
1188 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1190 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1191 if (multifd_recv_state->packet_num < p->packet_num) {
1192 multifd_recv_state->packet_num = p->packet_num;
1195 trace_multifd_recv_sync_main_signal(p->id);
1196 qemu_sem_post(&p->sem_sync);
1198 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1201 static void *multifd_recv_thread(void *opaque)
1203 MultiFDRecvParams *p = opaque;
1204 Error *local_err = NULL;
1205 int ret;
1207 trace_multifd_recv_thread_start(p->id);
1208 rcu_register_thread();
1210 while (true) {
1211 uint32_t flags;
1213 if (p->quit) {
1214 break;
1217 ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1218 p->packet_len, &local_err);
1219 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
1220 break;
1223 qemu_mutex_lock(&p->mutex);
1224 ret = multifd_recv_unfill_packet(p, &local_err);
1225 if (ret) {
1226 qemu_mutex_unlock(&p->mutex);
1227 break;
1230 flags = p->flags;
1231 /* recv methods don't know how to handle the SYNC flag */
1232 p->flags &= ~MULTIFD_FLAG_SYNC;
1233 qemu_mutex_unlock(&p->mutex);
1235 if (p->normal_num) {
1236 ret = multifd_recv_state->ops->recv_pages(p, &local_err);
1237 if (ret != 0) {
1238 break;
1242 if (flags & MULTIFD_FLAG_SYNC) {
1243 qemu_sem_post(&multifd_recv_state->sem_sync);
1244 qemu_sem_wait(&p->sem_sync);
1248 if (local_err) {
1249 multifd_recv_terminate_threads(local_err);
1250 error_free(local_err);
1253 rcu_unregister_thread();
1254 trace_multifd_recv_thread_end(p->id, p->packets_recved, p->total_normal_pages);
1256 return NULL;
1259 int multifd_recv_setup(Error **errp)
1261 int thread_count;
1262 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1263 uint8_t i;
1266 * Return successfully if multiFD recv state is already initialised
1267 * or multiFD is not enabled.
1269 if (multifd_recv_state || !migrate_multifd()) {
1270 return 0;
1273 thread_count = migrate_multifd_channels();
1274 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1275 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1276 qatomic_set(&multifd_recv_state->count, 0);
1277 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1278 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1280 for (i = 0; i < thread_count; i++) {
1281 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1283 qemu_mutex_init(&p->mutex);
1284 qemu_sem_init(&p->sem_sync, 0);
1285 p->quit = false;
1286 p->id = i;
1287 p->packet_len = sizeof(MultiFDPacket_t)
1288 + sizeof(uint64_t) * page_count;
1289 p->packet = g_malloc0(p->packet_len);
1290 p->name = g_strdup_printf("multifdrecv_%d", i);
1291 p->iov = g_new0(struct iovec, page_count);
1292 p->normal = g_new0(ram_addr_t, page_count);
1293 p->page_count = page_count;
1294 p->page_size = qemu_target_page_size();
1297 for (i = 0; i < thread_count; i++) {
1298 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1299 int ret;
1301 ret = multifd_recv_state->ops->recv_setup(p, errp);
1302 if (ret) {
1303 return ret;
1306 return 0;
1309 bool multifd_recv_all_channels_created(void)
1311 int thread_count = migrate_multifd_channels();
1313 if (!migrate_multifd()) {
1314 return true;
1317 if (!multifd_recv_state) {
1318 /* Called before any connections created */
1319 return false;
1322 return thread_count == qatomic_read(&multifd_recv_state->count);
1326 * Try to receive all multifd channels to get ready for the migration.
1327 * Sets @errp when failing to receive the current channel.
1329 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1331 MultiFDRecvParams *p;
1332 Error *local_err = NULL;
1333 int id;
1335 id = multifd_recv_initial_packet(ioc, &local_err);
1336 if (id < 0) {
1337 multifd_recv_terminate_threads(local_err);
1338 error_propagate_prepend(errp, local_err,
1339 "failed to receive packet"
1340 " via multifd channel %d: ",
1341 qatomic_read(&multifd_recv_state->count));
1342 return;
1344 trace_multifd_recv_new_channel(id);
1346 p = &multifd_recv_state->params[id];
1347 if (p->c != NULL) {
1348 error_setg(&local_err, "multifd: received id '%d' already setup'",
1349 id);
1350 multifd_recv_terminate_threads(local_err);
1351 error_propagate(errp, local_err);
1352 return;
1354 p->c = ioc;
1355 object_ref(OBJECT(ioc));
1357 p->thread_created = true;
1358 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1359 QEMU_THREAD_JOINABLE);
1360 qatomic_inc(&multifd_recv_state->count);