qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [Qemu-devel] [RFC PATCH] replication agent module


From: Anthony Liguori
Subject: Re: [Qemu-devel] [RFC PATCH] replication agent module
Date: Tue, 07 Feb 2012 06:12:46 -0600
User-agent: Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.23) Gecko/20110922 Lightning/1.0b2 Thunderbird/3.1.15

Hi,

On 02/07/2012 04:29 AM, Ori Mamluk wrote:
Repagent is a new module that allows an external replication system to
replicate a volume of a Qemu VM.

This RFC patch adds the repagent client module to Qemu.

Please read http://wiki.qemu.org/Contribute/SubmitAPatch

In particular, use a tool like git-send-email and split this patch up into more manageable chunks.

Is there an Open Source rehub available? As a project policy, adding external APIs specifically for proprietary software is not something we're willing to do.

Regards,

Anthony Liguori

Documentation of the module role and API is in the patch at
replication/qemu-repagent.txt



The main motivation behind the module is to allow replication of VMs in a
virtualization environment like RhevM.

To achieve this we need basic replication support in Qemu.



This is the first submission of this module, which was written as a Proof
Of Concept, and used successfully for replicating and recovering a Qemu VM.

Points and open issues:

*             The module interfaces the Qemu storage stack at block.c
generic layer. Is this the right place to intercept/inject IOs?

*             The patch contains performing IO reads invoked by a new
thread (a TCP listener thread). See repaget_read_vol in repagent.c. It is
not protected by any lock – is this OK?

*             VM ID – the replication system implies an environment with
several VMs connected to a central replication system (Rephub).

                 This requires some sort of identification for a VM. The
current patch does not include a VM ID – I did not find any adequate ID to
use.

                 Any suggestions?



Appreciate any feedback or suggestions.  Thanks,

Ori.





 From 5a0d88689ddcf325f25fdfca2a2012f1bbf141b9 Mon Sep 17 00:00:00 2001

From: Ori Mamluk<address@hidden(none)>

Date: Tue, 7 Feb 2012 11:12:12 +0200

Subject: [PATCH] Added replication agent module (repagent) to Qemu under

replication directory, added repagent configure and run

options, and the repagent API usage in bloc



Added build options to ./configure:  --enable-replication --disable-replicat

Added a commandline option to enable: -repagent<rep hub IP>

Added the module files under replication.



Signed-off-by: Ori Mamluk<address@hidden>

---

Makefile                      |    9 +-

Makefile.objs                 |    6 +

block.c                       |   20 +++-

configure                     |   11 ++

qemu-options.hx               |    6 +

replication/qemu-repagent.txt |  104 +++++++++++++

replication/repagent.c        |  322
+++++++++++++++++++++++++++++++++++++++++

replication/repagent.h        |   46 ++++++

replication/repagent_client.c |  138 ++++++++++++++++++

replication/repagent_client.h |   36 +++++

replication/repcmd.h          |   59 ++++++++

replication/repcmd_listener.c |  137 +++++++++++++++++

replication/repcmd_listener.h |   32 ++++

replication/rephub_cmds.h     |  150 +++++++++++++++++++

replication/rephub_defs.h     |   40 +++++

vl.c                          |   10 ++

16 files changed, 1121 insertions(+), 5 deletions(-)

mode change 100644 =>  100755 Makefile.objs

mode change 100644 =>  100755 qemu-options.hx

create mode 100755 replication/qemu-repagent.txt

create mode 100644 replication/repagent.c

create mode 100644 replication/repagent.h

create mode 100644 replication/repagent_client.c

create mode 100644 replication/repagent_client.h

create mode 100644 replication/repcmd.h

create mode 100644 replication/repcmd_listener.c

create mode 100644 replication/repcmd_listener.h

create mode 100644 replication/rephub_cmds.h

create mode 100644 replication/rephub_defs.h



diff --git a/Makefile b/Makefile

index 4f6eaa4..a1b3701 100644

--- a/Makefile

+++ b/Makefile

@@ -149,9 +149,9 @@ qemu-img.o qemu-tool.o qemu-nbd.o qemu-io.o cmd.o
qemu-ga.o: $(GENERATED_HEADERS

tools-obj-y = qemu-tool.o $(oslib-obj-y) $(trace-obj-y) \

                qemu-timer-common.o cutils.o

-qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)

-qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)

-qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)

+qemu-img$(EXESUF): qemu-img.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)

+qemu-nbd$(EXESUF): qemu-nbd.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)

+qemu-io$(EXESUF): qemu-io.o cmd.o $(tools-obj-y) $(block-obj-y)
$(replication-obj-y)

  qemu-img-cmds.h: $(SRC_PATH)/qemu-img-cmds.hx

                $(call quiet-command,sh $(SRC_PATH)/scripts/hxtool -h<  $<  >
$@,"  GEN   $@")

@@ -228,6 +228,7 @@ clean:

                rm -f trace-dtrace.dtrace trace-dtrace.dtrace-timestamp

                rm -f trace-dtrace.h trace-dtrace.h-timestamp

                rm -rf $(qapi-dir)

