qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v3 2/9] repagent: Changed repagent socket to be


From: Ori Mamluk
Subject: [Qemu-devel] [RFC PATCH v3 2/9] repagent: Changed repagent socket to be based on set_fd_handler, converted parsing functions to be non-serial
Date: Thu, 5 Apr 2012 15:17:50 +0300

Use set_fd_handler instead of a listening thread.

Change the reading/parsing function to be state-machine based, because they no longer have their own thread.

---

Makefile                      |    6 +-

Makefile.objs                 |   10 +-

configure                     |    3 +-

replication/repagent_client.c |   28 ++++++-

replication/repcmd_listener.c |  166 +++++++++++++++++++++++++----------------

replication/repcmd_listener.h |    8 +-

6 files changed, 141 insertions(+), 80 deletions(-)

 

diff --git a/Makefile b/Makefile

index fbd77df..b6379fb 100644

--- a/Makefile

+++ b/Makefile

@@ -156,9 +156,9 @@ tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \

               qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o

tools-obj-$(CONFIG_POSIX) += compatfd.o

-qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

+qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)

+qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)

+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)

 qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o

qemu-bridge-helper.o: $(GENERATED_HEADERS)

diff --git a/Makefile.objs b/Makefile.objs

index a28eefb..01413a2 100755

--- a/Makefile.objs

+++ b/Makefile.objs

@@ -30,6 +30,11 @@ block-obj-y += $(coroutine-obj-y) $(qobject-obj-y) $(version-obj-y)

block-obj-$(CONFIG_POSIX) += posix-aio-compat.o

block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o

+# Replication agent

+replication-nested-y = repagent_client.o  repagent.o  repcmd_listener.o

+replication-obj-y = $(addprefix replication/, $(replication-nested-y))

+block-obj-y += $(replication-obj-y)

+

block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o

block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o

block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o

@@ -423,11 +428,6 @@ common-obj-y += qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y)

common-obj-y += qmp.o hmp.o

 ######################################################################

-# replication

-replication-nested-y = repagent_client.o  repagent.o  repcmd_listener.o

-replication-obj-y = $(addprefix replication/, $(replication-nested-y))

-

-######################################################################

# guest agent

 qga-nested-y = commands.o guest-agent-command-state.o

diff --git a/configure b/configure

index f97394f..83b74c2 100755

--- a/configure

+++ b/configure

@@ -2883,7 +2883,6 @@ echo "curses support    $curses"

echo "curl support      $curl"

echo "mingw32 support   $mingw32"

echo "Audio drivers     $audio_drv_list"

-echo "Replication           $replication"

echo "Extra audio cards $audio_card_list"

echo "Block whitelist   $block_drv_whitelist"

echo "Mixer emulation   $mixemu"

@@ -3904,3 +3903,5 @@ symlink $source_path/Makefile.user $d/Makefile

if test "$docs" = "yes" ; then

   mkdir -p QMP

fi

+

+echo "Replication          $replication"

diff --git a/replication/repagent_client.c b/replication/repagent_client.c

index 4dd9ea4..eaa0a28 100644

--- a/replication/repagent_client.c

+++ b/replication/repagent_client.c

@@ -3,6 +3,7 @@

#include "repcmd_listener.h"

#include "repagent_client.h"

#include "repagent.h"

+#include "main-loop.h"

 #include <string.h>

#include <stdlib.h>

@@ -26,6 +27,15 @@ typedef struct repagent_client_state {

 static repagent_client_state g_client_state = { 0 };

+static void repagent_client_read(void *opaque)

+{

+    printf("repagent_client_read\n");

+    int bytes_read = repcmd_listener_socket_read_next_buf(g_client_state.hsock);

+    if (bytes_read <= 0) {

+        g_client_state.is_connected = 0;

+    }

+}

+

void *repagent_listen(void *pParam)

{

     rephub_params *pServerParams = (rephub_params *) pParam;

@@ -80,13 +90,25 @@ void *repagent_listen(void *pParam)

         }

         retries = 0;

-        g_client_state.is_connected = 1;

         repagent_client_connected();

-        repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);

-        close(g_client_state.hsock);

+        repcmd_listener_init(repagent_process_cmd, NULL);

+        g_client_state.is_connected = 1;

+        static int c;

+        /* repcmd_listener_socket_thread_listener(g_client_state.hsock); */

