qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [RFC PATCH v3 1/9] repagent: Added replication module


From: Ori Mamluk
Subject: [Qemu-devel] [RFC PATCH v3 1/9] repagent: Added replication module
Date: Thu, 5 Apr 2012 15:17:48 +0300

Added build options to ./configure:  --enable-replication --disable-replicat

Added a commandline option to enable: -repagent <rep hub IP>

Added the module files under replication.

Added the repagent API usage in block.c

 

Sent as Replication agent patch V1

 

Signed-off-by: Ori Mamluk <address@hidden>

---

Makefile                      |    9 +-

Makefile.objs                 |    6 +

block.c                       |   18 +++

configure                     |   11 ++

qemu-options.hx               |    6 +

replication/qemu-repagent.txt |  104 +++++++++++++

replication/repagent.c        |  327 +++++++++++++++++++++++++++++++++++++++++

replication/repagent.h        |   46 ++++++

replication/repagent_client.c |  138 +++++++++++++++++

replication/repagent_client.h |   36 +++++

replication/repcmd.h          |   59 ++++++++

replication/repcmd_listener.c |  137 +++++++++++++++++

replication/repcmd_listener.h |   32 ++++

replication/rephub_cmds.h     |  151 +++++++++++++++++++

replication/rephub_defs.h     |   40 +++++

vl.c                          |   10 ++

16 files changed, 1126 insertions(+), 4 deletions(-)

mode change 100644 => 100755 Makefile.objs

mode change 100644 => 100755 qemu-options.hx

create mode 100644 replication/qemu-repagent.txt

create mode 100644 replication/repagent.c

create mode 100644 replication/repagent.h

create mode 100644 replication/repagent_client.c

create mode 100644 replication/repagent_client.h

create mode 100644 replication/repcmd.h

create mode 100644 replication/repcmd_listener.c

create mode 100644 replication/repcmd_listener.h

create mode 100644 replication/rephub_cmds.h

create mode 100644 replication/rephub_defs.h

 

diff --git a/Makefile b/Makefile

index 49c775b..fbd77df 100644

--- a/Makefile

+++ b/Makefile

@@ -156,9 +156,9 @@ tools-obj-y = $(oslib-obj-y) $(trace-obj-y) qemu-tool.o qemu-timer.o \

               qemu-timer-common.o main-loop.o notify.o iohandler.o cutils.o async.o

tools-obj-$(CONFIG_POSIX) += compatfd.o

-qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)

-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)

-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)

+qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

+qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y) $(replication-obj-y)

 qemu-bridge-helper$(EXESUF): qemu-bridge-helper.o

qemu-bridge-helper.o: $(GENERATED_HEADERS)

@@ -224,6 +224,7 @@ clean:

               rm -f $(GENERATED_SOURCES)

               rm -rf $(qapi-dir)

               $(MAKE) -C tests/tcg clean

+             rm -f replication/*.{o,d}

               for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \

               if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \

               rm -f $$d/qemu-options.def; \

@@ -390,4 +391,4 @@ tar:

               rm -rf /tmp/$(FILE)

 # Include automatically generated dependency files

--include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d qapi/*.d qga/*.d)

+-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d qapi/*.d qga/*.d replication/*.d)

diff --git a/Makefile.objs b/Makefile.objs

old mode 100644

new mode 100755

index b39d76c..a28eefb

--- a/Makefile.objs

+++ b/Makefile.objs

@@ -75,6 +75,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/, $(fsdev-nested-y))

# single QEMU executable should support all CPUs and machines.

 common-obj-y = $(block-obj-y) blockdev.o

+common-obj-y += $(replication-obj-$(CONFIG_REPLICATION))

common-obj-y += $(net-obj-y)

common-obj-y += $(qobject-obj-y)

common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))

@@ -422,6 +423,11 @@ common-obj-y += qmp-marshal.o qapi-visit.o qapi-types.o $(qapi-obj-y)

common-obj-y += qmp.o hmp.o

 ######################################################################

+# replication

+replication-nested-y = repagent_client.o  repagent.o  repcmd_listener.o

+replication-obj-y = $(addprefix replication/, $(replication-nested-y))

+

+######################################################################

# guest agent

 qga-nested-y = commands.o guest-agent-command-state.o

diff --git a/block.c b/block.c

index 52ffe14..4809416 100644

--- a/block.c

+++ b/block.c

@@ -32,6 +32,10 @@

#include "qmp-commands.h"

#include "qemu-timer.h"

+#ifdef CONFIG_REPLICATION

+#include "replication/repagent.h"

+#endif

+

#ifdef CONFIG_BSD

#include <sys/types.h>

#include <sys/stat.h>

@@ -749,6 +753,9 @@ int bdrv_open(BlockDriverState *bs, const char *filename, int flags,

         goto unlink_and_fail;

     }

+#ifdef CONFIG_REPLICATION

+    repagent_register_drive(filename,  bs);

+#endif

     /* Open the image */

     ret = bdrv_open_common(bs, filename, flags, drv);

     if (ret < 0) {

@@ -1842,6 +1849,17 @@ static int coroutine_fn bdrv_co_do_writev(BlockDriverState *bs,

         ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);

     }

+

+#ifdef CONFIG_REPLICATION

+    if (bs->device_name[0] != '\0') {

+        /* We split the IO only at the highest stack driver layer.

+           Currently we know that by checking device_name - only

+           highest level (closest to the guest) has that name.

+           */

+           repagent_handle_protected_write(bs, sector_num,

+                nb_sectors, qiov, ret);

+    }

