4 * Copyright (c) 2019-2020 Red Hat Inc
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
15 #include "exec/target_page.h"
16 #include "sysemu/sysemu.h"
17 #include "exec/ramblock.h"
18 #include "qemu/error-report.h"
19 #include "qapi/error.h"
22 #include "migration.h"
23 #include "migration-stats.h"
26 #include "qemu-file.h"
29 #include "threadinfo.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
44 unsigned char uuid
[16]; /* QemuUUID */
46 uint8_t unused1
[7]; /* Reserved for future use */
47 uint64_t unused2
[4]; /* Reserved for future use */
48 } __attribute__((packed
)) MultiFDInit_t
;
51 MultiFDSendParams
*params
;
52 /* array of pages to sent */
53 MultiFDPages_t
*pages
;
55 * Global number of generated multifd packets.
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
68 * Synchronization point past which no more channels will be
71 QemuSemaphore channels_created
;
72 /* send channels ready */
73 QemuSemaphore channels_ready
;
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
82 } *multifd_send_state
;
85 MultiFDRecvParams
*params
;
86 MultiFDRecvData
*data
;
87 /* number of created threads */
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
93 QemuSemaphore sem_sync
;
94 /* global number of generated multifd packets */
99 } *multifd_recv_state
;
101 static bool multifd_use_packets(void)
103 return !migrate_mapped_ram();
106 void multifd_send_channel_created(void)
108 qemu_sem_post(&multifd_send_state
->channels_created
);
111 static void multifd_set_file_bitmap(MultiFDSendParams
*p
)
113 MultiFDPages_t
*pages
= p
->pages
;
115 assert(pages
->block
);
117 for (int i
= 0; i
< p
->pages
->num
; i
++) {
118 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
]);
122 /* Multifd without compression */
125 * nocomp_send_setup: setup send side
127 * @p: Params for the channel that we are using
128 * @errp: pointer to an error
130 static int nocomp_send_setup(MultiFDSendParams
*p
, Error
**errp
)
132 if (migrate_zero_copy_send()) {
133 p
->write_flags
|= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
;
140 * nocomp_send_cleanup: cleanup send side
142 * For no compression this function does nothing.
144 * @p: Params for the channel that we are using
145 * @errp: pointer to an error
147 static void nocomp_send_cleanup(MultiFDSendParams
*p
, Error
**errp
)
152 static void multifd_send_prepare_iovs(MultiFDSendParams
*p
)
154 MultiFDPages_t
*pages
= p
->pages
;
156 for (int i
= 0; i
< pages
->num
; i
++) {
157 p
->iov
[p
->iovs_num
].iov_base
= pages
->block
->host
+ pages
->offset
[i
];
158 p
->iov
[p
->iovs_num
].iov_len
= p
->page_size
;
162 p
->next_packet_size
= pages
->num
* p
->page_size
;
166 * nocomp_send_prepare: prepare date to be able to send
168 * For no compression we just have to calculate the size of the
171 * Returns 0 for success or -1 for error
173 * @p: Params for the channel that we are using
174 * @errp: pointer to an error
176 static int nocomp_send_prepare(MultiFDSendParams
*p
, Error
**errp
)
178 bool use_zero_copy_send
= migrate_zero_copy_send();
181 if (!multifd_use_packets()) {
182 multifd_send_prepare_iovs(p
);
183 multifd_set_file_bitmap(p
);
188 if (!use_zero_copy_send
) {
190 * Only !zerocopy needs the header in IOV; zerocopy will
191 * send it separately.
193 multifd_send_prepare_header(p
);
196 multifd_send_prepare_iovs(p
);
197 p
->flags
|= MULTIFD_FLAG_NOCOMP
;
199 multifd_send_fill_packet(p
);
201 if (use_zero_copy_send
) {
202 /* Send header first, without zerocopy */
203 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
204 p
->packet_len
, errp
);
214 * nocomp_recv_setup: setup receive side
216 * For no compression this function does nothing.
218 * Returns 0 for success or -1 for error
220 * @p: Params for the channel that we are using
221 * @errp: pointer to an error
223 static int nocomp_recv_setup(MultiFDRecvParams
*p
, Error
**errp
)
229 * nocomp_recv_cleanup: setup receive side
231 * For no compression this function does nothing.
233 * @p: Params for the channel that we are using
235 static void nocomp_recv_cleanup(MultiFDRecvParams
*p
)
240 * nocomp_recv: read the data from the channel
242 * For no compression we just need to read things into the correct place.
244 * Returns 0 for success or -1 for error
246 * @p: Params for the channel that we are using
247 * @errp: pointer to an error
249 static int nocomp_recv(MultiFDRecvParams
*p
, Error
**errp
)
253 if (!multifd_use_packets()) {
254 return multifd_file_recv_data(p
, errp
);
257 flags
= p
->flags
& MULTIFD_FLAG_COMPRESSION_MASK
;
259 if (flags
!= MULTIFD_FLAG_NOCOMP
) {
260 error_setg(errp
, "multifd %u: flags received %x flags expected %x",
261 p
->id
, flags
, MULTIFD_FLAG_NOCOMP
);
264 for (int i
= 0; i
< p
->normal_num
; i
++) {
265 p
->iov
[i
].iov_base
= p
->host
+ p
->normal
[i
];
266 p
->iov
[i
].iov_len
= p
->page_size
;
268 return qio_channel_readv_all(p
->c
, p
->iov
, p
->normal_num
, errp
);
271 static MultiFDMethods multifd_nocomp_ops
= {
272 .send_setup
= nocomp_send_setup
,
273 .send_cleanup
= nocomp_send_cleanup
,
274 .send_prepare
= nocomp_send_prepare
,
275 .recv_setup
= nocomp_recv_setup
,
276 .recv_cleanup
= nocomp_recv_cleanup
,
280 static MultiFDMethods
*multifd_ops
[MULTIFD_COMPRESSION__MAX
] = {
281 [MULTIFD_COMPRESSION_NONE
] = &multifd_nocomp_ops
,
284 void multifd_register_ops(int method
, MultiFDMethods
*ops
)
286 assert(0 < method
&& method
< MULTIFD_COMPRESSION__MAX
);
287 multifd_ops
[method
] = ops
;
290 /* Reset a MultiFDPages_t* object for the next use */
291 static void multifd_pages_reset(MultiFDPages_t
*pages
)
294 * We don't need to touch offset[] array, because it will be
295 * overwritten later when reused.
301 static int multifd_send_initial_packet(MultiFDSendParams
*p
, Error
**errp
)
303 MultiFDInit_t msg
= {};
304 size_t size
= sizeof(msg
);
307 msg
.magic
= cpu_to_be32(MULTIFD_MAGIC
);
308 msg
.version
= cpu_to_be32(MULTIFD_VERSION
);
310 memcpy(msg
.uuid
, &qemu_uuid
.data
, sizeof(msg
.uuid
));
312 ret
= qio_channel_write_all(p
->c
, (char *)&msg
, size
, errp
);
316 stat64_add(&mig_stats
.multifd_bytes
, size
);
320 static int multifd_recv_initial_packet(QIOChannel
*c
, Error
**errp
)
325 ret
= qio_channel_read_all(c
, (char *)&msg
, sizeof(msg
), errp
);
330 msg
.magic
= be32_to_cpu(msg
.magic
);
331 msg
.version
= be32_to_cpu(msg
.version
);
333 if (msg
.magic
!= MULTIFD_MAGIC
) {
334 error_setg(errp
, "multifd: received packet magic %x "
335 "expected %x", msg
.magic
, MULTIFD_MAGIC
);
339 if (msg
.version
!= MULTIFD_VERSION
) {
340 error_setg(errp
, "multifd: received packet version %u "
341 "expected %u", msg
.version
, MULTIFD_VERSION
);
345 if (memcmp(msg
.uuid
, &qemu_uuid
, sizeof(qemu_uuid
))) {
346 char *uuid
= qemu_uuid_unparse_strdup(&qemu_uuid
);
347 char *msg_uuid
= qemu_uuid_unparse_strdup((const QemuUUID
*)msg
.uuid
);
349 error_setg(errp
, "multifd: received uuid '%s' and expected "
350 "uuid '%s' for channel %hhd", msg_uuid
, uuid
, msg
.id
);
356 if (msg
.id
> migrate_multifd_channels()) {
357 error_setg(errp
, "multifd: received channel id %u is greater than "
358 "number of channels %u", msg
.id
, migrate_multifd_channels());
365 static MultiFDPages_t
*multifd_pages_init(uint32_t n
)
367 MultiFDPages_t
*pages
= g_new0(MultiFDPages_t
, 1);
369 pages
->allocated
= n
;
370 pages
->offset
= g_new0(ram_addr_t
, n
);
375 static void multifd_pages_clear(MultiFDPages_t
*pages
)
377 multifd_pages_reset(pages
);
378 pages
->allocated
= 0;
379 g_free(pages
->offset
);
380 pages
->offset
= NULL
;
384 void multifd_send_fill_packet(MultiFDSendParams
*p
)
386 MultiFDPacket_t
*packet
= p
->packet
;
387 MultiFDPages_t
*pages
= p
->pages
;
391 packet
->flags
= cpu_to_be32(p
->flags
);
392 packet
->pages_alloc
= cpu_to_be32(p
->pages
->allocated
);
393 packet
->normal_pages
= cpu_to_be32(pages
->num
);
394 packet
->next_packet_size
= cpu_to_be32(p
->next_packet_size
);
396 packet_num
= qatomic_fetch_inc(&multifd_send_state
->packet_num
);
397 packet
->packet_num
= cpu_to_be64(packet_num
);
400 strncpy(packet
->ramblock
, pages
->block
->idstr
, 256);
403 for (i
= 0; i
< pages
->num
; i
++) {
404 /* there are architectures where ram_addr_t is 32 bit */
405 uint64_t temp
= pages
->offset
[i
];
407 packet
->offset
[i
] = cpu_to_be64(temp
);
411 p
->total_normal_pages
+= pages
->num
;
413 trace_multifd_send(p
->id
, packet_num
, pages
->num
, p
->flags
,
414 p
->next_packet_size
);
417 static int multifd_recv_unfill_packet(MultiFDRecvParams
*p
, Error
**errp
)
419 MultiFDPacket_t
*packet
= p
->packet
;
422 packet
->magic
= be32_to_cpu(packet
->magic
);
423 if (packet
->magic
!= MULTIFD_MAGIC
) {
424 error_setg(errp
, "multifd: received packet "
425 "magic %x and expected magic %x",
426 packet
->magic
, MULTIFD_MAGIC
);
430 packet
->version
= be32_to_cpu(packet
->version
);
431 if (packet
->version
!= MULTIFD_VERSION
) {
432 error_setg(errp
, "multifd: received packet "
433 "version %u and expected version %u",
434 packet
->version
, MULTIFD_VERSION
);
438 p
->flags
= be32_to_cpu(packet
->flags
);
440 packet
->pages_alloc
= be32_to_cpu(packet
->pages_alloc
);
442 * If we received a packet that is 100 times bigger than expected
443 * just stop migration. It is a magic number.
445 if (packet
->pages_alloc
> p
->page_count
) {
446 error_setg(errp
, "multifd: received packet "
447 "with size %u and expected a size of %u",
448 packet
->pages_alloc
, p
->page_count
) ;
452 p
->normal_num
= be32_to_cpu(packet
->normal_pages
);
453 if (p
->normal_num
> packet
->pages_alloc
) {
454 error_setg(errp
, "multifd: received packet "
455 "with %u pages and expected maximum pages are %u",
456 p
->normal_num
, packet
->pages_alloc
) ;
460 p
->next_packet_size
= be32_to_cpu(packet
->next_packet_size
);
461 p
->packet_num
= be64_to_cpu(packet
->packet_num
);
463 p
->total_normal_pages
+= p
->normal_num
;
465 trace_multifd_recv(p
->id
, p
->packet_num
, p
->normal_num
, p
->flags
,
466 p
->next_packet_size
);
468 if (p
->normal_num
== 0) {
472 /* make sure that ramblock is 0 terminated */
473 packet
->ramblock
[255] = 0;
474 p
->block
= qemu_ram_block_by_name(packet
->ramblock
);
476 error_setg(errp
, "multifd: unknown ram block %s",
481 p
->host
= p
->block
->host
;
482 for (i
= 0; i
< p
->normal_num
; i
++) {
483 uint64_t offset
= be64_to_cpu(packet
->offset
[i
]);
485 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
486 error_setg(errp
, "multifd: offset too long %" PRIu64
487 " (max " RAM_ADDR_FMT
")",
488 offset
, p
->block
->used_length
);
491 p
->normal
[i
] = offset
;
497 static bool multifd_send_should_exit(void)
499 return qatomic_read(&multifd_send_state
->exiting
);
502 static bool multifd_recv_should_exit(void)
504 return qatomic_read(&multifd_recv_state
->exiting
);
508 * The migration thread can wait on either of the two semaphores. This
509 * function can be used to kick the main thread out of waiting on either of
510 * them. Should mostly only be called when something wrong happened with
511 * the current multifd send thread.
513 static void multifd_send_kick_main(MultiFDSendParams
*p
)
515 qemu_sem_post(&p
->sem_sync
);
516 qemu_sem_post(&multifd_send_state
->channels_ready
);
520 * How we use multifd_send_state->pages and channel->pages?
522 * We create a pages for each channel, and a main one. Each time that
523 * we need to send a batch of pages we interchange the ones between
524 * multifd_send_state and the channel that is sending it. There are
525 * two reasons for that:
526 * - to not have to do so many mallocs during migration
527 * - to make easier to know what to free at the end of migration
529 * This way we always know who is the owner of each "pages" struct,
530 * and we don't need any locking. It belongs to the migration thread
531 * or to the channel thread. Switching is safe because the migration
532 * thread is using the channel mutex when changing it, and the channel
533 * have to had finish with its own, otherwise pending_job can't be
536 * Returns true if succeed, false otherwise.
538 static bool multifd_send_pages(void)
541 static int next_channel
;
542 MultiFDSendParams
*p
= NULL
; /* make happy gcc */
543 MultiFDPages_t
*pages
= multifd_send_state
->pages
;
545 if (multifd_send_should_exit()) {
549 /* We wait here, until at least one channel is ready */
550 qemu_sem_wait(&multifd_send_state
->channels_ready
);
553 * next_channel can remain from a previous migration that was
554 * using more channels, so ensure it doesn't overflow if the
555 * limit is lower now.
557 next_channel
%= migrate_multifd_channels();
558 for (i
= next_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
559 if (multifd_send_should_exit()) {
562 p
= &multifd_send_state
->params
[i
];
564 * Lockless read to p->pending_job is safe, because only multifd
565 * sender thread can clear it.
567 if (qatomic_read(&p
->pending_job
) == false) {
568 next_channel
= (i
+ 1) % migrate_multifd_channels();
574 * Make sure we read p->pending_job before all the rest. Pairs with
575 * qatomic_store_release() in multifd_send_thread().
578 assert(!p
->pages
->num
);
579 multifd_send_state
->pages
= p
->pages
;
582 * Making sure p->pages is setup before marking pending_job=true. Pairs
583 * with the qatomic_load_acquire() in multifd_send_thread().
585 qatomic_store_release(&p
->pending_job
, true);
586 qemu_sem_post(&p
->sem
);
591 static inline bool multifd_queue_empty(MultiFDPages_t
*pages
)
593 return pages
->num
== 0;
596 static inline bool multifd_queue_full(MultiFDPages_t
*pages
)
598 return pages
->num
== pages
->allocated
;
601 static inline void multifd_enqueue(MultiFDPages_t
*pages
, ram_addr_t offset
)
603 pages
->offset
[pages
->num
++] = offset
;
606 /* Returns true if enqueue successful, false otherwise */
607 bool multifd_queue_page(RAMBlock
*block
, ram_addr_t offset
)
609 MultiFDPages_t
*pages
;
612 pages
= multifd_send_state
->pages
;
614 /* If the queue is empty, we can already enqueue now */
615 if (multifd_queue_empty(pages
)) {
616 pages
->block
= block
;
617 multifd_enqueue(pages
, offset
);
622 * Not empty, meanwhile we need a flush. It can because of either:
624 * (1) The page is not on the same ramblock of previous ones, or,
625 * (2) The queue is full.
627 * After flush, always retry.
629 if (pages
->block
!= block
|| multifd_queue_full(pages
)) {
630 if (!multifd_send_pages()) {
636 /* Not empty, and we still have space, do it! */
637 multifd_enqueue(pages
, offset
);
641 /* Multifd send side hit an error; remember it and prepare to quit */
642 static void multifd_send_set_error(Error
*err
)
645 * We don't want to exit each threads twice. Depending on where
646 * we get the error, or if there are two independent errors in two
647 * threads at the same time, we can end calling this function
650 if (qatomic_xchg(&multifd_send_state
->exiting
, 1)) {
655 MigrationState
*s
= migrate_get_current();
656 migrate_set_error(s
, err
);
657 if (s
->state
== MIGRATION_STATUS_SETUP
||
658 s
->state
== MIGRATION_STATUS_PRE_SWITCHOVER
||
659 s
->state
== MIGRATION_STATUS_DEVICE
||
660 s
->state
== MIGRATION_STATUS_ACTIVE
) {
661 migrate_set_state(&s
->state
, s
->state
,
662 MIGRATION_STATUS_FAILED
);
667 static void multifd_send_terminate_threads(void)
671 trace_multifd_send_terminate_threads();
674 * Tell everyone we're quitting. No xchg() needed here; we simply
677 qatomic_set(&multifd_send_state
->exiting
, 1);
680 * Firstly, kick all threads out; no matter whether they are just idle,
681 * or blocked in an IO system call.
683 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
684 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
686 qemu_sem_post(&p
->sem
);
688 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
693 * Finally recycle all the threads.
695 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
696 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
698 if (p
->tls_thread_created
) {
699 qemu_thread_join(&p
->tls_thread
);
702 if (p
->thread_created
) {
703 qemu_thread_join(&p
->thread
);
708 static bool multifd_send_cleanup_channel(MultiFDSendParams
*p
, Error
**errp
)
711 migration_ioc_unregister_yank(p
->c
);
713 * An explicit close() on the channel here is normally not
714 * required, but can be helpful for "file:" iochannels, where it
715 * will include fdatasync() to make sure the data is flushed to the
718 * The object_unref() cannot guarantee that because: (1) finalize()
719 * of the iochannel is only triggered on the last reference, and
720 * it's not guaranteed that we always hold the last refcount when
721 * reaching here, and, (2) even if finalize() is invoked, it only
722 * does a close(fd) without data flush.
724 qio_channel_close(p
->c
, &error_abort
);
725 object_unref(OBJECT(p
->c
));
728 qemu_sem_destroy(&p
->sem
);
729 qemu_sem_destroy(&p
->sem_sync
);
732 multifd_pages_clear(p
->pages
);
739 multifd_send_state
->ops
->send_cleanup(p
, errp
);
741 return *errp
== NULL
;
744 static void multifd_send_cleanup_state(void)
746 file_cleanup_outgoing_migration();
747 fd_cleanup_outgoing_migration();
748 socket_cleanup_outgoing_migration();
749 qemu_sem_destroy(&multifd_send_state
->channels_created
);
750 qemu_sem_destroy(&multifd_send_state
->channels_ready
);
751 g_free(multifd_send_state
->params
);
752 multifd_send_state
->params
= NULL
;
753 multifd_pages_clear(multifd_send_state
->pages
);
754 multifd_send_state
->pages
= NULL
;
755 g_free(multifd_send_state
);
756 multifd_send_state
= NULL
;
759 void multifd_send_shutdown(void)
763 if (!migrate_multifd()) {
767 multifd_send_terminate_threads();
769 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
770 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
771 Error
*local_err
= NULL
;
773 if (!multifd_send_cleanup_channel(p
, &local_err
)) {
774 migrate_set_error(migrate_get_current(), local_err
);
775 error_free(local_err
);
779 multifd_send_cleanup_state();
782 static int multifd_zero_copy_flush(QIOChannel
*c
)
787 ret
= qio_channel_flush(c
, &err
);
789 error_report_err(err
);
793 stat64_add(&mig_stats
.dirty_sync_missed_zero_copy
, 1);
799 int multifd_send_sync_main(void)
802 bool flush_zero_copy
;
804 if (!migrate_multifd()) {
807 if (multifd_send_state
->pages
->num
) {
808 if (!multifd_send_pages()) {
809 error_report("%s: multifd_send_pages fail", __func__
);
814 flush_zero_copy
= migrate_zero_copy_send();
816 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
817 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
819 if (multifd_send_should_exit()) {
823 trace_multifd_send_sync_main_signal(p
->id
);
826 * We should be the only user so far, so not possible to be set by
827 * others concurrently.
829 assert(qatomic_read(&p
->pending_sync
) == false);
830 qatomic_set(&p
->pending_sync
, true);
831 qemu_sem_post(&p
->sem
);
833 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
834 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
836 if (multifd_send_should_exit()) {
840 qemu_sem_wait(&multifd_send_state
->channels_ready
);
841 trace_multifd_send_sync_main_wait(p
->id
);
842 qemu_sem_wait(&p
->sem_sync
);
844 if (flush_zero_copy
&& p
->c
&& (multifd_zero_copy_flush(p
->c
) < 0)) {
848 trace_multifd_send_sync_main(multifd_send_state
->packet_num
);
853 static void *multifd_send_thread(void *opaque
)
855 MultiFDSendParams
*p
= opaque
;
856 MigrationThread
*thread
= NULL
;
857 Error
*local_err
= NULL
;
859 bool use_packets
= multifd_use_packets();
861 thread
= migration_threads_add(p
->name
, qemu_get_thread_id());
863 trace_multifd_send_thread_start(p
->id
);
864 rcu_register_thread();
867 if (multifd_send_initial_packet(p
, &local_err
) < 0) {
874 qemu_sem_post(&multifd_send_state
->channels_ready
);
875 qemu_sem_wait(&p
->sem
);
877 if (multifd_send_should_exit()) {
882 * Read pending_job flag before p->pages. Pairs with the
883 * qatomic_store_release() in multifd_send_pages().
885 if (qatomic_load_acquire(&p
->pending_job
)) {
886 MultiFDPages_t
*pages
= p
->pages
;
891 ret
= multifd_send_state
->ops
->send_prepare(p
, &local_err
);
896 if (migrate_mapped_ram()) {
897 ret
= file_write_ramblock_iov(p
->c
, p
->iov
, p
->iovs_num
,
898 p
->pages
->block
, &local_err
);
900 ret
= qio_channel_writev_full_all(p
->c
, p
->iov
, p
->iovs_num
,
901 NULL
, 0, p
->write_flags
,
909 stat64_add(&mig_stats
.multifd_bytes
,
910 p
->next_packet_size
+ p
->packet_len
);
912 multifd_pages_reset(p
->pages
);
913 p
->next_packet_size
= 0;
916 * Making sure p->pages is published before saying "we're
917 * free". Pairs with the smp_mb_acquire() in
918 * multifd_send_pages().
920 qatomic_store_release(&p
->pending_job
, false);
923 * If not a normal job, must be a sync request. Note that
924 * pending_sync is a standalone flag (unlike pending_job), so
925 * it doesn't require explicit memory barriers.
927 assert(qatomic_read(&p
->pending_sync
));
930 p
->flags
= MULTIFD_FLAG_SYNC
;
931 multifd_send_fill_packet(p
);
932 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
933 p
->packet_len
, &local_err
);
937 /* p->next_packet_size will always be zero for a SYNC packet */
938 stat64_add(&mig_stats
.multifd_bytes
, p
->packet_len
);
942 qatomic_set(&p
->pending_sync
, false);
943 qemu_sem_post(&p
->sem_sync
);
950 trace_multifd_send_error(p
->id
);
951 multifd_send_set_error(local_err
);
952 multifd_send_kick_main(p
);
953 error_free(local_err
);
956 rcu_unregister_thread();
957 migration_threads_remove(thread
);
958 trace_multifd_send_thread_end(p
->id
, p
->packets_sent
, p
->total_normal_pages
);
963 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
);
966 MultiFDSendParams
*p
;
968 } MultiFDTLSThreadArgs
;
970 static void *multifd_tls_handshake_thread(void *opaque
)
972 MultiFDTLSThreadArgs
*args
= opaque
;
974 qio_channel_tls_handshake(args
->tioc
,
975 multifd_new_send_channel_async
,
984 static bool multifd_tls_channel_connect(MultiFDSendParams
*p
,
988 MigrationState
*s
= migrate_get_current();
989 const char *hostname
= s
->hostname
;
990 MultiFDTLSThreadArgs
*args
;
993 tioc
= migration_tls_client_create(ioc
, hostname
, errp
);
999 * Ownership of the socket channel now transfers to the newly
1000 * created TLS channel, which has already taken a reference.
1002 object_unref(OBJECT(ioc
));
1003 trace_multifd_tls_outgoing_handshake_start(ioc
, tioc
, hostname
);
1004 qio_channel_set_name(QIO_CHANNEL(tioc
), "multifd-tls-outgoing");
1006 args
= g_new0(MultiFDTLSThreadArgs
, 1);
1010 p
->tls_thread_created
= true;
1011 qemu_thread_create(&p
->tls_thread
, "multifd-tls-handshake-worker",
1012 multifd_tls_handshake_thread
, args
,
1013 QEMU_THREAD_JOINABLE
);
1017 void multifd_channel_connect(MultiFDSendParams
*p
, QIOChannel
*ioc
)
1019 qio_channel_set_delay(ioc
, false);
1021 migration_ioc_register_yank(ioc
);
1022 /* Setup p->c only if the channel is completely setup */
1025 p
->thread_created
= true;
1026 qemu_thread_create(&p
->thread
, p
->name
, multifd_send_thread
, p
,
1027 QEMU_THREAD_JOINABLE
);
1031 * When TLS is enabled this function is called once to establish the
1032 * TLS connection and a second time after the TLS handshake to create
1033 * the multifd channel. Without TLS it goes straight into the channel
1036 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
)
1038 MultiFDSendParams
*p
= opaque
;
1039 QIOChannel
*ioc
= QIO_CHANNEL(qio_task_get_source(task
));
1040 Error
*local_err
= NULL
;
1043 trace_multifd_new_send_channel_async(p
->id
);
1045 if (qio_task_propagate_error(task
, &local_err
)) {
1050 trace_multifd_set_outgoing_channel(ioc
, object_get_typename(OBJECT(ioc
)),
1051 migrate_get_current()->hostname
);
1053 if (migrate_channel_requires_tls_upgrade(ioc
)) {
1054 ret
= multifd_tls_channel_connect(p
, ioc
, &local_err
);
1059 multifd_channel_connect(p
, ioc
);
1065 * Here we're not interested whether creation succeeded, only that
1066 * it happened at all.
1068 multifd_send_channel_created();
1074 trace_multifd_new_send_channel_async_error(p
->id
, local_err
);
1075 multifd_send_set_error(local_err
);
1077 * For error cases (TLS or non-TLS), IO channel is always freed here
1078 * rather than when cleanup multifd: since p->c is not set, multifd
1079 * cleanup code doesn't even know its existence.
1081 object_unref(OBJECT(ioc
));
1082 error_free(local_err
);
1085 static bool multifd_new_send_channel_create(gpointer opaque
, Error
**errp
)
1087 if (!multifd_use_packets()) {
1088 return file_send_channel_create(opaque
, errp
);
1091 socket_send_channel_create(multifd_new_send_channel_async
, opaque
);
1095 bool multifd_send_setup(void)
1097 MigrationState
*s
= migrate_get_current();
1098 Error
*local_err
= NULL
;
1099 int thread_count
, ret
= 0;
1100 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1101 bool use_packets
= multifd_use_packets();
1104 if (!migrate_multifd()) {
1108 thread_count
= migrate_multifd_channels();
1109 multifd_send_state
= g_malloc0(sizeof(*multifd_send_state
));
1110 multifd_send_state
->params
= g_new0(MultiFDSendParams
, thread_count
);
1111 multifd_send_state
->pages
= multifd_pages_init(page_count
);
1112 qemu_sem_init(&multifd_send_state
->channels_created
, 0);
1113 qemu_sem_init(&multifd_send_state
->channels_ready
, 0);
1114 qatomic_set(&multifd_send_state
->exiting
, 0);
1115 multifd_send_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1117 for (i
= 0; i
< thread_count
; i
++) {
1118 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1120 qemu_sem_init(&p
->sem
, 0);
1121 qemu_sem_init(&p
->sem_sync
, 0);
1123 p
->pages
= multifd_pages_init(page_count
);
1126 p
->packet_len
= sizeof(MultiFDPacket_t
)
1127 + sizeof(uint64_t) * page_count
;
1128 p
->packet
= g_malloc0(p
->packet_len
);
1129 p
->packet
->magic
= cpu_to_be32(MULTIFD_MAGIC
);
1130 p
->packet
->version
= cpu_to_be32(MULTIFD_VERSION
);
1132 /* We need one extra place for the packet header */
1133 p
->iov
= g_new0(struct iovec
, page_count
+ 1);
1135 p
->iov
= g_new0(struct iovec
, page_count
);
1137 p
->name
= g_strdup_printf("multifdsend_%d", i
);
1138 p
->page_size
= qemu_target_page_size();
1139 p
->page_count
= page_count
;
1142 if (!multifd_new_send_channel_create(p
, &local_err
)) {
1148 * Wait until channel creation has started for all channels. The
1149 * creation can still fail, but no more channels will be created
1152 for (i
= 0; i
< thread_count
; i
++) {
1153 qemu_sem_wait(&multifd_send_state
->channels_created
);
1156 for (i
= 0; i
< thread_count
; i
++) {
1157 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1159 ret
= multifd_send_state
->ops
->send_setup(p
, &local_err
);
1166 migrate_set_error(s
, local_err
);
1167 error_report_err(local_err
);
1168 migrate_set_state(&s
->state
, MIGRATION_STATUS_SETUP
,
1169 MIGRATION_STATUS_FAILED
);
1176 bool multifd_recv(void)
1179 static int next_recv_channel
;
1180 MultiFDRecvParams
*p
= NULL
;
1181 MultiFDRecvData
*data
= multifd_recv_state
->data
;
1184 * next_channel can remain from a previous migration that was
1185 * using more channels, so ensure it doesn't overflow if the
1186 * limit is lower now.
1188 next_recv_channel
%= migrate_multifd_channels();
1189 for (i
= next_recv_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
1190 if (multifd_recv_should_exit()) {
1194 p
= &multifd_recv_state
->params
[i
];
1196 if (qatomic_read(&p
->pending_job
) == false) {
1197 next_recv_channel
= (i
+ 1) % migrate_multifd_channels();
1203 * Order pending_job read before manipulating p->data below. Pairs
1204 * with qatomic_store_release() at multifd_recv_thread().
1208 assert(!p
->data
->size
);
1209 multifd_recv_state
->data
= p
->data
;
1213 * Order p->data update before setting pending_job. Pairs with
1214 * qatomic_load_acquire() at multifd_recv_thread().
1216 qatomic_store_release(&p
->pending_job
, true);
1217 qemu_sem_post(&p
->sem
);
1222 MultiFDRecvData
*multifd_get_recv_data(void)
1224 return multifd_recv_state
->data
;
1227 static void multifd_recv_terminate_threads(Error
*err
)
1231 trace_multifd_recv_terminate_threads(err
!= NULL
);
1233 if (qatomic_xchg(&multifd_recv_state
->exiting
, 1)) {
1238 MigrationState
*s
= migrate_get_current();
1239 migrate_set_error(s
, err
);
1240 if (s
->state
== MIGRATION_STATUS_SETUP
||
1241 s
->state
== MIGRATION_STATUS_ACTIVE
) {
1242 migrate_set_state(&s
->state
, s
->state
,
1243 MIGRATION_STATUS_FAILED
);
1247 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1248 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1251 * The migration thread and channels interact differently
1252 * depending on the presence of packets.
1254 if (multifd_use_packets()) {
1256 * The channel receives as long as there are packets. When
1257 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1258 * channel waits for the migration thread to sync. If the
1259 * sync never happens, do it here.
1261 qemu_sem_post(&p
->sem_sync
);
1264 * The channel waits for the migration thread to give it
1265 * work. When the migration thread runs out of work, it
1266 * releases the channel and waits for any pending work to
1267 * finish. If we reach here (e.g. due to error) before the
1268 * work runs out, release the channel.
1270 qemu_sem_post(&p
->sem
);
1274 * We could arrive here for two reasons:
1275 * - normal quit, i.e. everything went fine, just finished
1276 * - error quit: We close the channels so the channel threads
1277 * finish the qio_channel_read_all_eof()
1280 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
1285 void multifd_recv_shutdown(void)
1287 if (migrate_multifd()) {
1288 multifd_recv_terminate_threads(NULL
);
1292 static void multifd_recv_cleanup_channel(MultiFDRecvParams
*p
)
1294 migration_ioc_unregister_yank(p
->c
);
1295 object_unref(OBJECT(p
->c
));
1297 qemu_mutex_destroy(&p
->mutex
);
1298 qemu_sem_destroy(&p
->sem_sync
);
1299 qemu_sem_destroy(&p
->sem
);
1309 multifd_recv_state
->ops
->recv_cleanup(p
);
1312 static void multifd_recv_cleanup_state(void)
1314 qemu_sem_destroy(&multifd_recv_state
->sem_sync
);
1315 g_free(multifd_recv_state
->params
);
1316 multifd_recv_state
->params
= NULL
;
1317 g_free(multifd_recv_state
->data
);
1318 multifd_recv_state
->data
= NULL
;
1319 g_free(multifd_recv_state
);
1320 multifd_recv_state
= NULL
;
1323 void multifd_recv_cleanup(void)
1327 if (!migrate_multifd()) {
1330 multifd_recv_terminate_threads(NULL
);
1331 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1332 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1334 if (p
->thread_created
) {
1335 qemu_thread_join(&p
->thread
);
1338 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1339 multifd_recv_cleanup_channel(&multifd_recv_state
->params
[i
]);
1341 multifd_recv_cleanup_state();
1344 void multifd_recv_sync_main(void)
1346 int thread_count
= migrate_multifd_channels();
1347 bool file_based
= !multifd_use_packets();
1350 if (!migrate_multifd()) {
1355 * File-based channels don't use packets and therefore need to
1356 * wait for more work. Release them to start the sync.
1359 for (i
= 0; i
< thread_count
; i
++) {
1360 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1362 trace_multifd_recv_sync_main_signal(p
->id
);
1363 qemu_sem_post(&p
->sem
);
1368 * Initiate the synchronization by waiting for all channels.
1370 * For socket-based migration this means each channel has received
1371 * the SYNC packet on the stream.
1373 * For file-based migration this means each channel is done with
1374 * the work (pending_job=false).
1376 for (i
= 0; i
< thread_count
; i
++) {
1377 trace_multifd_recv_sync_main_wait(i
);
1378 qemu_sem_wait(&multifd_recv_state
->sem_sync
);
1383 * For file-based loading is done in one iteration. We're
1390 * Sync done. Release the channels for the next iteration.
1392 for (i
= 0; i
< thread_count
; i
++) {
1393 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1395 WITH_QEMU_LOCK_GUARD(&p
->mutex
) {
1396 if (multifd_recv_state
->packet_num
< p
->packet_num
) {
1397 multifd_recv_state
->packet_num
= p
->packet_num
;
1400 trace_multifd_recv_sync_main_signal(p
->id
);
1401 qemu_sem_post(&p
->sem_sync
);
1403 trace_multifd_recv_sync_main(multifd_recv_state
->packet_num
);
1406 static void *multifd_recv_thread(void *opaque
)
1408 MultiFDRecvParams
*p
= opaque
;
1409 Error
*local_err
= NULL
;
1410 bool use_packets
= multifd_use_packets();
1413 trace_multifd_recv_thread_start(p
->id
);
1414 rcu_register_thread();
1418 bool has_data
= false;
1422 if (multifd_recv_should_exit()) {
1426 ret
= qio_channel_read_all_eof(p
->c
, (void *)p
->packet
,
1427 p
->packet_len
, &local_err
);
1428 if (ret
== 0 || ret
== -1) { /* 0: EOF -1: Error */
1432 qemu_mutex_lock(&p
->mutex
);
1433 ret
= multifd_recv_unfill_packet(p
, &local_err
);
1435 qemu_mutex_unlock(&p
->mutex
);
1440 /* recv methods don't know how to handle the SYNC flag */
1441 p
->flags
&= ~MULTIFD_FLAG_SYNC
;
1442 has_data
= !!p
->normal_num
;
1443 qemu_mutex_unlock(&p
->mutex
);
1446 * No packets, so we need to wait for the vmstate code to
1449 qemu_sem_wait(&p
->sem
);
1451 if (multifd_recv_should_exit()) {
1455 /* pairs with qatomic_store_release() at multifd_recv() */
1456 if (!qatomic_load_acquire(&p
->pending_job
)) {
1458 * Migration thread did not send work, this is
1459 * equivalent to pending_sync on the sending
1460 * side. Post sem_sync to notify we reached this
1463 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1467 has_data
= !!p
->data
->size
;
1471 ret
= multifd_recv_state
->ops
->recv(p
, &local_err
);
1478 if (flags
& MULTIFD_FLAG_SYNC
) {
1479 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1480 qemu_sem_wait(&p
->sem_sync
);
1483 p
->total_normal_pages
+= p
->data
->size
/ qemu_target_page_size();
1486 * Order data->size update before clearing
1487 * pending_job. Pairs with smp_mb_acquire() at
1490 qatomic_store_release(&p
->pending_job
, false);
1495 multifd_recv_terminate_threads(local_err
);
1496 error_free(local_err
);
1499 rcu_unregister_thread();
1500 trace_multifd_recv_thread_end(p
->id
, p
->packets_recved
, p
->total_normal_pages
);
1505 int multifd_recv_setup(Error
**errp
)
1508 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1509 bool use_packets
= multifd_use_packets();
1513 * Return successfully if multiFD recv state is already initialised
1514 * or multiFD is not enabled.
1516 if (multifd_recv_state
|| !migrate_multifd()) {
1520 thread_count
= migrate_multifd_channels();
1521 multifd_recv_state
= g_malloc0(sizeof(*multifd_recv_state
));
1522 multifd_recv_state
->params
= g_new0(MultiFDRecvParams
, thread_count
);
1524 multifd_recv_state
->data
= g_new0(MultiFDRecvData
, 1);
1525 multifd_recv_state
->data
->size
= 0;
1527 qatomic_set(&multifd_recv_state
->count
, 0);
1528 qatomic_set(&multifd_recv_state
->exiting
, 0);
1529 qemu_sem_init(&multifd_recv_state
->sem_sync
, 0);
1530 multifd_recv_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1532 for (i
= 0; i
< thread_count
; i
++) {
1533 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1535 qemu_mutex_init(&p
->mutex
);
1536 qemu_sem_init(&p
->sem_sync
, 0);
1537 qemu_sem_init(&p
->sem
, 0);
1538 p
->pending_job
= false;
1541 p
->data
= g_new0(MultiFDRecvData
, 1);
1545 p
->packet_len
= sizeof(MultiFDPacket_t
)
1546 + sizeof(uint64_t) * page_count
;
1547 p
->packet
= g_malloc0(p
->packet_len
);
1549 p
->name
= g_strdup_printf("multifdrecv_%d", i
);
1550 p
->iov
= g_new0(struct iovec
, page_count
);
1551 p
->normal
= g_new0(ram_addr_t
, page_count
);
1552 p
->page_count
= page_count
;
1553 p
->page_size
= qemu_target_page_size();
1556 for (i
= 0; i
< thread_count
; i
++) {
1557 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1560 ret
= multifd_recv_state
->ops
->recv_setup(p
, errp
);
1568 bool multifd_recv_all_channels_created(void)
1570 int thread_count
= migrate_multifd_channels();
1572 if (!migrate_multifd()) {
1576 if (!multifd_recv_state
) {
1577 /* Called before any connections created */
1581 return thread_count
== qatomic_read(&multifd_recv_state
->count
);
1585 * Try to receive all multifd channels to get ready for the migration.
1586 * Sets @errp when failing to receive the current channel.
1588 void multifd_recv_new_channel(QIOChannel
*ioc
, Error
**errp
)
1590 MultiFDRecvParams
*p
;
1591 Error
*local_err
= NULL
;
1592 bool use_packets
= multifd_use_packets();
1596 id
= multifd_recv_initial_packet(ioc
, &local_err
);
1598 multifd_recv_terminate_threads(local_err
);
1599 error_propagate_prepend(errp
, local_err
,
1600 "failed to receive packet"
1601 " via multifd channel %d: ",
1602 qatomic_read(&multifd_recv_state
->count
));
1605 trace_multifd_recv_new_channel(id
);
1607 id
= qatomic_read(&multifd_recv_state
->count
);
1610 p
= &multifd_recv_state
->params
[id
];
1612 error_setg(&local_err
, "multifd: received id '%d' already setup'",
1614 multifd_recv_terminate_threads(local_err
);
1615 error_propagate(errp
, local_err
);
1619 object_ref(OBJECT(ioc
));
1621 p
->thread_created
= true;
1622 qemu_thread_create(&p
->thread
, p
->name
, multifd_recv_thread
, p
,
1623 QEMU_THREAD_JOINABLE
);
1624 qatomic_inc(&multifd_recv_state
->count
);