gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21055 - in gnunet/src: dht include


From: gnunet
Subject: [GNUnet-SVN] r21055 - in gnunet/src: dht include
Date: Fri, 20 Apr 2012 19:18:14 +0200

Author: bartpolot
Date: 2012-04-20 19:18:14 +0200 (Fri, 20 Apr 2012)
New Revision: 21055

Modified:
   gnunet/src/dht/dht.h
   gnunet/src/dht/dht_api.c
   gnunet/src/dht/gnunet-service-dht_clients.c
   gnunet/src/dht/gnunet-service-dht_clients.h
   gnunet/src/dht/gnunet-service-dht_neighbours.c
   gnunet/src/include/gnunet_protocols.h
Log:
- Rewritten DHT monitoring


Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h        2012-04-20 12:35:09 UTC (rev 21054)
+++ gnunet/src/dht/dht.h        2012-04-20 17:18:14 UTC (rev 21055)
@@ -249,6 +249,48 @@
 
 
 /**
+ * Message to request monitoring messages, clients --> DHT service.
+ */
+struct GNUNET_DHT_MonitorStartMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_START
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Flag whether to notify about GET messages.
+   */
+  int16_t get GNUNET_PACKED;
+
+  /**
+   * Flag whether to notify about GET_REPONSE messages.
+   */
+  int16_t get_resp GNUNET_PACKED;
+
+  /**
+   * Flag whether to notify about PUT messages.
+   */
+  int16_t put GNUNET_PACKED;
+
+  /**
+   * Flag whether to use the provided key to filter messages.
+   */
+  int16_t filter_key GNUNET_PACKED;
+
+  /**
+   * The key to filter messages by..
+   */
+  GNUNET_HashCode key;
+};
+
+
+/**
  * Message to monitor get requests going through peer, DHT service --> clients.
  */
 struct GNUNET_DHT_MonitorGetMessage
@@ -296,7 +338,7 @@
 /**
  * Message to monitor get results going through peer, DHT service --> clients.
  */
-struct GNUNET_DHT_MonitorGetResultMessage
+struct GNUNET_DHT_MonitorGetRespMessage
 {
   /**
    * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT

Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c    2012-04-20 12:35:09 UTC (rev 21054)
+++ gnunet/src/dht/dht_api.c    2012-04-20 17:18:14 UTC (rev 21055)
@@ -171,11 +171,21 @@
   GNUNET_HashCode *key;
 
   /**
-   * Callback for each received message of interest.
+   * Callback for each received message of type get.
    */
-  GNUNET_DHT_MonitorCB cb;
+  GNUNET_DHT_MonitorGetCB get_cb;
 
   /**
+   * Callback for each received message of type get response.
+   */
+  GNUNET_DHT_MonitorGetRespCB get_resp_cb;
+
+  /**
+   * Callback for each received message of type put.
+   */
+  GNUNET_DHT_MonitorPutCB put_cb;
+
+  /**
    * Closure for cb.
    */
   void *cb_cls;
@@ -533,64 +543,206 @@
   return GNUNET_YES;
 }
 
+/**
+ * Process a get monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor get message from the service.
+ * 
+ * @return GNUNET_OK if everything went fine,
+ *         GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
+                             const struct GNUNET_DHT_MonitorGetMessage *msg)
+{
+  struct GNUNET_DHT_MonitorHandle *h;
+  size_t msize;
 
+  msize = ntohs (msg->header.size);
+  if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
+    return GNUNET_SYSERR;
+
+  h = handle->monitor_head;
+  while (NULL != h)
+  {
+    int type_ok;
+    int key_ok;
+
+    type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+    key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+                                       sizeof (GNUNET_HashCode)) == 0;
+    if (type_ok && key_ok && NULL != h->get_cb)
+    {
+      h->get_cb (h->cb_cls,
+                ntohl (msg->options),
+                (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+                ntohl (msg->hop_count),
+                ntohl (msg->desired_replication_level),
+                ntohl (msg->get_path_length),
+                (struct GNUNET_PeerIdentity *) &msg[1],
+                &msg->key);
+    }
+    h = h->next;
+  }
+  return GNUNET_OK;
+}
+
+
 /**
- * Process a monitoring message from the service.
+ * Process a get response monitor message from the service.
  *
  * @param handle The DHT handle.
- * @param msg Message from the service.
+ * @param msg Monitor get response message from the service.
  * 
  * @return GNUNET_OK if everything went fine,
  *         GNUNET_SYSERR if the message is malformed.
  */
 static int
