4 * Copyright (c) 2019-2020 Red Hat Inc
7 * Juan Quintela <quintela@redhat.com>
9 * This work is licensed under the terms of the GNU GPL, version 2 or later.
10 * See the COPYING file in the top-level directory.
13 #include "qemu/osdep.h"
14 #include "qemu/cutils.h"
16 #include "exec/target_page.h"
17 #include "sysemu/sysemu.h"
18 #include "exec/ramblock.h"
19 #include "qemu/error-report.h"
20 #include "qapi/error.h"
22 #include "migration.h"
23 #include "migration-stats.h"
26 #include "qemu-file.h"
29 #include "threadinfo.h"
31 #include "qemu/yank.h"
32 #include "io/channel-file.h"
33 #include "io/channel-socket.h"
34 #include "yank_functions.h"
38 #define MULTIFD_MAGIC 0x11223344U
39 #define MULTIFD_VERSION 1
44 unsigned char uuid
[16]; /* QemuUUID */
46 uint8_t unused1
[7]; /* Reserved for future use */
47 uint64_t unused2
[4]; /* Reserved for future use */
48 } __attribute__((packed
)) MultiFDInit_t
;
51 MultiFDSendParams
*params
;
52 /* array of pages to sent */
53 MultiFDPages_t
*pages
;
55 * Global number of generated multifd packets.
57 * Note that we used 'uintptr_t' because it'll naturally support atomic
58 * operations on both 32bit / 64 bits hosts. It means on 32bit systems
59 * multifd will overflow the packet_num easier, but that should be
62 * Another option is to use QEMU's Stat64 then it'll be 64 bits on all
63 * hosts, however so far it does not support atomic fetch_add() yet.
64 * Make it easy for now.
68 * Synchronization point past which no more channels will be
71 QemuSemaphore channels_created
;
72 /* send channels ready */
73 QemuSemaphore channels_ready
;
75 * Have we already run terminate threads. There is a race when it
76 * happens that we got one error while we are exiting.
77 * We will use atomic operations. Only valid values are 0 and 1.
82 } *multifd_send_state
;
85 MultiFDRecvParams
*params
;
86 MultiFDRecvData
*data
;
87 /* number of created threads */
90 * This is always posted by the recv threads, the migration thread
91 * uses it to wait for recv threads to finish assigned tasks.
93 QemuSemaphore sem_sync
;
94 /* global number of generated multifd packets */
99 } *multifd_recv_state
;
101 static bool multifd_use_packets(void)
103 return !migrate_mapped_ram();
106 void multifd_send_channel_created(void)
108 qemu_sem_post(&multifd_send_state
->channels_created
);
111 static void multifd_set_file_bitmap(MultiFDSendParams
*p
)
113 MultiFDPages_t
*pages
= p
->pages
;
115 assert(pages
->block
);
117 for (int i
= 0; i
< p
->pages
->normal_num
; i
++) {
118 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
], true);
121 for (int i
= p
->pages
->normal_num
; i
< p
->pages
->num
; i
++) {
122 ramblock_set_file_bmap_atomic(pages
->block
, pages
->offset
[i
], false);
126 /* Multifd without compression */
129 * nocomp_send_setup: setup send side
131 * @p: Params for the channel that we are using
132 * @errp: pointer to an error
134 static int nocomp_send_setup(MultiFDSendParams
*p
, Error
**errp
)
136 if (migrate_zero_copy_send()) {
137 p
->write_flags
|= QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
;
140 if (multifd_use_packets()) {
141 /* We need one extra place for the packet header */
142 p
->iov
= g_new0(struct iovec
, p
->page_count
+ 1);
144 p
->iov
= g_new0(struct iovec
, p
->page_count
);
151 * nocomp_send_cleanup: cleanup send side
153 * For no compression this function does nothing.
155 * @p: Params for the channel that we are using
156 * @errp: pointer to an error
158 static void nocomp_send_cleanup(MultiFDSendParams
*p
, Error
**errp
)
165 static void multifd_send_prepare_iovs(MultiFDSendParams
*p
)
167 MultiFDPages_t
*pages
= p
->pages
;
169 for (int i
= 0; i
< pages
->normal_num
; i
++) {
170 p
->iov
[p
->iovs_num
].iov_base
= pages
->block
->host
+ pages
->offset
[i
];
171 p
->iov
[p
->iovs_num
].iov_len
= p
->page_size
;
175 p
->next_packet_size
= pages
->normal_num
* p
->page_size
;
179 * nocomp_send_prepare: prepare date to be able to send
181 * For no compression we just have to calculate the size of the
184 * Returns 0 for success or -1 for error
186 * @p: Params for the channel that we are using
187 * @errp: pointer to an error
189 static int nocomp_send_prepare(MultiFDSendParams
*p
, Error
**errp
)
191 bool use_zero_copy_send
= migrate_zero_copy_send();
194 multifd_send_zero_page_detect(p
);
196 if (!multifd_use_packets()) {
197 multifd_send_prepare_iovs(p
);
198 multifd_set_file_bitmap(p
);
203 if (!use_zero_copy_send
) {
205 * Only !zerocopy needs the header in IOV; zerocopy will
206 * send it separately.
208 multifd_send_prepare_header(p
);
211 multifd_send_prepare_iovs(p
);
212 p
->flags
|= MULTIFD_FLAG_NOCOMP
;
214 multifd_send_fill_packet(p
);
216 if (use_zero_copy_send
) {
217 /* Send header first, without zerocopy */
218 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
219 p
->packet_len
, errp
);
229 * nocomp_recv_setup: setup receive side
231 * For no compression this function does nothing.
233 * Returns 0 for success or -1 for error
235 * @p: Params for the channel that we are using
236 * @errp: pointer to an error
238 static int nocomp_recv_setup(MultiFDRecvParams
*p
, Error
**errp
)
240 p
->iov
= g_new0(struct iovec
, p
->page_count
);
245 * nocomp_recv_cleanup: setup receive side
247 * For no compression this function does nothing.
249 * @p: Params for the channel that we are using
251 static void nocomp_recv_cleanup(MultiFDRecvParams
*p
)
258 * nocomp_recv: read the data from the channel
260 * For no compression we just need to read things into the correct place.
262 * Returns 0 for success or -1 for error
264 * @p: Params for the channel that we are using
265 * @errp: pointer to an error
267 static int nocomp_recv(MultiFDRecvParams
*p
, Error
**errp
)
271 if (!multifd_use_packets()) {
272 return multifd_file_recv_data(p
, errp
);
275 flags
= p
->flags
& MULTIFD_FLAG_COMPRESSION_MASK
;
277 if (flags
!= MULTIFD_FLAG_NOCOMP
) {
278 error_setg(errp
, "multifd %u: flags received %x flags expected %x",
279 p
->id
, flags
, MULTIFD_FLAG_NOCOMP
);
283 multifd_recv_zero_page_process(p
);
285 if (!p
->normal_num
) {
289 for (int i
= 0; i
< p
->normal_num
; i
++) {
290 p
->iov
[i
].iov_base
= p
->host
+ p
->normal
[i
];
291 p
->iov
[i
].iov_len
= p
->page_size
;
292 ramblock_recv_bitmap_set_offset(p
->block
, p
->normal
[i
]);
294 return qio_channel_readv_all(p
->c
, p
->iov
, p
->normal_num
, errp
);
297 static MultiFDMethods multifd_nocomp_ops
= {
298 .send_setup
= nocomp_send_setup
,
299 .send_cleanup
= nocomp_send_cleanup
,
300 .send_prepare
= nocomp_send_prepare
,
301 .recv_setup
= nocomp_recv_setup
,
302 .recv_cleanup
= nocomp_recv_cleanup
,
306 static MultiFDMethods
*multifd_ops
[MULTIFD_COMPRESSION__MAX
] = {
307 [MULTIFD_COMPRESSION_NONE
] = &multifd_nocomp_ops
,
310 void multifd_register_ops(int method
, MultiFDMethods
*ops
)
312 assert(0 < method
&& method
< MULTIFD_COMPRESSION__MAX
);
313 multifd_ops
[method
] = ops
;
316 /* Reset a MultiFDPages_t* object for the next use */
317 static void multifd_pages_reset(MultiFDPages_t
*pages
)
320 * We don't need to touch offset[] array, because it will be
321 * overwritten later when reused.
324 pages
->normal_num
= 0;
328 static int multifd_send_initial_packet(MultiFDSendParams
*p
, Error
**errp
)
330 MultiFDInit_t msg
= {};
331 size_t size
= sizeof(msg
);
334 msg
.magic
= cpu_to_be32(MULTIFD_MAGIC
);
335 msg
.version
= cpu_to_be32(MULTIFD_VERSION
);
337 memcpy(msg
.uuid
, &qemu_uuid
.data
, sizeof(msg
.uuid
));
339 ret
= qio_channel_write_all(p
->c
, (char *)&msg
, size
, errp
);
343 stat64_add(&mig_stats
.multifd_bytes
, size
);
347 static int multifd_recv_initial_packet(QIOChannel
*c
, Error
**errp
)
352 ret
= qio_channel_read_all(c
, (char *)&msg
, sizeof(msg
), errp
);
357 msg
.magic
= be32_to_cpu(msg
.magic
);
358 msg
.version
= be32_to_cpu(msg
.version
);
360 if (msg
.magic
!= MULTIFD_MAGIC
) {
361 error_setg(errp
, "multifd: received packet magic %x "
362 "expected %x", msg
.magic
, MULTIFD_MAGIC
);
366 if (msg
.version
!= MULTIFD_VERSION
) {
367 error_setg(errp
, "multifd: received packet version %u "
368 "expected %u", msg
.version
, MULTIFD_VERSION
);
372 if (memcmp(msg
.uuid
, &qemu_uuid
, sizeof(qemu_uuid
))) {
373 char *uuid
= qemu_uuid_unparse_strdup(&qemu_uuid
);
374 char *msg_uuid
= qemu_uuid_unparse_strdup((const QemuUUID
*)msg
.uuid
);
376 error_setg(errp
, "multifd: received uuid '%s' and expected "
377 "uuid '%s' for channel %hhd", msg_uuid
, uuid
, msg
.id
);
383 if (msg
.id
> migrate_multifd_channels()) {
384 error_setg(errp
, "multifd: received channel id %u is greater than "
385 "number of channels %u", msg
.id
, migrate_multifd_channels());
392 static MultiFDPages_t
*multifd_pages_init(uint32_t n
)
394 MultiFDPages_t
*pages
= g_new0(MultiFDPages_t
, 1);
396 pages
->allocated
= n
;
397 pages
->offset
= g_new0(ram_addr_t
, n
);
402 static void multifd_pages_clear(MultiFDPages_t
*pages
)
404 multifd_pages_reset(pages
);
405 pages
->allocated
= 0;
406 g_free(pages
->offset
);
407 pages
->offset
= NULL
;
411 void multifd_send_fill_packet(MultiFDSendParams
*p
)
413 MultiFDPacket_t
*packet
= p
->packet
;
414 MultiFDPages_t
*pages
= p
->pages
;
416 uint32_t zero_num
= pages
->num
- pages
->normal_num
;
419 packet
->flags
= cpu_to_be32(p
->flags
);
420 packet
->pages_alloc
= cpu_to_be32(p
->pages
->allocated
);
421 packet
->normal_pages
= cpu_to_be32(pages
->normal_num
);
422 packet
->zero_pages
= cpu_to_be32(zero_num
);
423 packet
->next_packet_size
= cpu_to_be32(p
->next_packet_size
);
425 packet_num
= qatomic_fetch_inc(&multifd_send_state
->packet_num
);
426 packet
->packet_num
= cpu_to_be64(packet_num
);
429 strncpy(packet
->ramblock
, pages
->block
->idstr
, 256);
432 for (i
= 0; i
< pages
->num
; i
++) {
433 /* there are architectures where ram_addr_t is 32 bit */
434 uint64_t temp
= pages
->offset
[i
];
436 packet
->offset
[i
] = cpu_to_be64(temp
);
440 p
->total_normal_pages
+= pages
->normal_num
;
441 p
->total_zero_pages
+= zero_num
;
443 trace_multifd_send(p
->id
, packet_num
, pages
->normal_num
, zero_num
,
444 p
->flags
, p
->next_packet_size
);
447 static int multifd_recv_unfill_packet(MultiFDRecvParams
*p
, Error
**errp
)
449 MultiFDPacket_t
*packet
= p
->packet
;
452 packet
->magic
= be32_to_cpu(packet
->magic
);
453 if (packet
->magic
!= MULTIFD_MAGIC
) {
454 error_setg(errp
, "multifd: received packet "
455 "magic %x and expected magic %x",
456 packet
->magic
, MULTIFD_MAGIC
);
460 packet
->version
= be32_to_cpu(packet
->version
);
461 if (packet
->version
!= MULTIFD_VERSION
) {
462 error_setg(errp
, "multifd: received packet "
463 "version %u and expected version %u",
464 packet
->version
, MULTIFD_VERSION
);
468 p
->flags
= be32_to_cpu(packet
->flags
);
470 packet
->pages_alloc
= be32_to_cpu(packet
->pages_alloc
);
472 * If we received a packet that is 100 times bigger than expected
473 * just stop migration. It is a magic number.
475 if (packet
->pages_alloc
> p
->page_count
) {
476 error_setg(errp
, "multifd: received packet "
477 "with size %u and expected a size of %u",
478 packet
->pages_alloc
, p
->page_count
) ;
482 p
->normal_num
= be32_to_cpu(packet
->normal_pages
);
483 if (p
->normal_num
> packet
->pages_alloc
) {
484 error_setg(errp
, "multifd: received packet "
485 "with %u normal pages and expected maximum pages are %u",
486 p
->normal_num
, packet
->pages_alloc
) ;
490 p
->zero_num
= be32_to_cpu(packet
->zero_pages
);
491 if (p
->zero_num
> packet
->pages_alloc
- p
->normal_num
) {
492 error_setg(errp
, "multifd: received packet "
493 "with %u zero pages and expected maximum zero pages are %u",
494 p
->zero_num
, packet
->pages_alloc
- p
->normal_num
) ;
498 p
->next_packet_size
= be32_to_cpu(packet
->next_packet_size
);
499 p
->packet_num
= be64_to_cpu(packet
->packet_num
);
501 p
->total_normal_pages
+= p
->normal_num
;
502 p
->total_zero_pages
+= p
->zero_num
;
504 trace_multifd_recv(p
->id
, p
->packet_num
, p
->normal_num
, p
->zero_num
,
505 p
->flags
, p
->next_packet_size
);
507 if (p
->normal_num
== 0 && p
->zero_num
== 0) {
511 /* make sure that ramblock is 0 terminated */
512 packet
->ramblock
[255] = 0;
513 p
->block
= qemu_ram_block_by_name(packet
->ramblock
);
515 error_setg(errp
, "multifd: unknown ram block %s",
520 p
->host
= p
->block
->host
;
521 for (i
= 0; i
< p
->normal_num
; i
++) {
522 uint64_t offset
= be64_to_cpu(packet
->offset
[i
]);
524 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
525 error_setg(errp
, "multifd: offset too long %" PRIu64
526 " (max " RAM_ADDR_FMT
")",
527 offset
, p
->block
->used_length
);
530 p
->normal
[i
] = offset
;
533 for (i
= 0; i
< p
->zero_num
; i
++) {
534 uint64_t offset
= be64_to_cpu(packet
->offset
[p
->normal_num
+ i
]);
536 if (offset
> (p
->block
->used_length
- p
->page_size
)) {
537 error_setg(errp
, "multifd: offset too long %" PRIu64
538 " (max " RAM_ADDR_FMT
")",
539 offset
, p
->block
->used_length
);
548 static bool multifd_send_should_exit(void)
550 return qatomic_read(&multifd_send_state
->exiting
);
553 static bool multifd_recv_should_exit(void)
555 return qatomic_read(&multifd_recv_state
->exiting
);
559 * The migration thread can wait on either of the two semaphores. This
560 * function can be used to kick the main thread out of waiting on either of
561 * them. Should mostly only be called when something wrong happened with
562 * the current multifd send thread.
564 static void multifd_send_kick_main(MultiFDSendParams
*p
)
566 qemu_sem_post(&p
->sem_sync
);
567 qemu_sem_post(&multifd_send_state
->channels_ready
);
571 * How we use multifd_send_state->pages and channel->pages?
573 * We create a pages for each channel, and a main one. Each time that
574 * we need to send a batch of pages we interchange the ones between
575 * multifd_send_state and the channel that is sending it. There are
576 * two reasons for that:
577 * - to not have to do so many mallocs during migration
578 * - to make easier to know what to free at the end of migration
580 * This way we always know who is the owner of each "pages" struct,
581 * and we don't need any locking. It belongs to the migration thread
582 * or to the channel thread. Switching is safe because the migration
583 * thread is using the channel mutex when changing it, and the channel
584 * have to had finish with its own, otherwise pending_job can't be
587 * Returns true if succeed, false otherwise.
589 static bool multifd_send_pages(void)
592 static int next_channel
;
593 MultiFDSendParams
*p
= NULL
; /* make happy gcc */
594 MultiFDPages_t
*pages
= multifd_send_state
->pages
;
596 if (multifd_send_should_exit()) {
600 /* We wait here, until at least one channel is ready */
601 qemu_sem_wait(&multifd_send_state
->channels_ready
);
604 * next_channel can remain from a previous migration that was
605 * using more channels, so ensure it doesn't overflow if the
606 * limit is lower now.
608 next_channel
%= migrate_multifd_channels();
609 for (i
= next_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
610 if (multifd_send_should_exit()) {
613 p
= &multifd_send_state
->params
[i
];
615 * Lockless read to p->pending_job is safe, because only multifd
616 * sender thread can clear it.
618 if (qatomic_read(&p
->pending_job
) == false) {
619 next_channel
= (i
+ 1) % migrate_multifd_channels();
625 * Make sure we read p->pending_job before all the rest. Pairs with
626 * qatomic_store_release() in multifd_send_thread().
629 assert(!p
->pages
->num
);
630 multifd_send_state
->pages
= p
->pages
;
633 * Making sure p->pages is setup before marking pending_job=true. Pairs
634 * with the qatomic_load_acquire() in multifd_send_thread().
636 qatomic_store_release(&p
->pending_job
, true);
637 qemu_sem_post(&p
->sem
);
642 static inline bool multifd_queue_empty(MultiFDPages_t
*pages
)
644 return pages
->num
== 0;
647 static inline bool multifd_queue_full(MultiFDPages_t
*pages
)
649 return pages
->num
== pages
->allocated
;
652 static inline void multifd_enqueue(MultiFDPages_t
*pages
, ram_addr_t offset
)
654 pages
->offset
[pages
->num
++] = offset
;
657 /* Returns true if enqueue successful, false otherwise */
658 bool multifd_queue_page(RAMBlock
*block
, ram_addr_t offset
)
660 MultiFDPages_t
*pages
;
663 pages
= multifd_send_state
->pages
;
665 /* If the queue is empty, we can already enqueue now */
666 if (multifd_queue_empty(pages
)) {
667 pages
->block
= block
;
668 multifd_enqueue(pages
, offset
);
673 * Not empty, meanwhile we need a flush. It can because of either:
675 * (1) The page is not on the same ramblock of previous ones, or,
676 * (2) The queue is full.
678 * After flush, always retry.
680 if (pages
->block
!= block
|| multifd_queue_full(pages
)) {
681 if (!multifd_send_pages()) {
687 /* Not empty, and we still have space, do it! */
688 multifd_enqueue(pages
, offset
);
692 /* Multifd send side hit an error; remember it and prepare to quit */
693 static void multifd_send_set_error(Error
*err
)
696 * We don't want to exit each threads twice. Depending on where
697 * we get the error, or if there are two independent errors in two
698 * threads at the same time, we can end calling this function
701 if (qatomic_xchg(&multifd_send_state
->exiting
, 1)) {
706 MigrationState
*s
= migrate_get_current();
707 migrate_set_error(s
, err
);
708 if (s
->state
== MIGRATION_STATUS_SETUP
||
709 s
->state
== MIGRATION_STATUS_PRE_SWITCHOVER
||
710 s
->state
== MIGRATION_STATUS_DEVICE
||
711 s
->state
== MIGRATION_STATUS_ACTIVE
) {
712 migrate_set_state(&s
->state
, s
->state
,
713 MIGRATION_STATUS_FAILED
);
718 static void multifd_send_terminate_threads(void)
722 trace_multifd_send_terminate_threads();
725 * Tell everyone we're quitting. No xchg() needed here; we simply
728 qatomic_set(&multifd_send_state
->exiting
, 1);
731 * Firstly, kick all threads out; no matter whether they are just idle,
732 * or blocked in an IO system call.
734 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
735 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
737 qemu_sem_post(&p
->sem
);
739 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
744 * Finally recycle all the threads.
746 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
747 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
749 if (p
->tls_thread_created
) {
750 qemu_thread_join(&p
->tls_thread
);
753 if (p
->thread_created
) {
754 qemu_thread_join(&p
->thread
);
759 static bool multifd_send_cleanup_channel(MultiFDSendParams
*p
, Error
**errp
)
762 migration_ioc_unregister_yank(p
->c
);
764 * The object_unref() cannot guarantee the fd will always be
765 * released because finalize() of the iochannel is only
766 * triggered on the last reference and it's not guaranteed
767 * that we always hold the last refcount when reaching here.
769 * Closing the fd explicitly has the benefit that if there is any
770 * registered I/O handler callbacks on such fd, that will get a
771 * POLLNVAL event and will further trigger the cleanup to finally
774 * FIXME: It should logically be guaranteed that all multifd
775 * channels have no I/O handler callback registered when reaching
776 * here, because migration thread will wait for all multifd channel
777 * establishments to complete during setup. Since
778 * migrate_fd_cleanup() will be scheduled in main thread too, all
779 * previous callbacks should guarantee to be completed when
780 * reaching here. See multifd_send_state.channels_created and its
781 * usage. In the future, we could replace this with an assert
782 * making sure we're the last reference, or simply drop it if above
783 * is more clear to be justified.
785 qio_channel_close(p
->c
, &error_abort
);
786 object_unref(OBJECT(p
->c
));
789 qemu_sem_destroy(&p
->sem
);
790 qemu_sem_destroy(&p
->sem_sync
);
793 multifd_pages_clear(p
->pages
);
798 multifd_send_state
->ops
->send_cleanup(p
, errp
);
800 return *errp
== NULL
;
803 static void multifd_send_cleanup_state(void)
805 file_cleanup_outgoing_migration();
806 socket_cleanup_outgoing_migration();
807 qemu_sem_destroy(&multifd_send_state
->channels_created
);
808 qemu_sem_destroy(&multifd_send_state
->channels_ready
);
809 g_free(multifd_send_state
->params
);
810 multifd_send_state
->params
= NULL
;
811 multifd_pages_clear(multifd_send_state
->pages
);
812 multifd_send_state
->pages
= NULL
;
813 g_free(multifd_send_state
);
814 multifd_send_state
= NULL
;
817 void multifd_send_shutdown(void)
821 if (!migrate_multifd()) {
825 multifd_send_terminate_threads();
827 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
828 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
829 Error
*local_err
= NULL
;
831 if (!multifd_send_cleanup_channel(p
, &local_err
)) {
832 migrate_set_error(migrate_get_current(), local_err
);
833 error_free(local_err
);
837 multifd_send_cleanup_state();
840 static int multifd_zero_copy_flush(QIOChannel
*c
)
845 ret
= qio_channel_flush(c
, &err
);
847 error_report_err(err
);
851 stat64_add(&mig_stats
.dirty_sync_missed_zero_copy
, 1);
857 int multifd_send_sync_main(void)
860 bool flush_zero_copy
;
862 if (!migrate_multifd()) {
865 if (multifd_send_state
->pages
->num
) {
866 if (!multifd_send_pages()) {
867 error_report("%s: multifd_send_pages fail", __func__
);
872 flush_zero_copy
= migrate_zero_copy_send();
874 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
875 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
877 if (multifd_send_should_exit()) {
881 trace_multifd_send_sync_main_signal(p
->id
);
884 * We should be the only user so far, so not possible to be set by
885 * others concurrently.
887 assert(qatomic_read(&p
->pending_sync
) == false);
888 qatomic_set(&p
->pending_sync
, true);
889 qemu_sem_post(&p
->sem
);
891 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
892 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
894 if (multifd_send_should_exit()) {
898 qemu_sem_wait(&multifd_send_state
->channels_ready
);
899 trace_multifd_send_sync_main_wait(p
->id
);
900 qemu_sem_wait(&p
->sem_sync
);
902 if (flush_zero_copy
&& p
->c
&& (multifd_zero_copy_flush(p
->c
) < 0)) {
906 trace_multifd_send_sync_main(multifd_send_state
->packet_num
);
911 static void *multifd_send_thread(void *opaque
)
913 MultiFDSendParams
*p
= opaque
;
914 MigrationThread
*thread
= NULL
;
915 Error
*local_err
= NULL
;
917 bool use_packets
= multifd_use_packets();
919 thread
= migration_threads_add(p
->name
, qemu_get_thread_id());
921 trace_multifd_send_thread_start(p
->id
);
922 rcu_register_thread();
925 if (multifd_send_initial_packet(p
, &local_err
) < 0) {
932 qemu_sem_post(&multifd_send_state
->channels_ready
);
933 qemu_sem_wait(&p
->sem
);
935 if (multifd_send_should_exit()) {
940 * Read pending_job flag before p->pages. Pairs with the
941 * qatomic_store_release() in multifd_send_pages().
943 if (qatomic_load_acquire(&p
->pending_job
)) {
944 MultiFDPages_t
*pages
= p
->pages
;
949 ret
= multifd_send_state
->ops
->send_prepare(p
, &local_err
);
954 if (migrate_mapped_ram()) {
955 ret
= file_write_ramblock_iov(p
->c
, p
->iov
, p
->iovs_num
,
956 p
->pages
->block
, &local_err
);
958 ret
= qio_channel_writev_full_all(p
->c
, p
->iov
, p
->iovs_num
,
959 NULL
, 0, p
->write_flags
,
967 stat64_add(&mig_stats
.multifd_bytes
,
968 p
->next_packet_size
+ p
->packet_len
);
969 stat64_add(&mig_stats
.normal_pages
, pages
->normal_num
);
970 stat64_add(&mig_stats
.zero_pages
, pages
->num
- pages
->normal_num
);
972 multifd_pages_reset(p
->pages
);
973 p
->next_packet_size
= 0;
976 * Making sure p->pages is published before saying "we're
977 * free". Pairs with the smp_mb_acquire() in
978 * multifd_send_pages().
980 qatomic_store_release(&p
->pending_job
, false);
983 * If not a normal job, must be a sync request. Note that
984 * pending_sync is a standalone flag (unlike pending_job), so
985 * it doesn't require explicit memory barriers.
987 assert(qatomic_read(&p
->pending_sync
));
990 p
->flags
= MULTIFD_FLAG_SYNC
;
991 multifd_send_fill_packet(p
);
992 ret
= qio_channel_write_all(p
->c
, (void *)p
->packet
,
993 p
->packet_len
, &local_err
);
997 /* p->next_packet_size will always be zero for a SYNC packet */
998 stat64_add(&mig_stats
.multifd_bytes
, p
->packet_len
);
1002 qatomic_set(&p
->pending_sync
, false);
1003 qemu_sem_post(&p
->sem_sync
);
1010 trace_multifd_send_error(p
->id
);
1011 multifd_send_set_error(local_err
);
1012 multifd_send_kick_main(p
);
1013 error_free(local_err
);
1016 rcu_unregister_thread();
1017 migration_threads_remove(thread
);
1018 trace_multifd_send_thread_end(p
->id
, p
->packets_sent
, p
->total_normal_pages
,
1019 p
->total_zero_pages
);
1024 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
);
1027 MultiFDSendParams
*p
;
1028 QIOChannelTLS
*tioc
;
1029 } MultiFDTLSThreadArgs
;
1031 static void *multifd_tls_handshake_thread(void *opaque
)
1033 MultiFDTLSThreadArgs
*args
= opaque
;
1035 qio_channel_tls_handshake(args
->tioc
,
1036 multifd_new_send_channel_async
,
1045 static bool multifd_tls_channel_connect(MultiFDSendParams
*p
,
1049 MigrationState
*s
= migrate_get_current();
1050 const char *hostname
= s
->hostname
;
1051 MultiFDTLSThreadArgs
*args
;
1052 QIOChannelTLS
*tioc
;
1054 tioc
= migration_tls_client_create(ioc
, hostname
, errp
);
1060 * Ownership of the socket channel now transfers to the newly
1061 * created TLS channel, which has already taken a reference.
1063 object_unref(OBJECT(ioc
));
1064 trace_multifd_tls_outgoing_handshake_start(ioc
, tioc
, hostname
);
1065 qio_channel_set_name(QIO_CHANNEL(tioc
), "multifd-tls-outgoing");
1067 args
= g_new0(MultiFDTLSThreadArgs
, 1);
1071 p
->tls_thread_created
= true;
1072 qemu_thread_create(&p
->tls_thread
, "mig/src/tls",
1073 multifd_tls_handshake_thread
, args
,
1074 QEMU_THREAD_JOINABLE
);
1078 void multifd_channel_connect(MultiFDSendParams
*p
, QIOChannel
*ioc
)
1080 qio_channel_set_delay(ioc
, false);
1082 migration_ioc_register_yank(ioc
);
1083 /* Setup p->c only if the channel is completely setup */
1086 p
->thread_created
= true;
1087 qemu_thread_create(&p
->thread
, p
->name
, multifd_send_thread
, p
,
1088 QEMU_THREAD_JOINABLE
);
1092 * When TLS is enabled this function is called once to establish the
1093 * TLS connection and a second time after the TLS handshake to create
1094 * the multifd channel. Without TLS it goes straight into the channel
1097 static void multifd_new_send_channel_async(QIOTask
*task
, gpointer opaque
)
1099 MultiFDSendParams
*p
= opaque
;
1100 QIOChannel
*ioc
= QIO_CHANNEL(qio_task_get_source(task
));
1101 Error
*local_err
= NULL
;
1104 trace_multifd_new_send_channel_async(p
->id
);
1106 if (qio_task_propagate_error(task
, &local_err
)) {
1111 trace_multifd_set_outgoing_channel(ioc
, object_get_typename(OBJECT(ioc
)),
1112 migrate_get_current()->hostname
);
1114 if (migrate_channel_requires_tls_upgrade(ioc
)) {
1115 ret
= multifd_tls_channel_connect(p
, ioc
, &local_err
);
1120 multifd_channel_connect(p
, ioc
);
1126 * Here we're not interested whether creation succeeded, only that
1127 * it happened at all.
1129 multifd_send_channel_created();
1135 trace_multifd_new_send_channel_async_error(p
->id
, local_err
);
1136 multifd_send_set_error(local_err
);
1138 * For error cases (TLS or non-TLS), IO channel is always freed here
1139 * rather than when cleanup multifd: since p->c is not set, multifd
1140 * cleanup code doesn't even know its existence.
1142 object_unref(OBJECT(ioc
));
1143 error_free(local_err
);
1146 static bool multifd_new_send_channel_create(gpointer opaque
, Error
**errp
)
1148 if (!multifd_use_packets()) {
1149 return file_send_channel_create(opaque
, errp
);
1152 socket_send_channel_create(multifd_new_send_channel_async
, opaque
);
1156 bool multifd_send_setup(void)
1158 MigrationState
*s
= migrate_get_current();
1159 Error
*local_err
= NULL
;
1160 int thread_count
, ret
= 0;
1161 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1162 bool use_packets
= multifd_use_packets();
1165 if (!migrate_multifd()) {
1169 thread_count
= migrate_multifd_channels();
1170 multifd_send_state
= g_malloc0(sizeof(*multifd_send_state
));
1171 multifd_send_state
->params
= g_new0(MultiFDSendParams
, thread_count
);
1172 multifd_send_state
->pages
= multifd_pages_init(page_count
);
1173 qemu_sem_init(&multifd_send_state
->channels_created
, 0);
1174 qemu_sem_init(&multifd_send_state
->channels_ready
, 0);
1175 qatomic_set(&multifd_send_state
->exiting
, 0);
1176 multifd_send_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1178 for (i
= 0; i
< thread_count
; i
++) {
1179 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1181 qemu_sem_init(&p
->sem
, 0);
1182 qemu_sem_init(&p
->sem_sync
, 0);
1184 p
->pages
= multifd_pages_init(page_count
);
1187 p
->packet_len
= sizeof(MultiFDPacket_t
)
1188 + sizeof(uint64_t) * page_count
;
1189 p
->packet
= g_malloc0(p
->packet_len
);
1190 p
->packet
->magic
= cpu_to_be32(MULTIFD_MAGIC
);
1191 p
->packet
->version
= cpu_to_be32(MULTIFD_VERSION
);
1193 p
->name
= g_strdup_printf("mig/src/send_%d", i
);
1194 p
->page_size
= qemu_target_page_size();
1195 p
->page_count
= page_count
;
1198 if (!multifd_new_send_channel_create(p
, &local_err
)) {
1204 * Wait until channel creation has started for all channels. The
1205 * creation can still fail, but no more channels will be created
1208 for (i
= 0; i
< thread_count
; i
++) {
1209 qemu_sem_wait(&multifd_send_state
->channels_created
);
1212 for (i
= 0; i
< thread_count
; i
++) {
1213 MultiFDSendParams
*p
= &multifd_send_state
->params
[i
];
1215 ret
= multifd_send_state
->ops
->send_setup(p
, &local_err
);
1222 migrate_set_error(s
, local_err
);
1223 error_report_err(local_err
);
1224 migrate_set_state(&s
->state
, MIGRATION_STATUS_SETUP
,
1225 MIGRATION_STATUS_FAILED
);
1232 bool multifd_recv(void)
1235 static int next_recv_channel
;
1236 MultiFDRecvParams
*p
= NULL
;
1237 MultiFDRecvData
*data
= multifd_recv_state
->data
;
1240 * next_channel can remain from a previous migration that was
1241 * using more channels, so ensure it doesn't overflow if the
1242 * limit is lower now.
1244 next_recv_channel
%= migrate_multifd_channels();
1245 for (i
= next_recv_channel
;; i
= (i
+ 1) % migrate_multifd_channels()) {
1246 if (multifd_recv_should_exit()) {
1250 p
= &multifd_recv_state
->params
[i
];
1252 if (qatomic_read(&p
->pending_job
) == false) {
1253 next_recv_channel
= (i
+ 1) % migrate_multifd_channels();
1259 * Order pending_job read before manipulating p->data below. Pairs
1260 * with qatomic_store_release() at multifd_recv_thread().
1264 assert(!p
->data
->size
);
1265 multifd_recv_state
->data
= p
->data
;
1269 * Order p->data update before setting pending_job. Pairs with
1270 * qatomic_load_acquire() at multifd_recv_thread().
1272 qatomic_store_release(&p
->pending_job
, true);
1273 qemu_sem_post(&p
->sem
);
1278 MultiFDRecvData
*multifd_get_recv_data(void)
1280 return multifd_recv_state
->data
;
1283 static void multifd_recv_terminate_threads(Error
*err
)
1287 trace_multifd_recv_terminate_threads(err
!= NULL
);
1289 if (qatomic_xchg(&multifd_recv_state
->exiting
, 1)) {
1294 MigrationState
*s
= migrate_get_current();
1295 migrate_set_error(s
, err
);
1296 if (s
->state
== MIGRATION_STATUS_SETUP
||
1297 s
->state
== MIGRATION_STATUS_ACTIVE
) {
1298 migrate_set_state(&s
->state
, s
->state
,
1299 MIGRATION_STATUS_FAILED
);
1303 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1304 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1307 * The migration thread and channels interact differently
1308 * depending on the presence of packets.
1310 if (multifd_use_packets()) {
1312 * The channel receives as long as there are packets. When
1313 * packets end (i.e. MULTIFD_FLAG_SYNC is reached), the
1314 * channel waits for the migration thread to sync. If the
1315 * sync never happens, do it here.
1317 qemu_sem_post(&p
->sem_sync
);
1320 * The channel waits for the migration thread to give it
1321 * work. When the migration thread runs out of work, it
1322 * releases the channel and waits for any pending work to
1323 * finish. If we reach here (e.g. due to error) before the
1324 * work runs out, release the channel.
1326 qemu_sem_post(&p
->sem
);
1330 * We could arrive here for two reasons:
1331 * - normal quit, i.e. everything went fine, just finished
1332 * - error quit: We close the channels so the channel threads
1333 * finish the qio_channel_read_all_eof()
1336 qio_channel_shutdown(p
->c
, QIO_CHANNEL_SHUTDOWN_BOTH
, NULL
);
1341 void multifd_recv_shutdown(void)
1343 if (migrate_multifd()) {
1344 multifd_recv_terminate_threads(NULL
);
1348 static void multifd_recv_cleanup_channel(MultiFDRecvParams
*p
)
1350 migration_ioc_unregister_yank(p
->c
);
1351 object_unref(OBJECT(p
->c
));
1353 qemu_mutex_destroy(&p
->mutex
);
1354 qemu_sem_destroy(&p
->sem_sync
);
1355 qemu_sem_destroy(&p
->sem
);
1365 multifd_recv_state
->ops
->recv_cleanup(p
);
1368 static void multifd_recv_cleanup_state(void)
1370 qemu_sem_destroy(&multifd_recv_state
->sem_sync
);
1371 g_free(multifd_recv_state
->params
);
1372 multifd_recv_state
->params
= NULL
;
1373 g_free(multifd_recv_state
->data
);
1374 multifd_recv_state
->data
= NULL
;
1375 g_free(multifd_recv_state
);
1376 multifd_recv_state
= NULL
;
1379 void multifd_recv_cleanup(void)
1383 if (!migrate_multifd()) {
1386 multifd_recv_terminate_threads(NULL
);
1387 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1388 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1390 if (p
->thread_created
) {
1391 qemu_thread_join(&p
->thread
);
1394 for (i
= 0; i
< migrate_multifd_channels(); i
++) {
1395 multifd_recv_cleanup_channel(&multifd_recv_state
->params
[i
]);
1397 multifd_recv_cleanup_state();
1400 void multifd_recv_sync_main(void)
1402 int thread_count
= migrate_multifd_channels();
1403 bool file_based
= !multifd_use_packets();
1406 if (!migrate_multifd()) {
1411 * File-based channels don't use packets and therefore need to
1412 * wait for more work. Release them to start the sync.
1415 for (i
= 0; i
< thread_count
; i
++) {
1416 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1418 trace_multifd_recv_sync_main_signal(p
->id
);
1419 qemu_sem_post(&p
->sem
);
1424 * Initiate the synchronization by waiting for all channels.
1426 * For socket-based migration this means each channel has received
1427 * the SYNC packet on the stream.
1429 * For file-based migration this means each channel is done with
1430 * the work (pending_job=false).
1432 for (i
= 0; i
< thread_count
; i
++) {
1433 trace_multifd_recv_sync_main_wait(i
);
1434 qemu_sem_wait(&multifd_recv_state
->sem_sync
);
1439 * For file-based loading is done in one iteration. We're
1446 * Sync done. Release the channels for the next iteration.
1448 for (i
= 0; i
< thread_count
; i
++) {
1449 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1451 WITH_QEMU_LOCK_GUARD(&p
->mutex
) {
1452 if (multifd_recv_state
->packet_num
< p
->packet_num
) {
1453 multifd_recv_state
->packet_num
= p
->packet_num
;
1456 trace_multifd_recv_sync_main_signal(p
->id
);
1457 qemu_sem_post(&p
->sem_sync
);
1459 trace_multifd_recv_sync_main(multifd_recv_state
->packet_num
);
1462 static void *multifd_recv_thread(void *opaque
)
1464 MultiFDRecvParams
*p
= opaque
;
1465 Error
*local_err
= NULL
;
1466 bool use_packets
= multifd_use_packets();
1469 trace_multifd_recv_thread_start(p
->id
);
1470 rcu_register_thread();
1474 bool has_data
= false;
1478 if (multifd_recv_should_exit()) {
1482 ret
= qio_channel_read_all_eof(p
->c
, (void *)p
->packet
,
1483 p
->packet_len
, &local_err
);
1484 if (ret
== 0 || ret
== -1) { /* 0: EOF -1: Error */
1488 qemu_mutex_lock(&p
->mutex
);
1489 ret
= multifd_recv_unfill_packet(p
, &local_err
);
1491 qemu_mutex_unlock(&p
->mutex
);
1496 /* recv methods don't know how to handle the SYNC flag */
1497 p
->flags
&= ~MULTIFD_FLAG_SYNC
;
1498 has_data
= p
->normal_num
|| p
->zero_num
;
1499 qemu_mutex_unlock(&p
->mutex
);
1502 * No packets, so we need to wait for the vmstate code to
1505 qemu_sem_wait(&p
->sem
);
1507 if (multifd_recv_should_exit()) {
1511 /* pairs with qatomic_store_release() at multifd_recv() */
1512 if (!qatomic_load_acquire(&p
->pending_job
)) {
1514 * Migration thread did not send work, this is
1515 * equivalent to pending_sync on the sending
1516 * side. Post sem_sync to notify we reached this
1519 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1523 has_data
= !!p
->data
->size
;
1527 ret
= multifd_recv_state
->ops
->recv(p
, &local_err
);
1534 if (flags
& MULTIFD_FLAG_SYNC
) {
1535 qemu_sem_post(&multifd_recv_state
->sem_sync
);
1536 qemu_sem_wait(&p
->sem_sync
);
1539 p
->total_normal_pages
+= p
->data
->size
/ qemu_target_page_size();
1542 * Order data->size update before clearing
1543 * pending_job. Pairs with smp_mb_acquire() at
1546 qatomic_store_release(&p
->pending_job
, false);
1551 multifd_recv_terminate_threads(local_err
);
1552 error_free(local_err
);
1555 rcu_unregister_thread();
1556 trace_multifd_recv_thread_end(p
->id
, p
->packets_recved
,
1557 p
->total_normal_pages
,
1558 p
->total_zero_pages
);
1563 int multifd_recv_setup(Error
**errp
)
1566 uint32_t page_count
= MULTIFD_PACKET_SIZE
/ qemu_target_page_size();
1567 bool use_packets
= multifd_use_packets();
1571 * Return successfully if multiFD recv state is already initialised
1572 * or multiFD is not enabled.
1574 if (multifd_recv_state
|| !migrate_multifd()) {
1578 thread_count
= migrate_multifd_channels();
1579 multifd_recv_state
= g_malloc0(sizeof(*multifd_recv_state
));
1580 multifd_recv_state
->params
= g_new0(MultiFDRecvParams
, thread_count
);
1582 multifd_recv_state
->data
= g_new0(MultiFDRecvData
, 1);
1583 multifd_recv_state
->data
->size
= 0;
1585 qatomic_set(&multifd_recv_state
->count
, 0);
1586 qatomic_set(&multifd_recv_state
->exiting
, 0);
1587 qemu_sem_init(&multifd_recv_state
->sem_sync
, 0);
1588 multifd_recv_state
->ops
= multifd_ops
[migrate_multifd_compression()];
1590 for (i
= 0; i
< thread_count
; i
++) {
1591 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1593 qemu_mutex_init(&p
->mutex
);
1594 qemu_sem_init(&p
->sem_sync
, 0);
1595 qemu_sem_init(&p
->sem
, 0);
1596 p
->pending_job
= false;
1599 p
->data
= g_new0(MultiFDRecvData
, 1);
1603 p
->packet_len
= sizeof(MultiFDPacket_t
)
1604 + sizeof(uint64_t) * page_count
;
1605 p
->packet
= g_malloc0(p
->packet_len
);
1607 p
->name
= g_strdup_printf("mig/dst/recv_%d", i
);
1608 p
->normal
= g_new0(ram_addr_t
, page_count
);
1609 p
->zero
= g_new0(ram_addr_t
, page_count
);
1610 p
->page_count
= page_count
;
1611 p
->page_size
= qemu_target_page_size();
1614 for (i
= 0; i
< thread_count
; i
++) {
1615 MultiFDRecvParams
*p
= &multifd_recv_state
->params
[i
];
1618 ret
= multifd_recv_state
->ops
->recv_setup(p
, errp
);
1626 bool multifd_recv_all_channels_created(void)
1628 int thread_count
= migrate_multifd_channels();
1630 if (!migrate_multifd()) {
1634 if (!multifd_recv_state
) {
1635 /* Called before any connections created */
1639 return thread_count
== qatomic_read(&multifd_recv_state
->count
);
1643 * Try to receive all multifd channels to get ready for the migration.
1644 * Sets @errp when failing to receive the current channel.
1646 void multifd_recv_new_channel(QIOChannel
*ioc
, Error
**errp
)
1648 MultiFDRecvParams
*p
;
1649 Error
*local_err
= NULL
;
1650 bool use_packets
= multifd_use_packets();
1654 id
= multifd_recv_initial_packet(ioc
, &local_err
);
1656 multifd_recv_terminate_threads(local_err
);
1657 error_propagate_prepend(errp
, local_err
,
1658 "failed to receive packet"
1659 " via multifd channel %d: ",
1660 qatomic_read(&multifd_recv_state
->count
));
1663 trace_multifd_recv_new_channel(id
);
1665 id
= qatomic_read(&multifd_recv_state
->count
);
1668 p
= &multifd_recv_state
->params
[id
];
1670 error_setg(&local_err
, "multifd: received id '%d' already setup'",
1672 multifd_recv_terminate_threads(local_err
);
1673 error_propagate(errp
, local_err
);
1677 object_ref(OBJECT(ioc
));
1679 p
->thread_created
= true;
1680 qemu_thread_create(&p
->thread
, p
->name
, multifd_recv_thread
, p
,
1681 QEMU_THREAD_JOINABLE
);
1682 qatomic_inc(&multifd_recv_state
->count
);
1685 bool multifd_send_prepare_common(MultiFDSendParams
*p
)
1687 multifd_send_zero_page_detect(p
);
1689 if (!p
->pages
->normal_num
) {
1690 p
->next_packet_size
= 0;
1694 multifd_send_prepare_header(p
);