+             rm -f replication/*.{o,d}

                $(MAKE) -C tests clean

                for d in $(ALL_SUBDIRS) $(QEMULIBS) libcacard; do \

                if test -d $$d; then $(MAKE) -C $$d $@ || exit 1; fi; \

@@ -387,4 +388,4 @@ tar:

                rm -rf /tmp/$(FILE)

  # Include automatically generated dependency files

--include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
qapi/*.d qga/*.d)

+-include $(wildcard *.d audio/*.d slirp/*.d block/*.d net/*.d ui/*.d
qapi/*.d qga/*.d replication/*.d)

diff --git a/Makefile.objs b/Makefile.objs

old mode 100644

new mode 100755

index d7a6539..dbd6f15

--- a/Makefile.objs

+++ b/Makefile.objs

@@ -74,6 +74,7 @@ fsdev-obj-$(CONFIG_VIRTFS) += $(addprefix fsdev/,
$(fsdev-nested-y))

# CPUs and machines.

  common-obj-y = $(block-obj-y) blockdev.o

+common-obj-y += $(replication-obj-$(CONFIG_REPLICATION))

common-obj-y += $(net-obj-y)

common-obj-y += $(qobject-obj-y)

common-obj-$(CONFIG_LINUX) += $(fsdev-obj-$(CONFIG_LINUX))

@@ -413,6 +414,11 @@ common-obj-y += qmp-marshal.o qapi-visit.o
qapi-types.o $(qapi-obj-y)

common-obj-y += qmp.o hmp.o

  ######################################################################

+# replication

+replication-nested-y = repagent_client.o  repagent.o  repcmd_listener.o

+replication-obj-y = $(addprefix replication/, $(replication-nested-y))

+

+######################################################################

# guest agent

  qga-nested-y = guest-agent-commands.o guest-agent-command-state.o

diff --git a/block.c b/block.c

index 9bb236c..f3b8387 100644

--- a/block.c

+++ b/block.c

@@ -31,6 +31,10 @@

#include "qemu-coroutine.h"

#include "qmp-commands.h"

+#ifdef CONFIG_REPLICATION

+#include "replication/repagent.h"

+#endif

+

#ifdef CONFIG_BSD

#include<sys/types.h>

#include<sys/stat.h>

@@ -640,6 +644,9 @@ int bdrv_open(BlockDriverState *bs, const char
*filename, int flags,

          goto unlink_and_fail;

      }

+#ifdef CONFIG_REPLICATION

+    repagent_register_drive(filename,  bs);

+#endif

      /* Open the image */

      ret = bdrv_open_common(bs, filename, flags, drv);

      if (ret<  0) {

@@ -1292,6 +1299,17 @@ static int coroutine_fn
bdrv_co_do_writev(BlockDriverState *bs,

      ret = drv->bdrv_co_writev(bs, sector_num, nb_sectors, qiov);

+

+#ifdef CONFIG_REPLICATION

+    if (bs->device_name[0] != '\0') {

+        /* We split the IO only at the highest stack driver layer.

+           Currently we know that by checking device_name - only

+           highest level (closest to the guest) has that name.

+           */

+           repagent_handle_protected_write(bs, sector_num,

+                nb_sectors, qiov, ret);

+    }

+#endif

      if (bs->dirty_bitmap) {

          set_dirty_bitmap(bs, sector_num, nb_sectors, 1);

      }

@@ -1783,7 +1801,7 @@ int bdrv_has_zero_init(BlockDriverState *bs)

   * 'nb_sectors' is the max value 'pnum' should be set to.

   */

int bdrv_is_allocated(BlockDriverState *bs, int64_t sector_num, int
nb_sectors,

-              int *pnum)

+    int *pnum)

{

      int64_t n;

      if (!bs->drv->bdrv_is_allocated) {

diff --git a/configure b/configure

index 9e5da44..93d600e 100755

--- a/configure

+++ b/configure

@@ -179,6 +179,7 @@ spice=""

rbd=""

smartcard=""

smartcard_nss=""

+replication=""

usb_redir=""

opengl=""

zlib="yes"

@@ -772,6 +773,10 @@ for opt do

    ;;

    --enable-smartcard-nss) smartcard_nss="yes"

    ;;

+  --disable-replication) replication="no"

+  ;;

+  --enable-replication) replication="yes"

+  ;;

    --disable-usb-redir) usb_redir="no"

    ;;

    --enable-usb-redir) usb_redir="yes"

@@ -1067,6 +1072,7 @@ echo "  --disable-usb-redir      disable usb network
redirection support"

echo "  --enable-usb-redir       enable usb network redirection support"

echo "  --disable-guest-agent    disable building of the QEMU Guest Agent"

echo "  --enable-guest-agent     enable building of the QEMU Guest Agent"

+echo "  --enable-replication     enable replication support"

echo ""

echo "NOTE: The object files are built at the place where configure is
launched"

exit 1

@@ -2733,6 +2739,7 @@ echo "curl support      $curl"

echo "check support     $check_utests"

echo "mingw32 support   $mingw32"

echo "Audio drivers     $audio_drv_list"

+echo "Replication          $replication"

echo "Extra audio cards $audio_card_list"

echo "Block whitelist   $block_drv_whitelist"

echo "Mixer emulation   $mixemu"

@@ -3080,6 +3087,10 @@ if test "$smartcard_nss" = "yes" ; then

    echo "CONFIG_SMARTCARD_NSS=y">>  $config_host_mak

fi

+if test "$replication" = "yes" ; then

+  echo "CONFIG_REPLICATION=y">>  $config_host_mak

+fi

+

if test "$usb_redir" = "yes" ; then

    echo "CONFIG_USB_REDIR=y">>  $config_host_mak

fi

diff --git a/qemu-options.hx b/qemu-options.hx

old mode 100644

new mode 100755

index 681eaf1..c97e4f8

--- a/qemu-options.hx

+++ b/qemu-options.hx

@@ -2602,3 +2602,9 @@ HXCOMM This is the last statement. Insert new options
before this line!

STEXI

@end table

ETEXI

+

+DEF("repagent", HAS_ARG, QEMU_OPTION_repagent,

+    "-repagent [hub IP/name]\n"

+    "                Enable replication support for disks\n"

+    "                hub is the ip or name of the machine running the
replication hub.\n",

+    QEMU_ARCH_ALL)

diff --git a/replication/qemu-repagent.txt b/replication/qemu-repagent.txt

new file mode 100755

index 0000000..e3b0c1e

--- /dev/null

+++ b/replication/qemu-repagent.txt

@@ -0,0 +1,104 @@

+             repagent - replication agent - a Qemu module for enabling
continuous async replication of VM volumes

+

+Introduction

+             This document describes a feature in Qemu - a replication
agent (AKA Repagent).

+             The Repagent is a new module that exposes an API to an
external replication system (AKA Rephub).

+             This API allows a Rephub to communicate with a Qemu VM and
continuously replicate its volumes.

+             The imlementation of a Rephub is outside of the scope of this
document. There may be several various Rephub

+             implenetations using the same repagent in Qemu.

+

+Main feature of Repagent

+             Repagent does the following:

+             * Report volumes - report a list of all volumes in a VM to
the Rephub.

+             * Report writes to a volume - send all writes made to a
protected volume to the Rephub.

+                             The reporting of an IO is asyncronuous - i.e.
the IO is not delayed by the Repagent to get any acknowledgement from the
Rephub.