-process_monitor_message (struct GNUNET_DHT_Handle *handle,
-                         const struct GNUNET_MessageHeader *msg)
+process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
+                                  const struct GNUNET_DHT_MonitorGetRespMessage
+                                  *msg)
 {
-  struct GNUNET_DHT_MonitorMessage *m;
   struct GNUNET_DHT_MonitorHandle *h;
   size_t msize;
 
-  if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
-      ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
+  msize = ntohs (msg->header.size);
+  if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
     return GNUNET_SYSERR;
-  msize = ntohs (msg->size);
-  if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
-    return GNUNET_SYSERR;
 
-  m = (struct GNUNET_DHT_MonitorMessage *) msg;
   h = handle->monitor_head;
   while (NULL != h)
   {
-    if (h->type == ntohl(m->type) &&
-      (NULL == h->key ||
-       memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
+    int type_ok;
+    int key_ok;
+
+    type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+    key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+                                       sizeof (GNUNET_HashCode)) == 0;
+    if (type_ok && key_ok && NULL != h->get_resp_cb)
     {
       struct GNUNET_PeerIdentity *path;
       uint32_t getl;
       uint32_t putl;
 
-      path = (struct GNUNET_PeerIdentity *) &m[1];
-      getl = ntohl (m->get_path_length);
-      putl = ntohl (m->put_path_length);
-      h->cb (h->cb_cls, ntohs(msg->type),
-             GNUNET_TIME_absolute_ntoh(m->expiration),
-             &m->key,
-             &path[getl], putl, path, getl,
-             ntohl (m->desired_replication_level),
-             ntohl (m->options), ntohl (m->type),
-             (void *) &path[getl + putl],
-             ntohs (msg->size) -
-             sizeof (struct GNUNET_DHT_MonitorMessage) -
-             sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
+      path = (struct GNUNET_PeerIdentity *) &msg[1];
+      getl = ntohl (msg->get_path_length);
+      putl = ntohl (msg->put_path_length);
+      h->get_resp_cb (h->cb_cls,
+                      (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+                      path, getl,
+                      &path[getl], putl,
+                      GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+                      &msg->key,
+                      (void *) &path[getl + putl],
+                      msize -
+                      sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
+                      sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
     }
     h = h->next;
   }
+  return GNUNET_OK;
+}
 
+
+/**
+ * Process a put monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor put message from the service.
+ * 
+ * @return GNUNET_OK if everything went fine,
+ *         GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
+                             const struct GNUNET_DHT_MonitorPutMessage *msg)
+{
+  struct GNUNET_DHT_MonitorHandle *h;
+  size_t msize;
+
+  msize = ntohs (msg->header.size);
+  if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
+    return GNUNET_SYSERR;
+
+  h = handle->monitor_head;
+  while (NULL != h)
+  {
+    int type_ok;
+    int key_ok;
+
+    type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+    key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+                                       sizeof (GNUNET_HashCode)) == 0;
+    if (type_ok && key_ok && NULL != h->put_cb)
+    {
+      struct GNUNET_PeerIdentity *path;
+      uint32_t putl;
+
+      path = (struct GNUNET_PeerIdentity *) &msg[1];
+      putl = ntohl (msg->put_path_length);
+      h->put_cb (h->cb_cls,
+                 ntohl (msg->options),
+                 (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+                 ntohl (msg->hop_count),
+                 ntohl (msg->desired_replication_level),
+                 putl, path,
+                 GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+                 &msg->key,
+                 (void *) &path[putl],
+                 msize -
+                 sizeof (struct GNUNET_DHT_MonitorPutMessage) -
+                 sizeof (struct GNUNET_PeerIdentity) * putl);
+    }
+    h = h->next;
+  }
   return GNUNET_OK;
 }
 
+
 /**
+ * Process a monitoring message from the service: demultiplex for proper type.
+ *
+ * @param handle The DHT handle.
+ * @param msg Message from the service.
+ * 
+ * @return GNUNET_OK if everything went fine,
+ *         GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_message (struct GNUNET_DHT_Handle *handle,
+                         const struct GNUNET_MessageHeader *msg)
+{
+  switch (ntohs (msg->type))
+  {
+    case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
+      return process_monitor_get_message(handle,
+                                         (struct GNUNET_DHT_MonitorGetMessage 
*)
+                                         msg);
+
+    case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
+    {
+      return process_monitor_get_resp_message(
+        handle,
+        (struct GNUNET_DHT_MonitorGetRespMessage *) msg);
+    }
+    case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
+    {
+      return process_monitor_put_message(handle,
+                                         (struct GNUNET_DHT_MonitorPutMessage 
*)
+                                         msg);
+    }
+    case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
+      /* Not implemented yet */
+      GNUNET_break(0);
+      /* Fall through */
+    default:
+      GNUNET_break(0);
+      return GNUNET_SYSERR;
+  }
+}
+
+/**
  * Handler for messages received from the DHT service
  * a demultiplexer which handles numerous message types
  *
@@ -930,7 +1082,9 @@
  * @param handle Handle to the DHT service.
  * @param type Type of blocks that are of interest.
  * @param key Key of data of interest, NULL for all.
- * @param cb Callback to process all monitored data.
+ * @param get_cb Callback to process monitored get messages.
+ * @param get_resp_cb Callback to process monitored get response messages.
+ * @param put_cb Callback to process monitored put messages.
  * @param cb_cls Closure for cb.
  *
  * @return Handle to stop monitoring.
@@ -939,18 +1093,21 @@
 GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
                           enum GNUNET_BLOCK_Type type,
                           const GNUNET_HashCode *key,
-                          GNUNET_DHT_MonitorCB cb,
+                          GNUNET_DHT_MonitorGetCB get_cb,
+                          GNUNET_DHT_MonitorGetRespCB get_resp_cb,
+                          GNUNET_DHT_MonitorPutCB put_cb,
                           void *cb_cls)
 {
   struct GNUNET_DHT_MonitorHandle *h;
-  struct GNUNET_DHT_MonitorMessage *m;
+  struct GNUNET_DHT_MonitorStartMessage *m;
   struct PendingMessage *pending;
 
   h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
   GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
 
-  GNUNET_assert (NULL != cb);
-  h->cb = cb;
+  h->get_cb = get_cb;
+  h->get_resp_cb = get_resp_cb;
+  h->put_cb = put_cb;
   h->cb_cls = cb_cls;
   h->type = type;
   h->dht_handle = handle;
@@ -960,17 +1117,22 @@
     memcpy (h->key, key, sizeof(GNUNET_HashCode));
   }
 
-  pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
+  pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
                            sizeof (struct PendingMessage));
-  m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
+  m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
   pending->msg = &m->header;
   pending->handle = handle;
   pending->free_on_send = GNUNET_YES;
   m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
-  m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage));
+  m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
   m->type = htonl(type);
-  if (NULL != key)
+  m->get = (NULL != get_cb);
+  m->get_resp = (NULL != get_resp_cb);
+  m->put = (NULL != put_cb);
+  if (NULL != key) {
+    m->filter_key = 1;
     memcpy (&m->key, key, sizeof(GNUNET_HashCode));
+  }
   GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
                                pending);
   pending->in_pending_queue = GNUNET_YES;

Modified: gnunet/src/dht/gnunet-service-dht_clients.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_clients.c 2012-04-20 12:35:09 UTC (rev 
21054)
+++ gnunet/src/dht/gnunet-service-dht_clients.c 2012-04-20 17:18:14 UTC (rev 
21055)
@@ -204,6 +204,21 @@
   GNUNET_HashCode         *key;
 
   /**
+   * Flag whether to notify about GET messages.
+   */
+  int16_t get;
+
+  /**
+   * Flag whether to notify about GET_REPONSE messages.
+   */
+  int16_t get_resp;
+
+  /**
+   * Flag whether to notify about PUT messages.
+   */
+  uint16_t put;
+
+  /**
    * Client to notify of these requests.
    */
   struct ClientList             *client;
@@ -490,12 +505,16 @@
                              peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
                              size -
                              sizeof (struct GNUNET_DHT_ClientPutMessage));
-  GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
-    GNUNET_TIME_absolute_ntoh (dht_msg->expiration), &dht_msg->key,
-    1, GDS_NEIGHBOURS_get_id(), 0, NULL,
-    ntohl (dht_msg->desired_replication_level),
-    ntohl (dht_msg->type), &(dht_msg[1].header),
-    size - sizeof (struct GNUNET_DHT_ClientPutMessage));
+  GDS_CLIENTS_process_put (ntohl (dht_msg->options),
+                           ntohl (dht_msg->type),
+                           0,
+                           ntohl (dht_msg->desired_replication_level),
+                           1,
+                           GDS_NEIGHBOURS_get_id(),
+                           GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+                           &dht_msg->key,
+                           &dht_msg[1],
+                           size - sizeof (struct GNUNET_DHT_ClientPutMessage));
   GNUNET_CONTAINER_bloomfilter_free (peer_bf);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -554,11 +573,13 @@
   cqr->type = ntohl (get->type);
   GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-  GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
