[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r27930 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r27930 - msh/src |
Date: |
Thu, 11 Jul 2013 17:06:38 +0200 |
Author: harsha
Date: 2013-07-11 17:06:37 +0200 (Thu, 11 Jul 2013)
New Revision: 27930
Added:
msh/src/bitmap.c
msh/src/bitmap.h
msh/src/test_bitmap.c
Modified:
msh/src/
msh/src/Makefile.am
msh/src/common.h
msh/src/mshd.c
msh/src/scheduler.c
msh/src/scheduler.h
Log:
- address verfication
Index: msh/src
===================================================================
--- msh/src 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src 2013-07-11 15:06:37 UTC (rev 27930)
Property changes on: msh/src
___________________________________________________________________
Modified: svn:ignore
## -5,5 +5,4 ##
mping
test-suite.log
test-scheduler*
-test-scheduler*.log
-test-scheduler*.trs
+test-bitmap*
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/Makefile.am 2013-07-11 15:06:37 UTC (rev 27930)
@@ -2,14 +2,16 @@
mping_SOURCES = mping.c
-mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h common.h
+mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h \
+ common.h bitmap.c bitmap.h
mshd_LDADD = -levent
mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
mshd_LDFLAGS = $(LIBEVENT_LDFLAGS)
check_PROGRAMS = \
test-scheduler \
- test-scheduler-socket
+ test-scheduler-socket \
+ test-bitmap
test_scheduler_SOURCES = test_scheduler.c scheduler.c scheduler.h common.h \
util.c util.h common.h
@@ -23,6 +25,8 @@
test_scheduler_socket_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
test_scheduler_socket_LDFLAGS = $(LIBEVENT_LDFLAGS)
+test_bitmap_SOURCES = test_bitmap.c bitmap.c bitmap.h
+
TESTS = \
test-scheduler \
test-scheduler-socket
Added: msh/src/bitmap.c
===================================================================
--- msh/src/bitmap.c (rev 0)
+++ msh/src/bitmap.c 2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,159 @@
+#include "common.h"
+#include "bitmap.h"
+
+/**
+ * @file bitmap
+ * @brief implementation of bitmap array
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+/**
+ * Handler to the bitmap
+ */
+struct BitMap
+{
+ /**
+ * Number of relevant bits in the bitmap
+ */
+ unsigned int len;
+
+ /**
+ * The size of the barray. Should be enough to hold size number of bits
+ */
+ unsigned int array_size;
+
+ /**
+ * The bitmap array
+ */
+ uint32_t barray[0];
+
+};
+
+#define ELE_BITSIZE (8 * sizeof (bm->barray[0]))
+
+/**
+ * Initialize bitmap array
+ *
+ * @param len the number of bits to be present in the bitmap
+ * @return handle to the bitmap
+ */
+struct BitMap *
+bitmap_create (unsigned int len)
+{
+ struct BitMap *bm;
+ unsigned int array_size;
+
+ array_size = (len + ELE_BITSIZE - 1) / ELE_BITSIZE;
+ bm = MSH_malloc (sizeof (struct BitMap) + (array_size * sizeof
+ (bm->barray[0])) );
+ bm->len = len;
+ bm->array_size = array_size;
+ return bm;
+}
+
+
+/**
+ * Destroy a bitmap handle and free its resources
+ *
+ * @param bm the bitmap to destroy
+ */
+void
+bitmap_destroy (struct BitMap *bm)
+{
+ free (bm);
+}
+
+
+/**
+ * Clear all the bits in the bitmap
+ *
+ * @param bm the handle to the bitmap
+ */
+void
+bitmap_clear (struct BitMap *bm)
+{
+ (void) memset (bm->barray, 0, bm->array_size * sizeof(bm->barray[0]));
+}
+
+
+/**
+ * Set the bit at given index to the given value
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @param val the value; should be either 1 or 0
+ */
+void
+bitmap_set (struct BitMap *bm, unsigned int id, int val)
+{
+ unsigned int off;
+ unsigned int bitidx;
+ typeof (bm->barray[0]) one;
+
+ MSH_assert (id < bm->len);
+ MSH_assert ( (0 == val) || (1 == val) );
+ off = id / ELE_BITSIZE;
+ bitidx = id % ELE_BITSIZE;
+ MSH_assert (off < bm->array_size);
+ one = (typeof (bm->barray[0])) 1; /* cast */
+ one = one << bitidx;
+ if (1 == val)
+ bm->barray[off] |= one;
+ else if (0 != (bm->barray[off] & one))
+ bm->barray[off] ^= one;
+}
+
+
+/**
+ * Checks if the given bit in the bit array is set to not
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @return 1 if the given bit is set; 0 otherwise
+ */
+int
+bitmap_isbitset (struct BitMap *bm, unsigned int id)
+{
+ unsigned int off;
+ unsigned int bitidx;
+ typeof (bm->barray[0]) one;
+
+ MSH_assert (id < bm->len);
+ off = id / ELE_BITSIZE;
+ bitidx = id % ELE_BITSIZE;
+ MSH_assert (off < bm->array_size);
+ one = (typeof (bm->barray[0])) 1; /* cast */
+ return (0 == (bm->barray[off] & (one << bitidx))) ? 0 : 1;
+}
+
+
+/**
+ * Check if all relevant bits in the bitmap are set
+ *
+ * @param bm the handle to the bitmap
+ * @return 1 if all the bits are set; 0 if not
+ */
+int
+bitmap_allset (struct BitMap *bm)
+{
+ unsigned int off;
+ unsigned int bitidx;
+ unsigned int cnt;
+ typeof (bm->barray[0]) max;
+
+ max = (typeof (bm->barray[0])) 0;
+ max = max - 1;
+ off = bm->len / ELE_BITSIZE;
+ bitidx = bm->len % ELE_BITSIZE;
+ MSH_assert (off < bm->array_size);
+ for (cnt = 0; cnt < off; cnt ++)
+ {
+ if (0 != max ^ bm->barray[cnt])
+ return 0;
+ }
+ if (0 == bitidx)
+ return 1;
+ max = (typeof (bm->barray[0])) 1;
+ max = (max << bitidx) - 1;
+ return (max == (bm->barray[off] & max)) ? 1 : 0;
+}
Added: msh/src/bitmap.h
===================================================================
--- msh/src/bitmap.h (rev 0)
+++ msh/src/bitmap.h 2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,70 @@
+/**
+ * @file bitmap.h
+ * @brief interface for bitmap array
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#ifndef BITMAP_H_
+#define BITMAP_H_
+
+
+/**
+ * Opaque handle to the bitmap
+ */
+struct BitMap;
+
+
+/**
+ * Initialize bitmap array
+ *
+ * @param len the number of bits to be present in the bitmap
+ * @return handle to the bitmap
+ */
+struct BitMap *
+bitmap_create (unsigned int len);
+
+
+/**
+ * Destroy a bitmap handle and free its resources
+ *
+ * @param bm the bitmap to destroy
+ */
+void
+bitmap_destroy (struct BitMap *bm);
+
+
+/**
+ * Clear all the bits in the bitmap
+ *
+ * @param bm the handle to the bitmap
+ */
+void
+bitmap_clear (struct BitMap *bm);
+
+
+/**
+ * Set the bit at given index to the given value
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @param val the value; should be either 1 or 0
+ */
+void
+bitmap_set (struct BitMap *bm, unsigned int id, int val);
+
+
+/**
+ * Checks if the given bit in the bit array is set to not
+ *
+ * @param bm the handle to the bitmap
+ * @param id the index of the bit to set
+ * @return 1 if the given bit is set; 0 otherwise
+ */
+int
+bitmap_isbitset (struct BitMap *bm, unsigned int id);
+
+
+
+#endif /* #ifndef BITMAP_H_ */
+
+/* End of bitmap.h */
Modified: msh/src/common.h
===================================================================
--- msh/src/common.h 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/common.h 2013-07-11 15:06:37 UTC (rev 27930)
@@ -1,3 +1,12 @@
+/**
+ * @file common.h
+ * @brief common header which is to be included in all sources
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#ifndef COMMON_H_
+#define COMMON_H_
+
#ifndef _GNU_SOURCE
#define _GNU_SOURCE 1
#endif
@@ -203,3 +212,7 @@
(element)->next->prev = (element)->prev; \
(element)->next = NULL; \
(element)->prev = NULL; } while (0)
+
+#endif /* COMMON_H_ */
+
+/* End of common.h */
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/mshd.c 2013-07-11 15:06:37 UTC (rev 27930)
@@ -3,42 +3,123 @@
#include "util.h"
#include "scheduler.h"
#include "mtypes.h"
+#include "bitmap.h"
+
/**
- * The port number of our local socket
+ * An address of an instance
*/
-uint16_t lport;
+struct InstanceAddr
+{
+ /**
+ * The length of the instance address
+ */
+ socklen_t addrlen;
+ /**
+ * The instance address to be followed here
+ */
+ struct sockaddr saddr[0];
+};
+
/**
- * The number of total mshd processes
+ * Instance address information
*/
-static int nproc;
+struct InstanceAddrInfo
+{
+ /**
+ * Array of addresses
+ */
+ struct InstanceAddr **addrs;
+ /**
+ * Number of addresses in the above array
+ */
+ unsigned int naddrs;
+
+ /**
+ * The MPI id of the instance to whom these addresses belong to
+ */
+ unsigned int source;
+};
+
+
/**
- * Rank of this process
+ * Context for verifying addresses
*/
-static int rank;
+struct VerifyAddressesCtx
+{
+ /**
+ * The DLL next ptr
+ */
+ struct VerifyAddressesCtx *next;
+ /**
+ * The DLL prev ptr
+ */
+ struct VerifyAddressesCtx *prev;
+
+ /**
+ * The instance addresses
+ */
+ struct InstanceAddrInfo *iainfo;
+
+ /**
+ * The socket open handle to the instance address
+ */
+ struct SocketOpenHandle *soh;
+
+ /**
+ * close task handle
+ */
+ struct Task *close_task;
+
+ /**
+ * The index of the address being verified in association with this context
+ */
+ unsigned int naddr;
+
+ /**
+ * The socket file descriptor associated with the connection used to verify
+ * the address
+ */
+ int sock;
+};
+
+
+struct ReadContext
+{
+ struct ReadContext *next;
+
+ struct ReadContext *prev;
+
+ /* struct sockaddr_in addr; */
+
+ /* socklen_t addrlen; */
+
+ struct Task *task;
+};
+
+
/**
- * Array of our IP addresses in network-byte format
+ * DLL head for address verification contexts
*/
-static in_addr_t *s_addrs;
+static struct VerifyAddressesCtx *vactx_head;
/**
- * Number of IP addresses
+ * DLL tail for address verification contexts
*/
-static unsigned int nips;
+static struct VerifyAddressesCtx *vactx_tail;
/**
- * Current IP verification round
+ * Task for finalising a round
*/
-static unsigned int round;
+static struct Task *finalise_task;
/**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
+ * Array of our IP addresses in network-byte format
*/
-static unsigned int rwidth;
+static in_addr_t *s_addrs;
/**
* Tasks for handling SIGINT and SIGTERM
@@ -56,60 +137,75 @@
static struct Task *atask;
/**
- * Array for checking which MPI processes have verified our addresses in the
+ * Bitmap for checking which MPI processes have verified our addresses in the
* current round
*/
-static uint8_t *barray;
+static struct BitMap *bitmap;
-static size_t barray_size;
+/**
+ * Instances addresses learnt in the current round
+ */
+struct InstanceAddrInfo **riainfos;
-static void
-barray_init ()
-{
- barray_size = (rwidth + sizeof (barray[0]) - 1) / sizeof (barray[0]);
- barray = MSH_malloc (barray_size);
-}
+/**
+ * head for read context DLL
+ */
+static struct ReadContext *rhead;
-static void
-barray_destroy ()
-{
- free (barray);
- barray = NULL;
-}
+/**
+ * tail for read context DLL
+ */
+static struct ReadContext *rtail;
-static void
-barray_clear ()
-{
- (void) memset (barray, 0, barray_size);
-}
+/**
+ * The number of total mshd processes
+ */
+static int nproc;
-static void
-barray_set (unsigned int id)
+/**
+ * Rank of this process
+ */
+static int rank;
+
+/**
+ * The listen socket for the current round
+ */
+static int listen_sock;
+
+/**
+ * Number of IP addresses
+ */
+static unsigned int nips;
+
+/**
+ * Current IP verification round
+ */
+static unsigned int round;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+static unsigned int rwidth;
+
+/**
+ * The port number of our local socket
+ */
+uint16_t lport;
+
+
+static char *
+saddr2str (const struct sockaddr *addr, const socklen_t addrlen)
{
- unsigned int off;
- unsigned int idx;
- typeof (barray[0]) one;
-
- off = id / sizeof (barray[0]);
- idx = id % sizeof (barray[0]);
- MSH_assert (off < barray_size);
- one = (typeof (barray[0])) 1; /* cast */
- MSH_assert (0 == (barray[off] & (one << idx)) );
- barray[off] = barray[off] | (one << idx);
-}
+ static char hostip[NI_MAXHOST];
-static int
-barray_isset (unsigned int id)
-{
- unsigned int off;
- unsigned int idx;
- typeof (barray[0]) one;
-
- off = id / sizeof (barray[0]);
- idx = id % sizeof (barray[0]);
- MSH_assert (off < barray_size);
- one = (typeof (barray[0])) 1; /* cast */
- return (0 == (barray[off] & (one << idx)) ) ? 0 : 1;
+ if (0 != getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0,
+ NI_NUMERICHOST))
+ {
+ LOG_STRERROR ("getnameinfo");
+ return NULL;
+ }
+ return hostip;
}
@@ -132,17 +228,14 @@
const struct sockaddr *netmask,
socklen_t addrlen)
{
- char hostip[NI_MAXHOST];
+ char *hostip;
const struct sockaddr_in *inaddr;
if (sizeof (struct sockaddr_in) != addrlen)
return MSH_OK; /* Only consider IPv4 for now */
- if (0 !=
- getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST))
- {
- LOG_STRERROR ("getnameinfo");
+ hostip = saddr2str (addr, addrlen);
+ if (NULL == hostip)
return MSH_OK;
- }
inaddr = (const struct sockaddr_in *) addr;
MSH_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
@@ -150,24 +243,6 @@
}
-struct ReadContext
-{
- struct ReadContext *next;
-
- struct ReadContext *prev;
-
- /* struct sockaddr_in addr; */
-
- /* socklen_t addrlen; */
-
- struct Task *task;
-};
-
-static struct ReadContext *rhead;
-
-static struct ReadContext *rtail;
-
-
/**
* Task to read from socket
*
@@ -190,7 +265,7 @@
MSH_close (sock);
return;
}
- rsize = read (sock, &cid, sizeof (cid));
+ rsize = read (sock, &cid, sizeof (uint32_t));
if (rsize < 0)
{
LOG_STRERROR ("read");
@@ -202,8 +277,9 @@
goto err_ret;
}
cid = ntohl (cid);
- if (!barray_isset (cid))
- barray_set (cid);
+ LOG_DEBUG ("%d: read id %u from connection\n", rank, cid);
+ /* if (!barray_isset (cid)) */
+ /* barray_set (cid); */
MSH_close (sock);
return;
@@ -234,7 +310,7 @@
(void) close (sock);
return;
}
- LOG_DEBUG ("Got a connect\n");
+ LOG_DEBUG ("%d: Got a connect\n", rank);
if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
{
LOG_STRERROR ("accept4");
@@ -245,25 +321,286 @@
rctx = MSH_malloc (sizeof (struct ReadContext));
DLL_insert_tail (rhead, rtail, rctx);
rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
+ /* resume accepting connections on the listen sock */
+ atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
}
+/**
+ * Task for running a round
+ *
+ * @param nosock we have no sockets associated with this callback
+ * @param flags EV_* flags
+ * @param cls NULL
+ */
+static void
+run_round (evutil_socket_t nosock, short flags, void *cls);
+
+
+/**
+ * Schedules next round
+ */
+static void
+schedule_next_round ()
+{
+ int trounds;
+
+ MSH_assert (NULL == rtask);
+ /* Number of rounds required to contact all processes except ourselves
(rwidth
+ in parallel in each round) */
+ trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+ if (round < trounds)
+ rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+ else
+ LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
+}
+
+
+/**
+ * Free an instance's address information
+ *
+ * @param iainfos the instance address information
+ */
+static void
+free_instance_addresses (struct InstanceAddrInfo *iainfos)
+{
+ unsigned int cnt;
+
+ for (cnt = 0; cnt < iainfos->naddrs; cnt++)
+ free (iainfos->addrs[cnt]);
+ free (iainfos->addrs);
+ free (iainfos);
+}
+
+
+/**
+ * Callback trigger to finalise a round
+ *
+ * @param sock -1 do not use this
+ * @param flags EV_* flags
+ * @param cls
+ */
+static void
+finalise_round (evutil_socket_t sock, short flags, void *cls)
+{
+ struct VerifyAddressesCtx *ctx;
+ unsigned int cnt;
+
+ scheduler_remove (finalise_task);
+ finalise_task = NULL;
+ while (NULL != (ctx = vactx_head))
+ {
+ if (NULL != ctx->soh)
+ scheduler_open_socket_cancel (ctx->soh);
+ if (NULL != ctx->close_task)
+ {
+ MSH_close (ctx->sock);
+ scheduler_remove (ctx->close_task);
+ }
+ DLL_remove (vactx_head, vactx_tail, ctx);
+ free (ctx);
+ }
+ for (cnt = 0; cnt < rwidth; cnt++)
+ free_instance_addresses (riainfos[cnt]);
+ if (IS_SHUTDOWN_EVENT (flags))
+ return;
+ MSH_close (listen_sock);
+ listen_sock = -1;
+ scheduler_remove (atask);
+ atask = NULL;
+ if (1 != bitmap_allset (bitmap))
+ {
+ LOG_ERROR ("Could not verify addresses of all hosts\n");
+ scheduler_shutdown ();
+ return;
+ }
+ round++;
+ schedule_next_round ();
+}
+
+
+/**
+ * Callback triggered when the data on the sock is written. This function
+ * closes the socket.
+ *
+ * @param sock the socket file descriptor
+ * @param flags EV_* flags
+ * @param cls context for verifying addresses
+ */
+static void
+socket_close_cb (evutil_socket_t sock, short flags, void *cls)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ int lb;
+
+ scheduler_remove (ctx->close_task);
+ ctx->close_task = NULL;
+ if (ctx->sock == sock)
+ MSH_close (sock);
+ else if (-1 == sock)
+ MSH_break (0);
+ if (IS_SHUTDOWN_EVENT (flags))
+ {
+ DLL_remove (vactx_head, vactx_tail, ctx);
+ free (ctx);
+ return;
+ }
+ /* FIXME: add the addresses associated with the contex to the mapping */
+ lb = rank - round * rwidth - rwidth + nproc;
+ MSH_assert (0 <= lb);
+ lb %= nproc;
+ MSH_assert (lb <= ctx->iainfo->source);
+ bitmap_set (bitmap, ctx->iainfo->source - lb, 1);
+ return;
+}
+
+
+/**
+ * Callback triggered when a socket connection is ready to be written to
+ *
+ * @param sockfd the file descriptor of the socket which is ready to be written
+ * to
+ * @param cls context information for verifying an instance address
+ */
+static void
+socket_open_cb (int sockfd, void *cls)
+{
+ struct VerifyAddressesCtx *ctx = cls;
+ struct InstanceAddr *iaddr;
+ uint32_t id;
+
+ ctx->soh = NULL;
+ if (-1 == sockfd)
+ {
+ MSH_break (0);
+ /* FIXME: Check if we already got a mapping for the instance */
+ goto err_ret;
+ }
+ iaddr = ctx->iainfo->addrs[ctx->naddr];
+ LOG_DEBUG ("%d: Opened a connection to %s\n", rank,
+ saddr2str (iaddr->saddr, iaddr->addrlen));
+ ctx->sock = sockfd;
+ id = htonl ((uint32_t) rank);
+ if (sizeof (uint32_t) != write (sockfd, &id, sizeof (uint32_t)))
+ {
+ MSH_break (0); /* FIXME: handle error */
+ MSH_close (sockfd);
+ goto err_ret;
+ }
+ ctx->close_task =
+ scheduler_add_socket (sockfd, EV_WRITE, &socket_close_cb, ctx, NULL);
+ return;
+
+ err_ret:
+ DLL_remove (vactx_head, vactx_tail, ctx);
+ free (ctx);
+}
+
+
+/**
+ * Verify the addresses of an instance by connecting to the instance's listen
+ * socket
+ *
+ * @param iainfo the instance's address information
+ * @return MSH_OK upon success initialisation of the connection to instance's
+ * listen socket (this does not mean that the connection is
+ * established or an address is verified); MSH_SYSERR upon error
+ */
static int
+verify_addresses (struct InstanceAddrInfo *iainfo)
+{
+ struct VerifyAddressesCtx *ctx;
+ struct InstanceAddr *iaddr;
+ unsigned int cnt;
+
+ for (cnt = 0; cnt < iainfo->naddrs; cnt++)
+ {
+ iaddr = iainfo->addrs[cnt];
+ ctx = MSH_malloc (sizeof (struct VerifyAddressesCtx));
+ ctx->naddr = cnt;
+ ctx->soh = scheduler_open_socket (iaddr->saddr, iaddr->addrlen,
+ &socket_open_cb, ctx);
+ ctx->iainfo = iainfo;
+ ctx->sock = -1;
+ if (NULL == ctx->soh)
+ {
+ MSH_break (0);
+ free (ctx);
+ return MSH_SYSERR;
+ }
+ DLL_insert_tail (vactx_head, vactx_tail, ctx);
+ }
+ return MSH_OK;
+}
+
+
+/**
+ * Parse a verfication message from a source for its address information
+ *
+ * @param msg the message to parse
+ * @param source the MPI id of the instance which has sent this message
+ * @return the instance's address information
+ */
+static struct InstanceAddrInfo *
+parse_verify_address_msg (struct MSH_MSG_VerifyAddress *msg, int source)
+{
+ struct InstanceAddr *addr;
+ struct sockaddr_in *inaddr;
+ struct InstanceAddrInfo *iainfo;
+ size_t size;
+ socklen_t addrlen;
+ uint16_t port;
+ uint16_t nips;
+ uint16_t cnt;
+
+ size = ntohs (msg->header.size);
+ nips = ntohs (msg->nips);
+ if (size != (sizeof (struct MSH_MSG_VerifyAddress)
+ + (sizeof (uint32_t) * nips)))
+ {
+ LOG_ERROR ("Parsing failed\n");
+ return NULL;
+ }
+ iainfo = MSH_malloc (sizeof (struct InstanceAddrInfo));
+ iainfo->source = source;
+ for (cnt = 0; cnt < nips; cnt++)
+ {
+ addrlen = sizeof (struct sockaddr_in); /* IPv4 */
+ addr = MSH_malloc (sizeof (struct InstanceAddr) + addrlen);
+ addr->addrlen = addrlen;
+ inaddr = (struct sockaddr_in *) addr->saddr;
+ inaddr->sin_family = AF_INET;
+ /* assign directly as address and port already in NB format */
+ inaddr->sin_port = msg->port;
+ inaddr->sin_addr.s_addr = (in_addr_t) msg->ipaddrs[cnt];
+ MSH_array_append (iainfo->addrs, iainfo->naddrs, addr);
+ }
+ return iainfo;
+}
+
+
+/**
+ * Receives the IP addresses to verify in the current round from instances
+ *
+ * @return an array containing the instance addresses; NULL upon a receive
error
+ */
+static struct InstanceAddrInfo **
receive_addresses ()
{
- struct MSH_MSG_VerifyAddress **rmsgs;
+ struct InstanceAddrInfo **iainfos;
MPI_Status status;
- int rsize;
- int lb;
- int up;
- int source;
- int ret;
int cnt;
- ret = MSH_SYSERR;
- rmsgs = MSH_malloc (sizeof (struct MSH_MSG_VerifyAddress *) * rwidth);
+ iainfos = MSH_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
for (cnt=0; cnt < rwidth; cnt++)
{
+ struct MSH_MSG_VerifyAddress *msg;
+ int rsize;
+ int lb;
+ int up;
+ int source;
+ int ret;
+
MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
&status);
MPI_Get_elements (&status, MPI_BYTE, &rsize);
@@ -288,29 +625,32 @@
MSH_break (0);
goto err_ret;
}
- rmsgs[cnt] = MSH_malloc (rsize);
- if (MPI_SUCCESS != MPI_Recv (rmsgs[cnt], rsize, MPI_BYTE, source,
+ msg = MSH_malloc (rsize);
+ if (MPI_SUCCESS != MPI_Recv (msg, rsize, MPI_BYTE, source,
MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
MPI_STATUS_IGNORE))
{
MSH_break (0);
goto err_ret;
}
+ if (NULL == (iainfos[cnt] = parse_verify_address_msg (msg, source)))
+ {
+ free (msg);
+ goto err_ret;
+ }
+ free (msg);
LOG_DEBUG ("%d: Received message of size %d from %d\n", rank, rsize,
source);
}
- /* remove this later on and do something useful */
- for (cnt = 0; cnt < rwidth; cnt++)
+ return iainfos;
+
+ err_ret:
+ for (cnt=0; cnt < rwidth; cnt++)
{
- MSH_free_non_null (rmsgs[cnt]);
- rmsgs[cnt] = NULL;
+ if (NULL != iainfos[cnt])
+ free_instance_addresses (iainfos[cnt]);
}
- ret = MSH_OK;
-
- err_ret:
- for (cnt = 0; cnt < rwidth; cnt++)
- MSH_free_non_null (rmsgs[cnt]);
- free (rmsgs);
- return ret;
+ free (iainfos);
+ return NULL;
}
@@ -321,7 +661,7 @@
* @return MSH_OK on success; MSH_SYSERR upon error
*/
static int
-send_receive_addresses ()
+send_addresses ()
{
struct MSH_MSG_VerifyAddress *msg;
struct MSH_MSG_VerifyAddress *cpys;
@@ -338,7 +678,9 @@
msg->port = htons (lport);
msg->nips = htons (nips);
for (cnt = 0; cnt < nips; cnt++)
+ {
msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+ }
width = rwidth;
if ( (0 != ( (nproc - 1) % rwidth)) && (round == ( (nproc - 1) / rwidth)) )
width = (nproc - 1) % rwidth;
@@ -366,7 +708,7 @@
MSH_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
}
- goto end;
+ goto err_ret;
}
for (cnt=0; cnt < width; cnt++)
{
@@ -378,10 +720,8 @@
free (cpys);
cpys = NULL;
}
- if (MSH_SYSERR == receive_addresses ())
- goto end;
-
- end:
+
+ err_ret:
MSH_free_non_null (cpys);
MSH_free_non_null (sreqs);
return (MPI_SUCCESS == ret) ? MSH_OK : MSH_SYSERR;
@@ -389,17 +729,22 @@
/**
- * Verify IP addresses of all the hosts where mshd services are running
+ * This functions opens a listen socket, sends this instance's IP addresses to
+ * other instances and receives their IP addresses, starts accepting
connections
+ * on listen socket and verifies the IP addresses of other instances by
+ * connecting to their listen sockets
*
* @return MSH_OK if verification is successful; MSH_SYSERR upon error (an
error
* message is logged)
*/
static int
-verify_addresses ()
+run_round_ ()
{
struct sockaddr_in addr;
+ struct timeval tv;
socklen_t addrlen;
int sock;
+ unsigned int cnt;
addrlen = sizeof (struct sockaddr_in);
(void) memset (&addr, 0, addrlen);
@@ -412,21 +757,27 @@
MSH_break (0);
goto clo_ret;
}
- LOG_DEBUG ("Bound to local port %u\n", lport);
if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
{
MSH_break (0);
goto clo_ret;
}
- if (MSH_SYSERR == send_receive_addresses ())
+ if (MSH_SYSERR == send_addresses ())
goto clo_ret;
- MSH_close (sock); /* FIXME: remove later */
+ if (NULL == (riainfos = receive_addresses ()))
+ goto clo_ret;
atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
{
MSH_break (0);
goto clo_ret;
}
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ for (cnt = 0; NULL != riainfos[cnt]; cnt++)
+ verify_addresses (riainfos[cnt]);
+ listen_sock = sock;
+ finalise_task = scheduler_add (&finalise_round, NULL, &tv);
return MSH_OK;
clo_ret:
@@ -465,49 +816,14 @@
* @param cls NULL
*/
static void
-run_round (evutil_socket_t nosock, short flags, void *cls);
-
-
-/**
- * Schedules next round
- */
-static void
-schedule_next_round ()
-{
- int trounds;
-
- MSH_assert (NULL == rtask);
- /* Number of rounds required to contact all processes except ourselves
(rwidth
- in parallel in each round) */
- trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
- if (round < trounds)
- rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
- else
- LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
-}
-
-
-/**
- * Task for running a round
- *
- * @param nosock we have no sockets associated with this callback
- * @param flags EV_* flags
- * @param cls NULL
- */
-static void
run_round (evutil_socket_t nosock, short flags, void *cls)
{
scheduler_remove (rtask);
rtask = NULL;
if (IS_SHUTDOWN_EVENT (flags))
return;
- if (MSH_OK == verify_addresses ())
- {
- round++;
- scheduler_remove (atask);
- atask = NULL;
- schedule_next_round ();
- }
+ if (MSH_OK != run_round_ ())
+ scheduler_shutdown ();
}
@@ -531,6 +847,9 @@
}
+/**
+ * Prints help message for this program
+ */
static void
print_help ()
{
@@ -546,6 +865,14 @@
fprintf (stderr, "%s", msg);
}
+
+/**
+ * The execution start point
+ *
+ * @param argc the number of arguments
+ * @param argv the argument strings
+ * @return 0 for successful termination; 1 for termination upon error
+ */
int
main (int argc, char **argv)
{
@@ -555,6 +882,7 @@
ret = 1;
rwidth = 1;
+ listen_sock = -1;
while (-1 != (c = getopt (argc, argv, "hw:")))
{
switch (c)
@@ -614,17 +942,21 @@
LOG_ERROR ("No IP addresses found\n");
goto fail;
}
- barray_init ();
+ bitmap = bitmap_create (rwidth);
if (MSH_OK != scheduler_run (&run, NULL))
{
MSH_break (0);
- barray_destroy ();
goto fail;
}
- barray_destroy ();
+ //barray_destroy ();
ret = 0;
fail:
+ if (NULL != bitmap)
+ {
+ bitmap_destroy (bitmap);
+ bitmap = NULL;
+ }
MSH_break (MPI_SUCCESS == MPI_Finalize());
MSH_free_non_null (s_addrs);
//libevent_global_shutdown ();
Modified: msh/src/scheduler.c
===================================================================
--- msh/src/scheduler.c 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/scheduler.c 2013-07-11 15:06:37 UTC (rev 27930)
@@ -6,6 +6,10 @@
#include "scheduler.h"
+/**
+ * variable for 0 time. Externalised in scheduler.h
+ */
+struct timeval tv_immediate;
struct Task
{
@@ -32,10 +36,25 @@
*/
static struct Task *ttail;
-
+/**
+ * Our event base
+ */
static struct event_base *ebase;
+/**
+ * Adds a task which is to be executed when one of the given events are
+ * triggered on the given socket or upon the expiry of the given timeout
+ *
+ * @param sock the sock to wait for
+ * @param flags EV_* events; the callback cb
+ * @param cb the callback to call when one of the events marked in flags are
+ * triggered on for the sock
+ * @param cls closure for the callback
+ * @param tv how long should we wait for the events. Upon this value the cb is
+ * called with EV_TIMEOUT flag. Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
struct Task *
scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb,
void *cls, const struct timeval *tv)
@@ -55,6 +74,15 @@
}
+/**
+ * Adds a task which is to be executed after given interval
+ *
+ * @param cb the callback to call for executing the task
+ * @param cls closure for the above callback
+ * @param tv the interval after which the task has to be executed; NULL to
+ * denote infinite delay
+ * @return handle for task; NULL upon error
+ */
struct Task *
scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv)
{
@@ -62,6 +90,17 @@
}
+/**
+ * Add a task to be executed upon reception of a signal or upon the expiry of a
+ * given timeout
+ *
+ * @param signal the signal to wait for
+ * @param cb the callback to call upon reception of the signal
+ * @param cls closure for the above callback
+ * @param tv how long should we wait for the signal before. Upon this value
the cb is
+ * called with EV_TIMEOUT flag. Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
struct Task *
scheduler_add_signal (int signal, event_callback_fn cb, void *cls,
const struct timeval *tv)
@@ -70,6 +109,12 @@
}
+/**
+ * Remove a task. All tasks are to be removed (even after their respective
+ * callbacks are executed)
+ *
+ * @param task the task handle to remove
+ */
void
scheduler_remove (struct Task *task)
{
@@ -79,6 +124,14 @@
free (task);
}
+/**
+ * Shutdowns the scheduler. All pending tasks are executed (their respective
+ * callbacks will be called). Use IS_SHUTDOWN_EVENT() to check if the
callbacks
+ * are called upon scheduler's shutdown. It is not possible to add any tasks
+ * after this function is called.
+ *
+ * @see IS_SHUTDOWN_EVENT
+ */
void
scheduler_shutdown ()
{
@@ -91,6 +144,15 @@
}
+/**
+ * Run the scheduler loop by calling the given callback. This function returns
+ * once all tasks are finished or after a call to scheduler_shutdown() (which
+ * causes all waiting tasks to be executed)
+ *
+ * @param cb the callback to call when the scheduler is ready. Further tasks
+ * can be added through this callback.
+ * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
+ */
int
scheduler_run (event_callback_fn cb, void *cls)
{
@@ -114,17 +176,41 @@
}
+/**
+ * Handle to be returned from scheduler_open_socket()
+ */
struct SocketOpenHandle
{
+ /**
+ * the function to call when the socket is ready
+ */
socket_open_fn cb;
+ /**
+ * The closure for the above callback
+ */
void *cls;
+ /**
+ * The task associated with the socket. Will be executed when the connection
+ * on the socket is ready
+ */
struct Task *task;
+ /**
+ * The file descriptor of the socket
+ */
int sock;
};
+
+/**
+ * Callback that will be called when the socket is ready for reading
+ *
+ * @param sock the file descriptor of the socket
+ * @param flags EV_* flags
+ * @param cls the closure
+ */
static void
open_socket_cb (evutil_socket_t sock, short flags, void *cls)
{
@@ -163,6 +249,17 @@
}
+/**
+ * Open a socket, connect it to the target address and schedule a task to be
+ * executed when the connection is ready.
+ *
+ * @param addr the target address to connect
+ * @param addrlen the length of the addr
+ * @param cb the callback to call to signal success or failure
+ * @param cls the closure for the above callback
+ * @return a handle which can be used to cancel the task to be executed when
the
+ * connection is ready; NULL upon error
+ */
struct SocketOpenHandle *
scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
socket_open_fn cb, void *cls)
@@ -181,6 +278,12 @@
return h;
}
+
+/**
+ * Cancel a handle created with scheduler_open_socket()
+ *
+ * @param h the handle to cancel
+ */
void
scheduler_open_socket_cancel (struct SocketOpenHandle *h)
{
Modified: msh/src/scheduler.h
===================================================================
--- msh/src/scheduler.h 2013-07-11 15:04:18 UTC (rev 27929)
+++ msh/src/scheduler.h 2013-07-11 15:06:37 UTC (rev 27930)
@@ -1,37 +1,151 @@
+/**
+ * @file scheduler.h
+ * @brief interface for task scheduler based on libevent
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#ifndef SCHEDULER_H_
+#define SCHEDULER_H_
+
#include "common.h"
#include "event2/event.h"
-static struct timeval tv_immediate;
+extern struct timeval tv_immediate;
+/**
+ * Use this for scheduling tasks immediately
+ */
#define TV_IMMEDIATE &tv_immediate
+/**
+ * Returns true if the flags denote a shutdown event
+ */
#define IS_SHUTDOWN_EVENT(flags) ((flags & (EV_READ | EV_WRITE | EV_TIMEOUT))
== (EV_READ | EV_WRITE | EV_TIMEOUT))
+
+/**
+ * Opaque handle for a task
+ */
struct Task;
+
+/**
+ * Adds a task which is to be executed when one of the given events are
+ * triggered on the given socket or upon the expiry of the given timeout
+ *
+ * @param sock the sock to wait for
+ * @param flags EV_* events; the callback cb
+ * @param cb the callback to call when one of the events marked in flags are
+ * triggered on for the sock
+ * @param cls closure for the callback
+ * @param tv how long should we wait for the events. Upon this value the cb is
+ * called with EV_TIMEOUT flag. Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
struct Task *
scheduler_add_socket (evutil_socket_t sock, short flags, event_callback_fn cb,
void *cls, const struct timeval *tv);
+
+/**
+ * Adds a task which is to be executed after given interval
+ *
+ * @param cb the callback to call for executing the task
+ * @param cls closure for the above callback
+ * @param tv the interval after which the task has to be executed; NULL to
+ * denote infinite delay
+ * @return handle for task; NULL upon error
+ */
struct Task *
scheduler_add (event_callback_fn cb, void *cls, const struct timeval *tv);
+/**
+ * Add a task to be executed upon reception of a signal or upon the expiry of a
+ * given timeout
+ *
+ * @param signal the signal to wait for
+ * @param cb the callback to call upon reception of the signal
+ * @param cls closure for the above callback
+ * @param tv how long should we wait for the signal before. Upon this value
the cb is
+ * called with EV_TIMEOUT flag. Use NULL to wait forever.
+ * @return handle for the task; NULL upon error
+ */
struct Task *
scheduler_add_signal (int signal, event_callback_fn cb, void *cls,
const struct timeval *tv);
+
+/**
+ * Remove a task. All tasks are to be removed (even after their respective
+ * callbacks are executed)
+ *
+ * @param task the task handle to remove
+ */
void
scheduler_remove (struct Task *task);
+
+/**
+ * Shutdowns the scheduler. All pending tasks are executed (their respective
+ * callbacks will be called). Use IS_SHUTDOWN_EVENT() to check if the
callbacks
+ * are called upon scheduler's shutdown. It is not possible to add any tasks
+ * after this function is called.
+ *
+ * @see IS_SHUTDOWN_EVENT
+ */
void
scheduler_shutdown ();
+
+/**
+ * Run the scheduler loop by calling the given callback. This function returns
+ * once all tasks are finished or after a call to scheduler_shutdown() (which
+ * causes all waiting tasks to be executed)
+ *
+ * @param cb the callback to call when the scheduler is ready. Further tasks
+ * can be added through this callback.
+ * @return MSH_OK if all tasks are successfully executed; MSH_SYSERR upon error
+ */
int
scheduler_run (event_callback_fn cb, void *cls);
+
+/**
+ * The type of the function which is used as a callback argument to
+ * scheduler_open_socket(). The callback will be called when a socket
+ * connection is either successfully established or failed
+ *
+ * @param sockfd the socket file descriptor; upon failure its value is -1
+ * @param cls the closure for this callback as passed to
scheduler_open_socket()
+ */
typedef void (* socket_open_fn) (int sockfd, void *cls);
+
+/**
+ * Open a socket, connect it to the target address and schedule a task to be
+ * executed when the connection is ready.
+ *
+ * @param addr the target address to connect
+ * @param addrlen the length of the addr
+ * @param cb the callback to call to signal success or failure
+ * @param cls the closure for the above callback
+ * @return a handle which can be used to cancel the task to be executed when
the
+ * connection is ready; NULL upon error
+ */
struct SocketOpenHandle *
scheduler_open_socket (const struct sockaddr *addr, const socklen_t addrlen,
socket_open_fn cb, void *cls);
+
+
+/**
+ * Cancel a handle created with scheduler_open_socket()
+ *
+ * @param h the handle to cancel
+ */
+void
+scheduler_open_socket_cancel (struct SocketOpenHandle *h);
+
+#endif /* SCHEDULER_H_ */
+
+/* End of scheduler.h */
Added: msh/src/test_bitmap.c
===================================================================
--- msh/src/test_bitmap.c (rev 0)
+++ msh/src/test_bitmap.c 2013-07-11 15:06:37 UTC (rev 27930)
@@ -0,0 +1,43 @@
+/**
+ * @file test_bitmap.c
+ * @brief testcase for bitmap
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#include "common.h"
+#include "bitmap.h"
+
+#define FAIL_TEST(cond,ret) \
+do { if (!cond){fprintf (stderr, "Assertion failed at %s:%d\n", __FILE__,
__LINE__); ret;} }while(0)
+
+int main ()
+{
+ struct BitMap *bm;
+ unsigned int len = 80;
+ unsigned int cnt;
+
+ bm = bitmap_create (len);
+ bitmap_set (bm, 13, 1);
+ MSH_assert (1 == bitmap_isbitset (bm, 13));
+ for (cnt = 0; cnt < len; cnt++)
+ {
+ if (cnt == 13)
+ continue;
+ FAIL_TEST (0 == bitmap_isbitset (bm, cnt), return 1);
+ }
+ bitmap_clear (bm);
+ for (cnt = 0; cnt < len; cnt++)
+ {
+ FAIL_TEST (0 == bitmap_isbitset (bm, cnt), return 1);
+ }
+ bitmap_destroy (bm);
+ len = 9;
+ bm = bitmap_create (len);
+ for (cnt = 0; cnt < len; cnt++)
+ {
+ bitmap_set (bm, cnt, 1);
+ }
+ FAIL_TEST (1 == bitmap_allset (bm), return 1);
+ bitmap_destroy (bm);
+ return 0;
+}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r27930 - msh/src,
gnunet <=