@@ -534,6 +534,9 @@ void multifd_send_terminate_threads(Error *err)
qemu_mutex_lock(&p->mutex);
p->quit = true;
qemu_sem_post(&p->sem);
+ if (migrate_use_rdma()) {
+ qemu_sem_post(&p->sem_sync);
+ }
qemu_mutex_unlock(&p->mutex);
}
}
@@ -3837,6 +3837,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
return rdma_block_notification_handle(opaque, data);
case RAM_CONTROL_HOOK:
+ if (migrate_use_multifd()) {
+ int i;
+ MultiFDRecvParams *multifd_recv_param = NULL;
+ int thread_count = migrate_multifd_channels();
+ /* Inform dest recv_thread to poll */
+ for (i = 0; i < thread_count; i++) {
+ if (get_multifd_recv_param(i, &multifd_recv_param)) {
+ return -1;
+ }
+ qemu_sem_post(&multifd_recv_param->sem_sync);
+ }
+ }
+
return qemu_rdma_registration_handle(f, opaque);
default:
@@ -3909,6 +3922,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
trace_qemu_rdma_registration_stop_ram();
+ if (migrate_use_multifd()) {
+ /*
+ * Inform the multifd channels to register memory
+ */
+ int i;
+ int thread_count = migrate_multifd_channels();
+ MultiFDSendParams *multifd_send_param = NULL;
+ for (i = 0; i < thread_count; i++) {
+ ret = get_multifd_send_param(i, &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd(%d)", i);
+ return ret;
+ }
+
+ qemu_sem_post(&multifd_send_param->sem_sync);
+ }
+ }
+
/*
* Make sure that we parallelize the pinning on both sides.
* For very large guests, doing this serially takes a really
@@ -3967,6 +3998,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
rdma->dest_blocks[i].remote_host_addr;
local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
}
+ /* Wait for all multifd channels to complete registration */
+ if (migrate_use_multifd()) {
+ int i;
+ int thread_count = migrate_multifd_channels();
+ MultiFDSendParams *multifd_send_param = NULL;
+ for (i = 0; i < thread_count; i++) {
+ ret = get_multifd_send_param(i, &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd(%d)", i);
+ return ret;
+ }
+
+ qemu_sem_wait(&multifd_send_param->sem);
+ }
+ }
}
trace_qemu_rdma_registration_stop(flags);
@@ -3978,6 +4024,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
goto err;
}
+ if (migrate_use_multifd()) {
+ /*
+ * Inform src send_thread to send FINISHED signal.
+ * Wait for multifd RDMA send threads to poll the CQE.
+ */
+ int i;
+ int thread_count = migrate_multifd_channels();
+ MultiFDSendParams *multifd_send_param = NULL;
+ for (i = 0; i < thread_count; i++) {
+ ret = get_multifd_send_param(i, &multifd_send_param);
+ if (ret < 0) {
+ goto err;
+ }
+
+ qemu_sem_post(&multifd_send_param->sem_sync);
+ }
+ }
+
return 0;
err:
rdma->error_state = ret;
@@ -4355,20 +4419,39 @@ static void *multifd_rdma_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
Error *local_err = NULL;
+ int ret = 0;
+ RDMAControlHeader head = { .len = 0, .repeat = 1 };
trace_multifd_send_thread_start(p->id);
if (multifd_send_initial_packet(p, &local_err) < 0) {
goto out;
}
+ /* wait for semaphore notification to register memory */
+ qemu_sem_wait(&p->sem_sync);
+ if (qemu_rdma_registration(p->rdma) < 0) {
+ goto out;
+ }
+ /*
+ * Inform the main RDMA thread to run when multifd
+ * RDMA thread have completed registration.
+ */
+ qemu_sem_post(&p->sem);
while (true) {
+ qemu_sem_wait(&p->sem_sync);
qemu_mutex_lock(&p->mutex);
if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
}
qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+
+ /* Send FINISHED to the destination */
+ head.type = RDMA_CONTROL_REGISTER_FINISHED;
+ ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
+ if (ret < 0) {
+ return NULL;
+ }
}
out:
@@ -4406,15 +4489,22 @@ static void multifd_rdma_send_channel_setup(MultiFDSendParams *p)
static void *multifd_rdma_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
+ int ret = 0;
while (true) {
+ qemu_sem_wait(&p->sem_sync);
+
qemu_mutex_lock(&p->mutex);
if (p->quit) {
qemu_mutex_unlock(&p->mutex);
break;
}
qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem_sync);
+ ret = qemu_rdma_registration_handle(p->file, p->c);
+ if (ret < 0) {
+ qemu_file_set_error(p->file, ret);
+ break;
+ }
}
qemu_mutex_lock(&p->mutex);