+#endif

     if (bs->dirty_bitmap) {

         set_dirty_bitmap(bs, sector_num, nb_sectors, 1);

     }

diff --git a/configure b/configure

index 66a65d9..f97394f 100755

--- a/configure

+++ b/configure

@@ -189,6 +189,7 @@ spice=""

rbd=""

smartcard=""

smartcard_nss=""

+replication=""

usb_redir=""

opengl=""

zlib="yes"

@@ -806,6 +807,10 @@ for opt do

   ;;

   --enable-smartcard-nss) smartcard_nss="yes"

   ;;

+  --disable-replication) replication="no"

+  ;;

+  --enable-replication) replication="yes"

+  ;;

   --disable-usb-redir) usb_redir="no"

   ;;

   --enable-usb-redir) usb_redir="yes"

@@ -1104,6 +1109,7 @@ echo "  --disable-usb-redir      disable usb network redirection support"

echo "  --enable-usb-redir       enable usb network redirection support"

echo "  --disable-guest-agent    disable building of the QEMU Guest Agent"

echo "  --enable-guest-agent     enable building of the QEMU Guest Agent"

+echo "  --enable-replication     enable replication support"

echo ""

echo "NOTE: The object files are built at the place where configure is launched"

exit 1

@@ -2877,6 +2883,7 @@ echo "curses support    $curses"

echo "curl support      $curl"

echo "mingw32 support   $mingw32"

echo "Audio drivers     $audio_drv_list"

+echo "Replication          $replication"

echo "Extra audio cards $audio_card_list"

echo "Block whitelist   $block_drv_whitelist"

echo "Mixer emulation   $mixemu"

@@ -3214,6 +3221,10 @@ if test "$smartcard_nss" = "yes" ; then

   echo "libcacard_cflags=$libcacard_cflags" >> $config_host_mak

fi

+if test "$replication" = "yes" ; then

+  echo "CONFIG_REPLICATION=y" >> $config_host_mak

+fi

+

if test "$usb_redir" = "yes" ; then

   echo "CONFIG_USB_REDIR=y" >> $config_host_mak

fi

diff --git a/qemu-options.hx b/qemu-options.hx

old mode 100644

new mode 100755

index daefce3..15fb938

--- a/qemu-options.hx

+++ b/qemu-options.hx

@@ -2711,3 +2711,9 @@ HXCOMM This is the last statement. Insert new options before this line!

STEXI

@end table

ETEXI

+

+DEF("repagent", HAS_ARG, QEMU_OPTION_repagent,

+    "-repagent [hub IP/name]\n"

+    "                Enable replication support for disks\n"

+    "                hub is the ip or name of the machine running the replication hub.\n",

+    QEMU_ARCH_ALL)

diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt

new file mode 100644

index 0000000..e3b0c1e

--- /dev/null

+++ b/replication/qemu-repagent.txt

@@ -0,0 +1,104 @@

+             repagent - replication agent - a Qemu module for enabling continuous async replication of VM volumes

+

+Introduction

+             This document describes a feature in Qemu - a replication agent (AKA Repagent).

+             The Repagent is a new module that exposes an API to an external replication system (AKA Rephub).

+             This API allows a Rephub to communicate with a Qemu VM and continuously replicate its volumes.

+             The imlementation of a Rephub is outside of the scope of this document. There may be several various Rephub

+             implenetations using the same repagent in Qemu.

+

+Main feature of Repagent

+             Repagent does the following:

+             * Report volumes - report a list of all volumes in a VM to the Rephub.

+             * Report writes to a volume - send all writes made to a protected volume to the Rephub.

+                             The reporting of an IO is asyncronuous - i.e. the IO is not delayed by the Repagent to get any acknowledgement from the Rephub.

+                             It is only copied to the Rephub.

+             * Read a protected volume - allows the Rephub to read a protected volume, to enable the protected hub to syncronize the content of a protected volume.

+

+Description of the Repagent module

+

+Build and run options

+             New configure option: --enable-replication

+             New command line option:

+             -repagent [hub IP/name]

+                                                                             Enable replication support for disks

+                                                                             hub is the ip or name of the machine running the replication hub.

+

+Module APIs

+             The Repagent module interfaces two main components:

+             1. The Rephub - An external API based on socket messages

+             2. The generic block layer- block.c

+

+             Rephub message API

+                             The external replication API is a message based API.

+                             We won't go into the structure of the messages here - just the sematics.

+

+                             Messages list

+                                             (The updated list and comments are in Rephub_cmds.h)

+

+                                             Messages from the Repagent to the Rephub:

+                                             * Protected write

+                                                             The Repagent sends each write to a protected volume to the hub with the IO status.

+                                                             In case the status is bad the write content is not sent

+                                             * Report VM volumes

+                                                             The agent reports all the volumes of the VM to the hub.

+                                             * Read Volume Response

+                                                             A response to a Read Volume Request

+                                                             Sends the data read from a protected volume to the hub

+                                             * Agent shutdown

+                                                             Notifies the hub that the agent is about to shutdown.

+                                                             This allows a graceful shutdown. Any disconnection of an agent without

+                                                             sending this command will result in a full sync of the VM volumes.

+

+                                             Messages from the Rephub to the Repagent:

+                                             * Start protect

