diff mbox series

[v8,07/20] multi-process: add co-routines to communicate with remote

Message ID b57493752ed0ec04f44df915413e325acf641882.1596217462.git.jag.raman@oracle.com
State New
Headers show
Series None | expand

Commit Message

Jag Raman July 31, 2020, 6:20 p.m. UTC
From: Elena Ufimtseva <elena.ufimtseva@oracle.com>

process to avoid blocking the main loop during the message exchanges.
To be used by proxy device.

Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
---
 include/io/mpqemu-link.h | 15 +++++++++
 io/mpqemu-link.c         | 82 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 97 insertions(+)
diff mbox series

Patch

diff --git a/include/io/mpqemu-link.h b/include/io/mpqemu-link.h
index ae7008e..8591ad2 100644
--- a/include/io/mpqemu-link.h
+++ b/include/io/mpqemu-link.h
@@ -14,6 +14,7 @@ 
 #include "qom/object.h"
 #include "qemu/thread.h"
 #include "io/channel.h"
+#include "io/channel-socket.h"
 
 #define REMOTE_MAX_FDS 8
 
@@ -27,6 +28,7 @@ 
  */
 typedef enum {
     INIT = 0,
+    RET_MSG,
     MAX = INT_MAX,
 } MPQemuCmd;
 
@@ -64,6 +66,19 @@  typedef struct {
     uint8_t *data2;
 } MPQemuMsg;
 
+struct MPQemuRequest {
+    MPQemuMsg *msg;
+    QIOChannel *ioc;
+    Coroutine *co;
+    bool finished;
+    int error;
+    long ret;
+};
+
+typedef struct MPQemuRequest MPQemuRequest;
+
+uint64_t mpqemu_msg_send_and_await_reply(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp);
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc, Error **errp);
 void mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc, Error **errp);
 
diff --git a/io/mpqemu-link.c b/io/mpqemu-link.c
index dcefa42..d4dd0fe 100644
--- a/io/mpqemu-link.c
+++ b/io/mpqemu-link.c
@@ -16,6 +16,8 @@ 
 #include "qapi/error.h"
 #include "qemu/iov.h"
 #include "qemu/error-report.h"
+#include "qemu/main-loop.h"
+#include "io/channel-socket.h"
 
 void mpqemu_msg_send(MPQemuMsg *msg, QIOChannel *ioc, Error **errp)
 {
@@ -132,6 +134,86 @@  void mpqemu_msg_recv(MPQemuMsg *msg, QIOChannel *ioc, Error **errp)
     }
 }
 
+/* Use in proxy only as it clobbers fd handlers. */
+static void coroutine_fn mpqemu_msg_send_co(void *data)
+{
+    MPQemuRequest *req = (MPQemuRequest *)data;
+    MPQemuMsg msg_reply = {0};
+    Error *local_err = NULL;
+
+    if (!req->ioc) {
+        error_report("No channel available to send command %d",
+                     req->msg->cmd);
+        req->finished = true;
+        req->error = -EINVAL;
+        return;
+    }
+
+    req->co = qemu_coroutine_self();
+    mpqemu_msg_send(req->msg, req->ioc, &local_err);
+    if (local_err) {
+        error_report("ERROR: failed to send command to remote %d, ",
+                     req->msg->cmd);
+        req->finished = true;
+        req->error = -EINVAL;
+        return;
+    }
+
+    mpqemu_msg_recv(&msg_reply, req->ioc, &local_err);
+    if (local_err) {
+        error_report("ERROR: failed to get a reply for command %d, "
+                     "errno %s",
+                     req->msg->cmd, strerror(errno));
+        req->error = -EIO;
+    } else {
+        if (!mpqemu_msg_valid(&msg_reply) || msg_reply.cmd != RET_MSG) {
+            error_report("ERROR: Invalid reply received for command %d",
+                         req->msg->cmd);
+            req->error = -EINVAL;
+        } else {
+            req->ret = msg_reply.data1.u64;
+        }
+    }
+    req->finished = true;
+}
+
+/*
+ * Create if needed and enter co-routine to send the message to the
+ * remote channel ioc and wait for the reply.
+ * Returns the value from the reply message, sets the error on failure.
+ */
+
+uint64_t mpqemu_msg_send_and_await_reply(MPQemuMsg *msg, QIOChannel *ioc,
+                                  Error **errp)
+{
+    MPQemuRequest req = {0};
+    uint64_t ret = UINT64_MAX;
+
+    req.ioc = ioc;
+    if (!req.ioc) {
+        error_setg(errp, "Channel is set to NULL");
+        return ret;
+    }
+
+    req.msg = msg;
+    req.ret = 0;
+    req.finished = false;
+
+    req.co = qemu_coroutine_create(mpqemu_msg_send_co, &req);
+    qemu_coroutine_enter(req.co);
+
+    while (!req.finished) {
+        aio_poll(qemu_get_aio_context(), true);
+    }
+    if (req.error) {
+        error_setg(errp, "Error exchanging message with remote process, "
+                        "error %d", req.error);
+    }
+    ret = req.ret;
+
+    return ret;
+}
+
 bool mpqemu_msg_valid(MPQemuMsg *msg)
 {
     if (msg->cmd >= MAX && msg->cmd < 0) {