+                             It is only copied to the Rephub.

+             * Read a protected volume - allows the Rephub to read a
protected volume, to enable the protected hub to syncronize the content of
a protected volume.

+

+Description of the Repagent module

+

+Build and run options

+             New configure option: --enable-replication

+             New command line option:

+             -repagent [hub IP/name]

+
Enable replication support for disks

+
hub is the ip or name of the machine running the replication hub.

+

+Module APIs

+             The Repagent module interfaces two main components:

+             1. The Rephub - An external API based on socket messages

+             2. The generic block layer- block.c

+

+             Rephub message API

+                             The external replication API is a message
based API.

+                             We won't go into the structure of the
messages here - just the sematics.

+

+                             Messages list

+                                             (The updated list and
comments are in Rephub_cmds.h)

+

+                                             Messages from the Repagent to
the Rephub:

+                                             * Protected write

+                                                             The Repagent
sends each write to a protected volume to the hub with the IO status.

+                                                             In case the
status is bad the write content is not sent

+                                             * Report VM volumes

+                                                             The agent
reports all the volumes of the VM to the hub.

+                                             * Read Volume Response

+                                                             A response to
a Read Volume Request

+                                                             Sends the
data read from a protected volume to the hub

+                                             * Agent shutdown

+                                                             Notifies the
hub that the agent is about to shutdown.

+                                                             This allows a
graceful shutdown. Any disconnection of an agent without

+                                                             sending this
command will result in a full sync of the VM volumes.

+

+                                             Messages from the Rephub to
the Repagent:

+                                             * Start protect

+                                                             The hub
instructs the agent to start protecting a volume. When a volume is protected

+                                                             all its
writes are sent to to the hub.

+                                                             With this
command the hub also assigns a volume ID to the given volume name.

+                                             * Read volume request

+                                                             The hub
issues a read IO to a protected volume.

+                                                             This command
is used during sync - when the hub needs to read unsyncronized

+                                                             sections of a
protected volume.

+                                                             This command
is a request, the read data is returned by the read volume response message
(see above).

+             block.c API

+                             The API to the generic block storage layer
contains 3 functionalities:

+                             1. Handle writes to protected volumes

+                                             In bdrv_co_do_writev, each
write is reported to the Repagent module.

+                             2. Handle each new volume that registers

+                                             In bdrv_open - each new
bottom-level block driver that registers is reported.

+                             2. Read from a volume

+                                             Repagent calls bdrv_aio_readv
to handle read requests coming from the hub.

+

+

+General description of a Rephub  - a replication system the repagent
connects to

+             This section describes in high level a sample Rephub - a
replication system that uses the repagent API

+             to replicate disks.

+             It describes a simple Rephub that comntinuously maintains a
mirror of the volumes of a VM.

+

+             Say we have a VM we want to protect - call it PVM, say it has
2 volumes - V1, V2.

+             Our Rephub is called SingleRephub - a Rephub protecting a
single VM.

+

+             Preparations

+             1. The user chooses a host to rub SingleRephub - a different
host than PVM, call it Host2

+             2. The user creates two volumes on Host2 - same sizes of V1
and V2, call them V1R (V1 recovery) and V2R.

+             3. The user runs SingleRephub process on Host2, and gives V1R
and V2R as command line arguments.

+                             From now on SingleRephub waits for the
protected VM repagent to connect.

+             4. The user runs the protected VM PVM - and uses the switch
-repagent<Host2 IP>.

+

+             Runtime

+             1. The repagent module connects to SingleRephub on startup.

+             2. repagent reports V1 and V2 to SingleRephub.

+             3. SingleRephub starts to perform an initial synchronization
of the protected volumes-

+                             it reads each protected volume (V1 and V2) -
using read volume requests - and copies the data into the

+                             recovery volume V1R and V2R.

+             4. SingleRephub enters 'protection' mode - each write to the
protected volume is sent by the repagent to the Rephub,

+                             and the Rephub performs the write on the
matching recovery volume.

+

+             * Note that during stage 3 writes to the protected volumes
are not ignored - they're kept in a bitmap,

+                             and will be read again when stage 3 ends, in
an interative convergin process.

+

+             This flow continuously maintains an updated recovery volume.

+             If the protected system is damaged, the user can create a new
VM on Host2 with the replicated volumes attached to it.

+             The new VM is a replica of the protected system.

+

+

diff --git a/replication/repagent.c b/replication/repagent.c

new file mode 100644

index 0000000..c66eae7

--- /dev/null

+++ b/replication/repagent.c

@@ -0,0 +1,322 @@

+#include<string.h>

+#include<stdlib.h>

+#include<stdio.h>

+#include<pthread.h>

+#include<stdint.h>

+

+#include "block.h"

+#include "rephub_defs.h"

+#include "block_int.h"

+#include "repagent_client.h"

+#include "repagent.h"

+#include "rephub_cmds.h"

+

+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))

+#define REPAGENT_MAX_NUM_VOLUMES (64)

+#define REPAGENT_VOLUME_ID_NONE (0)

+

+typedef struct RepagentVolume {

+    uint64_t vol_id;

+    const char *vol_path;

+    BlockDriverState *driver_ptr;

+} RepagentVolume;

+

+struct RepAgentState {

+    int is_init;

+    int num_volumes;

+    RepagentVolume * volumes[REPAGENT_MAX_NUM_VOLUMES];

+};

+

+typedef struct RepagentReadVolIo {

+    QEMUIOVector qiov;

+    RepCmdReadVolReq rep_cmd;

+    uint8_t *buf;

+    struct timeval start_time;

+} RepagentReadVolIo;

+

+static int repagent_get_volume_by_name(const char *name);

+static void repagent_report_volumes_to_hub(void);

+static void repagent_vol_read_done(void *opaque, int ret);

+static struct timeval tsub(struct timeval t1, struct timeval t2);

+

+RepAgentState g_rep_agent = { 0 };

+

+void repagent_init(const char *hubname, int port)