+                                                             The hub instructs the agent to start protecting a volume. When a volume is protected

+                                                             all its writes are sent to to the hub.

+                                                             With this command the hub also assigns a volume ID to the given volume name.

+                                             * Read volume request

+                                                             The hub issues a read IO to a protected volume.

+                                                             This command is used during sync - when the hub needs to read unsyncronized

+                                                             sections of a protected volume.

+                                                             This command is a request, the read data is returned by the read volume response message (see above).

+             block.c API

+                             The API to the generic block storage layer contains 3 functionalities:

+                             1. Handle writes to protected volumes

+                                             In bdrv_co_do_writev, each write is reported to the Repagent module.

+                             2. Handle each new volume that registers

+                                             In bdrv_open - each new bottom-level block driver that registers is reported.

+                             2. Read from a volume

+                                             Repagent calls bdrv_aio_readv to handle read requests coming from the hub.

+

+

+General description of a Rephub  - a replication system the repagent connects to

+             This section describes in high level a sample Rephub - a replication system that uses the repagent API

+             to replicate disks.

+             It describes a simple Rephub that comntinuously maintains a mirror of the volumes of a VM.

+

+             Say we have a VM we want to protect - call it PVM, say it has 2 volumes - V1, V2.

+             Our Rephub is called SingleRephub - a Rephub protecting a single VM.

+

+             Preparations

+             1. The user chooses a host to rub SingleRephub - a different host than PVM, call it Host2

+             2. The user creates two volumes on Host2 - same sizes of V1 and V2, call them V1R (V1 recovery) and V2R.

+             3. The user runs SingleRephub process on Host2, and gives V1R and V2R as command line arguments.

+                             From now on SingleRephub waits for the protected VM repagent to connect.

+             4. The user runs the protected VM PVM - and uses the switch -repagent <Host2 IP>.

+

+             Runtime

+             1. The repagent module connects to SingleRephub on startup.

+             2. repagent reports V1 and V2 to SingleRephub.

+             3. SingleRephub starts to perform an initial synchronization of the protected volumes-

+                             it reads each protected volume (V1 and V2) - using read volume requests - and copies the data into the

+                             recovery volume V1R and V2R.

+             4. SingleRephub enters 'protection' mode - each write to the protected volume is sent by the repagent to the Rephub,

+                             and the Rephub performs the write on the matching recovery volume.

+

+             * Note that during stage 3 writes to the protected volumes are not ignored - they're kept in a bitmap,

+                             and will be read again when stage 3 ends, in an interative convergin process.

+

+             This flow continuously maintains an updated recovery volume.

+             If the protected system is damaged, the user can create a new VM on Host2 with the replicated volumes attached to it.

+             The new VM is a replica of the protected system.

+

+

diff --git a/replication/repagent.c b/replication/repagent.c

new file mode 100644

index 0000000..c291915

--- /dev/null

+++ b/replication/repagent.c

@@ -0,0 +1,327 @@

+#include <string.h>

+#include <stdlib.h>

+#include <stdio.h>

+#include <pthread.h>

+#include <stdint.h>

+

+#include "block.h"

+#include "rephub_defs.h"

+#include "block_int.h"

+#include "repagent_client.h"

+#include "repagent.h"

+#include "rephub_cmds.h"

+

+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))

+#define REPAGENT_MAX_NUM_VOLUMES (64)

+#define REPAGENT_VOLUME_ID_NONE (0)

+

+typedef struct RepagentVolume {

+    uint64_t vol_id;

+    const char *vol_path;

+    BlockDriverState *driver_ptr;

+} RepagentVolume;

+

+struct RepAgentState {

+    int is_init;

+    int num_volumes;

+    RepagentVolume *volumes[REPAGENT_MAX_NUM_VOLUMES];

+};

+

+typedef struct RepagentReadVolIo {

+    QEMUIOVector qiov;

+    RepCmdReadVolReq rep_cmd;

+    uint8_t *buf;

+    struct timeval start_time;

+} RepagentReadVolIo;

+

+static int repagent_get_volume_by_name(const char *name);

+static void repagent_report_volumes_to_hub(void);

+static void repagent_vol_read_done(void *opaque, int ret);

+static struct timeval tsub(struct timeval t1, struct timeval t2);

+

+RepAgentState g_rep_agent = { 0 };

+

+void repagent_init(const char *hubname, int port)

+{

+    /* It is the responsibility of the thread to free this struct */

+    rephub_params *pParams = (rephub_params *)g_malloc(sizeof(rephub_params));

+    if (hubname == NULL) {

+        hubname = "127.0.0.1";

+    }

+    if (port == 0) {

+        port = 9010;

+    }

+

+    printf("repagent_init %s\n", hubname);

+

+    pParams->port = port;

+    pParams->name = g_strdup(hubname);

+

+    pthread_t thread_id = 0;

+

+    /* Create the repagent client listener thread */

+    pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);

+    pthread_detach(thread_id);

+}

+

+void repagent_register_drive(const char *drive_path,

+        BlockDriverState *driver_ptr)