+        qemu_set_fd_handler(g_client_state.hsock, repagent_client_read, NULL,

+                NULL);

+        while (g_client_state.is_connected) {

+            printf("Connected (%d)...\n", c++);

+            usleep(1 * 1000 * 1000);

+        }

+        /* Unregister */

+        qemu_set_fd_handler(g_client_state.hsock, NULL, NULL, NULL);

+        printf("Disconnected\n");

         g_client_state.is_connected = 0;

+        close(g_client_state.hsock);

+

     }

     return 0;

}

diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c

index a211927..c1ce97f 100644

--- a/replication/repcmd_listener.c

+++ b/replication/repcmd_listener.c

@@ -26,93 +26,129 @@

 #define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))

+

+typedef struct RepCmdRxCmdState {

+    RepCmd curCmd;

+    uint8_t *pReadBuf;

+    int bytesToGet;

+    int bytesGotten;

+    int isGotHeader;

+    uint8_t *pdata;

+} RepCmdRxCmdState;

+

typedef struct RepCmdListenerState {

     int is_terminate_receive;

+    pfn_received_cmd_cb  receive_cb;

+    void *opaque;

+    int hsock;

+    RepCmdRxCmdState cur_cmd;

} RepCmdListenerState;

 static RepCmdListenerState g_listenerState = { 0 };

-/* Returns 0 for initiated termination or socket error value on error */

-int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr)

+static int repcmd_listener_process_rx(int bytecount);

+

+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque)

{

-    RepCmd curCmd;

-    uint8_t *pReadBuf = (uint8_t *) &curCmd;

-    int bytesToGet = sizeof(RepCmd);

-    int bytesGotten = 0;

-    int isGotHeader = 0;

-    uint8_t *pdata = NULL;

+    ZERO_MEM_OBJ(&g_listenerState);

+    g_listenerState.receive_cb = callback;

+    g_listenerState.opaque = opaque;

-    assert(callback != NULL);

+    g_listenerState.cur_cmd.bytesToGet = sizeof(RepCmd);

+    g_listenerState.cur_cmd.pReadBuf =

+            (uint8_t *) &g_listenerState.cur_cmd.curCmd;

+}

+int repcmd_listener_socket_read_next_buf(int hsock)

+{

+    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;

+    int bytecount = recv(hsock, cmd_state->pReadBuf + cmd_state->bytesGotten,

+            cmd_state->bytesToGet - cmd_state->bytesGotten, 0);

+    return repcmd_listener_process_rx(bytecount);

+}

+

+/* Returns 0 for initiated termination or socket error value on error */

+int repcmd_listener_socket_thread_listener(int hsock)