+{

+    /* It is the responsibility of the thread to free this struct */

+    rephub_params *pParams = (rephub_params
*)g_malloc(sizeof(rephub_params));

+    if (hubname == NULL) {

+        hubname = "127.0.0.1";

+    }

+    if (port == 0) {

+        port = 9010;

+    }

+

+    printf("repagent_init %s\n", hubname);

+

+    pParams->port = port;

+    pParams->name = g_strdup(hubname);

+

+    pthread_t thread_id = 0;

+

+    /* Create the repagent client listener thread */

+    pthread_create(&thread_id, 0, repagent_listen, (void *) pParams);

+    pthread_detach(thread_id);

+}

+

+void repagent_register_drive(const char *drive_path,

+        BlockDriverState *driver_ptr)

+{

+    int i;

+    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {

+        RepagentVolume *vol = g_rep_agent.volumes[i];

+        if (vol != NULL) {

+            assert(

+                    strcmp(drive_path, vol->vol_path) != 0

+&&  driver_ptr != vol->driver_ptr);

+        }

+    }

+

+    assert(g_rep_agent.num_volumes<  REPAGENT_MAX_NUM_VOLUMES);

+

+    printf("zerto repagent: Registering drive. Num drives %d, path %s\n",

+            g_rep_agent.num_volumes, drive_path);

+    g_rep_agent.volumes[i] =

+            (RepagentVolume *)g_malloc(sizeof(RepagentVolume));

+    g_rep_agent.volumes[i]->driver_ptr = driver_ptr;

+    /* orim todo strcpy? */

+    g_rep_agent.volumes[i]->vol_path = drive_path;

+

+    /* Orim todo thread-safety? */

+    g_rep_agent.num_volumes++;

+

+    repagent_report_volumes_to_hub();

+}

+

+/* orim todo destruction? */

+

+static RepagentVolume *repagent_get_protected_volume_by_driver(

+        BlockDriverState *bs)

+{

+    /* orim todo optimize search */

+    int i = 0;

+    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {

+        RepagentVolume *p_vol = g_rep_agent.volumes[i];

+        if (p_vol != NULL&&  p_vol->driver_ptr == (void *) bs) {

+            return p_vol;

+        }

+    }

+    return NULL;

+}

+

+void repagent_handle_protected_write(BlockDriverState *bs, int64_t
sector_num,

+        int nb_sectors, QEMUIOVector *qiov, int ret_status)

+{

+    printf("zerto Protected write offset %lld, size %d, IO return status
%d",

+            (long long int) sector_num, nb_sectors, ret_status);

+    if (bs->filename != NULL) {

+        printf(", filename %s", bs->filename);

+    }

+

+    printf("\n");

+

+    RepagentVolume *p_vol = repagent_get_protected_volume_by_driver(bs);

+    if (p_vol == NULL || p_vol->vol_id == REPAGENT_VOLUME_ID_NONE) {

+        /* Unprotected */

+        printf("Got a write to an unprotected volume.\n");

+        return;

+    }

+

+    /* Report IO to rephub */

+

+    int data_size = qiov->size;

+    if (ret_status<  0) {

+        /* On failed ios we don't send the data to the hub */

+        data_size = 0;

+    }

+    uint8_t *pdata = NULL;

+    RepCmdProtectedWrite *p_cmd = (RepCmdProtectedWrite *) repcmd_new(

+            REPHUB_CMD_PROTECTED_WRITE, data_size, (uint8_t **)&pdata);

+

+    if (ret_status>= 0) {

+        qemu_iovec_to_buffer(qiov, pdata);

+    }

+

+    p_cmd->volume_id = p_vol->vol_id;

+    p_cmd->offset_sectors = sector_num;

+    p_cmd->size_sectors = nb_sectors;

+    p_cmd->ret_status = ret_status;

+

+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {

+        printf("Error sending command\n");

+    }

+}

+

+static void repagent_report_volumes_to_hub(void)

+{

+    /* Report IO to rephub */

+    int i;

+    RepCmdDataReportVmVolumes *p_cmd_data = NULL;

+    RepCmdReportVmVolumes *p_cmd = (RepCmdReportVmVolumes *) repcmd_new(

+            REPHUB_CMD_REPORT_VM_VOLUMES,

+            g_rep_agent.num_volumes * sizeof(RepVmVolumeInfo),

+            (uint8_t **)&p_cmd_data);

+    p_cmd->num_volumes = g_rep_agent.num_volumes;

+    printf("reporting %u volumes\n", g_rep_agent.num_volumes);

+    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {

+        assert(g_rep_agent.volumes[i] != NULL);

+        printf("reporting volume %s size %u\n",

+                g_rep_agent.volumes[i]->vol_path,

+                (uint32_t) sizeof(p_cmd_data->volumes[i].name));

+        strncpy((char *) p_cmd_data->volumes[i].name,

+                g_rep_agent.volumes[i]->vol_path,

+                sizeof(p_cmd_data->volumes[i].name));

+        p_cmd_data->volumes[i].volume_id = g_rep_agent.volumes[i]->vol_id;

+    }

+    if (repagent_client_send((RepCmd *) p_cmd) != 0) {

+        printf("Error sending command\n");

+    }

+}

+

+int repaget_start_protect(RepCmdStartProtect *pcmd,

+        RepCmdDataStartProtect *pcmd_data)

+{

+    printf("Start protect vol %s, ID %llu\n", pcmd_data->volume_name,

+            (unsigned long long) pcmd->volume_id);

+    int vol_index = repagent_get_volume_by_name(pcmd_data->volume_name);

+    if (vol_index<  0) {

+        printf("The volume doesn't exist\n");

+        return TRUE;

+    }

+    /* orim todo protect */

+    g_rep_agent.volumes[vol_index]->vol_id = pcmd->volume_id;

+

+    return TRUE;

+}

+

+static int repagent_get_volume_by_name(const char *name)

+{

+    int i = 0;

+    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {

+        if (g_rep_agent.volumes[i] != NULL

+&&  strcmp(name, g_rep_agent.volumes[i]->vol_path) == 0) {

+            return i;

+        }

+    }

+    return -1;

+}

+

+static int repagent_get_volume_by_id(uint64_t vol_id)

+{

+    int i = 0;

+    for (i = 0; i<  g_rep_agent.num_volumes ; i++) {

+        if (g_rep_agent.volumes[i] != NULL

+&&  g_rep_agent.volumes[i]->vol_id == vol_id) {

+            return i;

+        }

+    }

+    return -1;

+}

+

+int repaget_read_vol(RepCmdReadVolReq *pcmd, uint8_t *pdata)

