@@ -2001,6 +2001,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
.repeat = 1,
};
+ /* use multifd to send data */
+ if (migrate_use_multifd()) {
+ int channel = get_multifd_RDMA_channel();
+ int ret = 0;
+ MultiFDSendParams *multifd_send_param = NULL;
+ ret = get_multifd_send_param(channel, &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd_send_param(%d)", channel);
+ return -EINVAL;
+ }
+ rdma = (RDMAContext *)multifd_send_param->rdma;
+ block = &(rdma->local_ram_blocks.block[current_index]);
+ }
+
retry:
sge.addr = (uintptr_t)(block->local_host_addr +
(current_addr - block->offset));
@@ -2196,6 +2210,27 @@ retry:
return 0;
}
+static int multifd_rdma_write_flush(void)
+{
+ /* The multifd RDMA threads send data */
+ MultiFDSendParams *multifd_send_param = NULL;
+ RDMAContext *rdma = NULL;
+ MigrationState *s = migrate_get_current();
+ int ret = 0;
+
+ ret = get_multifd_send_param(s->rdma_channel,
+ &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd_send_param(%d)",
+ s->rdma_channel);
+ return ret;
+ }
+ rdma = (RDMAContext *)(multifd_send_param->rdma);
+ rdma->nb_sent++;
+
+ return ret;
+}
+
/*
* Push out any unwritten RDMA operations.
*
@@ -2218,8 +2253,15 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
}
if (ret == 0) {
- rdma->nb_sent++;
- trace_qemu_rdma_write_flush(rdma->nb_sent);
+ if (migrate_use_multifd()) {
+ ret = multifd_rdma_write_flush();
+ if (ret) {
+ return ret;
+ }
+ } else {
+ rdma->nb_sent++;
+ trace_qemu_rdma_write_flush(rdma->nb_sent);
+ }
}
rdma->current_length = 0;
@@ -4061,6 +4103,7 @@ wait_reg_complete:
}
qemu_sem_post(&multifd_send_param->sem_sync);
+ qemu_sem_wait(&multifd_send_param->sem);
}
}
@@ -4443,6 +4486,7 @@ static void *multifd_rdma_send_thread(void *opaque)
Error *local_err = NULL;
int ret = 0;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
+ RDMAContext *rdma = p->rdma;
trace_multifd_send_thread_start(p->id);
if (multifd_send_initial_packet(p, &local_err) < 0) {
@@ -4451,7 +4495,7 @@ static void *multifd_rdma_send_thread(void *opaque)
/* wait for semaphore notification to register memory */
qemu_sem_wait(&p->sem_sync);
- if (qemu_rdma_registration(p->rdma) < 0) {
+ if (qemu_rdma_registration(rdma) < 0) {
goto out;
}
/*
@@ -4467,13 +4511,26 @@ static void *multifd_rdma_send_thread(void *opaque)
break;
}
qemu_mutex_unlock(&p->mutex);
-
+ /* To complete polling(CQE) */
+ while (rdma->nb_sent) {
+ ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
+ if (ret < 0) {
+ error_report("multifd RDMA migration: "
+ "complete polling error!");
+ return NULL;
+ }
+ }
/* Send FINISHED to the destination */
head.type = RDMA_CONTROL_REGISTER_FINISHED;
- ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
+ ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
if (ret < 0) {
+ error_report("multifd RDMA migration: "
+ "sending remote error!");
return NULL;
}
+
+ /* sync main thread */
+ qemu_sem_post(&p->sem);
}
out: