qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC][PATCH v1 07/12] qmp proxy: core code for proxying qmp


From: Michael Roth
Subject: [Qemu-devel] [RFC][PATCH v1 07/12] qmp proxy: core code for proxying qmp requests to guest
Date: Fri, 25 Mar 2011 14:47:54 -0500

This provides a QmpProxy class, 1 instance of which is shared by all QMP
servers/sessions to send/receive QMP requests/responses between QEMU and
the QEMU guest agent.

A single qmp_proxy_send_request() is the only interface currently needed
by a QMP session, QAPI/QMP's existing async support handles all the work
of doing callbacks and routing responses to the proper session.

Currently the class requires a path to a listening socket that either
corresponds to the chardev that the guest agent is communicating
through, or a local socket so we can communicate with a host-side
"guest" agent for testing purposes.

A subsequent patch will introduce a new chardev that sets up the
socket chardev and initializes the QmpProxy instance to abstract this
away from the user. Unifying this with local "guest" agent support may
not be feasible, so another command-line option may be needed support
host-side-only testing.

Signed-off-by: Michael Roth <address@hidden>
---
 qmp-core.c       |    8 ++
 qmp-core.h       |    7 +-
 qmp-proxy-core.h |   20 ++++
 qmp-proxy.c      |  335 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 vl.c             |    1 +
 5 files changed, 365 insertions(+), 6 deletions(-)
 create mode 100644 qmp-proxy-core.h
 create mode 100644 qmp-proxy.c

diff --git a/qmp-core.c b/qmp-core.c
index 9f3d182..dab50a1 100644
--- a/qmp-core.c
+++ b/qmp-core.c
@@ -937,7 +937,15 @@ void qmp_async_complete_command(QmpCommandState *cmd, 
QObject *retval, Error *er
     qemu_free(cmd);
 }
 
+extern QmpProxy *qmp_proxy_default;
+
 void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
                         QmpGuestCompletionFunc *cb, void *opaque)
 {
+    if (!qmp_proxy_default) {
+        /* TODO: should set errp here */
+        fprintf(stderr, "qmp proxy: no guest proxy found\n");
+        return;
+    }
+    qmp_proxy_send_request(qmp_proxy_default, name, args, errp, cb, opaque);
 }
diff --git a/qmp-core.h b/qmp-core.h
index b676020..114d290 100644
--- a/qmp-core.h
+++ b/qmp-core.h
@@ -4,6 +4,7 @@
 #include "monitor.h"
 #include "qmp-marshal-types.h"
 #include "error_int.h"
+#include "qmp-proxy-core.h"
 
 struct QmpCommandState
 {
@@ -85,11 +86,5 @@ int qmp_state_get_fd(QmpState *sess);
     }                                                        \
 } while(0)
 
-typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data, Error 
*err);
-
-void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
-                        QmpGuestCompletionFunc *cb, void *opaque);
-
-
 #endif
 