+{

+    int index = repagent_get_volume_by_id(pcmd->volume_id);

+    int size_bytes = pcmd->size_sectors * 512;

+    if (index<  0) {

+        printf("Vol read - Could not find vol id %llu\n",

+                (unsigned long long int) pcmd->volume_id);

+        RepCmdReadVolRes *p_res_cmd = (RepCmdReadVolRes *) repcmd_new(

+                REPHUB_CMD_READ_VOL_RES, 0, NULL);

+        p_res_cmd->req_id = pcmd->req_id;

+        p_res_cmd->volume_id = pcmd->volume_id;

+        p_res_cmd->is_status_success = FALSE;

+        repagent_client_send((RepCmd *) p_res_cmd);

+        return TRUE;

+    }

+

+    printf("Vol read - driver %p, volId %llu, offset %llu, size %u\n",

+            g_rep_agent.volumes[index]->driver_ptr,

+            (unsigned long long int) pcmd->volume_id,

+            (unsigned long long int) pcmd->offset_sectors,
pcmd->size_sectors);

+

+    {

+        RepagentReadVolIo *read_xact = calloc(1,
sizeof(RepagentReadVolIo));

+

+/*        BlockDriverAIOCB *acb; */

+

+        ZERO_MEM_OBJ(read_xact);

+

+        qemu_iovec_init(&read_xact->qiov, 1);

+

+        /*read_xact->buf =

+        qemu_blockalign(g_rep_agent.volumes[index]->driver_ptr,
size_bytes); */

+        read_xact->buf = (uint8_t *) g_malloc(size_bytes);

+        read_xact->rep_cmd = *pcmd;

+        qemu_iovec_add(&read_xact->qiov, read_xact->buf, size_bytes);

+

+        gettimeofday(&read_xact->start_time, NULL);

+        /* orim TODO - use the returned acb to cancel the request on
shutdown */

+        /*acb = */bdrv_aio_readv(g_rep_agent.volumes[index]->driver_ptr,

+                read_xact->rep_cmd.offset_sectors,&read_xact->qiov,

+                read_xact->rep_cmd.size_sectors, repagent_vol_read_done,

+                read_xact);

+    }

+

+    return TRUE;

+}

+

+static void repagent_vol_read_done(void *opaque, int ret)

+{

+    struct timeval t2;

+    RepagentReadVolIo *read_xact = (RepagentReadVolIo *) opaque;

+    uint8_t *pdata = NULL;

+    RepCmdReadVolRes *pcmd = (RepCmdReadVolRes *) repcmd_new(

+            REPHUB_CMD_READ_VOL_RES, read_xact->rep_cmd.size_sectors * 512,

+&pdata);

+    pcmd->req_id = read_xact->rep_cmd.req_id;

+    pcmd->volume_id = read_xact->rep_cmd.volume_id;

+    pcmd->is_status_success = FALSE;

+

+    printf("Protected vol read - volId %llu, offset %llu, size %u\n",

+            (unsigned long long int) read_xact->rep_cmd.volume_id,

+            (unsigned long long int) read_xact->rep_cmd.offset_sectors,

+            read_xact->rep_cmd.size_sectors);

+    gettimeofday(&t2, NULL);

+

+    if (ret>= 0) {

+        /* Read response - send the data to the hub */

+        t2 = tsub(t2, read_xact->start_time);

+        printf("Read prot vol done. Took %u seconds, %u us.",

+                (uint32_t) t2.tv_sec, (uint32_t) t2.tv_usec);

+

+        pcmd->is_status_success = TRUE;

+        /* orim todo optimize - don't copy, use the qiov buffer */

+        qemu_iovec_to_buffer(&read_xact->qiov, pdata);

+    } else {

+        printf("readv failed: %s\n", strerror(-ret));

+    }

+

+    repagent_client_send((RepCmd *) pcmd);

+

+    /*qemu_vfree(read_xact->buf); */

+    g_free(read_xact->buf);

+

+    g_free(read_xact);

+}

+

+static struct timeval tsub(struct timeval t1, struct timeval t2)

+{

+    t1.tv_usec -= t2.tv_usec;

+    if (t1.tv_usec<  0) {

+        t1.tv_usec += 1000000;

+        t1.tv_sec--;

+    }

+    t1.tv_sec -= t2.tv_sec;

+    return t1;

+}

+

+void repagent_client_connected(void)

+{

+    /* orim todo thread protection */

+    repagent_report_volumes_to_hub();

+}

diff --git a/replication/repagent.h b/replication/repagent.h

new file mode 100644

index 0000000..98ccbf2

--- /dev/null

+++ b/replication/repagent.h

@@ -0,0 +1,46 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REPAGENT_H

+#define REPAGENT_H

+#include<stdint.h>

+

+#include "qemu-common.h"

+

+typedef struct RepAgentState RepAgentState;

+typedef struct RepCmdStartProtect RepCmdStartProtect;

+typedef struct RepCmdDataStartProtect RepCmdDataStartProtect;

+struct RepCmdReadVolReq;

+

+void repagent_init(const char *hubname, int port);

+void repagent_handle_protected_write(BlockDriverState *bs,

+        int64_t sector_num, int nb_sectors, QEMUIOVector *qiov, int
ret_status);

+void repagent_register_drive(const char *drive_path,

+        BlockDriverState *driver_ptr);

+int repaget_start_protect(RepCmdStartProtect *pcmd,

+        RepCmdDataStartProtect *pcmd_data);

+int repaget_read_vol(struct RepCmdReadVolReq *pcmd, uint8_t *pdata);

+void repagent_client_connected(void);

+

+

+#endif /* REPAGENT_H */

diff --git a/replication/repagent_client.c b/replication/repagent_client.c

new file mode 100644

index 0000000..4dd9ea4

--- /dev/null

+++ b/replication/repagent_client.c

@@ -0,0 +1,138 @@

+#include "repcmd.h"

+#include "rephub_cmds.h"

+#include "repcmd_listener.h"

+#include "repagent_client.h"

+#include "repagent.h"

+

+#include<string.h>

+#include<stdlib.h>

+#include<errno.h>

+#include<stdio.h>

+#include<resolv.h>

+#include<sys/socket.h>

+#include<arpa/inet.h>

