|
From: | Michael Roth |
Subject: | [Qemu-devel] Re: [RFC][PATCH v7 04/16] virtagent: bi-directional RPC handling logic |
Date: | Mon, 07 Mar 2011 16:35:44 -0600 |
User-agent: | Mozilla/5.0 (X11; U; Linux i686 (x86_64); en-US; rv:1.9.2.13) Gecko/20101207 Thunderbird/3.1.7 |
On 03/07/2011 03:24 PM, Adam Litke wrote:
On Mon, 2011-03-07 at 14:10 -0600, Michael Roth wrote:This implements the state machine/logic used to manage send/receive/execute phases of RPCs we send or receive. It does so using a set of abstract methods we implement with the application and transport level code which will follow. Signed-off-by: Michael Roth<address@hidden> --- virtagent-manager.c | 326 +++++++++++++++++++++++++++++++++++++++++++++++++++ virtagent-manager.h | 130 ++++++++++++++++++++ 2 files changed, 456 insertions(+), 0 deletions(-) create mode 100644 virtagent-manager.c create mode 100644 virtagent-manager.h diff --git a/virtagent-manager.c b/virtagent-manager.c new file mode 100644 index 0000000..51d26a3 --- /dev/null +++ b/virtagent-manager.c @@ -0,0 +1,326 @@ +/* + * virtagent - job queue management + * + * Copyright IBM Corp. 2011 + * + * Authors: + * Michael Roth<address@hidden> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include "virtagent-common.h" + +typedef struct VAServerJob { + char tag[64]; + void *opaque; + VAServerJobOps ops; + QTAILQ_ENTRY(VAServerJob) next; + enum { + VA_SERVER_JOB_STATE_NEW = 0, + VA_SERVER_JOB_STATE_BUSY, + VA_SERVER_JOB_STATE_EXECUTED, + VA_SERVER_JOB_STATE_SENT, + VA_SERVER_JOB_STATE_DONE, + } state; +} VAServerJob; + +typedef struct VAClientJob { + char tag[64]; + void *opaque; + void *resp_opaque; + VAClientJobOps ops; + QTAILQ_ENTRY(VAClientJob) next; + enum { + VA_CLIENT_JOB_STATE_NEW = 0, + VA_CLIENT_JOB_STATE_BUSY, + VA_CLIENT_JOB_STATE_SENT, + VA_CLIENT_JOB_STATE_READ, + VA_CLIENT_JOB_STATE_DONE, + } state; +} VAClientJob; + +#define SEND_COUNT_MAX 1 +#define EXECUTE_COUNT_MAX 4It's not immediately clear what the difference between SEND_COUNT_MAX and EXECUTE_COUNT_MAX is. Some comments would help. Also, will the code work if these numbers are changed? If not, a note about what someone needs to look at when changing these would seem appropriate.
Basically the SEND_COUNT_MAX is the number of RPCs the client can have in flight at a time. EXECUTE_COUNT_MAX is the number of jobs the server can execute concurrently/asynchronously (execute as in actually do the "execute corresponding RPC" phase of a server job's lifecycle).
These should be tweakable without much side-effect. These aren't currently that important since a monitor tends to limit us to 1 RPC at a time, and the guest agent doesn't make any substantial use of guest->host RPCs atm, so SEND_COUNT_MAX has little impact.
We don't currently execute RPCs concurrently/asynchronously either, so EXECUTE_COUNT_MAX doesn't do much. But when threaded RPC execution is re-implemented this will come back into play. I'll make sure to add some comments on this.
+ +struct VAManager { + int send_count; /* sends in flight */ + int execute_count; /* number of jobs currently executing */ + QTAILQ_HEAD(, VAServerJob) server_jobs; + QTAILQ_HEAD(, VAClientJob) client_jobs; +}; + +/* server job operations/helpers */ + +static VAServerJob *va_server_job_by_tag(VAManager *m, const char *tag) +{ + VAServerJob *j; + QTAILQ_FOREACH(j,&m->server_jobs, next) { + if (strcmp(j->tag, tag) == 0) { + return j; + } + } + return NULL; +} + +int va_server_job_add(VAManager *m, const char *tag, void *opaque, + VAServerJobOps ops) +{ + VAServerJob *j = qemu_mallocz(sizeof(VAServerJob)); + TRACE("called");Qemu has a good tracing infrastructure. If this is trace point is useful enough to keep around, it should try to use that. If it's not that important, I'd remove it entirely. I believe this has been flagged in an earlier RFC too.
These are really just to aid in development. I plan on NOOPing these via the DEBUG_VA flag before merge. Can also remove them if it's too nasty. Only a very small subset of these would be useful for the trace facility, I'll have a better idea of which ones once I stop relying on the TRACE() stuff.
+ j->state = VA_SERVER_JOB_STATE_NEW; + j->ops = ops; + j->opaque = opaque; + memset(j->tag, 0, 64); + pstrcpy(j->tag, 63, tag);Magic numbers. Should use something like #define TAG_LEN 64+ QTAILQ_INSERT_TAIL(&m->server_jobs, j, next); + va_kick(m); + return 0; +} + +static void va_server_job_execute(VAServerJob *j) +{ + TRACE("called"); + j->state = VA_SERVER_JOB_STATE_BUSY; + j->ops.execute(j->opaque, j->tag); +} + +/* TODO: need a way to pass information back */ +void va_server_job_execute_done(VAManager *m, const char *tag) +{ + VAServerJob *j = va_server_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("server job with tag \"%s\" not found", tag); + return; + } + j->state = VA_SERVER_JOB_STATE_EXECUTED; + va_kick(m); +} + +static void va_server_job_send(VAServerJob *j) +{ + TRACE("called"); + j->state = VA_SERVER_JOB_STATE_BUSY; + j->ops.send(j->opaque, j->tag); +} + +void va_server_job_send_done(VAManager *m, const char *tag) +{ + VAServerJob *j = va_server_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("server job with tag \"%s\" not found", tag); + return; + } + j->state = VA_SERVER_JOB_STATE_SENT; + m->send_count--; + va_kick(m); +} + +static void va_server_job_callback(VAServerJob *j) +{ + TRACE("called"); + j->state = VA_SERVER_JOB_STATE_BUSY; + if (j->ops.callback) { + j->ops.callback(j->opaque, j->tag); + } + j->state = VA_SERVER_JOB_STATE_DONE; +} + +void va_server_job_cancel(VAManager *m, const char *tag) +{ + VAServerJob *j = va_server_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("server job with tag \"%s\" not found", tag); + return; + } + /* TODO: need to decrement sends/execs in flight appropriately */ + /* make callback and move to done state, kick() will handle cleanup */ + va_server_job_callback(j); + va_kick(m); +} + +/* client job operations */ + +static VAClientJob *va_client_job_by_tag(VAManager *m, const char *tag) +{ + VAClientJob *j; + QTAILQ_FOREACH(j,&m->client_jobs, next) { + if (strcmp(j->tag, tag) == 0) { + return j; + } + } + return NULL; +} + +int va_client_job_add(VAManager *m, const char *tag, void *opaque, + VAClientJobOps ops) +{ + VAClientJob *j = qemu_mallocz(sizeof(VAClientJob)); + TRACE("called"); + j->ops = ops; + j->opaque = opaque; + memset(j->tag, 0, 64); + pstrcpy(j->tag, 63, tag); + QTAILQ_INSERT_TAIL(&m->client_jobs, j, next); + va_kick(m); + return 0; +} + +static void va_client_job_send(VAClientJob *j) +{ + TRACE("called"); + j->state = VA_CLIENT_JOB_STATE_BUSY; + j->ops.send(j->opaque, j->tag); +} + +void va_client_job_send_done(VAManager *m, const char *tag) +{ + VAClientJob *j = va_client_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("client job with tag \"%s\" not found", tag); + return; + } + j->state = VA_CLIENT_JOB_STATE_SENT; + m->send_count--; + va_kick(m); +} + +void va_client_job_read_done(VAManager *m, const char *tag, void *resp) +{ + VAClientJob *j = va_client_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("client job with tag \"%s\" not found", tag); + return; + } + j->state = VA_CLIENT_JOB_STATE_READ; + j->resp_opaque = resp; + va_kick(m); +} + +static void va_client_job_callback(VAClientJob *j) +{ + TRACE("called"); + j->state = VA_CLIENT_JOB_STATE_BUSY; + if (j->ops.callback) { + j->ops.callback(j->opaque, j->resp_opaque, j->tag); + } + j->state = VA_CLIENT_JOB_STATE_DONE; +} + +void va_client_job_cancel(VAManager *m, const char *tag) +{ + VAClientJob *j = va_client_job_by_tag(m, tag); + TRACE("called"); + if (!j) { + LOG("client job with tag \"%s\" not found", tag); + return; + } + /* TODO: need to decrement sends/execs in flight appropriately */ + /* make callback and move to done state, kick() will handle cleanup */ + va_client_job_callback(j); + va_kick(m); +} + +/* general management functions */ + +VAManager *va_manager_new(void) +{ + VAManager *m = qemu_mallocz(sizeof(VAManager)); + QTAILQ_INIT(&m->client_jobs); + QTAILQ_INIT(&m->server_jobs); + return m; +} + +static void va_process_server_job(VAManager *m, VAServerJob *sj) +{ + switch (sj->state) { + case VA_SERVER_JOB_STATE_NEW: + TRACE("marker"); + va_server_job_execute(sj); + break; + case VA_SERVER_JOB_STATE_EXECUTED: + TRACE("marker"); + if (m->send_count< SEND_COUNT_MAX) { + TRACE("marker"); + va_server_job_send(sj); + m->send_count++; + } + break; + case VA_SERVER_JOB_STATE_SENT: + TRACE("marker"); + va_server_job_callback(sj); + break; + case VA_SERVER_JOB_STATE_BUSY: + TRACE("marker, server job currently busy"); + break; + case VA_SERVER_JOB_STATE_DONE: + TRACE("marker"); + QTAILQ_REMOVE(&m->server_jobs, sj, next); + break; + default: + LOG("error, unknown server job state"); + break; + } +} + +static void va_process_client_job(VAManager *m, VAClientJob *cj) +{ + switch (cj->state) { + case VA_CLIENT_JOB_STATE_NEW: + TRACE("marker"); + if (m->send_count< SEND_COUNT_MAX) { + TRACE("marker"); + va_client_job_send(cj); + m->send_count++; + } + break; + case VA_CLIENT_JOB_STATE_SENT: + TRACE("marker"); + //nothing to do here, awaiting read_done() + break; + case VA_CLIENT_JOB_STATE_READ: + TRACE("marker"); + va_client_job_callback(cj); + break; + case VA_CLIENT_JOB_STATE_DONE: + TRACE("marker"); + QTAILQ_REMOVE(&m->client_jobs, cj, next); + break; + case VA_CLIENT_JOB_STATE_BUSY: + TRACE("marker, client job currently busy"); + break; + default: + LOG("error, unknown client job state"); + break; + } +} + +void va_kick(VAManager *m) +{ + VAServerJob *sj, *sj_tmp; + VAClientJob *cj, *cj_tmp; + + TRACE("called"); + TRACE("send_count: %u, execute_count: %u", m->send_count, m->execute_count); + + /* TODO: make sure there is no starvation of jobs/operations here */ + + /* look for any work to be done among pending server jobs */ + QTAILQ_FOREACH_SAFE(sj,&m->server_jobs, next, sj_tmp) { + TRACE("marker, server tag: %s", sj->tag); + va_process_server_job(m, sj); + } + + /* look for work to be done among pending client jobs */ + QTAILQ_FOREACH_SAFE(cj,&m->client_jobs, next, cj_tmp) { + TRACE("marker, client tag: %s", cj->tag); + va_process_client_job(m, cj); + } +} diff --git a/virtagent-manager.h b/virtagent-manager.h new file mode 100644 index 0000000..7b463fb --- /dev/null +++ b/virtagent-manager.h @@ -0,0 +1,130 @@ +#ifndef VIRTAGENT_MANAGER_H +#define VIRTAGENT_MANAGER_H + +#include "qemu-common.h" +#include "qemu-queue.h" + +/* + * Protocol Overview: + * + * The virtagent protocol depends on a state machine to manage communication + * over a single connection stream, currently a virtio or isa serial channel. + * The basic characterization of the work being done is that clients + * send/handle client jobs locally, which are then read/handled remotely as + * server jobs. A client job consists of a request which is sent, and a + * response which is eventually recieved. A server job consists of a request + * which is recieved from the other end, and a response which is sent back."i before e, except after c ..." (I misspell receive all the time too).
TIL about vim's spell check feature :)
+ * + * Server jobs are given priority over client jobs, i.e. if we send a client + * job (our request) and recieve a server job (their request), rather than + * await a response to the client job, we immediately begin processing the + * server job and then send back the response. This prevents us from being + * deadlocked in a situation where both sides have sent a client job and are + * awaiting the response before handling the other side's client job. + * + * Multiple in-flight requests are supported, but high request rates can + * potentially starve out the other side's client jobs / requests, so we'll + * behaved participants should periodically backoff on high request rates, or + * limit themselves to 1 request at a time (anything more than 1 can still + * potentionally remove any window for the other end to service it's own + * client jobs, since we can begin sending the next request before it begins + * send the response for the 2nd). + * + * On a related note, in the future, bidirectional user/session-level guest + * agents may also be supported via a forwarding service made available + * through the system-level guest agent. In this case it is up to the + * system-level agent to handle forwarding requests in such a way that we + * don't starve the host-side service out sheerly by having too many + * sessions/users trying to send RPCs at a constant rate. This would be + * supported through this job Manager via an additional "forwarder" job type. + * + * To encapsulate some of this logic, we define here a "Manager" class, which + * provides an abstract interface to a state machine which handles most of + * the above logic transparently to the transport/application-level code. + * This also makes it possible to utilize alternative + * transport/application-level protocols in the future. + * + */ + +/* + * Two types of jobs are generated from various components of virtagent. + * Each job type has a priority, and a set of prioritized functions as well. + * + * The read handler generates new server jobs as it recieves requests from + * the channel. Server jobs make progress through the following operations. + * + * EXECUTE->EXECUTE_DONE->SEND->SEND_DONE + * + * EXECUTE (provided by user, manager calls) + * When server jobs are added, eventually (as execution slots become + * available) an execute() will be called to begin executing the job. An + * error value will be returned if there is no room in the queue for another + * server job. + * + * EXECUTE_DONE (provided by manager, user calls) + * As server jobs complete, execute_completed() is called to update execution + * status of that job (failure/success), inject the payload, and kick off the + * next operation. + * + * SEND (provided by user, manager calls) + * Eventually the send() operation is made. This will cause the send handler + * to begin sending the response. + * + * SEND_DONE (provided by manager, user calls) + * Upon completion of that send, the send_completed() operation will be + * called. This will free up the job, and kick off the next operation. + */Very helpful protocol overview. Thanks for adding this.+typedef int (va_job_op)(void *opaque, const char *tag); +typedef struct VAServerJobOps { + va_job_op *execute; + va_job_op *send; + va_job_op *callback; +} VAServerJobOps; + +/* + * The client component generates new client jobs as they're made by + * virtagent in response to monitored events or user-issued commands. + * Client jobs progress via the following operations. + * + * SEND->SEND_DONE->READ_DONE + * + * SEND (provided by user, called by manager) + * After client jobs are added, send() will eventually be called to queue + * the job up for xmit over the channel. + * + * SEND_DONE (provided by manager, called by user) + * Upon completion of the send, send_completed() should be called with + * failure/success indication. + * + * READ_DONE (provided by manager, called by user) + * When a response for the request is read back via the transport layer, + * read_done() will be called by the user to indicate success/failure, + * inject the response, and make the associated callback. + */ +typedef int (va_client_job_cb)(void *opaque, void *resp_opaque, + const char *tag); +typedef struct VAClientJobOps { + va_job_op *send; + va_client_job_cb *callback; +} VAClientJobOps; + +typedef struct VAManager VAManager; + +VAManager *va_manager_new(void); +void va_kick(VAManager *m); + +/* interfaces for server jobs */ +int va_server_job_add(VAManager *m, const char *tag, void *opaque, + VAServerJobOps ops); +void va_server_job_execute_done(VAManager *m, const char *tag); +void va_server_job_send_done(VAManager *m, const char *tag); +void va_server_job_cancel(VAManager *m, const char *tag); + +/* interfaces for client jobs */ +int va_client_job_add(VAManager *m, const char *tag, void *opaque, + VAClientJobOps ops); +void va_client_job_cancel(VAManager *m, const char *tag); +void va_client_job_send_done(VAManager *m, const char *tag); +void va_client_job_read_done(VAManager *m, const char *tag, void *resp); + +#endif /* VIRTAGENT_MANAGER_H */
[Prev in Thread] | Current Thread | [Next in Thread] |