diff --git a/qmp-proxy-core.h b/qmp-proxy-core.h
new file mode 100644
index 0000000..47ac85d
--- /dev/null
+++ b/qmp-proxy-core.h
@@ -0,0 +1,20 @@
+#ifndef QMP_PROXY_CORE_H
+#define QMP_PROXY_CORE_H
+
+#define QMP_PROXY_PATH_DEFAULT "/tmp/qmp-proxy.sock"
+
+typedef void (QmpGuestCompletionFunc)(void *opaque, QObject *ret_data,
+                                      Error *err);
+
+void qmp_guest_dispatch(const char *name, const QDict *args, Error **errp,
+                        QmpGuestCompletionFunc *cb, void *opaque);
+
+typedef struct QmpProxy QmpProxy;
+
+void qmp_proxy_send_request(QmpProxy *p, const char *name,
+                            const QDict *args, Error **errp,
+                            QmpGuestCompletionFunc *cb, void *opaque);
+QmpProxy *qmp_proxy_new(const char *channel_path);
+void qmp_proxy_close(QmpProxy *p);
+
+#endif
diff --git a/qmp-proxy.c b/qmp-proxy.c
new file mode 100644
index 0000000..eaa6e6e
--- /dev/null
+++ b/qmp-proxy.c
@@ -0,0 +1,335 @@
+/*
+ * QMP definitions for communicating with guest agent
+ *
+ * Copyright IBM Corp. 2011
+ *
+ * Authors:
+ *  Michael Roth      <address@hidden>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ *
+ */
+
+#include "qmp.h"
+#include "qmp-core.h"
+#include "qemu-queue.h"
+#include "json-parser.h"
+#include "json-streamer.h"
+#include "qemu_socket.h"
+
+#define QMP_SENTINEL 0xFF
+
+typedef struct QmpProxyRequest {
+    const char *name;
+    const QDict *args;
+    QmpGuestCompletionFunc *cb;
+    void *opaque;
+    QString *json;
+    QTAILQ_ENTRY(QmpProxyRequest) entry;
+} QmpProxyRequest;
+
+typedef struct QmpProxyWriteState {
+    QmpProxyRequest *current_request;
+    const char *buf;
+    size_t size;
+    size_t pos;
+    bool use_sentinel;
+} QmpProxyWriteState;
+
+typedef struct QmpProxyReadState {
+    char *buf;
+    size_t size;
+    size_t pos;
+} QmpProxyReadState;
+
+struct QmpProxy {
+    int fd;
+    const char *path;
+    QmpProxyWriteState write_state;
+    QmpProxyReadState read_state;
+    JSONMessageParser parser;
+    QTAILQ_HEAD(, QmpProxyRequest) sent_requests;
+    QTAILQ_HEAD(, QmpProxyRequest) queued_requests;
+    QString *xport_event;
+    QString *xport_event_sending;
+};
+
+static void qmp_proxy_read_handler(void *opaque);
+static void qmp_proxy_write_handler(void *opaque);
+
+static int qmp_proxy_cancel_request(QmpProxy *p, QmpProxyRequest *r)
+{
+    if (r->name) {
+        if (r->cb) {
+            r->cb(r->opaque, NULL, NULL);
+        }
+    }
+
+    return 0;
+}
+
+static int qmp_proxy_cancel_all(QmpProxy *p)
+{
+    QmpProxyRequest *r, *tmp;
+    QTAILQ_FOREACH_SAFE(r, &p->queued_requests, entry, tmp) {
+        qmp_proxy_cancel_request(p, r);
+        QTAILQ_REMOVE(&p->queued_requests, r, entry);
+    }
+    QTAILQ_FOREACH_SAFE(r, &p->sent_requests, entry, tmp) {
+        qmp_proxy_cancel_request(p, r);
+        QTAILQ_REMOVE(&p->queued_requests, r, entry);
+    }
+
+    return 0;
+}
+
+static void qmp_proxy_send_host_ack(QmpProxy *p, int session_id)
+{
+    QDict *evt = qdict_new();
+
+    /* only the last ack matters, nuke any outstanding ones. need to rethink
+     * this approach if a host->guest reset event is added
+     */
+    if (p->xport_event) {
+        QDECREF(p->xport_event);
+    }
+
+    qdict_put_obj(evt, "_xport_event", QOBJECT(qstring_from_str("host_ack")));
+    qdict_put_obj(evt, "_xport_arg_sid", QOBJECT(qint_from_int(session_id)));
+
+    p->xport_event = qobject_to_json(QOBJECT(evt));
+
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler,
+                        qmp_proxy_write_handler, p);
+}
+
+static void qmp_proxy_process_event(JSONMessageParser *parser, QList *tokens)
+{
+    QmpProxy *p = container_of(parser, QmpProxy, parser);
+    QmpProxyRequest *r;
+    QObject *obj;
+    QDict *qdict;
+    Error *err = NULL;
+    const char *cmd;
+    int session_id;
+
+    fprintf(stderr, "qmp proxy: called\n");
+    obj = json_parser_parse_err(tokens, NULL, &err);
+    if (!obj) {
+        fprintf(stderr, "qmp proxy: failed to parse\n");
+        return;
+    } else {
+        fprintf(stderr, "qmp proxy: parse successful\n");
+        qdict = qobject_to_qdict(obj);
+    }
+
+    /* check for transport-only commands/events */
+    if (qdict_haskey(qdict, "_xport_event")) {
+        cmd = qdict_get_try_str(qdict, "_xport_event");
+        if (cmd && strcmp(cmd, "guest_init") == 0) {
+            /* reset outstanding requests, then send an ack with the
+             * session id they passed us
+             */
+            session_id = qdict_get_try_int(qdict, "_xport_arg_sid", 0);
+            if (!session_id) {
+                fprintf(stderr, "received invalid guest_init event\n");
+            }
+            qmp_proxy_cancel_all(p);
+            qmp_proxy_send_host_ack(p, session_id);
+
+            return;
+        }
+    } else if (qdict_haskey(qdict, "return")) {
+        fprintf(stderr, "received return\n");
+        r = QTAILQ_FIRST(&p->sent_requests);
+        if (!r) {
+            fprintf(stderr, "received return, but no request queued\n");
+            return;
+        }
+        /* XXX: can't assume type here */
+        fprintf(stderr, "recieved response for cmd: %s\nreturn: %s\n",
+                r->name, qstring_get_str(qobject_to_json(QOBJECT(qdict))));
+        /* TODO: we don't know what kind of qtype the return value is, what
+         * really need is for the qmp async callbacks to handle demarshalling
+         * for us, for now we just pass the whole response up the stack, which
+         * means everything except commands with no return value, like
+         * guest-ping, will result in errors reported to the client
+         */ 
+        r->cb(r->opaque, QOBJECT(qdict), NULL);
+        QTAILQ_REMOVE(&p->sent_requests, r, entry);
+        fprintf(stderr, "done handling response\n");
+    } else {
+        fprintf(stderr, "received invalid payload format\n");
+    }
+}
+
+static void qmp_proxy_read_handler(void *opaque)
+{
+    QmpProxy *p = opaque;
+    char buf[4096];
+    int ret;
+
+    do {
+        ret = read(p->fd, buf, 4096);
+        if (ret == -1) {
+            if (errno != EAGAIN && errno != EINTR) {
+                fprintf(stderr, "qmp proxy: error reading request: %s",
+                        strerror(errno));
+            }
+            return;
+        } else if (ret == 0) {
+            /* TODO: is this recoverable? should only happen for hot-unplug
+             * in the chardev case, but for testing via a local guest agent
+             * we may want to do some special handling...
+             */
+            fprintf(stderr, "qmp proxy: connection closed unexpectedly");
+            qmp_proxy_cancel_all(p);
+            qemu_set_fd_handler(p->fd, NULL, NULL, p);
+            return;
+        }
+        buf[ret] = 0;
+        json_message_parser_feed(&p->parser, (char *)buf, ret);
+    } while (ret > 0);
+}
+
+static void qmp_proxy_write_handler(void *opaque)
+{
+    QmpProxy *p = opaque;
+    QmpProxyWriteState s = p->write_state;
+    QmpProxyRequest *r;
+    char sentinel = QMP_SENTINEL;
+    int ret;
+
+send_another:
+    if (p->xport_event) {
+        s.current_request = NULL;
+        if (p->xport_event_sending) {
+            QDECREF(p->xport_event_sending);
+        }
+        p->xport_event_sending = p->xport_event;
+        p->xport_event = NULL;
+        s.buf = qstring_get_str(p->xport_event_sending);
+        s.pos = 0;
+        s.size = strlen(s.buf);
+        s.use_sentinel = true;
+    } else if (!s.current_request) {
+        r = QTAILQ_FIRST(&p->queued_requests);
+        if (r == NULL) {
+            /* no more requests to send for now */
+            qemu_set_fd_handler(p->fd, qmp_proxy_read_handler, NULL, p);
+            return;
+        }
+        s.current_request = r;
+        s.buf = qstring_get_str(s.current_request->json);
+        s.pos = 0;
+        s.size = strlen(s.buf);
+        s.use_sentinel = false;
+    }
+
+    while (s.pos < s.size) {
+        if (s.use_sentinel) {
+            ret = write(p->fd, &sentinel, 1);
+        } else {
+            ret = write(p->fd, s.buf + s.pos, s.size - s.pos);
+        }
+        if (ret == -1) {
+            if (errno != EAGAIN && errno != EINTR) {
+                fprintf(stderr, "qmp proxy: error sending payload");
+            }
+            return;
+        } else if (ret == 0) {
+            /* TODO: is this recoverable? should only happen for hot-unplug
+             * in the chardev case, but for testing via a local guest agent
+             * we may want to do some special handling...
+             */
+            fprintf(stderr, "qmp proxy: connection closed unexpectedly");
+            qmp_proxy_cancel_all(p);
+            qemu_set_fd_handler(p->fd, NULL, NULL, p);
+            return;
+        }
+        if (s.use_sentinel) {
+            s.use_sentinel = false;
+        } else {
+            s.pos += ret;
+        }
+    }
+
+    /* done with this request. send another if there is one */
+    if (s.current_request) {
+        QTAILQ_REMOVE(&p->queued_requests, s.current_request, entry);
+        QTAILQ_INSERT_TAIL(&p->sent_requests, s.current_request, entry);
+        s.current_request = NULL;
+    } else if (p->xport_event) {
+        QDECREF(p->xport_event);
+        p->xport_event = NULL;
+    }
+    s.use_sentinel = false;
+    goto send_another;
+}
+
+void qmp_proxy_send_request(QmpProxy *p, const char *name,
+                            const QDict *args, Error **errp,
+                            QmpGuestCompletionFunc *cb, void *opaque)
+{
+    QmpProxyRequest *r = qemu_mallocz(sizeof(QmpProxyRequest));
+    QDict *payload = qdict_new();
+
+    /* TODO: don't really need to hold on to name/args after encoding */
+    r->name = name;
+    r->args = args;
+    r->cb = cb;
+    r->opaque = opaque;
+    QTAILQ_INSERT_TAIL(&p->queued_requests, r, entry);
+
+    qdict_put_obj(payload, "execute", QOBJECT(qstring_from_str(r->name)));
+    /* TODO: casting a const so we can add it to our dictionary. bad. */
+    qdict_put_obj(payload, "arguments", QOBJECT((QDict *)args));
+
+    r->json = qobject_to_json(QOBJECT((QDict *)payload));
+    if (!r->json) {
+        QDECREF(r->json);
+        goto out_bad;
+    }
+
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler,
+                        qmp_proxy_write_handler, p);
+out_bad:
+    return;
+}
+
+QmpProxy *qmp_proxy_new(const char *channel_path)
+{
+    QmpProxy *p = qemu_mallocz(sizeof(QmpProxy));
+    QemuOpts *opts;
+    int fd;
+
+    /* connect to guest agent channel */
+    opts = qemu_opts_create(qemu_find_opts("chardev", NULL), NULL, 0, NULL);
+    qemu_opt_set(opts, "path", channel_path, NULL);
+    fd = unix_connect_opts(opts);
+    if (fd == -1) {
+        qemu_opts_del(opts);
+        fprintf(stderr, "qmp proxy: error opening channel: %s",
+                strerror(errno));
+        return NULL;
+    }
+    qemu_opts_del(opts);
+    socket_set_nonblock(fd);
+
+    p->fd = fd;
+    json_message_parser_init(&p->parser, qmp_proxy_process_event);
+    QTAILQ_INIT(&p->queued_requests);
+    QTAILQ_INIT(&p->sent_requests);
+    qemu_set_fd_handler(p->fd, qmp_proxy_read_handler, NULL, p);
+
+    return p;
+}
+
+void qmp_proxy_close(QmpProxy *p)
+{
+    qmp_proxy_cancel_all(p);
+    close(p->fd);
+    unlink(p->path);
+    qemu_free(p);
+}
diff --git a/vl.c b/vl.c
index 3fdc7cc..e8c49ef 100644
--- a/vl.c
+++ b/vl.c
@@ -231,6 +231,7 @@ int ctrl_grab = 0;
 unsigned int nb_prom_envs = 0;
 const char *prom_envs[MAX_PROM_ENVS];
 int boot_menu;
+QmpProxy *qmp_proxy_default;
 
 ShutdownEvent qemu_shutdown_event;
 ResetEvent qemu_reset_event;
-- 
1.7.0.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]