+#include<netinet/in.h>

+#include<unistd.h>

+

+#define ZERO_MEM_OBJ(pObj) memset(pObj, 0, sizeof(*pObj))

+

+static void repagent_process_cmd(RepCmd *pCmd, uint8_t *pData, void
*clientPtr);

+

+typedef struct repagent_client_state {

+    int is_connected;

+    int is_terminate_receive;

+    int hsock;

+} repagent_client_state;

+

+static repagent_client_state g_client_state = { 0 };

+

+void *repagent_listen(void *pParam)

+{

+    rephub_params *pServerParams = (rephub_params *) pParam;

+    int host_port = pServerParams->port;

+    const char *host_name = pServerParams->name;

+

+    printf("Creating repagent listener thread...\n");

+    g_free(pServerParams);

+

+    struct sockaddr_in my_addr;

+

+    int err;

+    int retries = 0;

+

+    g_client_state.hsock = socket(AF_INET, SOCK_STREAM, 0);

+    if (g_client_state.hsock == -1) {

+        printf("Error initializing socket %d\n", errno);

+        return (void *) -1;

+    }

+

+    int param = 1;

+

+    if ((setsockopt(g_client_state.hsock, SOL_SOCKET, SO_REUSEADDR,

+            (char *)&param, sizeof(int)) == -1)

+            || (setsockopt(g_client_state.hsock, SOL_SOCKET, SO_KEEPALIVE,

+                    (char *)&param, sizeof(int)) == -1)) {

+        printf("Error setting options %d\n", errno);

+        return (void *) -1;

+    }

+

+    my_addr.sin_family = AF_INET;

+    my_addr.sin_port = htons(host_port);

+    memset(&(my_addr.sin_zero), 0, 8);

+

+    my_addr.sin_addr.s_addr = inet_addr(host_name);

+

+    /* Reconnect loop */

+    while (!g_client_state.is_terminate_receive) {

+

+        if (connect(g_client_state.hsock, (struct sockaddr *)&my_addr,

+                sizeof(my_addr)) == -1) {

+            err = errno;

+            if (err != EINPROGRESS) {

+                retries++;

+                fprintf(

+                        stderr,

+                        "Error connecting socket %d. Host %s, port %u.
Retry count %d\n",

+                        errno, host_name, host_port, retries);

+                usleep(5 * 1000 * 1000);

+                continue;

+            }

+        }

+        retries = 0;

+

+        g_client_state.is_connected = 1;

+

+        repagent_client_connected();

+        repcmd_listener(g_client_state.hsock, repagent_process_cmd, NULL);

+        close(g_client_state.hsock);

+

+        g_client_state.is_connected = 0;

+    }

+    return 0;

+}

+

+void repagent_process_cmd(RepCmd *pcmd, uint8_t *pdata, void *clientPtr)

+{

+    int is_free_data = 1;

+    printf("Repagent got cmd %d\n", pcmd->hdr.cmdid);

+    switch (pcmd->hdr.cmdid) {

+    case REPHUB_CMD_START_PROTECT: {

+        is_free_data = repaget_start_protect((RepCmdStartProtect *) pcmd,

+                (RepCmdDataStartProtect *) pdata);

+    }

+        break;

+    case REPHUB_CMD_READ_VOL_REQ: {

+        is_free_data = repaget_read_vol((RepCmdReadVolReq *) pcmd, pdata);

+    }

+        break;

+    default:

+        assert(0);

+        break;

+

+    }

+

+    if (is_free_data) {

+        g_free(pdata);

+    }

+}

+

+int repagent_client_send(RepCmd *p_cmd)

+{

+    int bytecount = 0;

+    printf("Send cmd %u, data size %u\n", p_cmd->hdr.cmdid,

+            p_cmd->hdr.data_size_bytes);

+    if (!g_client_state.is_connected) {

+        printf("Not connected to hub\n");

+        return -1;

+    }

+

+    bytecount = send(g_client_state.hsock, p_cmd,

+            sizeof(RepCmd) + p_cmd->hdr.data_size_bytes, 0);

+    if (bytecount<  sizeof(RepCmd) + p_cmd->hdr.data_size_bytes) {

+        printf("Bad send %d, errno %d\n", bytecount, errno);

+        return bytecount;

+    }

+

+    /* Success */

+    return 0;

+}

diff --git a/replication/repagent_client.h b/replication/repagent_client.h

new file mode 100644

index 0000000..62a5377

--- /dev/null

+++ b/replication/repagent_client.h

@@ -0,0 +1,36 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REPAGENT_CLIENT_H

+#define REPAGENT_CLIENT_H

+#include "repcmd.h"

+

+typedef struct rephub_params {

+    char *name;

+    int port;

+} rephub_params;

+

+void *repagent_listen(void *pParam);

+int repagent_client_send(RepCmd *p_cmd);

+

+#endif /* REPAGENT_CLIENT_H */

diff --git a/replication/repcmd.h b/replication/repcmd.h

new file mode 100644

index 0000000..8c6cf1b

--- /dev/null

+++ b/replication/repcmd.h

@@ -0,0 +1,59 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REPCMD_H

+#define REPCMD_H

+

+#include<stdint.h>

+

+#define REPCMD_MAGIC1 (0x1122)

+#define REPCMD_MAGIC2 (0x3344)

+#define REPCMD_NUM_U32_PARAMS (11)

+

+enum RepCmds {

+    REPCMD_FIRST_INVALID                    = 0,

+    REPCMD_FIRST_HUBCMD                     = 1,

+    REPHUB_CMD_PROTECTED_WRITE              = 2,

+    REPHUB_CMD_REPORT_VM_VOLUMES            = 3,

+    REPHUB_CMD_START_PROTECT                = 4,

+    REPHUB_CMD_READ_VOL_REQ                 = 5,

+    REPHUB_CMD_READ_VOL_RES                 = 6,

+    REPHUB_CMD_AGENT_SHUTDOWN               = 7,

+};

+

+typedef struct RepCmdHdr {

+    uint16_t magic1;

+    uint16_t cmdid;

+    uint32_t data_size_bytes;

+} RepCmdHdr;

+

+typedef struct RepCmd {

+    RepCmdHdr hdr;

+    unsigned int parameters[REPCMD_NUM_U32_PARAMS];

+    unsigned int magic2;

+    uint8_t data[0];

+} RepCmd;