+{

+    int i;

+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {

+        RepagentVolume *vol = g_rep_agent.volumes[i];

+        if (vol != NULL) {

+            assert(

+                    strcmp(drive_path, vol->vol_path) != 0

+                    && driver_ptr != vol->driver_ptr);

+        }

+    }

+

+    assert(g_rep_agent.num_volumes < REPAGENT_MAX_NUM_VOLUMES);

+

+    printf("zerto repagent: Registering drive. Num drives %d, path %s\n",

+            g_rep_agent.num_volumes, drive_path);

+    g_rep_agent.volumes[i] =

+            (RepagentVolume *)g_malloc(sizeof(RepagentVolume));

+    g_rep_agent.volumes[i]->driver_ptr = driver_ptr;

+    /* orim todo strcpy? */

+    g_rep_agent.volumes[i]->vol_path = drive_path;

+

+    /* Orim todo thread-safety? */

+    g_rep_agent.num_volumes++;

+

+    repagent_report_volumes_to_hub();

+}

+

+/* orim todo destruction? */

+

+static RepagentVolume *repagent_get_protected_volume_by_driver(

+        BlockDriverState *bs)

+{

+    /* orim todo optimize search */

+    int i = 0;

+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {

+        RepagentVolume *p_vol = g_rep_agent.volumes[i];

+        if (p_vol != NULL && p_vol->driver_ptr == (void *) bs) {

+            return p_vol;

+        }

+    }

+    return NULL;

+}

+

+void repagent_handle_protected_write(BlockDriverState *bs, int64_t sector_num,

+        int nb_sectors, QEMUIOVector *qiov, int ret_status)

+{

+    printf("zerto Protected write offset %lld, size %d, IO return status %d",

+            (long long int) sector_num, nb_sectors, ret_status);

+    if (bs->filename != NULL) {

+        printf(", filename %s", bs->filename);

+    }

+

+    printf("\n");

+

+    RepagentVolume *p_vol = repagent_get_protected_volume_by_driver(bs);

+    if (p_vol == NULL || p_vol->vol_id == REPAGENT_VOLUME_ID_NONE) {

+        /* Unprotected */

+        printf("Got a write to an unprotected volume.\n");

+        return;

+    }

+

+    /* Report IO to rephub */

+

+    int data_size = qiov->size;

+    if (ret_status < 0) {

+        /* On failed ios we don't send the data to the hub */

+        data_size = 0;

+    }

+    uint8_t *pdata = NULL;

+    RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(

+            REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **) &pdata);

+

+    if (ret_status >= 0) {

+        qemu_iovec_to_buffer(qiov, pdata);

+    }

+

+    p_cmd->volume_id = p_vol->vol_id;

+    p_cmd->offset_sectors = sector_num;

+    p_cmd->size_sectors = nb_sectors;

+    p_cmd->ret_status = ret_status;

+

+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {

+        printf("Error sending command\n");

+    }

+}

+

+static void repagent_report_volumes_to_hub(void)

+{

+    /* Report IO to rephub */

+    int i;

+    RepCmdDataReportVmVolumes *p_cmd_data = NULL;

+    RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(

+            REPHUB_CMD_REPORT_VM_VOLUMES,

+            g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),

+            (uint8_t **) &p_cmd_data);

+    p_cmd->num_volumes = g_rep_agent.num_volumes;

+    printf("reporting %u volumes\n", g_rep_agent.num_volumes);

+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {

+        assert(g_rep_agent.volumes[i] != NULL);

+        printf("reporting volume %s size %u\n",

+                g_rep_agent.volumes[i]->vol_path,

+                (uint32_t) sizeof(p_cmd_data->volumes[i].name));

+        strncpy((char *) p_cmd_data->volumes[i].name,

+                g_rep_agent.volumes[i]->vol_path,

+                sizeof(p_cmd_data->volumes[i].name));

+        p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id;

+    }

+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {

+        printf("Error sending command\n");

+    }

+}

+

+int repaget_start_protect(RepCmdStartProtect *pcmd,

+        RepCmdDataStartProtect *pcmd_data)

+{

+    printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,

+            (unsigned long long) pcmd->volume_id);

+    int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);

+    if (g_rep_agent.num_volumes > 0

+            && strcmp(pcmd_data->volume_name, "stam") == 0) {

+        /* Choose the first one for rephub */

+        vol_index = 0;

+    }

+    if (vol_index < 0) {

+        printf("The volume doesn't exist\n");

+        return TRUE;

+    }

+    /* orim todo protect */

+    g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;

+

+    return TRUE;

+}

+

+static int repagent_get_volume_by_name(const char *name)

+{

+    int i = 0;

+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {

+        if (g_rep_agent.volumes[i] != NULL

+                && strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {

+            return i;

+        }

+    }

+    return -1;

+}

+

+static int repagent_get_volume_by_id(uint64_t vol_id)

+{

+    int i = 0;

+    for (i = 0; i < g_rep_agent.num_volumes ; i++) {

+        if (g_rep_agent.volumes[i] != NULL

+                && g_rep_agent.volumes[i]->vol_id == vol_id) {

+            return i;

+        }

+    }

+    return -1;

+}

+

+int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata)

+{

+    int index = repagent_get_volume_by_id(pcmd->volume_id);

+    int size_bytes = pcmd->size_sectors * 512;

+    if (index < 0) {

+        printf("Vol read - Could not find vol id %llx\n",

+                (unsigned long long int) pcmd->volume_id);

+        RepCmdReadVolRes *p_res_cmd = (RepCmdReadVolRes *) repcmd_new(

+                REPHUB_CMD_READ_VOL_RES, 0, NULL);

+        p_res_cmd->req_id = pcmd->req_id;

+        p_res_cmd->volume_id = pcmd->volume_id;

+        p_res_cmd->is_status_success = FALSE;

+        repagent_client_send((RepCmd *) p_res_cmd);

+        return TRUE;

+    }

+

+    printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",

+            g_rep_agent.volumes[index]->driver_ptr,

+            (unsigned long long int) pcmd->volume_id,

+            (unsigned long long int) pcmd->offset_sectors, pcmd->size_sectors);