-    GNUNET_TIME_UNIT_FOREVER_ABS, &get->key,
-    0, NULL, 1, GDS_NEIGHBOURS_get_id(),
-    ntohl (get->desired_replication_level),
-    ntohl (get->type), NULL, 0);
+  GDS_CLIENTS_process_get (ntohl (get->options),
+                           ntohl (get->type),
+                           0,
+                           ntohl (get->desired_replication_level),
+                           1,
+                           GDS_NEIGHBOURS_get_id(),
+                           &get->key);
   /* start remote requests */
   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
     GNUNET_SCHEDULER_cancel (retry_task);
@@ -659,15 +680,18 @@
                           const struct GNUNET_MessageHeader *message)
 {
   struct ClientMonitorRecord *r;
-  const struct GNUNET_DHT_MonitorMessage *msg;
+  const struct GNUNET_DHT_MonitorStartMessage *msg;
   unsigned int i;
   char *c;
 
-  msg = (struct GNUNET_DHT_MonitorMessage *) message;
+  msg = (struct GNUNET_DHT_MonitorStartMessage *) message;
   r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
 
   r->client = find_active_client(client);
   r->type = ntohl(msg->type);
+  r->get = msg->get;
+  r->get_resp = msg->get_resp;
+  r->put = msg->put;
   c = (char *) &msg->key;
   for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
   if (sizeof (GNUNET_HashCode) == i)
@@ -1038,33 +1062,101 @@
 
 
 /**
- * Check if some client is monitoring messages of this type and notify
- * him in that case.
+ * Check if some client is monitoring GET messages and notify
+ * them in that case.
  *
- * @param mtype Type of the DHT message.
- * @param exp When will this value expire.
- * @param key Key of the result/request.
- * @param putl number of entries in get_path.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (uint32_t options,
+                         enum GNUNET_BLOCK_Type type,
+                         uint32_t hop_count,
+                         uint32_t desired_replication_level, 
+                         unsigned int path_length,
+                         const struct GNUNET_PeerIdentity *path,
+                         const GNUNET_HashCode * key)
+{
+  struct ClientMonitorRecord *m;
+  struct ClientList **cl;
+  unsigned int cl_size;
+
+  cl = NULL;
+  cl_size = 0;
+  for (m = monitor_head; NULL != m; m = m->next)
+  {
+    if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
+        (NULL == m->key ||
+         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+    {
+      struct PendingMessage *pm;
+      struct GNUNET_DHT_MonitorGetMessage *mmsg;
+      struct GNUNET_PeerIdentity *msg_path;
+      size_t msize;
+      unsigned int i;
+
+      /* Don't send duplicates */
+      for (i = 0; i < cl_size; i++)
+        if (cl[i] == m->client)
+          break;
+      if (i < cl_size)
+        continue;
+      GNUNET_array_append (cl, cl_size, m->client);
+
+      msize = path_length * sizeof (struct GNUNET_PeerIdentity);
+      msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
+      msize += sizeof (struct PendingMessage);
+      pm = (struct PendingMessage *) GNUNET_malloc (msize);
+      mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
+      pm->msg = (struct GNUNET_MessageHeader *) mmsg;
+      mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
+      mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
+      mmsg->options = htonl(options);
+      mmsg->type = htonl(type);
+      mmsg->hop_count = htonl(hop_count);
+      mmsg->desired_replication_level = htonl(desired_replication_level);
+      mmsg->get_path_length = htonl(path_length);
+      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+      if (path_length > 0)
+        memcpy (msg_path, path,
+                path_length * sizeof (struct GNUNET_PeerIdentity));
+      add_pending_message (m->client, pm);
+    }
+  }
+  GNUNET_free_non_null (cl);
+}
+
+
+/**
+ * Check if some client is monitoring GET RESP messages and notify
+ * them in that case.
+ *
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
  * @param put_path peers on the PUT path (or NULL if not recorded).
- * @param getl number of entries in get_path.
- * @param get_path Peers on reply path (or NULL if not recorded).
- * @param desired_replication_level Desired replication level.
- * @param type Type of the result/request.
+ * @param put_path_length number of entries in get_path.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
  * @param data Pointer to the result data.
  * @param size Number of bytes in data.
  */
 void