+

+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata);

+

+#endif /* REPCMD_H */

diff --git a/replication/repcmd_listener.c b/replication/repcmd_listener.c

new file mode 100644

index 0000000..a211927

--- /dev/null

+++ b/replication/repcmd_listener.c

@@ -0,0 +1,137 @@

+#include<fcntl.h>

+#include<string.h>

+#include<stdlib.h>

+#include<errno.h>

+#include<stdio.h>

+#include<netinet/in.h>

+#include<resolv.h>

+#include<sys/socket.h>

+#include<arpa/inet.h>

+#include<unistd.h>

+#include<pthread.h>

+#include<assert.h>

+

+/* Use the CONFIG_REPLICATION flag to determine whether

+ * we're under qemu build or a hub When under

+ * qemu use g_malloc */

+#ifdef CONFIG_REPLICATION

+#include<glib.h>

+#define REPCMD_MALLOC g_malloc

+#else

+#define REPCMD_MALLOC malloc

+#endif

+

+#include "repcmd.h"

+#include "repcmd_listener.h"

+

+#define ZERO_MEM_OBJ(pObj) memset((void *)pObj, 0, sizeof(*pObj))

+

+typedef struct RepCmdListenerState {

+    int is_terminate_receive;

+} RepCmdListenerState;

+

+static RepCmdListenerState g_listenerState = { 0 };

+

+/* Returns 0 for initiated termination or socket error value on error */

+int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
*clientPtr)

+{

+    RepCmd curCmd;

+    uint8_t *pReadBuf = (uint8_t *)&curCmd;

+    int bytesToGet = sizeof(RepCmd);

+    int bytesGotten = 0;

+    int isGotHeader = 0;

+    uint8_t *pdata = NULL;

+

+    assert(callback != NULL);

+

+    /* receive loop */

+    while (!g_listenerState.is_terminate_receive) {

+        int bytecount;

+

+        bytecount = recv(hsock, pReadBuf + bytesGotten,

+                bytesToGet - bytesGotten, 0);

+        if (bytecount == -1) {

+            fprintf(stderr, "Error receiving data %d\n", errno);

+            return errno;

+        }

+

+        if (bytecount == 0) {

+            printf("Disconnected\n");

+            return 0;

+        }

+        bytesGotten += bytecount;

+/*     printf("Recieved bytes %d, got %d/%d\n",

+                bytecount, bytesGotten, bytesToGet); */

+        /* print content */

+        if (0) {

+            int i;

+            for (i = 0; i<  bytecount ; i += 4) {

+                /*printf("%d/%d", i, bytecount/4); */

+                printf("%#x ",

+                        *(int *) (&pReadBuf[bytesGotten - bytecount + i]));

+

+            }

+            printf("\n");

+        }

+        assert(bytesGotten<= bytesToGet);

+        if (bytesGotten == bytesToGet) {

+            int isGotData = 0;

+            bytesGotten = 0;

+            if (!isGotHeader) {

+                /* We just got the header */

+                isGotHeader = 1;

+

+                assert(curCmd.hdr.magic1 == REPCMD_MAGIC1);

+                assert(curCmd.magic2 == REPCMD_MAGIC2);

+                if (curCmd.hdr.data_size_bytes>  0) {

+                    pdata = (uint8_t *)REPCMD_MALLOC(

+                                curCmd.hdr.data_size_bytes);

+/*                    printf("malloc %p\n", pdata); */

+                    pReadBuf = pdata;

+                } else {

+                    /* no data */

+                    isGotData = 1;

+                    pdata = NULL;

+                }

+                bytesToGet = curCmd.hdr.data_size_bytes;

+            } else {

+                isGotData = 1;

+            }

+

+            if (isGotData) {

+                /* Got command and data */

+                (*callback)(&curCmd, pdata, clientPtr);

+

+                /* It's the callee responsibility to free pData */

+                pdata = NULL;

+                ZERO_MEM_OBJ(&curCmd);

+                pReadBuf = (uint8_t *)&curCmd;

+                bytesGotten = 0;

+                bytesToGet = sizeof(RepCmd);

+                isGotHeader = 0;

+            }

+        }

+    }

+    return 0;

+}

+

+RepCmd *repcmd_new(int cmd_id, int data_size, uint8_t **p_out_pdata)

+{

+    RepCmd *p_cmd = (RepCmd *)REPCMD_MALLOC(sizeof(RepCmd) + data_size);

+    assert(p_cmd != NULL);

+

+    /* Zero the CMD (not the data) */

+    ZERO_MEM_OBJ(p_cmd);

+

+    p_cmd->hdr.cmdid = cmd_id;

+    p_cmd->hdr.magic1 = REPCMD_MAGIC1;

+    p_cmd->magic2 = REPCMD_MAGIC2;

+    p_cmd->hdr.data_size_bytes = data_size;

+

+    if (p_out_pdata != NULL) {

+        *p_out_pdata = p_cmd->data;

+    }

+

+    return p_cmd;

+}

+

diff --git a/replication/repcmd_listener.h b/replication/repcmd_listener.h

new file mode 100644

index 0000000..c09a12e

--- /dev/null

+++ b/replication/repcmd_listener.h

@@ -0,0 +1,32 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REPCMD_LISTENER_H

+#define REPCMD_LISTENER_H

+#include<stdint.h>

+typedef void (*pfn_received_cmd_cb)(RepCmd *pCmd,

+                uint8_t *pData, void *clientPtr);

+

+int repcmd_listener(int hsock, pfn_received_cmd_cb callback, void
*clientPtr);

+

+#endif /* REPCMD_LISTENER_H */

diff --git a/replication/rephub_cmds.h b/replication/rephub_cmds.h

new file mode 100644

index 0000000..820c37d

--- /dev/null

+++ b/replication/rephub_cmds.h

@@ -0,0 +1,150 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REPHUB_CMDS_H

+#define REPHUB_CMDS_H

+

+#include<stdint.h>

+#include "repcmd.h"

+#include "rephub_defs.h"

+