+

+    {

+        RepagentReadVolIo *read_xact = calloc(1, sizeof(RepagentReadVolIo));

+

+/*        BlockDriverAIOCB *acb; */

+

+        ZERO_MEM_OBJ(read_xact);

+

+        qemu_iovec_init(&read_xact->qiov, 1);

+

+        /*read_xact->buf =

+        qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr, size_bytes); */

+        read_xact->buf = (uint8_t *) g_malloc(size_bytes);

+        read_xact->rep_cmd = *pcmd;

+        qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes);

+

+        gettimeofday(&read_xact->start_time, NULL);

+        /* orim TODO - use the returned acb to cancel the request on shutdown */

+        /*acb = */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,

+                read_xact->rep_cmd.offset_sectors, &read_xact->qiov,

+                read_xact->rep_cmd.size_sectors, repagent_vol_read_done,

+                read_xact);

+    }

+

+    return TRUE;

+}

+

+static void repagent_vol_read_done(void *opaque, int ret)

+{

+    struct timeval t2;

+    RepagentReadVolIo *read_xact = (RepagentReadVolIo *) opaque;

+    uint8_t *pdata = NULL;

+    RepCmdReadVolRes *pcmd = (RepCmdReadVolRes *) repcmd_new(

+            REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512,

+            &pdata);

+    pcmd->req_id = read_xact->rep_cmd.req_id;

+    pcmd->volume_id = read_xact->rep_cmd.volume_id;

+    pcmd->is_status_success = FALSE;

+

+    printf("Protected vol read - volId %llu, offset %llu, size %u\n",

+            (unsigned long long int) read_xact->rep_cmd.volume_id,

+            (unsigned long long int) read_xact->rep_cmd.offset_sectors,

+            read_xact->rep_cmd.size_sectors);

+    gettimeofday(&t2, NULL);

+

+    if (ret >= 0) {

+        /* Read response - send the data to the hub */

+        t2 = tsub(t2, read_xact->start_time);

+        printf("Read prot vol done. Took %u seconds, %u us.",

+                (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);

+

+        pcmd->is_status_success = TRUE;

+        /* orim todo optimize - don't copy, use the qiov buffer */

+        qemu_iovec_to_buffer(&read_xact->qiov, pdata);

+    } else {

+        printf("readv failed: %s\n", strerror(-ret));

+    }

+

+    repagent_client_send((RepCmd *) pcmd);

+

+    /*qemu_vfree(read_xact->buf); */

+    g_free(read_xact->buf);

+

+    g_free(read_xact);

+}

+

+static struct timeval tsub(struct timeval t1, struct timeval t2)

+{

+    t1.tv_usec -= t2.tv_usec;

+    if (t1.tv_usec < 0) {

+        t1.tv_usec += 1000000;

+        t1.tv_sec--;

+    }

+    t1.tv_sec -= t2.tv_sec;

+    return t1;

+}

+

+void repagent_client_connected(void)

+{

+    /* orim todo thread protection */

+    repagent_report_volumes_to_hub();

+}

diff --git a/replication/repagent.h b/replication/repagent.h

new file mode 100644

index 0000000..98ccbf2

--- /dev/null

+++ b/replication/repagent.h

@@ -0,0 +1,46 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REPAGENT_H

+#define REPAGENT_H

+#include <stdint.h>

+

+#include "qemu-common.h"

+

+typedef struct RepAgentState RepAgentState;

+typedef struct RepCmdStartProtect RepCmdStartProtect;

+typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;

+struct RepCmdReadVolReq;

+

+void repagent_init(const char *hubname, int port);

+void repagent_handle_protected_write(BlockDriverState *bs,

+        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int ret_status);

+void repagent_register_drive(const char *drive_path,

+        BlockDriverState *driver_ptr);

+int repaget_start_protect(RepCmdStartProtect *pcmd,

+        RepCmdDataStartProtect *pcmd_data);

+int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata);

+void repagent_client_connected(void);

+

+

+#endif /* REPAGENT_H */

diff --git a/replication/repagent_client.c b/replication/repagent_client.c

new file mode 100644

index 0000000..4dd9ea4

--- /dev/null

+++ b/replication/repagent_client.c

@@ -0,0 +1,138 @@

+#include "repcmd.h"

+#include "rephub_cmds.h"

+#include "repcmd_listener.h"

+#include "repagent_client.h"

+#include "repagent.h"

+

+#include <string.h>

+#include <stdlib.h>

+#include <errno.h>

+#include <stdio.h>

+#include <resolv.h>

+#include <sys/socket.h>

+#include <arpa/inet.h>

+#include <netinet/in.h>

+#include <unistd.h>

+

+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))

+

+static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void *clientPtr);

+

+typedef struct repagent_client_state {

+    int is_connected;

+    int is_terminate_receive;

+    int hsock;

+} repagent_client_state;

+

+static repagent_client_state g_client_state = { 0 };

+

+void *repagent_listen(void *pParam)