+{

+    int ret = 0;

     /* receive loop */

     while (!g_listenerState.is_terminate_receive) {

-        int bytecount;

-

-        bytecount = recv(hsock, pReadBuf + bytesGotten,

-                bytesToGet - bytesGotten, 0);

-        if (bytecount == -1) {

-            fprintf(stderr, "Error receiving data %d\n", errno);

-            return errno;

+        ret = repcmd_listener_socket_read_next_buf(hsock);

+        if (ret <= 0) {

+            return ret;

         }

+    }

+    return 0;

+}

-        if (bytecount == 0) {

-            printf("Disconnected\n");

-            return 0;

-        }

-        bytesGotten += bytecount;

+static int repcmd_listener_process_rx(int bytecount)

+{

+    RepCmdRxCmdState *cmd_state = &g_listenerState.cur_cmd;

+    if (bytecount == -1) {

+        fprintf(stderr, "Error receiving data %d\n", errno);

+        return errno;

+    }

+

+    if (bytecount == 0) {

+        printf("Disconnected\n");

+        return 0;

+    }

+    cmd_state->bytesGotten += bytecount;

/*     printf("Recieved bytes %d, got %d/%d\n",

-                bytecount, bytesGotten, bytesToGet); */

-        /* print content */

-        if (0) {

-            int i;

-            for (i = 0; i < bytecount ; i += 4) {

-                /*printf("%d/%d", i, bytecount/4); */

-                printf("%#x ",

-                        *(int *) (&pReadBuf[bytesGotten - bytecount + i]));

+            bytecount, cmd_state->bytesGotten, cmd_state->bytesToGet); */

+    /* print content */

+    if (0) {

+        int i;

+        for (i = 0; i < bytecount ; i += 4) {

+            /*printf("%d/%d", i, bytecount/4); */

+            printf(

+                    "%#x ",

+                    *(int *) (&cmd_state->pReadBuf[cmd_state->bytesGotten

+                            - bytecount + i]));

-            }

-            printf("\n");

         }

-        assert(bytesGotten <= bytesToGet);

-        if (bytesGotten == bytesToGet) {

-            int isGotData = 0;

-            bytesGotten = 0;

-            if (!isGotHeader) {

-                /* We just got the header */

-                isGotHeader = 1;

-

-                assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);

-                assert(curCmd.magic2 == REPCMD_MAGIC2);

-                if (curCmd.hdr.data_size_bytes > 0) {

-                    pdata = (uint8_t *)REPCMD_MALLOC(

-                                curCmd.hdr.data_size_bytes);

-/*                    printf("malloc %p\n", pdata); */

-                    pReadBuf = pdata;

-                } else {

-                    /* no data */

-                    isGotData = 1;

-                    pdata = NULL;

-                }

-                bytesToGet = curCmd.hdr.data_size_bytes;

+        printf("\n");

+    }

+    assert(cmd_state->bytesGotten <= cmd_state->bytesToGet);

+    if (cmd_state->bytesGotten == cmd_state->bytesToGet) {

+        int isGotData = 0;

+        cmd_state->bytesGotten = 0;

+        if (!cmd_state->isGotHeader) {

+            /* We just got the header */

+            cmd_state->isGotHeader = 1;

+

+            assert(cmd_state->curCmd.hdr.magic1 == REPCMD_MAGIC1);

+            assert(cmd_state->curCmd.magic2 == REPCMD_MAGIC2);

+            if (cmd_state->curCmd.hdr.data_size_bytes > 0) {

+                cmd_state->pdata = (uint8_t *)REPCMD_MALLOC(

+                        cmd_state->curCmd.hdr.data_size_bytes);

+/*                    printf("malloc %p\n", cmd_state->pdata); */

+                cmd_state->pReadBuf = cmd_state->pdata;

             } else {

+                /* no data */

                 isGotData = 1;

+                cmd_state->pdata = NULL;

             }

+            cmd_state->bytesToGet = cmd_state->curCmd.hdr.data_size_bytes;

+        } else {

+            isGotData = 1;

+        }

-            if (isGotData) {

-                /* Got command and data */

-                (*callback)(&curCmd, pdata, clientPtr);

-

-                /* It's the callee responsibility to free pData */

-                pdata = NULL;

-                ZERO_MEM_OBJ(&curCmd);

-                pReadBuf = (uint8_t *) &curCmd;

-                bytesGotten = 0;

-                bytesToGet = sizeof(RepCmd);

-                isGotHeader = 0;

-            }

+        if (isGotData) {

+            /* Got command and data */

+            (*g_listenerState.receive_cb)(&cmd_state->curCmd, cmd_state->pdata,

+                    g_listenerState.opaque);

+

+            /* It's the callee responsibility to free cmd_state->pdata */

+            cmd_state->pdata = NULL;

+            ZERO_MEM_OBJ(&cmd_state->curCmd);

+            cmd_state->pReadBuf = (uint8_t *) &cmd_state->curCmd;

+            cmd_state->bytesGotten = 0;

+            cmd_state->bytesToGet = sizeof(RepCmd);

+            cmd_state->isGotHeader = 0;

         }

     }

-    return 0;

+    return bytecount;

}

 RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)

diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h

index c09a12e..19b9ea9 100644

--- a/replication/repcmd_listener.h

+++ b/replication/repcmd_listener.h

@@ -24,9 +24,11 @@

#ifndef REPCMD_LISTENER_H

#define REPCMD_LISTENER_H

#include <stdint.h>

-typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,

-                uint8_t *pData, void *clientPtr);

+typedef void (*pfn_received_cmd_cb)(RepCmd *pcmd,

+                uint8_t *pdata, void *opaque);

-int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr);

+void repcmd_listener_init(pfn_received_cmd_cb callback, void *opaque);

+int repcmd_listener_socket_read_next_buf(int hsock);

+int repcmd_listener_socket_thread_listener(int hsock);

 #endif /* REPCMD_LISTENER_H */

--

1.7.6.5


reply via email to

[Prev in Thread] Current Thread [Next in Thread]