+/*********************************************************

+ * RepCmd Report a protected IO

+ *

+ * REPHUB_CMD_PROTECTED_WRITE

+ * Direction: agent->hub

+ *

+ * Any write of a protected volume is send with this

+ * message to the hub, with its status.

+ * When case the status is bad no data is sent

+ *********************************************************/

+typedef struct RepCmdProtectedWrite {

+    RepCmdHdr hdr;

+    uint64_t volume_id;

+    uint64_t offset_sectors;

+    /* The size field duplicates the RepCmd size,

+     * but it is needed for reporting failed IOs' sizes */

+    uint32_t size_sectors;

+    int ret_status;

+} RepCmdProtectedWrite;

+

+/*********************************************************

+ * RepCmd Report VM volumes

+ *

+ * REPHUB_CMD_REPORT_VM_VOLUMES

+ * Direction: agent->hub

+ *

+ * The agent reports all the volumes of the VM

+ * to the hub.

+ *********************************************************/

+typedef struct RepVmVolumeInfo {

+    char name[REPHUB_MAX_VOL_NAME_LEN];

+    uint64_t volume_id;

+    uint32_t size_mb;

+} RepVmVolumeInfo;

+

+typedef struct RepCmdReportVmVolumes {

+    RepCmdHdr hdr;

+    int num_volumes;

+} RepCmdReportVmVolumes;

+

+typedef struct RepCmdDataReportVmVolumes {

+    RepVmVolumeInfo volumes[0];

+} RepCmdDataReportVmVolumes;

+

+

+/*********************************************************

+ * RepCmd Start protect

+ *

+ * REPHUB_CMD_START_PROTECT

+ * Direction: hub->agent

+ *

+ * The hub instructs the agent to start protecting

+ * a volume. When a volume is protected all its writes

+ * are sent to to the hub.

+ * With this command the hub also assigns a volume ID to

+ * the given volume name.

+ *********************************************************/

+typedef struct RepCmdStartProtect {

+    RepCmdHdr hdr;

+    uint64_t volume_id;

+} RepCmdStartProtect;

+

+typedef struct RepCmdDataStartProtect {

+    char volume_name[REPHUB_MAX_VOL_NAME_LEN];

+} RepCmdDataStartProtect;

+

+

+/*********************************************************

+ * RepCmd Read Volume Request

+ *

+ * REPHUB_CMD_READ_VOL_REQ

+ * Direction: hub->agent

+ *

+ * The hub issues a read IO to a protected volume.

+ * This command is used during sync - when the hub needs

+ * to read unsyncronized sections of a protected volume.

+ * This command is a request, the read data is returned

+ * by the response command REPHUB_CMD_READ_VOL_RES

+ *********************************************************/

+typedef struct RepCmdReadVolReq {

+    RepCmdHdr hdr;

+    int req_id;

+    int size_sectors;

+    uint64_t volume_id;

+    uint64_t offset_sectors;

+} RepCmdReadVolReq;

+

+/*********************************************************

+ * RepCmd Read Volume Response

+ *

+ * REPHUB_CMD_READ_VOL_RES

+ * Direction: agent->hub

+ *

+ * A response to REPHUB_CMD_READ_VOL_REQ.

+ * Sends the data read from a protected volume

+ *********************************************************/

+typedef struct RepCmdReadVolRes {

+    RepCmdHdr hdr;

+    int req_id;

+    int is_status_success;

+    uint64_t volume_id;

+} RepCmdReadVolRes;

+

+/*********************************************************

+ * RepCmd Agent shutdown

+ *

+ * REPHUB_CMD_AGENT_SHUTDOWN

+ * Direction: agent->hub

+ *

+ * Notifies the hub that the agent is about to shutdown.

+ * This allows a graceful shutdown. Any disconnection

+ * of an agent without sending this command will result

+ * in a full sync of the VM volumes.

+ *********************************************************/

+typedef struct RepCmdAgentShutdown {

+    RepCmdHdr hdr;

+} RepCmdAgentShutdown;

+

+

+#endif /* REPHUB_CMDS_H */

diff --git a/replication/rephub_defs.h b/replication/rephub_defs.h

new file mode 100644

index 0000000..e34e0ce

--- /dev/null

+++ b/replication/rephub_defs.h

@@ -0,0 +1,40 @@

+/*

+ * QEMU System Emulator

+ *

+ * Copyright (c) 2003-2008 Fabrice Bellard

+ *

+ * Permission is hereby granted, free of charge, to any person obtaining a
copy

+ * of this software and associated documentation files (the "Software"),
to deal

+ * in the Software without restriction, including without limitation the
rights

+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or
sell

+ * copies of the Software, and to permit persons to whom the Software is

+ * furnished to do so, subject to the following conditions:

+ *

+ * The above copyright notice and this permission notice shall be included
in

+ * all copies or substantial portions of the Software.

+ *

+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
OR

+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,

+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL

+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER

+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,

+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN

+ * THE SOFTWARE.

+ */

+#ifndef REP_HUB_DEFS_H

+#define REP_HUB_DEFS_H

+

+#include<stdint.h>

+

+#define REPHUB_MAX_VOL_NAME_LEN (1024)

+#define REPHUB_MAX_NUM_VOLUMES (512)

+

+#ifndef TRUE

+    #define TRUE (1)

+#endif

+

+#ifndef FALSE

+    #define FALSE (0)

+#endif

+

+#endif /* REP_HUB_DEFS_H */

diff --git a/vl.c b/vl.c

index 624da0f..506b5dc 100644

--- a/vl.c

+++ b/vl.c

@@ -167,6 +167,7 @@ int main(int argc, char **argv)

  #include "ui/qemu-spice.h"

+#include "replication/repagent.h"

//#define DEBUG_NET

//#define DEBUG_SLIRP

@@ -2307,6 +2308,15 @@ int main(int argc, char **argv, char **envp)

                  drive_add(IF_DEFAULT, popt->index - QEMU_OPTION_hda,
optarg,

                            HD_OPTS);

                  break;

+            case QEMU_OPTION_repagent:

+#ifdef CONFIG_REPLICATION

+                repagent_init(optarg, 0);

+#else

+                fprintf(stderr, "Replication support is disabled. "

+                    "Don't use -repagent option.\n");

+                exit(1);

+#endif

+                break;

              case QEMU_OPTION_drive:

                  if (drive_def(optarg) == NULL) {

                      exit(1);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]