+{

+    rephub_params *pServerParams = (rephub_params *) pParam;

+    int host_port = pServerParams->port;

+    const char *host_name = pServerParams->name;

+

+    printf("Creating repagent listener thread...\n");

+    g_free(pServerParams);

+

+    struct sockaddr_in my_addr;

+

+    int err;

+    int retries = 0;

+

+    g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);

+    if (g_client_state.hsock == -1) {

+        printf("Error initializing socket %d\n", errno);

+        return (void *) -1;

+    }

+

+    int param = 1;

+

+    if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,

+            (char *) &param, sizeof(int)) == -1)

+            || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE,

+                    (char *) &param, sizeof(int)) == -1)) {

+        printf("Error setting options %d\n", errno);

+        return (void *) -1;

+    }

+

+    my_addr.sin_family = AF_INET;

+    my_addr.sin_port = htons(host_port);

+    memset(&(my_addr.sin_zero), 0, 8);

+

+    my_addr.sin_addr.s_addr = inet_addr(host_name);

+

+    /* Reconnect loop */

+    while (!g_client_state.is_terminate_receive) {

+

+        if (connect(g_client_state.hsock, (struct sockaddr *) &my_addr,

+                sizeof(my_addr)) == -1) {

+            err = errno;

+            if (err != EINPROGRESS) {

+                retries++;

+                fprintf(

+                        stderr,

+                        "Error connecting socket %d. Host %s, port %u. Retry count %d\n",

+                        errno, host_name, host_port, retries);

+                usleep(5 * 1000 * 1000);

+                continue;

+            }

+        }

+        retries = 0;

+

+        g_client_state.is_connected = 1;

+

+        repagent_client_connected();

+        repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);

+        close(g_client_state.hsock);

+

+        g_client_state.is_connected = 0;

+    }

+    return 0;

+}

+

+void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)

+{

+    int is_free_data = 1;

+    printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);

+    switch (pcmd->hdr.cmdid) {

+    case REPHUB_CMD_START_PROTECT: {

+        is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,

+                (RepCmdDataStartProtect *) pdata);

+    }

+        break;

+    case REPHUB_CMD_READ_VOL_REQ: {

+        is_free_data = repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata);

+    }

+        break;

+    default:

+        assert(0);

+        break;

+

+    }

+

+    if (is_free_data) {

+        g_free(pdata);

+    }

+}

+

+int repagent_client_send(RepCmd *p_cmd)

+{

+    int bytecount = 0;

+    printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,

+            p_cmd->hdr.data_size_bytes);

+    if (!g_client_state.is_connected) {

+        printf("Not connected to hub\n");

+        return -1;

+    }

+

+    bytecount = send(g_client_state.hsock, p_cmd,

+            sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);

+    if (bytecount < sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {

+        printf("Bad send %d, errno %d\n", bytecount, errno);

+        return bytecount;

+    }

+

+    /* Success */

+    return 0;

+}

diff --git a/replication/repagent_client.h b/replication/repagent_client.h

new file mode 100644

index 0000000..62a5377

--- /dev/null

+++ b/replication/repagent_client.h

@@ -0,0 +1,36 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REPAGENT_CLIENT_H

+#define REPAGENT_CLIENT_H

+#include "repcmd.h"

+

+typedef struct rephub_params {

+    char *name;

+    int port;

+} rephub_params;

+

+void *repagent_listen(void *pParam);

+int repagent_client_send(RepCmd *p_cmd);

+

+#endif /* REPAGENT_CLIENT_H */

diff --git a/replication/repcmd.h b/replication/repcmd.h

new file mode 100644

index 0000000..8c6cf1b

--- /dev/null

+++ b/replication/repcmd.h

@@ -0,0 +1,59 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REPCMD_H

+#define REPCMD_H

+

+#include <stdint.h>

+

+#define REPCMD_MAGIC1 (0x1122)

+#define REPCMD_MAGIC2 (0x3344)

+#define REPCMD_NUM_U32_PARAMS (11)

+

+enum RepCmds {

+    REPCMD_FIRST_INVALID                    = 0,

+    REPCMD_FIRST_HUBCMD                     = 1,

+    REPHUB_CMD_PROTECTED_WRITE              = 2,

+    REPHUB_CMD_REPORT_VM_VOLUMES            = 3,

+    REPHUB_CMD_START_PROTECT                = 4,

+    REPHUB_CMD_READ_VOL_REQ                 = 5,

+    REPHUB_CMD_READ_VOL_RES                 = 6,

+    REPHUB_CMD_AGENT_SHUTDOWN               = 7,

+};

+

+typedef struct RepCmdHdr {

+    uint16_t magic1;

+    uint16_t cmdid;

+    uint32_t data_size_bytes;

+} RepCmdHdr;

+

+typedef struct RepCmd {

+    RepCmdHdr hdr;

+    unsigned int parameters[REPCMD_NUM_U32_PARAMS];

+    unsigned int magic2;

+    uint8_t data[0];

+} RepCmd;

+

+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);

+

+#endif /* REPCMD_H */

diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c

new file mode 100644

index 0000000..a211927

--- /dev/null

+++ b/replication/repcmd_listener.c

@@ -0,0 +1,137 @@

+#include <fcntl.h>

+#include <string.h>

+#include <stdlib.h>

+#include <errno.h>

+#include <stdio.h>

+#include <netinet/in.h>

+#include <resolv.h>