-GDS_CLIENTS_process_monitor (uint16_t mtype,
-                             const struct GNUNET_TIME_Absolute exp,
-                             const GNUNET_HashCode *key,
-                             uint32_t putl,
-                             const struct GNUNET_PeerIdentity *put_path,
-                             uint32_t getl,
-                             const struct GNUNET_PeerIdentity *get_path,
-                             uint32_t desired_replication_level,
-                             enum GNUNET_BLOCK_Type type,
-                             const struct GNUNET_MessageHeader *data,
-                             uint16_t size)
+GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
+                              const struct GNUNET_PeerIdentity *get_path,
+                              unsigned int get_path_length,
+                              const struct GNUNET_PeerIdentity *put_path,
+                              unsigned int put_path_length,
+                              struct GNUNET_TIME_Absolute exp,
+                              const GNUNET_HashCode * key,
+                              const void *data,
+                              size_t size)
 {
   struct ClientMonitorRecord *m;
   struct ClientList **cl;
@@ -1079,7 +1171,7 @@
          memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
     {
       struct PendingMessage *pm;
-      struct GNUNET_DHT_MonitorMessage *mmsg;
+      struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
       struct GNUNET_PeerIdentity *path;
       size_t msize;
       unsigned int i;
@@ -1093,29 +1185,32 @@
       GNUNET_array_append (cl, cl_size, m->client);
 
       msize = size;
-      msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity);
-      msize += sizeof (struct GNUNET_DHT_MonitorMessage);
+      msize += (get_path_length + put_path_length)
+               * sizeof (struct GNUNET_PeerIdentity);
+      msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
       msize += sizeof (struct PendingMessage);
       pm = (struct PendingMessage *) GNUNET_malloc (msize);
-      mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1];
+      mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
       pm->msg = (struct GNUNET_MessageHeader *) mmsg;
       mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
