4 * Copyright (c) 2019-2020 Red Hat Inc
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
22 #include "migration.h"
23 #include "migration-stats.h"
26 #include "qemu-file.h"
29 #include "threadinfo.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
44 unsigned char uuid
[16]; /* QemuUUID */
46 uint8_t unused1
[7]; /* Reserved for future use */
47 uint64_t unused2
[4]; /* Reserved for future use */
48 } __attribute__((packed
)) MultiFDInit_t
;
51 MultiFDSendParams
*params
;
52 /* array of pages to sent */
53 MultiFDPages_t
*pages
;
55 * Global number of generated multifd packets.
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
68 * Synchronization point past which no more channels will be
71 QemuSemaphore channels_created
;
72 /* send channels ready */
73 QemuSemaphore channels_ready
;
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
82 } *multifd_send_state
;
85 MultiFDRecvParams
*params
;
86 MultiFDRecvData
*data
;
87 /* number of created threads */
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
93 QemuSemaphore sem_sync
;
94 /* global number of generated multifd packets */
99 } *multifd_recv_state
;
101 static bool multifd_use_packets(void)
103 return !migrate_mapped_ram();
106 void multifd_send_channel_created(void)
108 qemu_sem_post(&multifd_send_state
->channels_created
);
111 static void multifd_set_file_bitmap(MultiFDSendParams
*p
)
113 MultiFDPages_t
*pages
= p
->pages
;
115 assert(pages
->block
);
117 for (int i
= 0; i
< p
->pages
->normal_num
; i
++) {
118 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
], true);
121 for (int i
= p
->pages
->normal_num
; i
< p
->pages
->num
; i
++) {
122 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
], false);
126 /* Multifd without compression */
129 * nocomp_send_setup: setup send side
131 * @p: Params for the channel that we are using
132 * @errp: pointer to an error
134 static int nocomp_send_setup(MultiFDSendParams
*p
, Error
**errp
)
136 if (migrate_zero_copy_send()) {
137 p
->write_flags
|= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
;
144 * nocomp_send_cleanup: cleanup send side
146 * For no compression this function does nothing.
148 * @p: Params for the channel that we are using
149 * @errp: pointer to an error
151 static void nocomp_send_cleanup(MultiFDSendParams
*p
, Error
**errp
)
156 static void multifd_send_prepare_iovs(MultiFDSendParams
*p
)
158 MultiFDPages_t
*pages
= p
->pages
;
160 for (int i
= 0; i
< pages
->normal_num
; i
++) {
161 p
->iov
[p
->iovs_num
].iov_base
= pages
->block
->host
+ pages
->offset
[i
];
162 p
->iov
[p
->iovs_num
].iov_len
= p
->page_size
;
166 p
->next_packet_size
= pages
->normal_num
* p
->page_size
;
170 * nocomp_send_prepare: prepare date to be able to send
172 * For no compression we just have to calculate the size of the
175 * Returns 0 for success or -1 for error
177 * @p: Params for the channel that we are using
178 * @errp: pointer to an error
180 static int nocomp_send_prepare(MultiFDSendParams
*p
, Error
**errp
)
182 bool use_zero_copy_send
= migrate_zero_copy_send();
185 multifd_send_zero_page_detect(p
);
187 if (!multifd_use_packets()) {
188 multifd_send_prepare_iovs(p
);
189 multifd_set_file_bitmap(p
);
194 if (!use_zero_copy_send
) {
196 * Only !zerocopy needs the header in IOV; zerocopy will
197 * send it separately.
199 multifd_send_prepare_header(p
);
202 multifd_send_prepare_iovs(p
);
203 p
->flags
|= MULTIFD_FLAG_NOCOMP
;
205 multifd_send_fill_packet(p
);
207 if (use_zero_copy_send
) {
208 /* Send header first, without zerocopy */
209 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
210 p
->packet_len
, errp
);
220 * nocomp_recv_setup: setup receive side
222 * For no compression this function does nothing.
224 * Returns 0 for success or -1 for error
226 * @p: Params for the channel that we are using
227 * @errp: pointer to an error
229 static int nocomp_recv_setup(MultiFDRecvParams
*p
, Error
**errp
)
235 * nocomp_recv_cleanup: setup receive side
237 * For no compression this function does nothing.
239 * @p: Params for the channel that we are using
241 static void nocomp_recv_cleanup(MultiFDRecvParams
*p
)
246 * nocomp_recv: read the data from the channel
248 * For no compression we just need to read things into the correct place.
250 * Returns 0 for success or -1 for error
252 * @p: Params for the channel that we are using
253 * @errp: pointer to an error
255 static int nocomp_recv(MultiFDRecvParams
*p
, Error
**errp
)
259 if (!multifd_use_packets()) {
260 return multifd_file_recv_data(p
, errp
);
263 flags
= p
->flags
& MULTIFD_FLAG_COMPRESSION_MASK
;
265 if (flags
!= MULTIFD_FLAG_NOCOMP
) {
266 error_setg(errp
, "multifd %u: flags received %x flags expected %x",
267 p
->id
, flags
, MULTIFD_FLAG_NOCOMP
);
271 multifd_recv_zero_page_process(p
);
273 if (!p
->normal_num
) {
277 for (int i
= 0; i
< p
->normal_num
; i
++) {
278 p
->iov
[i
].iov_base
= p
->host
+ p
->normal
[i
];
279 p
->iov
[i
].iov_len
= p
->page_size
;
280 ramblock_recv_bitmap_set_offset(p
->block
, p
->normal
[i
]);
282 return qio_channel_readv_all(p
->c
, p
->iov
, p
->normal_num
, errp
);
285 static MultiFDMethods multifd_nocomp_ops
= {
286 .send_setup
= nocomp_send_setup
,
287 .send_cleanup
= nocomp_send_cleanup
,
288 .send_prepare
= nocomp_send_prepare
,
289 .recv_setup
= nocomp_recv_setup
,
290 .recv_cleanup
= nocomp_recv_cleanup
,
294 static MultiFDMethods
*multifd_ops
[MULTIFD_COMPRESSION__MAX
] = {
295 [MULTIFD_COMPRESSION_NONE
] = &multifd_nocomp_ops
,
298 void multifd_register_ops(int method
, MultiFDMethods
*ops
)
300 assert(0 < method
&& method
< MULTIFD_COMPRESSION__MAX
);
301 multifd_ops
[method
] = ops
;
304 /* Reset a MultiFDPages_t* object for the next use */
305 static void multifd_pages_reset(MultiFDPages_t
*pages
)
308 * We don't need to touch offset[] array, because it will be
309 * overwritten later when reused.
312 pages
->normal_num
= 0;
316 static int multifd_send_initial_packet(MultiFDSendParams
*p
, Error
**errp
)
318 MultiFDInit_t msg
= {};
319 size_t size
= sizeof(msg
);
322 msg
.magic
= cpu_to_be32(MULTIFD_MAGIC
);
323 msg
.version
= cpu_to_be32(MULTIFD_VERSION
);
325 memcpy(msg
.uuid
, &qemu_uuid
.data
, sizeof(msg
.uuid
));
327 ret
= qio_channel_write_all(p
->c
, (char *)&msg
, size
, errp
);
331 stat64_add(&mig_stats
.multifd_bytes
, size
);
335 static int multifd_recv_initial_packet(QIOChannel
*c
, Error
**errp
)
340 ret
= qio_channel_read_all(c
, (char *)&msg
, sizeof(msg
), errp
);
345 msg
.magic
= be32_to_cpu(msg
.magic
);
346 msg
.version
= be32_to_cpu(msg
.version
);
348 if (msg
.magic
!= MULTIFD_MAGIC
) {
349 error_setg(errp
, "multifd: received packet magic %x "
350 "expected %x", msg
.magic
, MULTIFD_MAGIC
);
354 if (msg
.version
!= MULTIFD_VERSION
) {
355 error_setg(errp
, "multifd: received packet version %u "
356 "expected %u", msg
.version
, MULTIFD_VERSION
);
360 if (memcmp(msg
.uuid
, &qemu_uuid
, sizeof(qemu_uuid
))) {
361 char *uuid
= qemu_uuid_unparse_strdup(&qemu_uuid
);
362 char *msg_uuid
= qemu_uuid_unparse_strdup((const QemuUUID
*)msg
.uuid
);
364 error_setg(errp
, "multifd: received uuid '%s' and expected "
365 "uuid '%s' for channel %hhd", msg_uuid
, uuid
, msg
.id
);
371 if (msg
.id
> migrate_multifd_channels()) {
372 error_setg(errp
, "multifd: received channel id %u is greater than "
373 "number of channels %u", msg
.id
, migrate_multifd_channels());
380 static MultiFDPages_t
*multifd_pages_init(uint32_t n
)
382 MultiFDPages_t
*pages
= g_new0(MultiFDPages_t
, 1);
384 pages
->allocated
= n
;
385 pages
->offset
= g_new0(ram_addr_t
, n
);
390 static void multifd_pages_clear(MultiFDPages_t
*pages
)
392 multifd_pages_reset(pages
);
393 pages
->allocated
= 0;
394 g_free(pages
->offset
);
395 pages
->offset
= NULL
;
399 void multifd_send_fill_packet(MultiFDSendParams
*p
)
401 MultiFDPacket_t
*packet
= p
->packet
;
402 MultiFDPages_t
*pages
= p
->pages
;
404 uint32_t zero_num
= pages
->num
- pages
->normal_num
;
407 packet
->flags
= cpu_to_be32(p
->flags
);
408 packet
->pages_alloc
= cpu_to_be32(p
->pages
->allocated
);
409 packet
->normal_pages
= cpu_to_be32(pages
->normal_num
);
410 packet
->zero_pages
= cpu_to_be32(zero_num
);
411 packet
->next_packet_size
= cpu_to_be32(p
->next_packet_size
);
413 packet_num
= qatomic_fetch_inc(&multifd_send_state
->packet_num
);
414 packet
->packet_num
= cpu_to_be64(packet_num
);
417 strncpy(packet
->ramblock
, pages
->block
->idstr
, 256);
420 for (i
= 0; i
< pages
->num
; i
++) {
421 /* there are architectures where ram_addr_t is 32 bit */
422 uint64_t temp
= pages
->offset
[i
];
424 packet
->offset
[i
] = cpu_to_be64(temp
);
428 p
->total_normal_pages
+= pages
->normal_num
;
429 p
->total_zero_pages
+= zero_num
;
431 trace_multifd_send(p
->id
, packet_num
, pages
->normal_num
, zero_num
,
432 p
->flags
, p
->next_packet_size
);
435 static int multifd_recv_unfill_packet(MultiFDRecvParams
*p
, Error
**errp
)
437 MultiFDPacket_t
*packet
= p
->packet
;
440 packet
->magic
= be32_to_cpu(packet
->magic
);
441 if (packet
->magic
!= MULTIFD_MAGIC
) {
442 error_setg(errp
, "multifd: received packet "
443 "magic %x and expected magic %x",
444 packet
->magic
, MULTIFD_MAGIC
);
448 packet
->version
= be32_to_cpu(packet
->version
);
449 if (packet
->version
!= MULTIFD_VERSION
) {
450 error_setg(errp
, "multifd: received packet "
451 "version %u and expected version %u",
452 packet
->version
, MULTIFD_VERSION
);
456 p
->flags
= be32_to_cpu(packet
->flags
);
458 packet
->pages_alloc
= be32_to_cpu(packet
->pages_alloc
);
460 * If we received a packet that is 100 times bigger than expected
461 * just stop migration. It is a magic number.
463 if (packet
->pages_alloc
> p
->page_count
) {
464 error_setg(errp
, "multifd: received packet "
465 "with size %u and expected a size of %u",
466 packet
->pages_alloc
, p
->page_count
) ;
470 p
->normal_num
= be32_to_cpu(packet
->normal_pages
);
471 if (p
->normal_num
> packet
->pages_alloc
) {
472 error_setg(errp
, "multifd: received packet "
473 "with %u normal pages and expected maximum pages are %u",
474 p
->normal_num
, packet
->pages_alloc
) ;
478 p
->zero_num
= be32_to_cpu(packet
->zero_pages
);
479 if (p
->zero_num
> packet
->pages_alloc
- p
->normal_num
) {
480 error_setg(errp
, "multifd: received packet "
481 "with %u zero pages and expected maximum zero pages are %u",
482 p
->zero_num
, packet
->pages_alloc
- p
->normal_num
) ;
486 p
->next_packet_size
= be32_to_cpu(packet
->next_packet_size
);
487 p
->packet_num
= be64_to_cpu(packet
->packet_num
);
489 p
->total_normal_pages
+= p
->normal_num
;
490 p
->total_zero_pages
+= p
->zero_num
;
492 trace_multifd_recv(p
->id
, p
->packet_num
, p
->normal_num
, p
->zero_num
,
493 p
->flags
, p
->next_packet_size
);
495 if (p
->normal_num
== 0 && p
->zero_num
== 0) {
499 /* make sure that ramblock is 0 terminated */
500 packet
->ramblock
[255] = 0;
501 p
->block
= qemu_ram_block_by_name(packet
->ramblock
);
503 error_setg(errp
, "multifd: unknown ram block %s",
508 p
->host
= p
->block
->host
;
509 for (i
= 0; i
< p
->normal_num
; i
++) {
510 uint64_t offset
= be64_to_cpu(packet
->offset
[i
]);
512 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
513 error_setg(errp
, "multifd: offset too long %" PRIu64
514 " (max " RAM_ADDR_FMT
")",
515 offset
, p
->block
->used_length
);
518 p
->normal
[i
] = offset
;
521 for (i
= 0; i
< p
->zero_num
; i
++) {
522 uint64_t offset
= be64_to_cpu(packet
->offset
[p
->normal_num
+ i
]);
524 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
525 error_setg(errp
, "multifd: offset too long %" PRIu64
526 " (max " RAM_ADDR_FMT
")",
527 offset
, p
->block
->used_length
);
536 static bool multifd_send_should_exit(void)
538 return qatomic_read(&multifd_send_state
->exiting
);
541 static bool multifd_recv_should_exit(void)
543 return qatomic_read(&multifd_recv_state
->exiting
);
547 * The migration thread can wait on either of the two semaphores. This
548 * function can be used to kick the main thread out of waiting on either of
549 * them. Should mostly only be called when something wrong happened with
550 * the current multifd send thread.
552 static void multifd_send_kick_main(MultiFDSendParams
*p
)
554 qemu_sem_post(&p
->sem_sync
);
555 qemu_sem_post(&multifd_send_state
->channels_ready
);
559 * How we use multifd_send_state->pages and channel->pages?
561 * We create a pages for each channel, and a main one. Each time that
562 * we need to send a batch of pages we interchange the ones between
563 * multifd_send_state and the channel that is sending it. There are
564 * two reasons for that:
565 * - to not have to do so many mallocs during migration
566 * - to make easier to know what to free at the end of migration
568 * This way we always know who is the owner of each "pages" struct,
569 * and we don't need any locking. It belongs to the migration thread
570 * or to the channel thread. Switching is safe because the migration
571 * thread is using the channel mutex when changing it, and the channel
572 * have to had finish with its own, otherwise pending_job can't be
575 * Returns true if succeed, false otherwise.
577 static bool multifd_send_pages(void)
580 static int next_channel
;
581 MultiFDSendParams
*p
= NULL
; /* make happy gcc */
582 MultiFDPages_t
*pages
= multifd_send_state
->pages
;
584 if (multifd_send_should_exit()) {
588 /* We wait here, until at least one channel is ready */
589 qemu_sem_wait(&multifd_send_state
->channels_ready
);
592 * next_channel can remain from a previous migration that was
593 * using more channels, so ensure it doesn't overflow if the
594 * limit is lower now.
596 next_channel
%= migrate_multifd_channels();
597 for (i
= next_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
598 if (multifd_send_should_exit()) {
601 p
= &multifd_send_state
->params
[i
];
603 * Lockless read to p->pending_job is safe, because only multifd
604 * sender thread can clear it.
606 if (qatomic_read(&p
->pending_job
) == false) {
607 next_channel
= (i
+ 1) % migrate_multifd_channels();
613 * Make sure we read p->pending_job before all the rest. Pairs with
614 * qatomic_store_release() in multifd_send_thread().
617 assert(!p
->pages
->num
);
618 multifd_send_state
->pages
= p
->pages
;
621 * Making sure p->pages is setup before marking pending_job=true. Pairs
622 * with the qatomic_load_acquire() in multifd_send_thread().
624 qatomic_store_release(&p
->pending_job
, true);
625 qemu_sem_post(&p
->sem
);
630 static inline bool multifd_queue_empty(MultiFDPages_t
*pages
)
632 return pages
->num
== 0;
635 static inline bool multifd_queue_full(MultiFDPages_t
*pages
)
637 return pages
->num
== pages
->allocated
;
640 static inline void multifd_enqueue(MultiFDPages_t
*pages
, ram_addr_t offset
)
642 pages
->offset
[pages
->num
++] = offset
;
645 /* Returns true if enqueue successful, false otherwise */
646 bool multifd_queue_page(RAMBlock
*block
, ram_addr_t offset
)
648 MultiFDPages_t
*pages
;
651 pages
= multifd_send_state
->pages
;
653 /* If the queue is empty, we can already enqueue now */
654 if (multifd_queue_empty(pages
)) {
655 pages
->block
= block
;
656 multifd_enqueue(pages
, offset
);
661 * Not empty, meanwhile we need a flush. It can because of either:
663 * (1) The page is not on the same ramblock of previous ones, or,
664 * (2) The queue is full.
666 * After flush, always retry.
668 if (pages
->block
!= block
|| multifd_queue_full(pages
)) {
669 if (!multifd_send_pages()) {
675 /* Not empty, and we still have space, do it! */
676 multifd_enqueue(pages
, offset
);
680 /* Multifd send side hit an error; remember it and prepare to quit */
681 static void multifd_send_set_error(Error
*err
)
684 * We don't want to exit each threads twice. Depending on where
685 * we get the error, or if there are two independent errors in two
686 * threads at the same time, we can end calling this function
689 if (qatomic_xchg(&multifd_send_state
->exiting
, 1)) {
694 MigrationState
*s
= migrate_get_current();
695 migrate_set_error(s
, err
);
696 if (s
->state
== MIGRATION_STATUS_SETUP
||
697 s
->state
== MIGRATION_STATUS_PRE_SWITCHOVER
||
698 s
->state
== MIGRATION_STATUS_DEVICE
||
699 s
->state
== MIGRATION_STATUS_ACTIVE
) {
700 migrate_set_state(&s
->state
, s
->state
,
701 MIGRATION_STATUS_FAILED
);
706 static void multifd_send_terminate_threads(void)
710 trace_multifd_send_terminate_threads();
713 * Tell everyone we're quitting. No xchg() needed here; we simply
716 qatomic_set(&multifd_send_state
->exiting
, 1);
719 * Firstly, kick all threads out; no matter whether they are just idle,
720 * or blocked in an IO system call.
722 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
723 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
725 qemu_sem_post(&p
->sem
);
727 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
732 * Finally recycle all the threads.
734 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
735 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
737 if (p
->tls_thread_created
) {
738 qemu_thread_join(&p
->tls_thread
);
741 if (p
->thread_created
) {
742 qemu_thread_join(&p
->thread
);
747 static bool multifd_send_cleanup_channel(MultiFDSendParams
*p
, Error
**errp
)
750 migration_ioc_unregister_yank(p
->c
);
752 * The object_unref() cannot guarantee the fd will always be
753 * released because finalize() of the iochannel is only
754 * triggered on the last reference and it's not guaranteed
755 * that we always hold the last refcount when reaching here.
757 * Closing the fd explicitly has the benefit that if there is any
758 * registered I/O handler callbacks on such fd, that will get a
759 * POLLNVAL event and will further trigger the cleanup to finally
762 * FIXME: It should logically be guaranteed that all multifd
763 * channels have no I/O handler callback registered when reaching
764 * here, because migration thread will wait for all multifd channel
765 * establishments to complete during setup. Since
766 * migrate_fd_cleanup() will be scheduled in main thread too, all
767 * previous callbacks should guarantee to be completed when
768 * reaching here. See multifd_send_state.channels_created and its
769 * usage. In the future, we could replace this with an assert
770 * making sure we're the last reference, or simply drop it if above
771 * is more clear to be justified.
773 qio_channel_close(p
->c
, &error_abort
);
774 object_unref(OBJECT(p
->c
));
777 qemu_sem_destroy(&p
->sem
);
778 qemu_sem_destroy(&p
->sem_sync
);
781 multifd_pages_clear(p
->pages
);
788 multifd_send_state
->ops
->send_cleanup(p
, errp
);
790 return *errp
== NULL
;
793 static void multifd_send_cleanup_state(void)
795 file_cleanup_outgoing_migration();
796 socket_cleanup_outgoing_migration();
797 qemu_sem_destroy(&multifd_send_state
->channels_created
);
798 qemu_sem_destroy(&multifd_send_state
->channels_ready
);
799 g_free(multifd_send_state
->params
);
800 multifd_send_state
->params
= NULL
;
801 multifd_pages_clear(multifd_send_state
->pages
);
802 multifd_send_state
->pages
= NULL
;
803 g_free(multifd_send_state
);
804 multifd_send_state
= NULL
;
807 void multifd_send_shutdown(void)
811 if (!migrate_multifd()) {
815 multifd_send_terminate_threads();
817 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
818 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
819 Error
*local_err
= NULL
;
821 if (!multifd_send_cleanup_channel(p
, &local_err
)) {
822 migrate_set_error(migrate_get_current(), local_err
);
823 error_free(local_err
);
827 multifd_send_cleanup_state();
830 static int multifd_zero_copy_flush(QIOChannel
*c
)
835 ret
= qio_channel_flush(c
, &err
);
837 error_report_err(err
);
841 stat64_add(&mig_stats
.dirty_sync_missed_zero_copy
, 1);
847 int multifd_send_sync_main(void)
850 bool flush_zero_copy
;
852 if (!migrate_multifd()) {
855 if (multifd_send_state
->pages
->num
) {
856 if (!multifd_send_pages()) {
857 error_report("%s: multifd_send_pages fail", __func__
);
862 flush_zero_copy
= migrate_zero_copy_send();
864 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
865 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
867 if (multifd_send_should_exit()) {
871 trace_multifd_send_sync_main_signal(p
->id
);
874 * We should be the only user so far, so not possible to be set by
875 * others concurrently.
877 assert(qatomic_read(&p
->pending_sync
) == false);
878 qatomic_set(&p
->pending_sync
, true);
879 qemu_sem_post(&p
->sem
);
881 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
882 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
884 if (multifd_send_should_exit()) {
888 qemu_sem_wait(&multifd_send_state
->channels_ready
);
889 trace_multifd_send_sync_main_wait(p
->id
);
890 qemu_sem_wait(&p
->sem_sync
);
892 if (flush_zero_copy
&& p
->c
&& (multifd_zero_copy_flush(p
->c
) < 0)) {
896 trace_multifd_send_sync_main(multifd_send_state
->packet_num
);
901 static void *multifd_send_thread(void *opaque
)
903 MultiFDSendParams
*p
= opaque
;
904 MigrationThread
*thread
= NULL
;
905 Error
*local_err
= NULL
;
907 bool use_packets
= multifd_use_packets();
909 thread
= migration_threads_add(p
->name
, qemu_get_thread_id());
911 trace_multifd_send_thread_start(p
->id
);
912 rcu_register_thread();
915 if (multifd_send_initial_packet(p
, &local_err
) < 0) {
922 qemu_sem_post(&multifd_send_state
->channels_ready
);
923 qemu_sem_wait(&p
->sem
);
925 if (multifd_send_should_exit()) {
930 * Read pending_job flag before p->pages. Pairs with the
931 * qatomic_store_release() in multifd_send_pages().
933 if (qatomic_load_acquire(&p
->pending_job
)) {
934 MultiFDPages_t
*pages
= p
->pages
;
939 ret
= multifd_send_state
->ops
->send_prepare(p
, &local_err
);
944 if (migrate_mapped_ram()) {
945 ret
= file_write_ramblock_iov(p
->c
, p
->iov
, p
->iovs_num
,
946 p
->pages
->block
, &local_err
);
948 ret
= qio_channel_writev_full_all(p
->c
, p
->iov
, p
->iovs_num
,
949 NULL
, 0, p
->write_flags
,
957 stat64_add(&mig_stats
.multifd_bytes
,
958 p
->next_packet_size
+ p
->packet_len
);
959 stat64_add(&mig_stats
.normal_pages
, pages
->normal_num
);
960 stat64_add(&mig_stats
.zero_pages
, pages
->num
- pages
->normal_num
);
962 multifd_pages_reset(p
->pages
);
963 p
->next_packet_size
= 0;
966 * Making sure p->pages is published before saying "we're
967 * free". Pairs with the smp_mb_acquire() in
968 * multifd_send_pages().
970 qatomic_store_release(&p
->pending_job
, false);
973 * If not a normal job, must be a sync request. Note that
974 * pending_sync is a standalone flag (unlike pending_job), so
975 * it doesn't require explicit memory barriers.
977 assert(qatomic_read(&p
->pending_sync
));
980 p
->flags
= MULTIFD_FLAG_SYNC
;
981 multifd_send_fill_packet(p
);
982 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
983 p
->packet_len
, &local_err
);
987 /* p->next_packet_size will always be zero for a SYNC packet */
988 stat64_add(&mig_stats
.multifd_bytes
, p
->packet_len
);
992 qatomic_set(&p
->pending_sync
, false);
993 qemu_sem_post(&p
->sem_sync
);
1000 trace_multifd_send_error(p
->id
);
1001 multifd_send_set_error(local_err
);
1002 multifd_send_kick_main(p
);
1003 error_free(local_err
);
1006 rcu_unregister_thread();
1007 migration_threads_remove(thread
);
1008 trace_multifd_send_thread_end(p
->id
, p
->packets_sent
, p
->total_normal_pages
,
1009 p
->total_zero_pages
);
1014 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
);
1017 MultiFDSendParams
*p
;
1018 QIOChannelTLS
*tioc
;
1019 } MultiFDTLSThreadArgs
;
1021 static void *multifd_tls_handshake_thread(void *opaque
)
1023 MultiFDTLSThreadArgs
*args
= opaque
;
1025 qio_channel_tls_handshake(args
->tioc
,
1026 multifd_new_send_channel_async
,
1035 static bool multifd_tls_channel_connect(MultiFDSendParams
*p
,
1039 MigrationState
*s
= migrate_get_current();
1040 const char *hostname
= s
->hostname
;
1041 MultiFDTLSThreadArgs
*args
;
1042 QIOChannelTLS
*tioc
;
1044 tioc
= migration_tls_client_create(ioc
, hostname
, errp
);
1050 * Ownership of the socket channel now transfers to the newly
1051 * created TLS channel, which has already taken a reference.
1053 object_unref(OBJECT(ioc
));
1054 trace_multifd_tls_outgoing_handshake_start(ioc
, tioc
, hostname
);
1055 qio_channel_set_name(QIO_CHANNEL(tioc
), "multifd-tls-outgoing");
1057 args
= g_new0(MultiFDTLSThreadArgs
, 1);
1061 p
->tls_thread_created
= true;
1062 qemu_thread_create(&p
->tls_thread
, "multifd-tls-handshake-worker",
1063 multifd_tls_handshake_thread
, args
,
1064 QEMU_THREAD_JOINABLE
);
1068 void multifd_channel_connect(MultiFDSendParams
*p
, QIOChannel
*ioc
)
1070 qio_channel_set_delay(ioc
, false);
1072 migration_ioc_register_yank(ioc
);
1073 /* Setup p->c only if the channel is completely setup */
1076 p
->thread_created
= true;
1077 qemu_thread_create(&p
->thread
, p
->name
, multifd_send_thread
, p
,
1078 QEMU_THREAD_JOINABLE
);
1082 * When TLS is enabled this function is called once to establish the
1083 * TLS connection and a second time after the TLS handshake to create
1084 * the multifd channel. Without TLS it goes straight into the channel
1087 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
)
1089 MultiFDSendParams
*p
= opaque
;
1090 QIOChannel
*ioc
= QIO_CHANNEL(qio_task_get_source(task
));
1091 Error
*local_err
= NULL
;
1094 trace_multifd_new_send_channel_async(p
->id
);
1096 if (qio_task_propagate_error(task
, &local_err
)) {
1101 trace_multifd_set_outgoing_channel(ioc
, object_get_typename(OBJECT(ioc
)),
1102 migrate_get_current()->hostname
);
1104 if (migrate_channel_requires_tls_upgrade(ioc
)) {
1105 ret
= multifd_tls_channel_connect(p
, ioc
, &local_err
);
1110 multifd_channel_connect(p
, ioc
);
1116 * Here we're not interested whether creation succeeded, only that
1117 * it happened at all.
1119 multifd_send_channel_created();
1125 trace_multifd_new_send_channel_async_error(p
->id
, local_err
);
1126 multifd_send_set_error(local_err
);
1128 * For error cases (TLS or non-TLS), IO channel is always freed here
1129 * rather than when cleanup multifd: since p->c is not set, multifd
1130 * cleanup code doesn't even know its existence.
1132 object_unref(OBJECT(ioc
));
1133 error_free(local_err
);
1136 static bool multifd_new_send_channel_create(gpointer opaque
, Error
**errp
)
1138 if (!multifd_use_packets()) {
1139 return file_send_channel_create(opaque
, errp
);
1142 socket_send_channel_create(multifd_new_send_channel_async
, opaque
);
1146 bool multifd_send_setup(void)
1148 MigrationState
*s
= migrate_get_current();
1149 Error
*local_err
= NULL
;
1150 int thread_count
, ret
= 0;
1151 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1152 bool use_packets
= multifd_use_packets();
1155 if (!migrate_multifd()) {
1159 thread_count
= migrate_multifd_channels();
1160 multifd_send_state
= g_malloc0(sizeof(*multifd_send_state
));
1161 multifd_send_state
->params
= g_new0(MultiFDSendParams
, thread_count
);
1162 multifd_send_state
->pages
= multifd_pages_init(page_count
);
1163 qemu_sem_init(&multifd_send_state
->channels_created
, 0);
1164 qemu_sem_init(&multifd_send_state
->channels_ready
, 0);
1165 qatomic_set(&multifd_send_state
->exiting
, 0);
1166 multifd_send_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1168 for (i
= 0; i
< thread_count
; i
++) {
1169 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1171 qemu_sem_init(&p
->sem
, 0);
1172 qemu_sem_init(&p
->sem_sync
, 0);
1174 p
->pages
= multifd_pages_init(page_count
);
1177 p
->packet_len
= sizeof(MultiFDPacket_t
)
1178 + sizeof(uint64_t) * page_count
;
1179 p
->packet
= g_malloc0(p
->packet_len
);
1180 p
->packet
->magic
= cpu_to_be32(MULTIFD_MAGIC
);
1181 p
->packet
->version
= cpu_to_be32(MULTIFD_VERSION
);
1183 /* We need one extra place for the packet header */
1184 p
->iov
= g_new0(struct iovec
, page_count
+ 1);
1186 p
->iov
= g_new0(struct iovec
, page_count
);
1188 p
->name
= g_strdup_printf("multifdsend_%d", i
);
1189 p
->page_size
= qemu_target_page_size();
1190 p
->page_count
= page_count
;
1193 if (!multifd_new_send_channel_create(p
, &local_err
)) {
1199 * Wait until channel creation has started for all channels. The
1200 * creation can still fail, but no more channels will be created
1203 for (i
= 0; i
< thread_count
; i
++) {
1204 qemu_sem_wait(&multifd_send_state
->channels_created
);
1207 for (i
= 0; i
< thread_count
; i
++) {
1208 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1210 ret
= multifd_send_state
->ops
->send_setup(p
, &local_err
);
1217 migrate_set_error(s
, local_err
);
1218 error_report_err(local_err
);
1219 migrate_set_state(&s
->state
, MIGRATION_STATUS_SETUP
,
1220 MIGRATION_STATUS_FAILED
);
1227 bool multifd_recv(void)
1230 static int next_recv_channel
;
1231 MultiFDRecvParams
*p
= NULL
;
1232 MultiFDRecvData
*data
= multifd_recv_state
->data
;
1235 * next_channel can remain from a previous migration that was
1236 * using more channels, so ensure it doesn't overflow if the
1237 * limit is lower now.
1239 next_recv_channel
%= migrate_multifd_channels();
1240 for (i
= next_recv_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
1241 if (multifd_recv_should_exit()) {
1245 p
= &multifd_recv_state
->params
[i
];
1247 if (qatomic_read(&p
->pending_job
) == false) {
1248 next_recv_channel
= (i
+ 1) % migrate_multifd_channels();
1254 * Order pending_job read before manipulating p->data below. Pairs
1255 * with qatomic_store_release() at multifd_recv_thread().
1259 assert(!p
->data
->size
);
1260 multifd_recv_state
->data
= p
->data
;
1264 * Order p->data update before setting pending_job. Pairs with
1265 * qatomic_load_acquire() at multifd_recv_thread().
1267 qatomic_store_release(&p
->pending_job
, true);
1268 qemu_sem_post(&p
->sem
);
1273 MultiFDRecvData
*multifd_get_recv_data(void)
1275 return multifd_recv_state
->data
;
1278 static void multifd_recv_terminate_threads(Error
*err
)
1282 trace_multifd_recv_terminate_threads(err
!= NULL
);
1284 if (qatomic_xchg(&multifd_recv_state
->exiting
, 1)) {
1289 MigrationState
*s
= migrate_get_current();
1290 migrate_set_error(s
, err
);
1291 if (s
->state
== MIGRATION_STATUS_SETUP
||
1292 s
->state
== MIGRATION_STATUS_ACTIVE
) {
1293 migrate_set_state(&s
->state
, s
->state
,
1294 MIGRATION_STATUS_FAILED
);
1298 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1299 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1302 * The migration thread and channels interact differently
1303 * depending on the presence of packets.
1305 if (multifd_use_packets()) {
1307 * The channel receives as long as there are packets. When
1308 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1309 * channel waits for the migration thread to sync. If the
1310 * sync never happens, do it here.
1312 qemu_sem_post(&p
->sem_sync
);
1315 * The channel waits for the migration thread to give it
1316 * work. When the migration thread runs out of work, it
1317 * releases the channel and waits for any pending work to
1318 * finish. If we reach here (e.g. due to error) before the
1319 * work runs out, release the channel.
1321 qemu_sem_post(&p
->sem
);
1325 * We could arrive here for two reasons:
1326 * - normal quit, i.e. everything went fine, just finished
1327 * - error quit: We close the channels so the channel threads
1328 * finish the qio_channel_read_all_eof()
1331 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
1336 void multifd_recv_shutdown(void)
1338 if (migrate_multifd()) {
1339 multifd_recv_terminate_threads(NULL
);
1343 static void multifd_recv_cleanup_channel(MultiFDRecvParams
*p
)
1345 migration_ioc_unregister_yank(p
->c
);
1346 object_unref(OBJECT(p
->c
));
1348 qemu_mutex_destroy(&p
->mutex
);
1349 qemu_sem_destroy(&p
->sem_sync
);
1350 qemu_sem_destroy(&p
->sem
);
1362 multifd_recv_state
->ops
->recv_cleanup(p
);
1365 static void multifd_recv_cleanup_state(void)
1367 qemu_sem_destroy(&multifd_recv_state
->sem_sync
);
1368 g_free(multifd_recv_state
->params
);
1369 multifd_recv_state
->params
= NULL
;
1370 g_free(multifd_recv_state
->data
);
1371 multifd_recv_state
->data
= NULL
;
1372 g_free(multifd_recv_state
);
1373 multifd_recv_state
= NULL
;
1376 void multifd_recv_cleanup(void)
1380 if (!migrate_multifd()) {
1383 multifd_recv_terminate_threads(NULL
);
1384 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1385 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1387 if (p
->thread_created
) {
1388 qemu_thread_join(&p
->thread
);
1391 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1392 multifd_recv_cleanup_channel(&multifd_recv_state
->params
[i
]);
1394 multifd_recv_cleanup_state();
1397 void multifd_recv_sync_main(void)
1399 int thread_count
= migrate_multifd_channels();
1400 bool file_based
= !multifd_use_packets();
1403 if (!migrate_multifd()) {
1408 * File-based channels don't use packets and therefore need to
1409 * wait for more work. Release them to start the sync.
1412 for (i
= 0; i
< thread_count
; i
++) {
1413 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1415 trace_multifd_recv_sync_main_signal(p
->id
);
1416 qemu_sem_post(&p
->sem
);
1421 * Initiate the synchronization by waiting for all channels.
1423 * For socket-based migration this means each channel has received
1424 * the SYNC packet on the stream.
1426 * For file-based migration this means each channel is done with
1427 * the work (pending_job=false).
1429 for (i
= 0; i
< thread_count
; i
++) {
1430 trace_multifd_recv_sync_main_wait(i
);
1431 qemu_sem_wait(&multifd_recv_state
->sem_sync
);
1436 * For file-based loading is done in one iteration. We're
1443 * Sync done. Release the channels for the next iteration.
1445 for (i
= 0; i
< thread_count
; i
++) {
1446 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1448 WITH_QEMU_LOCK_GUARD(&p
->mutex
) {
1449 if (multifd_recv_state
->packet_num
< p
->packet_num
) {
1450 multifd_recv_state
->packet_num
= p
->packet_num
;
1453 trace_multifd_recv_sync_main_signal(p
->id
);
1454 qemu_sem_post(&p
->sem_sync
);
1456 trace_multifd_recv_sync_main(multifd_recv_state
->packet_num
);
1459 static void *multifd_recv_thread(void *opaque
)
1461 MultiFDRecvParams
*p
= opaque
;
1462 Error
*local_err
= NULL
;
1463 bool use_packets
= multifd_use_packets();
1466 trace_multifd_recv_thread_start(p
->id
);
1467 rcu_register_thread();
1471 bool has_data
= false;
1475 if (multifd_recv_should_exit()) {
1479 ret
= qio_channel_read_all_eof(p
->c
, (void *)p
->packet
,
1480 p
->packet_len
, &local_err
);
1481 if (ret
== 0 || ret
== -1) { /* 0: EOF -1: Error */
1485 qemu_mutex_lock(&p
->mutex
);
1486 ret
= multifd_recv_unfill_packet(p
, &local_err
);
1488 qemu_mutex_unlock(&p
->mutex
);
1493 /* recv methods don't know how to handle the SYNC flag */
1494 p
->flags
&= ~MULTIFD_FLAG_SYNC
;
1495 has_data
= p
->normal_num
|| p
->zero_num
;
1496 qemu_mutex_unlock(&p
->mutex
);
1499 * No packets, so we need to wait for the vmstate code to
1502 qemu_sem_wait(&p
->sem
);
1504 if (multifd_recv_should_exit()) {
1508 /* pairs with qatomic_store_release() at multifd_recv() */
1509 if (!qatomic_load_acquire(&p
->pending_job
)) {
1511 * Migration thread did not send work, this is
1512 * equivalent to pending_sync on the sending
1513 * side. Post sem_sync to notify we reached this
1516 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1520 has_data
= !!p
->data
->size
;
1524 ret
= multifd_recv_state
->ops
->recv(p
, &local_err
);
1531 if (flags
& MULTIFD_FLAG_SYNC
) {
1532 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1533 qemu_sem_wait(&p
->sem_sync
);
1536 p
->total_normal_pages
+= p
->data
->size
/ qemu_target_page_size();
1539 * Order data->size update before clearing
1540 * pending_job. Pairs with smp_mb_acquire() at
1543 qatomic_store_release(&p
->pending_job
, false);
1548 multifd_recv_terminate_threads(local_err
);
1549 error_free(local_err
);
1552 rcu_unregister_thread();
1553 trace_multifd_recv_thread_end(p
->id
, p
->packets_recved
,
1554 p
->total_normal_pages
,
1555 p
->total_zero_pages
);
1560 int multifd_recv_setup(Error
**errp
)
1563 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1564 bool use_packets
= multifd_use_packets();
1568 * Return successfully if multiFD recv state is already initialised
1569 * or multiFD is not enabled.
1571 if (multifd_recv_state
|| !migrate_multifd()) {
1575 thread_count
= migrate_multifd_channels();
1576 multifd_recv_state
= g_malloc0(sizeof(*multifd_recv_state
));
1577 multifd_recv_state
->params
= g_new0(MultiFDRecvParams
, thread_count
);
1579 multifd_recv_state
->data
= g_new0(MultiFDRecvData
, 1);
1580 multifd_recv_state
->data
->size
= 0;
1582 qatomic_set(&multifd_recv_state
->count
, 0);
1583 qatomic_set(&multifd_recv_state
->exiting
, 0);
1584 qemu_sem_init(&multifd_recv_state
->sem_sync
, 0);
1585 multifd_recv_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1587 for (i
= 0; i
< thread_count
; i
++) {
1588 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1590 qemu_mutex_init(&p
->mutex
);
1591 qemu_sem_init(&p
->sem_sync
, 0);
1592 qemu_sem_init(&p
->sem
, 0);
1593 p
->pending_job
= false;
1596 p
->data
= g_new0(MultiFDRecvData
, 1);
1600 p
->packet_len
= sizeof(MultiFDPacket_t
)
1601 + sizeof(uint64_t) * page_count
;
1602 p
->packet
= g_malloc0(p
->packet_len
);
1604 p
->name
= g_strdup_printf("multifdrecv_%d", i
);
1605 p
->iov
= g_new0(struct iovec
, page_count
);
1606 p
->normal
= g_new0(ram_addr_t
, page_count
);
1607 p
->zero
= g_new0(ram_addr_t
, page_count
);
1608 p
->page_count
= page_count
;
1609 p
->page_size
= qemu_target_page_size();
1612 for (i
= 0; i
< thread_count
; i
++) {
1613 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1616 ret
= multifd_recv_state
->ops
->recv_setup(p
, errp
);
1624 bool multifd_recv_all_channels_created(void)
1626 int thread_count
= migrate_multifd_channels();
1628 if (!migrate_multifd()) {
1632 if (!multifd_recv_state
) {
1633 /* Called before any connections created */
1637 return thread_count
== qatomic_read(&multifd_recv_state
->count
);
1641 * Try to receive all multifd channels to get ready for the migration.
1642 * Sets @errp when failing to receive the current channel.
1644 void multifd_recv_new_channel(QIOChannel
*ioc
, Error
**errp
)
1646 MultiFDRecvParams
*p
;
1647 Error
*local_err
= NULL
;
1648 bool use_packets
= multifd_use_packets();
1652 id
= multifd_recv_initial_packet(ioc
, &local_err
);
1654 multifd_recv_terminate_threads(local_err
);
1655 error_propagate_prepend(errp
, local_err
,
1656 "failed to receive packet"
1657 " via multifd channel %d: ",
1658 qatomic_read(&multifd_recv_state
->count
));
1661 trace_multifd_recv_new_channel(id
);
1663 id
= qatomic_read(&multifd_recv_state
->count
);
1666 p
= &multifd_recv_state
->params
[id
];
1668 error_setg(&local_err
, "multifd: received id '%d' already setup'",
1670 multifd_recv_terminate_threads(local_err
);
1671 error_propagate(errp
, local_err
);
1675 object_ref(OBJECT(ioc
));
1677 p
->thread_created
= true;
1678 qemu_thread_create(&p
->thread
, p
->name
, multifd_recv_thread
, p
,
1679 QEMU_THREAD_JOINABLE
);
1680 qatomic_inc(&multifd_recv_state
->count
);
1683 bool multifd_send_prepare_common(MultiFDSendParams
*p
)
1685 multifd_send_zero_page_detect(p
);
1687 if (!p
->pages
->normal_num
) {
1688 p
->next_packet_size
= 0;
1692 multifd_send_prepare_header(p
);