+#include <sys/socket.h>

+#include <arpa/inet.h>

+#include <unistd.h>

+#include <pthread.h>

+#include <assert.h>

+

+/* Use the CONFIG_REPLICATION flag to determine whether

+ * we're under qemu build or a hub When under

+ * qemu use g_malloc */

+#ifdef CONFIG_REPLICATION

+#include <glib.h>

+#define REPCMD_MALLOC g_malloc

+#else

+#define REPCMD_MALLOC malloc

+#endif

+

+#include "repcmd.h"

+#include "repcmd_listener.h"

+

+#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))

+

+typedef struct RepCmdListenerState {

+    int is_terminate_receive;

+} RepCmdListenerState;

+

+static RepCmdListenerState g_listenerState = { 0 };

+

+/* Returns 0 for initiated termination or socket error value on error */

+int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr)

+{

+    RepCmd curCmd;

+    uint8_t *pReadBuf = (uint8_t *) &curCmd;

+    int bytesToGet = sizeof(RepCmd);

+    int bytesGotten = 0;

+    int isGotHeader = 0;

+    uint8_t *pdata = NULL;

+

+    assert(callback != NULL);

+

+    /* receive loop */

+    while (!g_listenerState.is_terminate_receive) {

+        int bytecount;

+

+        bytecount = recv(hsock, pReadBuf + bytesGotten,

+                bytesToGet - bytesGotten, 0);

+        if (bytecount == -1) {

+            fprintf(stderr, "Error receiving data %d\n", errno);

+            return errno;

+        }

+

+        if (bytecount == 0) {

+            printf("Disconnected\n");

+            return 0;

+        }

+        bytesGotten += bytecount;

+/*     printf("Recieved bytes %d, got %d/%d\n",

+                bytecount, bytesGotten, bytesToGet); */

+        /* print content */

+        if (0) {

+            int i;

+            for (i = 0; i < bytecount ; i += 4) {

+                /*printf("%d/%d", i, bytecount/4); */

+                printf("%#x ",

+                        *(int *) (&pReadBuf[bytesGotten - bytecount + i]));

+

+            }

+            printf("\n");

+        }

+        assert(bytesGotten <= bytesToGet);

+        if (bytesGotten == bytesToGet) {

+            int isGotData = 0;

+            bytesGotten = 0;

+            if (!isGotHeader) {

+                /* We just got the header */

+                isGotHeader = 1;

+

+                assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);

+                assert(curCmd.magic2 == REPCMD_MAGIC2);

+                if (curCmd.hdr.data_size_bytes > 0) {

+                    pdata = (uint8_t *)REPCMD_MALLOC(

+                                curCmd.hdr.data_size_bytes);

+/*                    printf("malloc %p\n", pdata); */

+                    pReadBuf = pdata;

+                } else {

+                    /* no data */

+                    isGotData = 1;

+                    pdata = NULL;

+                }

+                bytesToGet = curCmd.hdr.data_size_bytes;

+            } else {

+                isGotData = 1;

+            }

+

+            if (isGotData) {

+                /* Got command and data */

+                (*callback)(&curCmd, pdata, clientPtr);

+

+                /* It's the callee responsibility to free pData */

+                pdata = NULL;

+                ZERO_MEM_OBJ(&curCmd);

+                pReadBuf = (uint8_t *) &curCmd;

+                bytesGotten = 0;

+                bytesToGet = sizeof(RepCmd);

+                isGotHeader = 0;

+            }

+        }

+    }

+    return 0;

+}

+

+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)

+{

+    RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);

+    assert(p_cmd != NULL);

+

+    /* Zero the CMD (not the data) */

+    ZERO_MEM_OBJ(p_cmd);

+

+    p_cmd->hdr.cmdid = cmd_id;

+    p_cmd->hdr.magic1 = REPCMD_MAGIC1;

+    p_cmd->magic2 = REPCMD_MAGIC2;

+    p_cmd->hdr.data_size_bytes = data_size;

+

+    if (p_out_pdata != NULL) {

+        *p_out_pdata = p_cmd->data;

+    }

+

+    return p_cmd;

+}

+

diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h

new file mode 100644

index 0000000..c09a12e

--- /dev/null

+++ b/replication/repcmd_listener.h

@@ -0,0 +1,32 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REPCMD_LISTENER_H

+#define REPCMD_LISTENER_H

+#include <stdint.h>

+typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,

+                uint8_t *pData, void *clientPtr);

+

+int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void *clientPtr);

+

+#endif /* REPCMD_LISTENER_H */

diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h

new file mode 100644

index 0000000..3bd4eb4

--- /dev/null

+++ b/replication/rephub_cmds.h

@@ -0,0 +1,151 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REPHUB_CMDS_H

+#define REPHUB_CMDS_H

+

+#include <stdint.h>

+#include "repcmd.h"

+#include "rephub_defs.h"

+

+/*********************************************************

+ * RepCmd Report a protected IO

+ *

+ * REPHUB_CMD_PROTECTED_WRITE

+ * Direction: agent->hub

+ *

+ * Any write of a protected volume is send with this

+ * message to the hub, with its status.

+ * When case the status is bad no data is sent

+ *********************************************************/

+typedef struct RepCmdProtectedWrite {

+    RepCmdHdr hdr;

+    uint64_t volume_id;

+    uint64_t offset_sectors;

+    /* The size field duplicates the RepCmd size,

+     * but it is needed for reporting failed IOs' sizes */

+    uint32_t size_sectors;

+    int ret_status;

+} RepCmdProtectedWrite;