-      mmsg->header.type = htons (mtype);
-      mmsg->expiration = GNUNET_TIME_absolute_hton(exp);
-      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
-      mmsg->put_path_length = htonl(putl);
-      mmsg->get_path_length = htonl(getl);
-      mmsg->desired_replication_level = htonl (desired_replication_level);
+      mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
+      mmsg->type = htonl(type);
+      mmsg->put_path_length = htonl(put_path_length);
+      mmsg->get_path_length = htonl(get_path_length);
       path = (struct GNUNET_PeerIdentity *) &mmsg[1];
-      if (putl > 0)
+      if (put_path_length > 0)
       {
-        memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity));
-        path = &path[putl];
+        memcpy (path, put_path,
+                put_path_length * sizeof (struct GNUNET_PeerIdentity));
+        path = &path[put_path_length];
       }
-      if (getl > 0)
-        memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity));
+      if (get_path_length > 0)
+        memcpy (path, get_path,
+                get_path_length * sizeof (struct GNUNET_PeerIdentity));
+      mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
+      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
       if (size > 0)
-        memcpy (&path[getl], data, size);
+        memcpy (&path[get_path_length], data, size);
       add_pending_message (m->client, pm);
     }
   }
@@ -1124,6 +1219,90 @@
 
 
 /**
+ * Check if some client is monitoring PUT messages and notify
+ * them in that case.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+GDS_CLIENTS_process_put (uint32_t options,
+                         enum GNUNET_BLOCK_Type type,
+                         uint32_t hop_count,
+                         uint32_t desired_replication_level, 
+                         unsigned int path_length,
+                         const struct GNUNET_PeerIdentity *path,
+                         struct GNUNET_TIME_Absolute exp,
+                         const GNUNET_HashCode * key,
+                         const void *data,
+                         size_t size)
+{
+  struct ClientMonitorRecord *m;
+  struct ClientList **cl;
+  unsigned int cl_size;
+
+  cl = NULL;
+  cl_size = 0;
+  for (m = monitor_head; NULL != m; m = m->next)
+  {
+    if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
+        (NULL == m->key ||
+         memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+    {
+      struct PendingMessage *pm;
+      struct GNUNET_DHT_MonitorPutMessage *mmsg;
+      struct GNUNET_PeerIdentity *msg_path;
+      size_t msize;
+      unsigned int i;
+
+      /* Don't send duplicates */
+      for (i = 0; i < cl_size; i++)
+        if (cl[i] == m->client)
+          break;
+      if (i < cl_size)
+        continue;
+      GNUNET_array_append (cl, cl_size, m->client);
+
+      msize = size;
+      msize += path_length * sizeof (struct GNUNET_PeerIdentity);
+      msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
+      msize += sizeof (struct PendingMessage);
+      pm = (struct PendingMessage *) GNUNET_malloc (msize);
+      mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
+      pm->msg = (struct GNUNET_MessageHeader *) mmsg;
+      mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
+      mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
+      mmsg->options = htonl(options);
+      mmsg->type = htonl(type);
+      mmsg->hop_count = htonl(hop_count);
+      mmsg->desired_replication_level = htonl(desired_replication_level);
+      mmsg->put_path_length = htonl(path_length);
+      msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+      if (path_length > 0)
+      {
+        memcpy (msg_path, path,
+                path_length * sizeof (struct GNUNET_PeerIdentity));
+      }
+      mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
+      memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+      if (size > 0)
+        memcpy (&msg_path[path_length], data, size);
+      add_pending_message (m->client, pm);
+    }
+  }
+  GNUNET_free_non_null (cl);
+}
+
+
+/**
  * Initialize client subsystem.
  *
  * @param server the initialized server
@@ -1140,8 +1319,8 @@
      GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
      sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
     {&handle_dht_local_monitor, NULL,
-     GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
-     sizeof (struct GNUNET_DHT_MonitorMessage)},
+     GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
+     sizeof (struct GNUNET_DHT_MonitorStartMessage)},
     {NULL, NULL, 0, 0}
   };
   forward_map = GNUNET_CONTAINER_multihashmap_create (1024);

Modified: gnunet/src/dht/gnunet-service-dht_clients.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht_clients.h 2012-04-20 12:35:09 UTC (rev 
21054)
+++ gnunet/src/dht/gnunet-service-dht_clients.h 2012-04-20 17:18:14 UTC (rev 
21055)
@@ -57,35 +57,79 @@
 
 
 /**
- * Check if some client is monitoring messages of this type and notify
- * him in that case.
+ * Check if some client is monitoring GET messages and notify
+ * them in that case.
  *
- * @param mtype Type of the DHT message.
- * @param exp When will this value expire.
- * @param key Key of the result/request.
- * @param putl number of entries in get_path.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (uint32_t options,
+                         enum GNUNET_BLOCK_Type type,
+                         uint32_t hop_count,
+                         uint32_t desired_replication_level, 
+                         unsigned int path_length,
+                         const struct GNUNET_PeerIdentity *path,
+                         const GNUNET_HashCode * key);
+
+/**
+ * Check if some client is monitoring GET RESP messages and notify
+ * them in that case.
+ *
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
  * @param put_path peers on the PUT path (or NULL if not recorded).
- * @param getl number of entries in get_path.
- * @param get_path Peers on reply path (or NULL if not recorded).
- * @param desired_replication_level Desired replication level.
- * @param type Type of the result/request.
+ * @param put_path_length number of entries in get_path.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
  * @param data Pointer to the result data.
  * @param size Number of bytes in data.
  */
 void
-GDS_CLIENTS_process_monitor (uint16_t mtype,
-                             const struct GNUNET_TIME_Absolute exp,
-                             const GNUNET_HashCode *key,
-                             uint32_t putl,
-                             const struct GNUNET_PeerIdentity *put_path,
-                             uint32_t getl,
-                             const struct GNUNET_PeerIdentity *get_path,
-                             uint32_t desired_replication_level,
-                             enum GNUNET_BLOCK_Type type,
-                             const struct GNUNET_MessageHeader *data,
-                             uint16_t size);
+GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
+                              const struct GNUNET_PeerIdentity *get_path,
+                              unsigned int get_path_length,
+                              const struct GNUNET_PeerIdentity *put_path,
+                              unsigned int put_path_length,
+                              struct GNUNET_TIME_Absolute exp,
+                              const GNUNET_HashCode * key,
+                              const void *data,
+                              size_t size);
 
 /**
+ * Check if some client is monitoring PUT messages and notify
+ * them in that case.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+GDS_CLIENTS_process_put (uint32_t options,
+                         enum GNUNET_BLOCK_Type type,
+                         uint32_t hop_count,
+                         uint32_t desired_replication_level, 
+                         unsigned int path_length,
+                         const struct GNUNET_PeerIdentity *path,
+                         struct GNUNET_TIME_Absolute exp,
+                         const GNUNET_HashCode * key,
+                         const void *data,
+                         size_t size);
+
+/**
  * Initialize client subsystem.
  *
  * @param server the initialized server

Modified: gnunet/src/dht/gnunet-service-dht_neighbours.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_neighbours.c      2012-04-20 12:35:09 UTC 
(rev 21054)
+++ gnunet/src/dht/gnunet-service-dht_neighbours.c      2012-04-20 17:18:14 UTC 
(rev 21055)
@@ -1617,10 +1617,15 @@
                                pp, payload, payload_size);
   }
   GNUNET_CONTAINER_bloomfilter_free (bf);
-  GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
-    GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key,
-    putlen, put_path, 0, NULL, ntohl(put->desired_replication_level),
-    ntohl (put->type), payload, payload_size);
+  GDS_CLIENTS_process_put (options,
+                           ntohl (put->type),
+                           ntohl (put->hop_count),
+                           ntohl (put->desired_replication_level),
+                           putlen, put_path,
+                           GNUNET_TIME_absolute_ntoh (put->expiration_time),
+                           &put->key,
+                           payload,
+                           payload_size);
   return GNUNET_YES;
 }
 
@@ -1827,9 +1832,12 @@
   }
 
   /* FIXME Path */
-  GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
-    GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL,
-    ntohl (get->desired_replication_level), type, NULL, 0);
+  GDS_CLIENTS_process_get (options,
+                           type,
+                           ntohl(get->hop_count),
+                           ntohl(get->desired_replication_level),
+                           0, NULL,
+                           &get->key);
 
   /* P2P forwarding */
   if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
@@ -1963,10 +1971,16 @@
                          xget_path, data, data_size);
   }
 
-  GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
-    GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key,
-    put_path_length, put_path, get_path_length, get_path,
-    0, type, data, data_size);
+  GDS_CLIENTS_process_get_resp (type,
+                                get_path,
+                                get_path_length,
+                                put_path,
+                                put_path_length,
+                                GNUNET_TIME_absolute_ntoh (
+                                  prm->expiration_time),
+                                &prm->key,
+                                data,
+                                data_size);
 
   return GNUNET_YES;
 }

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2012-04-20 12:35:09 UTC (rev 
21054)
+++ gnunet/src/include/gnunet_protocols.h       2012-04-20 17:18:14 UTC (rev 
21055)
@@ -524,26 +524,36 @@
 #define GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT 148
 
 /**
- * Request / receive information about transiting GETs
+ * Receive information about transiting GETs
  */
 #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET             149
 
 /**
- * Request / receive information about transiting GET responses
+ * Receive information about transiting GET responses
  */
 #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP        150
 
 /**
- * Request / receive information about transiting PUTs
+ * Receive information about transiting PUTs
  */
 #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT             151
 
 /**
- * Request / receive information about transiting PUT responses (TODO)
+ * Receive information about transiting PUT responses (TODO)
  */
 #define GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP        152
 
+/**
+ * Request information about transiting messages
+ */
+#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_START             153
 
+/**
+ * Stop information about transiting messages
+ */
+#define GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP             154
+
+
 
/*******************************************************************************
  * HOSTLIST message types
  
******************************************************************************/




reply via email to

[Prev in Thread] Current Thread [Next in Thread]