backends/host_iommu_device: Introduce HostIOMMUDeviceCaps
[qemu/armbru.git] / migration / multifd.c
blob0b4cbaddfee0891bc93da51a6f3a9191a726ffc4
1 /*
2 * Multifd common code
4 * Copyright (c) 2019-2020 Red Hat Inc
6 * Authors:
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
15 #include "qemu/rcu.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
21 #include "file.h"
22 #include "migration.h"
23 #include "migration-stats.h"
24 #include "socket.h"
25 #include "tls.h"
26 #include "qemu-file.h"
27 #include "trace.h"
28 #include "multifd.h"
29 #include "threadinfo.h"
30 #include "options.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
36 /* Multiple fd's */
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
41 typedef struct {
42 uint32_t magic;
43 uint32_t version;
44 unsigned char uuid[16]; /* QemuUUID */
45 uint8_t id;
46 uint8_t unused1[7]; /* Reserved for future use */
47 uint64_t unused2[4]; /* Reserved for future use */
48 } __attribute__((packed)) MultiFDInit_t;
50 struct {
51 MultiFDSendParams *params;
52 /* array of pages to sent */
53 MultiFDPages_t *pages;
55 * Global number of generated multifd packets.
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
60 * fine.
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
66 uintptr_t packet_num;
68 * Synchronization point past which no more channels will be
69 * created.
71 QemuSemaphore channels_created;
72 /* send channels ready */
73 QemuSemaphore channels_ready;
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
79 int exiting;
80 /* multifd ops */
81 MultiFDMethods *ops;
82 } *multifd_send_state;
84 struct {
85 MultiFDRecvParams *params;
86 MultiFDRecvData *data;
87 /* number of created threads */
88 int count;
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
93 QemuSemaphore sem_sync;
94 /* global number of generated multifd packets */
95 uint64_t packet_num;
96 int exiting;
97 /* multifd ops */
98 MultiFDMethods *ops;
99 } *multifd_recv_state;
101 static bool multifd_use_packets(void)
103 return !migrate_mapped_ram();
106 void multifd_send_channel_created(void)
108 qemu_sem_post(&multifd_send_state->channels_created);
111 static void multifd_set_file_bitmap(MultiFDSendParams *p)
113 MultiFDPages_t *pages = p->pages;
115 assert(pages->block);
117 for (int i = 0; i < p->pages->normal_num; i++) {
118 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], true);
121 for (int i = p->pages->normal_num; i < p->pages->num; i++) {
122 ramblock_set_file_bmap_atomic(pages->block, pages->offset[i], false);
126 /* Multifd without compression */
129 * nocomp_send_setup: setup send side
131 * @p: Params for the channel that we are using
132 * @errp: pointer to an error
134 static int nocomp_send_setup(MultiFDSendParams *p, Error **errp)
136 if (migrate_zero_copy_send()) {
137 p->write_flags |= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
140 if (multifd_use_packets()) {
141 /* We need one extra place for the packet header */
142 p->iov = g_new0(struct iovec, p->page_count + 1);
143 } else {
144 p->iov = g_new0(struct iovec, p->page_count);
147 return 0;
151 * nocomp_send_cleanup: cleanup send side
153 * For no compression this function does nothing.
155 * @p: Params for the channel that we are using
156 * @errp: pointer to an error
158 static void nocomp_send_cleanup(MultiFDSendParams *p, Error **errp)
160 g_free(p->iov);
161 p->iov = NULL;
162 return;
165 static void multifd_send_prepare_iovs(MultiFDSendParams *p)
167 MultiFDPages_t *pages = p->pages;
169 for (int i = 0; i < pages->normal_num; i++) {
170 p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
171 p->iov[p->iovs_num].iov_len = p->page_size;
172 p->iovs_num++;
175 p->next_packet_size = pages->normal_num * p->page_size;
179 * nocomp_send_prepare: prepare date to be able to send
181 * For no compression we just have to calculate the size of the
182 * packet.
184 * Returns 0 for success or -1 for error
186 * @p: Params for the channel that we are using
187 * @errp: pointer to an error
189 static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
191 bool use_zero_copy_send = migrate_zero_copy_send();
192 int ret;
194 multifd_send_zero_page_detect(p);
196 if (!multifd_use_packets()) {
197 multifd_send_prepare_iovs(p);
198 multifd_set_file_bitmap(p);
200 return 0;
203 if (!use_zero_copy_send) {
205 * Only !zerocopy needs the header in IOV; zerocopy will
206 * send it separately.
208 multifd_send_prepare_header(p);
211 multifd_send_prepare_iovs(p);
212 p->flags |= MULTIFD_FLAG_NOCOMP;
214 multifd_send_fill_packet(p);
216 if (use_zero_copy_send) {
217 /* Send header first, without zerocopy */
218 ret = qio_channel_write_all(p->c, (void *)p->packet,
219 p->packet_len, errp);
220 if (ret != 0) {
221 return -1;
225 return 0;
229 * nocomp_recv_setup: setup receive side
231 * For no compression this function does nothing.
233 * Returns 0 for success or -1 for error
235 * @p: Params for the channel that we are using
236 * @errp: pointer to an error
238 static int nocomp_recv_setup(MultiFDRecvParams *p, Error **errp)
240 p->iov = g_new0(struct iovec, p->page_count);
241 return 0;
245 * nocomp_recv_cleanup: setup receive side
247 * For no compression this function does nothing.
249 * @p: Params for the channel that we are using
251 static void nocomp_recv_cleanup(MultiFDRecvParams *p)
253 g_free(p->iov);
254 p->iov = NULL;
258 * nocomp_recv: read the data from the channel
260 * For no compression we just need to read things into the correct place.
262 * Returns 0 for success or -1 for error
264 * @p: Params for the channel that we are using
265 * @errp: pointer to an error
267 static int nocomp_recv(MultiFDRecvParams *p, Error **errp)
269 uint32_t flags;
271 if (!multifd_use_packets()) {
272 return multifd_file_recv_data(p, errp);
275 flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
277 if (flags != MULTIFD_FLAG_NOCOMP) {
278 error_setg(errp, "multifd %u: flags received %x flags expected %x",
279 p->id, flags, MULTIFD_FLAG_NOCOMP);
280 return -1;
283 multifd_recv_zero_page_process(p);
285 if (!p->normal_num) {
286 return 0;
289 for (int i = 0; i < p->normal_num; i++) {
290 p->iov[i].iov_base = p->host + p->normal[i];
291 p->iov[i].iov_len = p->page_size;
292 ramblock_recv_bitmap_set_offset(p->block, p->normal[i]);
294 return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp);
297 static MultiFDMethods multifd_nocomp_ops = {
298 .send_setup = nocomp_send_setup,
299 .send_cleanup = nocomp_send_cleanup,
300 .send_prepare = nocomp_send_prepare,
301 .recv_setup = nocomp_recv_setup,
302 .recv_cleanup = nocomp_recv_cleanup,
303 .recv = nocomp_recv
306 static MultiFDMethods *multifd_ops[MULTIFD_COMPRESSION__MAX] = {
307 [MULTIFD_COMPRESSION_NONE] = &multifd_nocomp_ops,
310 void multifd_register_ops(int method, MultiFDMethods *ops)
312 assert(0 < method && method < MULTIFD_COMPRESSION__MAX);
313 multifd_ops[method] = ops;
316 /* Reset a MultiFDPages_t* object for the next use */
317 static void multifd_pages_reset(MultiFDPages_t *pages)
320 * We don't need to touch offset[] array, because it will be
321 * overwritten later when reused.
323 pages->num = 0;
324 pages->normal_num = 0;
325 pages->block = NULL;
328 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
330 MultiFDInit_t msg = {};
331 size_t size = sizeof(msg);
332 int ret;
334 msg.magic = cpu_to_be32(MULTIFD_MAGIC);
335 msg.version = cpu_to_be32(MULTIFD_VERSION);
336 msg.id = p->id;
337 memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
339 ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
340 if (ret != 0) {
341 return -1;
343 stat64_add(&mig_stats.multifd_bytes, size);
344 return 0;
347 static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
349 MultiFDInit_t msg;
350 int ret;
352 ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
353 if (ret != 0) {
354 return -1;
357 msg.magic = be32_to_cpu(msg.magic);
358 msg.version = be32_to_cpu(msg.version);
360 if (msg.magic != MULTIFD_MAGIC) {
361 error_setg(errp, "multifd: received packet magic %x "
362 "expected %x", msg.magic, MULTIFD_MAGIC);
363 return -1;
366 if (msg.version != MULTIFD_VERSION) {
367 error_setg(errp, "multifd: received packet version %u "
368 "expected %u", msg.version, MULTIFD_VERSION);
369 return -1;
372 if (memcmp(msg.uuid, &qemu_uuid, sizeof(qemu_uuid))) {
373 char *uuid = qemu_uuid_unparse_strdup(&qemu_uuid);
374 char *msg_uuid = qemu_uuid_unparse_strdup((const QemuUUID *)msg.uuid);
376 error_setg(errp, "multifd: received uuid '%s' and expected "
377 "uuid '%s' for channel %hhd", msg_uuid, uuid, msg.id);
378 g_free(uuid);
379 g_free(msg_uuid);
380 return -1;
383 if (msg.id > migrate_multifd_channels()) {
384 error_setg(errp, "multifd: received channel id %u is greater than "
385 "number of channels %u", msg.id, migrate_multifd_channels());
386 return -1;
389 return msg.id;
392 static MultiFDPages_t *multifd_pages_init(uint32_t n)
394 MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
396 pages->allocated = n;
397 pages->offset = g_new0(ram_addr_t, n);
399 return pages;
402 static void multifd_pages_clear(MultiFDPages_t *pages)
404 multifd_pages_reset(pages);
405 pages->allocated = 0;
406 g_free(pages->offset);
407 pages->offset = NULL;
408 g_free(pages);
411 void multifd_send_fill_packet(MultiFDSendParams *p)
413 MultiFDPacket_t *packet = p->packet;
414 MultiFDPages_t *pages = p->pages;
415 uint64_t packet_num;
416 uint32_t zero_num = pages->num - pages->normal_num;
417 int i;
419 packet->flags = cpu_to_be32(p->flags);
420 packet->pages_alloc = cpu_to_be32(p->pages->allocated);
421 packet->normal_pages = cpu_to_be32(pages->normal_num);
422 packet->zero_pages = cpu_to_be32(zero_num);
423 packet->next_packet_size = cpu_to_be32(p->next_packet_size);
425 packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
426 packet->packet_num = cpu_to_be64(packet_num);
428 if (pages->block) {
429 strncpy(packet->ramblock, pages->block->idstr, 256);
432 for (i = 0; i < pages->num; i++) {
433 /* there are architectures where ram_addr_t is 32 bit */
434 uint64_t temp = pages->offset[i];
436 packet->offset[i] = cpu_to_be64(temp);
439 p->packets_sent++;
440 p->total_normal_pages += pages->normal_num;
441 p->total_zero_pages += zero_num;
443 trace_multifd_send(p->id, packet_num, pages->normal_num, zero_num,
444 p->flags, p->next_packet_size);
447 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
449 MultiFDPacket_t *packet = p->packet;
450 int i;
452 packet->magic = be32_to_cpu(packet->magic);
453 if (packet->magic != MULTIFD_MAGIC) {
454 error_setg(errp, "multifd: received packet "
455 "magic %x and expected magic %x",
456 packet->magic, MULTIFD_MAGIC);
457 return -1;
460 packet->version = be32_to_cpu(packet->version);
461 if (packet->version != MULTIFD_VERSION) {
462 error_setg(errp, "multifd: received packet "
463 "version %u and expected version %u",
464 packet->version, MULTIFD_VERSION);
465 return -1;
468 p->flags = be32_to_cpu(packet->flags);
470 packet->pages_alloc = be32_to_cpu(packet->pages_alloc);
472 * If we received a packet that is 100 times bigger than expected
473 * just stop migration. It is a magic number.
475 if (packet->pages_alloc > p->page_count) {
476 error_setg(errp, "multifd: received packet "
477 "with size %u and expected a size of %u",
478 packet->pages_alloc, p->page_count) ;
479 return -1;
482 p->normal_num = be32_to_cpu(packet->normal_pages);
483 if (p->normal_num > packet->pages_alloc) {
484 error_setg(errp, "multifd: received packet "
485 "with %u normal pages and expected maximum pages are %u",
486 p->normal_num, packet->pages_alloc) ;
487 return -1;
490 p->zero_num = be32_to_cpu(packet->zero_pages);
491 if (p->zero_num > packet->pages_alloc - p->normal_num) {
492 error_setg(errp, "multifd: received packet "
493 "with %u zero pages and expected maximum zero pages are %u",
494 p->zero_num, packet->pages_alloc - p->normal_num) ;
495 return -1;
498 p->next_packet_size = be32_to_cpu(packet->next_packet_size);
499 p->packet_num = be64_to_cpu(packet->packet_num);
500 p->packets_recved++;
501 p->total_normal_pages += p->normal_num;
502 p->total_zero_pages += p->zero_num;
504 trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->zero_num,
505 p->flags, p->next_packet_size);
507 if (p->normal_num == 0 && p->zero_num == 0) {
508 return 0;
511 /* make sure that ramblock is 0 terminated */
512 packet->ramblock[255] = 0;
513 p->block = qemu_ram_block_by_name(packet->ramblock);
514 if (!p->block) {
515 error_setg(errp, "multifd: unknown ram block %s",
516 packet->ramblock);
517 return -1;
520 p->host = p->block->host;
521 for (i = 0; i < p->normal_num; i++) {
522 uint64_t offset = be64_to_cpu(packet->offset[i]);
524 if (offset > (p->block->used_length - p->page_size)) {
525 error_setg(errp, "multifd: offset too long %" PRIu64
526 " (max " RAM_ADDR_FMT ")",
527 offset, p->block->used_length);
528 return -1;
530 p->normal[i] = offset;
533 for (i = 0; i < p->zero_num; i++) {
534 uint64_t offset = be64_to_cpu(packet->offset[p->normal_num + i]);
536 if (offset > (p->block->used_length - p->page_size)) {
537 error_setg(errp, "multifd: offset too long %" PRIu64
538 " (max " RAM_ADDR_FMT ")",
539 offset, p->block->used_length);
540 return -1;
542 p->zero[i] = offset;
545 return 0;
548 static bool multifd_send_should_exit(void)
550 return qatomic_read(&multifd_send_state->exiting);
553 static bool multifd_recv_should_exit(void)
555 return qatomic_read(&multifd_recv_state->exiting);
559 * The migration thread can wait on either of the two semaphores. This
560 * function can be used to kick the main thread out of waiting on either of
561 * them. Should mostly only be called when something wrong happened with
562 * the current multifd send thread.
564 static void multifd_send_kick_main(MultiFDSendParams *p)
566 qemu_sem_post(&p->sem_sync);
567 qemu_sem_post(&multifd_send_state->channels_ready);
571 * How we use multifd_send_state->pages and channel->pages?
573 * We create a pages for each channel, and a main one. Each time that
574 * we need to send a batch of pages we interchange the ones between
575 * multifd_send_state and the channel that is sending it. There are
576 * two reasons for that:
577 * - to not have to do so many mallocs during migration
578 * - to make easier to know what to free at the end of migration
580 * This way we always know who is the owner of each "pages" struct,
581 * and we don't need any locking. It belongs to the migration thread
582 * or to the channel thread. Switching is safe because the migration
583 * thread is using the channel mutex when changing it, and the channel
584 * have to had finish with its own, otherwise pending_job can't be
585 * false.
587 * Returns true if succeed, false otherwise.
589 static bool multifd_send_pages(void)
591 int i;
592 static int next_channel;
593 MultiFDSendParams *p = NULL; /* make happy gcc */
594 MultiFDPages_t *pages = multifd_send_state->pages;
596 if (multifd_send_should_exit()) {
597 return false;
600 /* We wait here, until at least one channel is ready */
601 qemu_sem_wait(&multifd_send_state->channels_ready);
604 * next_channel can remain from a previous migration that was
605 * using more channels, so ensure it doesn't overflow if the
606 * limit is lower now.
608 next_channel %= migrate_multifd_channels();
609 for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
610 if (multifd_send_should_exit()) {
611 return false;
613 p = &multifd_send_state->params[i];
615 * Lockless read to p->pending_job is safe, because only multifd
616 * sender thread can clear it.
618 if (qatomic_read(&p->pending_job) == false) {
619 next_channel = (i + 1) % migrate_multifd_channels();
620 break;
625 * Make sure we read p->pending_job before all the rest. Pairs with
626 * qatomic_store_release() in multifd_send_thread().
628 smp_mb_acquire();
629 assert(!p->pages->num);
630 multifd_send_state->pages = p->pages;
631 p->pages = pages;
633 * Making sure p->pages is setup before marking pending_job=true. Pairs
634 * with the qatomic_load_acquire() in multifd_send_thread().
636 qatomic_store_release(&p->pending_job, true);
637 qemu_sem_post(&p->sem);
639 return true;
642 static inline bool multifd_queue_empty(MultiFDPages_t *pages)
644 return pages->num == 0;
647 static inline bool multifd_queue_full(MultiFDPages_t *pages)
649 return pages->num == pages->allocated;
652 static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
654 pages->offset[pages->num++] = offset;
657 /* Returns true if enqueue successful, false otherwise */
658 bool multifd_queue_page(RAMBlock *block, ram_addr_t offset)
660 MultiFDPages_t *pages;
662 retry:
663 pages = multifd_send_state->pages;
665 /* If the queue is empty, we can already enqueue now */
666 if (multifd_queue_empty(pages)) {
667 pages->block = block;
668 multifd_enqueue(pages, offset);
669 return true;
673 * Not empty, meanwhile we need a flush. It can because of either:
675 * (1) The page is not on the same ramblock of previous ones, or,
676 * (2) The queue is full.
678 * After flush, always retry.
680 if (pages->block != block || multifd_queue_full(pages)) {
681 if (!multifd_send_pages()) {
682 return false;
684 goto retry;
687 /* Not empty, and we still have space, do it! */
688 multifd_enqueue(pages, offset);
689 return true;
692 /* Multifd send side hit an error; remember it and prepare to quit */
693 static void multifd_send_set_error(Error *err)
696 * We don't want to exit each threads twice. Depending on where
697 * we get the error, or if there are two independent errors in two
698 * threads at the same time, we can end calling this function
699 * twice.
701 if (qatomic_xchg(&multifd_send_state->exiting, 1)) {
702 return;
705 if (err) {
706 MigrationState *s = migrate_get_current();
707 migrate_set_error(s, err);
708 if (s->state == MIGRATION_STATUS_SETUP ||
709 s->state == MIGRATION_STATUS_PRE_SWITCHOVER ||
710 s->state == MIGRATION_STATUS_DEVICE ||
711 s->state == MIGRATION_STATUS_ACTIVE) {
712 migrate_set_state(&s->state, s->state,
713 MIGRATION_STATUS_FAILED);
718 static void multifd_send_terminate_threads(void)
720 int i;
722 trace_multifd_send_terminate_threads();
725 * Tell everyone we're quitting. No xchg() needed here; we simply
726 * always set it.
728 qatomic_set(&multifd_send_state->exiting, 1);
731 * Firstly, kick all threads out; no matter whether they are just idle,
732 * or blocked in an IO system call.
734 for (i = 0; i < migrate_multifd_channels(); i++) {
735 MultiFDSendParams *p = &multifd_send_state->params[i];
737 qemu_sem_post(&p->sem);
738 if (p->c) {
739 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
744 * Finally recycle all the threads.
746 for (i = 0; i < migrate_multifd_channels(); i++) {
747 MultiFDSendParams *p = &multifd_send_state->params[i];
749 if (p->tls_thread_created) {
750 qemu_thread_join(&p->tls_thread);
753 if (p->thread_created) {
754 qemu_thread_join(&p->thread);
759 static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
761 if (p->c) {
762 migration_ioc_unregister_yank(p->c);
764 * The object_unref() cannot guarantee the fd will always be
765 * released because finalize() of the iochannel is only
766 * triggered on the last reference and it's not guaranteed
767 * that we always hold the last refcount when reaching here.
769 * Closing the fd explicitly has the benefit that if there is any
770 * registered I/O handler callbacks on such fd, that will get a
771 * POLLNVAL event and will further trigger the cleanup to finally
772 * release the IOC.
774 * FIXME: It should logically be guaranteed that all multifd
775 * channels have no I/O handler callback registered when reaching
776 * here, because migration thread will wait for all multifd channel
777 * establishments to complete during setup. Since
778 * migrate_fd_cleanup() will be scheduled in main thread too, all
779 * previous callbacks should guarantee to be completed when
780 * reaching here. See multifd_send_state.channels_created and its
781 * usage. In the future, we could replace this with an assert
782 * making sure we're the last reference, or simply drop it if above
783 * is more clear to be justified.
785 qio_channel_close(p->c, &error_abort);
786 object_unref(OBJECT(p->c));
787 p->c = NULL;
789 qemu_sem_destroy(&p->sem);
790 qemu_sem_destroy(&p->sem_sync);
791 g_free(p->name);
792 p->name = NULL;
793 multifd_pages_clear(p->pages);
794 p->pages = NULL;
795 p->packet_len = 0;
796 g_free(p->packet);
797 p->packet = NULL;
798 multifd_send_state->ops->send_cleanup(p, errp);
800 return *errp == NULL;
803 static void multifd_send_cleanup_state(void)
805 file_cleanup_outgoing_migration();
806 socket_cleanup_outgoing_migration();
807 qemu_sem_destroy(&multifd_send_state->channels_created);
808 qemu_sem_destroy(&multifd_send_state->channels_ready);
809 g_free(multifd_send_state->params);
810 multifd_send_state->params = NULL;
811 multifd_pages_clear(multifd_send_state->pages);
812 multifd_send_state->pages = NULL;
813 g_free(multifd_send_state);
814 multifd_send_state = NULL;
817 void multifd_send_shutdown(void)
819 int i;
821 if (!migrate_multifd()) {
822 return;
825 multifd_send_terminate_threads();
827 for (i = 0; i < migrate_multifd_channels(); i++) {
828 MultiFDSendParams *p = &multifd_send_state->params[i];
829 Error *local_err = NULL;
831 if (!multifd_send_cleanup_channel(p, &local_err)) {
832 migrate_set_error(migrate_get_current(), local_err);
833 error_free(local_err);
837 multifd_send_cleanup_state();
840 static int multifd_zero_copy_flush(QIOChannel *c)
842 int ret;
843 Error *err = NULL;
845 ret = qio_channel_flush(c, &err);
846 if (ret < 0) {
847 error_report_err(err);
848 return -1;
850 if (ret == 1) {
851 stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
854 return ret;
857 int multifd_send_sync_main(void)
859 int i;
860 bool flush_zero_copy;
862 if (!migrate_multifd()) {
863 return 0;
865 if (multifd_send_state->pages->num) {
866 if (!multifd_send_pages()) {
867 error_report("%s: multifd_send_pages fail", __func__);
868 return -1;
872 flush_zero_copy = migrate_zero_copy_send();
874 for (i = 0; i < migrate_multifd_channels(); i++) {
875 MultiFDSendParams *p = &multifd_send_state->params[i];
877 if (multifd_send_should_exit()) {
878 return -1;
881 trace_multifd_send_sync_main_signal(p->id);
884 * We should be the only user so far, so not possible to be set by
885 * others concurrently.
887 assert(qatomic_read(&p->pending_sync) == false);
888 qatomic_set(&p->pending_sync, true);
889 qemu_sem_post(&p->sem);
891 for (i = 0; i < migrate_multifd_channels(); i++) {
892 MultiFDSendParams *p = &multifd_send_state->params[i];
894 if (multifd_send_should_exit()) {
895 return -1;
898 qemu_sem_wait(&multifd_send_state->channels_ready);
899 trace_multifd_send_sync_main_wait(p->id);
900 qemu_sem_wait(&p->sem_sync);
902 if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
903 return -1;
906 trace_multifd_send_sync_main(multifd_send_state->packet_num);
908 return 0;
911 static void *multifd_send_thread(void *opaque)
913 MultiFDSendParams *p = opaque;
914 MigrationThread *thread = NULL;
915 Error *local_err = NULL;
916 int ret = 0;
917 bool use_packets = multifd_use_packets();
919 thread = migration_threads_add(p->name, qemu_get_thread_id());
921 trace_multifd_send_thread_start(p->id);
922 rcu_register_thread();
924 if (use_packets) {
925 if (multifd_send_initial_packet(p, &local_err) < 0) {
926 ret = -1;
927 goto out;
931 while (true) {
932 qemu_sem_post(&multifd_send_state->channels_ready);
933 qemu_sem_wait(&p->sem);
935 if (multifd_send_should_exit()) {
936 break;
940 * Read pending_job flag before p->pages. Pairs with the
941 * qatomic_store_release() in multifd_send_pages().
943 if (qatomic_load_acquire(&p->pending_job)) {
944 MultiFDPages_t *pages = p->pages;
946 p->iovs_num = 0;
947 assert(pages->num);
949 ret = multifd_send_state->ops->send_prepare(p, &local_err);
950 if (ret != 0) {
951 break;
954 if (migrate_mapped_ram()) {
955 ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num,
956 p->pages->block, &local_err);
957 } else {
958 ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num,
959 NULL, 0, p->write_flags,
960 &local_err);
963 if (ret != 0) {
964 break;
967 stat64_add(&mig_stats.multifd_bytes,
968 p->next_packet_size + p->packet_len);
969 stat64_add(&mig_stats.normal_pages, pages->normal_num);
970 stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
972 multifd_pages_reset(p->pages);
973 p->next_packet_size = 0;
976 * Making sure p->pages is published before saying "we're
977 * free". Pairs with the smp_mb_acquire() in
978 * multifd_send_pages().
980 qatomic_store_release(&p->pending_job, false);
981 } else {
983 * If not a normal job, must be a sync request. Note that
984 * pending_sync is a standalone flag (unlike pending_job), so
985 * it doesn't require explicit memory barriers.
987 assert(qatomic_read(&p->pending_sync));
989 if (use_packets) {
990 p->flags = MULTIFD_FLAG_SYNC;
991 multifd_send_fill_packet(p);
992 ret = qio_channel_write_all(p->c, (void *)p->packet,
993 p->packet_len, &local_err);
994 if (ret != 0) {
995 break;
997 /* p->next_packet_size will always be zero for a SYNC packet */
998 stat64_add(&mig_stats.multifd_bytes, p->packet_len);
999 p->flags = 0;
1002 qatomic_set(&p->pending_sync, false);
1003 qemu_sem_post(&p->sem_sync);
1007 out:
1008 if (ret) {
1009 assert(local_err);
1010 trace_multifd_send_error(p->id);
1011 multifd_send_set_error(local_err);
1012 multifd_send_kick_main(p);
1013 error_free(local_err);
1016 rcu_unregister_thread();
1017 migration_threads_remove(thread);
1018 trace_multifd_send_thread_end(p->id, p->packets_sent, p->total_normal_pages,
1019 p->total_zero_pages);
1021 return NULL;
1024 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque);
1026 typedef struct {
1027 MultiFDSendParams *p;
1028 QIOChannelTLS *tioc;
1029 } MultiFDTLSThreadArgs;
1031 static void *multifd_tls_handshake_thread(void *opaque)
1033 MultiFDTLSThreadArgs *args = opaque;
1035 qio_channel_tls_handshake(args->tioc,
1036 multifd_new_send_channel_async,
1037 args->p,
1038 NULL,
1039 NULL);
1040 g_free(args);
1042 return NULL;
1045 static bool multifd_tls_channel_connect(MultiFDSendParams *p,
1046 QIOChannel *ioc,
1047 Error **errp)
1049 MigrationState *s = migrate_get_current();
1050 const char *hostname = s->hostname;
1051 MultiFDTLSThreadArgs *args;
1052 QIOChannelTLS *tioc;
1054 tioc = migration_tls_client_create(ioc, hostname, errp);
1055 if (!tioc) {
1056 return false;
1060 * Ownership of the socket channel now transfers to the newly
1061 * created TLS channel, which has already taken a reference.
1063 object_unref(OBJECT(ioc));
1064 trace_multifd_tls_outgoing_handshake_start(ioc, tioc, hostname);
1065 qio_channel_set_name(QIO_CHANNEL(tioc), "multifd-tls-outgoing");
1067 args = g_new0(MultiFDTLSThreadArgs, 1);
1068 args->tioc = tioc;
1069 args->p = p;
1071 p->tls_thread_created = true;
1072 qemu_thread_create(&p->tls_thread, "mig/src/tls",
1073 multifd_tls_handshake_thread, args,
1074 QEMU_THREAD_JOINABLE);
1075 return true;
1078 void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc)
1080 qio_channel_set_delay(ioc, false);
1082 migration_ioc_register_yank(ioc);
1083 /* Setup p->c only if the channel is completely setup */
1084 p->c = ioc;
1086 p->thread_created = true;
1087 qemu_thread_create(&p->thread, p->name, multifd_send_thread, p,
1088 QEMU_THREAD_JOINABLE);
1092 * When TLS is enabled this function is called once to establish the
1093 * TLS connection and a second time after the TLS handshake to create
1094 * the multifd channel. Without TLS it goes straight into the channel
1095 * creation.
1097 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
1099 MultiFDSendParams *p = opaque;
1100 QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
1101 Error *local_err = NULL;
1102 bool ret;
1104 trace_multifd_new_send_channel_async(p->id);
1106 if (qio_task_propagate_error(task, &local_err)) {
1107 ret = false;
1108 goto out;
1111 trace_multifd_set_outgoing_channel(ioc, object_get_typename(OBJECT(ioc)),
1112 migrate_get_current()->hostname);
1114 if (migrate_channel_requires_tls_upgrade(ioc)) {
1115 ret = multifd_tls_channel_connect(p, ioc, &local_err);
1116 if (ret) {
1117 return;
1119 } else {
1120 multifd_channel_connect(p, ioc);
1121 ret = true;
1124 out:
1126 * Here we're not interested whether creation succeeded, only that
1127 * it happened at all.
1129 multifd_send_channel_created();
1131 if (ret) {
1132 return;
1135 trace_multifd_new_send_channel_async_error(p->id, local_err);
1136 multifd_send_set_error(local_err);
1138 * For error cases (TLS or non-TLS), IO channel is always freed here
1139 * rather than when cleanup multifd: since p->c is not set, multifd
1140 * cleanup code doesn't even know its existence.
1142 object_unref(OBJECT(ioc));
1143 error_free(local_err);
1146 static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
1148 if (!multifd_use_packets()) {
1149 return file_send_channel_create(opaque, errp);
1152 socket_send_channel_create(multifd_new_send_channel_async, opaque);
1153 return true;
1156 bool multifd_send_setup(void)
1158 MigrationState *s = migrate_get_current();
1159 Error *local_err = NULL;
1160 int thread_count, ret = 0;
1161 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1162 bool use_packets = multifd_use_packets();
1163 uint8_t i;
1165 if (!migrate_multifd()) {
1166 return true;
1169 thread_count = migrate_multifd_channels();
1170 multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
1171 multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
1172 multifd_send_state->pages = multifd_pages_init(page_count);
1173 qemu_sem_init(&multifd_send_state->channels_created, 0);
1174 qemu_sem_init(&multifd_send_state->channels_ready, 0);
1175 qatomic_set(&multifd_send_state->exiting, 0);
1176 multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
1178 for (i = 0; i < thread_count; i++) {
1179 MultiFDSendParams *p = &multifd_send_state->params[i];
1181 qemu_sem_init(&p->sem, 0);
1182 qemu_sem_init(&p->sem_sync, 0);
1183 p->id = i;
1184 p->pages = multifd_pages_init(page_count);
1186 if (use_packets) {
1187 p->packet_len = sizeof(MultiFDPacket_t)
1188 + sizeof(uint64_t) * page_count;
1189 p->packet = g_malloc0(p->packet_len);
1190 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
1191 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
1193 p->name = g_strdup_printf("mig/src/send_%d", i);
1194 p->page_size = qemu_target_page_size();
1195 p->page_count = page_count;
1196 p->write_flags = 0;
1198 if (!multifd_new_send_channel_create(p, &local_err)) {
1199 return false;
1204 * Wait until channel creation has started for all channels. The
1205 * creation can still fail, but no more channels will be created
1206 * past this point.
1208 for (i = 0; i < thread_count; i++) {
1209 qemu_sem_wait(&multifd_send_state->channels_created);
1212 for (i = 0; i < thread_count; i++) {
1213 MultiFDSendParams *p = &multifd_send_state->params[i];
1215 ret = multifd_send_state->ops->send_setup(p, &local_err);
1216 if (ret) {
1217 break;
1221 if (ret) {
1222 migrate_set_error(s, local_err);
1223 error_report_err(local_err);
1224 migrate_set_state(&s->state, MIGRATION_STATUS_SETUP,
1225 MIGRATION_STATUS_FAILED);
1226 return false;
1229 return true;
1232 bool multifd_recv(void)
1234 int i;
1235 static int next_recv_channel;
1236 MultiFDRecvParams *p = NULL;
1237 MultiFDRecvData *data = multifd_recv_state->data;
1240 * next_channel can remain from a previous migration that was
1241 * using more channels, so ensure it doesn't overflow if the
1242 * limit is lower now.
1244 next_recv_channel %= migrate_multifd_channels();
1245 for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) {
1246 if (multifd_recv_should_exit()) {
1247 return false;
1250 p = &multifd_recv_state->params[i];
1252 if (qatomic_read(&p->pending_job) == false) {
1253 next_recv_channel = (i + 1) % migrate_multifd_channels();
1254 break;
1259 * Order pending_job read before manipulating p->data below. Pairs
1260 * with qatomic_store_release() at multifd_recv_thread().
1262 smp_mb_acquire();
1264 assert(!p->data->size);
1265 multifd_recv_state->data = p->data;
1266 p->data = data;
1269 * Order p->data update before setting pending_job. Pairs with
1270 * qatomic_load_acquire() at multifd_recv_thread().
1272 qatomic_store_release(&p->pending_job, true);
1273 qemu_sem_post(&p->sem);
1275 return true;
1278 MultiFDRecvData *multifd_get_recv_data(void)
1280 return multifd_recv_state->data;
1283 static void multifd_recv_terminate_threads(Error *err)
1285 int i;
1287 trace_multifd_recv_terminate_threads(err != NULL);
1289 if (qatomic_xchg(&multifd_recv_state->exiting, 1)) {
1290 return;
1293 if (err) {
1294 MigrationState *s = migrate_get_current();
1295 migrate_set_error(s, err);
1296 if (s->state == MIGRATION_STATUS_SETUP ||
1297 s->state == MIGRATION_STATUS_ACTIVE) {
1298 migrate_set_state(&s->state, s->state,
1299 MIGRATION_STATUS_FAILED);
1303 for (i = 0; i < migrate_multifd_channels(); i++) {
1304 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1307 * The migration thread and channels interact differently
1308 * depending on the presence of packets.
1310 if (multifd_use_packets()) {
1312 * The channel receives as long as there are packets. When
1313 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1314 * channel waits for the migration thread to sync. If the
1315 * sync never happens, do it here.
1317 qemu_sem_post(&p->sem_sync);
1318 } else {
1320 * The channel waits for the migration thread to give it
1321 * work. When the migration thread runs out of work, it
1322 * releases the channel and waits for any pending work to
1323 * finish. If we reach here (e.g. due to error) before the
1324 * work runs out, release the channel.
1326 qemu_sem_post(&p->sem);
1330 * We could arrive here for two reasons:
1331 * - normal quit, i.e. everything went fine, just finished
1332 * - error quit: We close the channels so the channel threads
1333 * finish the qio_channel_read_all_eof()
1335 if (p->c) {
1336 qio_channel_shutdown(p->c, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1341 void multifd_recv_shutdown(void)
1343 if (migrate_multifd()) {
1344 multifd_recv_terminate_threads(NULL);
1348 static void multifd_recv_cleanup_channel(MultiFDRecvParams *p)
1350 migration_ioc_unregister_yank(p->c);
1351 object_unref(OBJECT(p->c));
1352 p->c = NULL;
1353 qemu_mutex_destroy(&p->mutex);
1354 qemu_sem_destroy(&p->sem_sync);
1355 qemu_sem_destroy(&p->sem);
1356 g_free(p->name);
1357 p->name = NULL;
1358 p->packet_len = 0;
1359 g_free(p->packet);
1360 p->packet = NULL;
1361 g_free(p->normal);
1362 p->normal = NULL;
1363 g_free(p->zero);
1364 p->zero = NULL;
1365 multifd_recv_state->ops->recv_cleanup(p);
1368 static void multifd_recv_cleanup_state(void)
1370 qemu_sem_destroy(&multifd_recv_state->sem_sync);
1371 g_free(multifd_recv_state->params);
1372 multifd_recv_state->params = NULL;
1373 g_free(multifd_recv_state->data);
1374 multifd_recv_state->data = NULL;
1375 g_free(multifd_recv_state);
1376 multifd_recv_state = NULL;
1379 void multifd_recv_cleanup(void)
1381 int i;
1383 if (!migrate_multifd()) {
1384 return;
1386 multifd_recv_terminate_threads(NULL);
1387 for (i = 0; i < migrate_multifd_channels(); i++) {
1388 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1390 if (p->thread_created) {
1391 qemu_thread_join(&p->thread);
1394 for (i = 0; i < migrate_multifd_channels(); i++) {
1395 multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
1397 multifd_recv_cleanup_state();
1400 void multifd_recv_sync_main(void)
1402 int thread_count = migrate_multifd_channels();
1403 bool file_based = !multifd_use_packets();
1404 int i;
1406 if (!migrate_multifd()) {
1407 return;
1411 * File-based channels don't use packets and therefore need to
1412 * wait for more work. Release them to start the sync.
1414 if (file_based) {
1415 for (i = 0; i < thread_count; i++) {
1416 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1418 trace_multifd_recv_sync_main_signal(p->id);
1419 qemu_sem_post(&p->sem);
1424 * Initiate the synchronization by waiting for all channels.
1426 * For socket-based migration this means each channel has received
1427 * the SYNC packet on the stream.
1429 * For file-based migration this means each channel is done with
1430 * the work (pending_job=false).
1432 for (i = 0; i < thread_count; i++) {
1433 trace_multifd_recv_sync_main_wait(i);
1434 qemu_sem_wait(&multifd_recv_state->sem_sync);
1437 if (file_based) {
1439 * For file-based loading is done in one iteration. We're
1440 * done.
1442 return;
1446 * Sync done. Release the channels for the next iteration.
1448 for (i = 0; i < thread_count; i++) {
1449 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1451 WITH_QEMU_LOCK_GUARD(&p->mutex) {
1452 if (multifd_recv_state->packet_num < p->packet_num) {
1453 multifd_recv_state->packet_num = p->packet_num;
1456 trace_multifd_recv_sync_main_signal(p->id);
1457 qemu_sem_post(&p->sem_sync);
1459 trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
1462 static void *multifd_recv_thread(void *opaque)
1464 MultiFDRecvParams *p = opaque;
1465 Error *local_err = NULL;
1466 bool use_packets = multifd_use_packets();
1467 int ret;
1469 trace_multifd_recv_thread_start(p->id);
1470 rcu_register_thread();
1472 while (true) {
1473 uint32_t flags = 0;
1474 bool has_data = false;
1475 p->normal_num = 0;
1477 if (use_packets) {
1478 if (multifd_recv_should_exit()) {
1479 break;
1482 ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
1483 p->packet_len, &local_err);
1484 if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
1485 break;
1488 qemu_mutex_lock(&p->mutex);
1489 ret = multifd_recv_unfill_packet(p, &local_err);
1490 if (ret) {
1491 qemu_mutex_unlock(&p->mutex);
1492 break;
1495 flags = p->flags;
1496 /* recv methods don't know how to handle the SYNC flag */
1497 p->flags &= ~MULTIFD_FLAG_SYNC;
1498 has_data = p->normal_num || p->zero_num;
1499 qemu_mutex_unlock(&p->mutex);
1500 } else {
1502 * No packets, so we need to wait for the vmstate code to
1503 * give us work.
1505 qemu_sem_wait(&p->sem);
1507 if (multifd_recv_should_exit()) {
1508 break;
1511 /* pairs with qatomic_store_release() at multifd_recv() */
1512 if (!qatomic_load_acquire(&p->pending_job)) {
1514 * Migration thread did not send work, this is
1515 * equivalent to pending_sync on the sending
1516 * side. Post sem_sync to notify we reached this
1517 * point.
1519 qemu_sem_post(&multifd_recv_state->sem_sync);
1520 continue;
1523 has_data = !!p->data->size;
1526 if (has_data) {
1527 ret = multifd_recv_state->ops->recv(p, &local_err);
1528 if (ret != 0) {
1529 break;
1533 if (use_packets) {
1534 if (flags & MULTIFD_FLAG_SYNC) {
1535 qemu_sem_post(&multifd_recv_state->sem_sync);
1536 qemu_sem_wait(&p->sem_sync);
1538 } else {
1539 p->total_normal_pages += p->data->size / qemu_target_page_size();
1540 p->data->size = 0;
1542 * Order data->size update before clearing
1543 * pending_job. Pairs with smp_mb_acquire() at
1544 * multifd_recv().
1546 qatomic_store_release(&p->pending_job, false);
1550 if (local_err) {
1551 multifd_recv_terminate_threads(local_err);
1552 error_free(local_err);
1555 rcu_unregister_thread();
1556 trace_multifd_recv_thread_end(p->id, p->packets_recved,
1557 p->total_normal_pages,
1558 p->total_zero_pages);
1560 return NULL;
1563 int multifd_recv_setup(Error **errp)
1565 int thread_count;
1566 uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
1567 bool use_packets = multifd_use_packets();
1568 uint8_t i;
1571 * Return successfully if multiFD recv state is already initialised
1572 * or multiFD is not enabled.
1574 if (multifd_recv_state || !migrate_multifd()) {
1575 return 0;
1578 thread_count = migrate_multifd_channels();
1579 multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
1580 multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
1582 multifd_recv_state->data = g_new0(MultiFDRecvData, 1);
1583 multifd_recv_state->data->size = 0;
1585 qatomic_set(&multifd_recv_state->count, 0);
1586 qatomic_set(&multifd_recv_state->exiting, 0);
1587 qemu_sem_init(&multifd_recv_state->sem_sync, 0);
1588 multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()];
1590 for (i = 0; i < thread_count; i++) {
1591 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1593 qemu_mutex_init(&p->mutex);
1594 qemu_sem_init(&p->sem_sync, 0);
1595 qemu_sem_init(&p->sem, 0);
1596 p->pending_job = false;
1597 p->id = i;
1599 p->data = g_new0(MultiFDRecvData, 1);
1600 p->data->size = 0;
1602 if (use_packets) {
1603 p->packet_len = sizeof(MultiFDPacket_t)
1604 + sizeof(uint64_t) * page_count;
1605 p->packet = g_malloc0(p->packet_len);
1607 p->name = g_strdup_printf("mig/dst/recv_%d", i);
1608 p->normal = g_new0(ram_addr_t, page_count);
1609 p->zero = g_new0(ram_addr_t, page_count);
1610 p->page_count = page_count;
1611 p->page_size = qemu_target_page_size();
1614 for (i = 0; i < thread_count; i++) {
1615 MultiFDRecvParams *p = &multifd_recv_state->params[i];
1616 int ret;
1618 ret = multifd_recv_state->ops->recv_setup(p, errp);
1619 if (ret) {
1620 return ret;
1623 return 0;
1626 bool multifd_recv_all_channels_created(void)
1628 int thread_count = migrate_multifd_channels();
1630 if (!migrate_multifd()) {
1631 return true;
1634 if (!multifd_recv_state) {
1635 /* Called before any connections created */
1636 return false;
1639 return thread_count == qatomic_read(&multifd_recv_state->count);
1643 * Try to receive all multifd channels to get ready for the migration.
1644 * Sets @errp when failing to receive the current channel.
1646 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
1648 MultiFDRecvParams *p;
1649 Error *local_err = NULL;
1650 bool use_packets = multifd_use_packets();
1651 int id;
1653 if (use_packets) {
1654 id = multifd_recv_initial_packet(ioc, &local_err);
1655 if (id < 0) {
1656 multifd_recv_terminate_threads(local_err);
1657 error_propagate_prepend(errp, local_err,
1658 "failed to receive packet"
1659 " via multifd channel %d: ",
1660 qatomic_read(&multifd_recv_state->count));
1661 return;
1663 trace_multifd_recv_new_channel(id);
1664 } else {
1665 id = qatomic_read(&multifd_recv_state->count);
1668 p = &multifd_recv_state->params[id];
1669 if (p->c != NULL) {
1670 error_setg(&local_err, "multifd: received id '%d' already setup'",
1671 id);
1672 multifd_recv_terminate_threads(local_err);
1673 error_propagate(errp, local_err);
1674 return;
1676 p->c = ioc;
1677 object_ref(OBJECT(ioc));
1679 p->thread_created = true;
1680 qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
1681 QEMU_THREAD_JOINABLE);
1682 qatomic_inc(&multifd_recv_state->count);
1685 bool multifd_send_prepare_common(MultiFDSendParams *p)
1687 multifd_send_zero_page_detect(p);
1689 if (!p->pages->normal_num) {
1690 p->next_packet_size = 0;
1691 return false;
1694 multifd_send_prepare_header(p);
1696 return true;