+

+/*********************************************************

+ * RepCmd Report VM volumes

+ *

+ * REPHUB_CMD_REPORT_VM_VOLUMES

+ * Direction: agent->hub

+ *

+ * The agent reports all the volumes of the VM

+ * to the hub.

+ *********************************************************/

+typedef struct RepVmVolumeInfo {

+    char name[REPHUB_MAX_VOL_NAME_LEN];

+    uint64_t volume_id;

+    uint32_t size_mb;

+    uint32_t padding;

+} RepVmVolumeInfo;

+

+typedef struct RepCmdReportVmVolumes {

+    RepCmdHdr hdr;

+    int num_volumes;

+} RepCmdReportVmVolumes;

+

+typedef struct RepCmdDataReportVmVolumes {

+    RepVmVolumeInfo volumes[0];

+} RepCmdDataReportVmVolumes;

+

+

+/*********************************************************

+ * RepCmd Start protect

+ *

+ * REPHUB_CMD_START_PROTECT

+ * Direction: hub->agent

+ *

+ * The hub instructs the agent to start protecting

+ * a volume. When a volume is protected all its writes

+ * are sent to to the hub.

+ * With this command the hub also assigns a volume ID to

+ * the given volume name.

+ *********************************************************/

+typedef struct RepCmdStartProtect {

+    RepCmdHdr hdr;

+    uint64_t volume_id;

+} RepCmdStartProtect;

+

+typedef struct RepCmdDataStartProtect {

+    char volume_name[REPHUB_MAX_VOL_NAME_LEN];

+} RepCmdDataStartProtect;

+

+

+/*********************************************************

+ * RepCmd Read Volume Request

+ *

+ * REPHUB_CMD_READ_VOL_REQ

+ * Direction: hub->agent

+ *

+ * The hub issues a read IO to a protected volume.

+ * This command is used during sync - when the hub needs

+ * to read unsyncronized sections of a protected volume.

+ * This command is a request, the read data is returned

+ * by the response command REPHUB_CMD_READ_VOL_RES

+ *********************************************************/

+typedef struct RepCmdReadVolReq {

+    RepCmdHdr hdr;

+    int req_id;

+    int size_sectors;

+    uint64_t volume_id;

+    uint64_t offset_sectors;

+} RepCmdReadVolReq;

+

+/*********************************************************

+ * RepCmd Read Volume Response

+ *

+ * REPHUB_CMD_READ_VOL_RES

+ * Direction: agent->hub

+ *

+ * A response to REPHUB_CMD_READ_VOL_REQ.

+ * Sends the data read from a protected volume

+ *********************************************************/

+typedef struct RepCmdReadVolRes {

+    RepCmdHdr hdr;

+    int req_id;

+    int is_status_success;

+    uint64_t volume_id;

+} RepCmdReadVolRes;

+

+/*********************************************************

+ * RepCmd Agent shutdown

+ *

+ * REPHUB_CMD_AGENT_SHUTDOWN

+ * Direction: agent->hub

+ *

+ * Notifies the hub that the agent is about to shutdown.

+ * This allows a graceful shutdown. Any disconnection

+ * of an agent without sending this command will result

+ * in a full sync of the VM volumes.

+ *********************************************************/

+typedef struct RepCmdAgentShutdown {

+    RepCmdHdr hdr;

+} RepCmdAgentShutdown;

+

+

+#endif /* REPHUB_CMDS_H */

diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h

new file mode 100644

index 0000000..e34e0ce

--- /dev/null

+++ b/replication/rephub_defs.h

@@ -0,0 +1,40 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a copy

+ * of this software and associated documentation files (the "Software"), to deal

+ * in the Software without restriction, including without limitation the rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN

+ * THE SOFTWARE.

+ */

+#ifndef REP_HUB_DEFS_H

+#define REP_HUB_DEFS_H

+

+#include <stdint.h>

+

+#define REPHUB_MAX_VOL_NAME_LEN (1024)

+#define REPHUB_MAX_NUM_VOLUMES (512)

+

+#ifndef TRUE

+    #define TRUE (1)

+#endif

+

+#ifndef FALSE

+    #define FALSE (0)

+#endif

+

+#endif /* REP_HUB_DEFS_H */

diff --git a/vl.c b/vl.c

index 97ab2b9..7f8f14c 100644

--- a/vl.c

+++ b/vl.c

@@ -167,6 +167,7 @@ int main(int argc, char **argv)

 #include "ui/qemu-spice.h"

+#include "replication/repagent.h"

//#define DEBUG_NET

//#define DEBUG_SLIRP

@@ -2411,6 +2412,15 @@ int main(int argc, char **argv, char **envp)

                 drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda, optarg,

                           HD_OPTS);

                 break;

+            case QEMU_OPTION_repagent:

+#ifdef CONFIG_REPLICATION

+                repagent_init(optarg, 0);

+#else

+                fprintf(stderr, "Replication support is disabled. "

+                    "Don't use -repagent option.\n");

+                exit(1);

+#endif

+                break;

             case QEMU_OPTION_drive:

                 if (drive_def(optarg) == NULL) {

                     exit(1);

--

1.7.6.5


reply via email to

[Prev in Thread] Current Thread [Next in Thread]