gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28490 - gnunet/src/mesh


From: gnunet
Subject: [GNUnet-SVN] r28490 - gnunet/src/mesh
Date: Sun, 11 Aug 2013 05:31:06 +0200

Author: bartpolot
Date: 2013-08-11 05:31:06 +0200 (Sun, 11 Aug 2013)
New Revision: 28490

Modified:
   gnunet/src/mesh/gnunet-service-mesh-enc.c
Log:
- major refactorization


Modified: gnunet/src/mesh/gnunet-service-mesh-enc.c
===================================================================
--- gnunet/src/mesh/gnunet-service-mesh-enc.c   2013-08-10 21:06:01 UTC (rev 
28489)
+++ gnunet/src/mesh/gnunet-service-mesh-enc.c   2013-08-11 03:31:06 UTC (rev 
28490)
@@ -192,6 +192,11 @@
   struct MeshConnection *c;
 
     /**
+     * Is FWD in c?
+     */
+  int fwd;
+
+    /**
      * Channel this message belongs to, if known.
      */
   struct MeshChannel *ch;
@@ -225,18 +230,8 @@
   struct MeshConnection *c;
 
   /**
-   * Transmission queue to core DLL head
+   * How many messages are in the queue on this connection.
    */
-  struct MeshPeerQueue *queue_head;
-
-  /**
-   * Transmission queue to core DLL tail
-   */
-  struct MeshPeerQueue *queue_tail;
-
-  /**
-   * How many messages are in the queue to this peer.
-   */
   unsigned int queue_n;
 
   /**
@@ -245,11 +240,6 @@
   unsigned int queue_max;
 
   /**
-   * Handle for queued transmissions
-   */
-  struct GNUNET_CORE_TransmitHandle *core_transmit;
-
-  /**
    * ID of the last packet sent towards the peer.
    */
   uint32_t last_pid_sent;
@@ -316,6 +306,30 @@
      */
   struct MeshTunnel2 *tunnel;
 
+    /**
+     * Connections that go through this peer, indexed by tid;
+     */
+  struct GNUNET_CONTAINER_MultiHashMap *connections;
+
+    /**
+     * Handle for queued transmissions
+     */
+  struct GNUNET_CORE_TransmitHandle *core_transmit;
+
+  /**
+   * Transmission queue to core DLL head
+   */
+  struct MeshPeerQueue *queue_head;
+  
+  /**
+   * Transmission queue to core DLL tail
+   */
+  struct MeshPeerQueue *queue_tail;
+
+  /**
+   * How many messages are in the queue to this peer.
+   */
+  unsigned int queue_n;
 };
 
 
@@ -363,7 +377,7 @@
   struct MeshReliableMessage        *tail_sent;
 
     /**
-     * Messages pending
+     * Messages pending to send.
      */
   unsigned int                      n_sent;
 
@@ -374,6 +388,16 @@
   struct MeshReliableMessage        *tail_recv;
 
     /**
+     * Messages received.
+     */
+  unsigned int                      n_recv;
+
+    /**
+     * Can we send data to the client?
+     */
+  int client_ready;
+
+    /**
      * Task to resend/poll in case no ACK is received.
      */
   GNUNET_SCHEDULER_TaskIdentifier   retry_task;
@@ -1242,6 +1266,22 @@
 
 
 /**
+ * Get the hop in a connection.
+ *
+ * @param c Connection.
+ * @param fwd Next hop?
+ *
+ * @return Next peer in the connection. 
+ */
+static struct MeshPeer *
+connection_get_hop (struct MeshConnection *c, int fwd)
+{
+  if (fwd)
+    return connection_get_next_hop (c);
+  return connection_get_prev_hop (c);
+}
+
+/**
  * Check if client has registered with the service and has not disconnected
  *
  * @param client the client to check
@@ -1422,6 +1462,29 @@
 
 
 /**
+ * Get the total buffer space for a tunnel
+ */
+static unsigned int
+tunnel_get_buffer (struct MeshTunnel2 *t, int fwd)
+{
+  struct MeshConnection *c;
+  struct MeshFlowControl *fc;
+  unsigned int buffer;
+
+  for (buffer = 0, c = t->connection_head; NULL != c; c = c->next)
+  {
+    if (c->state != MESH_CONNECTION_READY)
+      continue;
+
+    fc = fwd ? &c->fwd_fc : &c->bck_fc;
+    buffer += fc->last_ack_recv - fc->last_pid_sent;
+  }
+
+  return buffer;
+}
+
+
+/**
  * FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME FIXME 
  * Encrypt data with the tunnel key.
  *
@@ -1482,7 +1545,7 @@
   size_t size;
   uint16_t type;
 
-  neighbor = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
+  neighbor = connection_get_hop (c, fwd);
   if (NULL == neighbor)
   {
     GNUNET_break (0);
@@ -1780,24 +1843,141 @@
 
 
 /**
- * Iterator over all the peers to remove the oldest not-used entry.
+ * Destroy the peer_info and free any allocated resources linked to it
  *
+ * @param peer The peer_info to destroy.
+ *
+ * @return GNUNET_OK on success
+ */
+static int
+peer_destroy (struct MeshPeer *peer)
+{
+  struct GNUNET_PeerIdentity id;
+  struct MeshPeerPath *p;
+  struct MeshPeerPath *nextp;
+
+  GNUNET_PEER_resolve (peer->id, &id);
+  GNUNET_PEER_change_rc (peer->id, -1);
+
+  if (GNUNET_YES !=
+      GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer))
+  {
+    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "removing peer %s, not in hashmap\n", GNUNET_i2s (&id));
+  }
+  if (NULL != peer->dhtget)
+  {
+    GNUNET_DHT_get_stop (peer->dhtget);
+  }
+  p = peer->path_head;
+  while (NULL != p)
+  {
+    nextp = p->next;
+    GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
+    path_destroy (p);
+    p = nextp;
+  }
+  tunnel_destroy_empty (peer->tunnel);
+  GNUNET_free (peer);
+  return GNUNET_OK;
+}
+
+
+/**
+ * Returns if peer is used (has a tunnel, is neighbor).
+ *
+ * @peer Peer to check.
+ *
+ * @return GNUNET_YES if peer is in use.
+ */
+static int
+peer_is_used (struct MeshPeer *peer)
+{
+  struct MeshPeerPath *p;
+
+  if (NULL != peer->tunnel)
+    return GNUNET_YES;
+
+  for (p = peer->path_head; NULL != p; p = p->next)
+  {
+    if (p->length < 3)
+      return GNUNET_YES;
+  }
+  return GNUNET_NO;
+}
+
+/**
+ * Iterator over all the peers to get the oldest timestamp.
+ *
  * @param cls Closure (unsued).
  * @param key ID of the peer.
  * @param value Peer_Info of the peer.
+ */
+static int
+peer_get_oldest (void *cls,
+                 const struct GNUNET_HashCode *key,
+                 void *value)
+{
+  struct MeshPeer *p = value;
+  struct GNUNET_TIME_Absolute *abs = cls;
+
+  /* Don't count active peers */
+  if (GNUNET_YES == peer_is_used (p))
+    return GNUNET_YES;
+
+  if (abs->abs_value < p->last_contact.abs_value)
+    abs->abs_value = p->last_contact.abs_value;
+
+  return GNUNET_YES;
+}
+
+
+/**
+ * Iterator over all the peers to remove the oldest entry.
  *
- * FIXME implement
+ * @param cls Closure (unsued).
+ * @param key ID of the peer.
+ * @param value Peer_Info of the peer.
  */
 static int
 peer_timeout (void *cls,
               const struct GNUNET_HashCode *key,
               void *value)
 {
+  struct MeshPeer *p = value;
+  struct GNUNET_TIME_Absolute *abs = cls;
+
+  if (p->last_contact.abs_value == abs->abs_value &&
+      GNUNET_NO == peer_is_used (p))
+  {
+    peer_destroy (p);
+    return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
 
 /**
+ * Delete oldest unused peer.
+ */
+static void
+peer_delete_oldest (void)
+{
+  struct GNUNET_TIME_Absolute abs;
+
+  abs = GNUNET_TIME_UNIT_FOREVER_ABS;
+
+  GNUNET_CONTAINER_multihashmap_iterate (peers,
+                                         &peer_get_oldest,
+                                         &abs);
+  GNUNET_CONTAINER_multihashmap_iterate (peers,
+                                         &peer_timeout,
+                                         &abs);
+}
+
+
+/**
  * Retrieve the MeshPeer stucture associated with the peer, create one
  * and insert it in the appropriate structures if the peer is not known yet.
  *
@@ -1816,9 +1996,7 @@
     peer = GNUNET_new (struct MeshPeer);
     if (GNUNET_CONTAINER_multihashmap_size (peers) > max_peers)
     {
-      GNUNET_CONTAINER_multihashmap_iterate (peers,
-                                             &peer_timeout,
-                                             NULL);
+      peer_delete_oldest ();
     }
     GNUNET_CONTAINER_multihashmap_put (peers, &peer_id->hashPubKey, peer,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
@@ -1921,8 +2099,51 @@
   return best_p;
 }
 
+static int
+queue_is_sendable (struct MeshPeerQueue *q)
+{
+  struct MeshFlowControl *fc;
 
+  /* Is PID-independent? */
+  switch (q->type)
+  {
+    case GNUNET_MESSAGE_TYPE_MESH_ACK:
+    case GNUNET_MESSAGE_TYPE_MESH_POLL:
+      return GNUNET_YES;
+  }
+
+  /* Is PID allowed? */
+  fc = q->fwd ? &q->c->fwd_fc : &q->c->bck_fc;
+  if (GMC_is_pid_bigger (fc->last_ack_recv, fc->last_pid_sent))
+    return GNUNET_YES;
+
+  return GNUNET_NO;
+}
+
+
 /**
+ * Get first sendable message.
+ *
+ * @param peer The destination peer.
+ *
+ * @return Best current known path towards the peer, if any.
+ */
+static struct MeshPeerQueue *
+peer_get_first_message (const struct MeshPeer *peer)
+{
+  struct MeshPeerQueue *q;
+
+  for (q = peer->queue_head; NULL != q; q = q->next)
+  {
+    if (queue_is_sendable (q))
+      return q;
+  }
+
+  return NULL;
+}
+
+
+/**
  * Try to establish a new connection to this peer in the given tunnel.
  * If the peer doesn't have any path to it yet, try to get one.
  * If the peer already has some path, send a CREATE CONNECTION towards it.
@@ -1974,6 +2195,35 @@
 
 
 /**
+ * Get the first transmittable message for a connection.
+ *
+ * @param c Connection.
+ * @param fwd Is this FWD?
+ *
+ * @return First transmittable message.
+ */
+static struct MeshPeerQueue *
+connection_get_first_message (struct MeshConnection *c, int fwd)
+{
+  struct MeshPeerQueue *q;
+  struct MeshPeer *p;
+
+  p = connection_get_hop (c, fwd);
+
+  for (q = p->queue_head; NULL != q; q = q->next)
+  {
+    if (q->c != c)
+      continue;
+    if (queue_is_sendable (q))
+      return q;
+  }
+
+  return NULL;
+}
+
+
+
+/**
  * @brief Re-initiate traffic on this connection if necessary.
  *
  * Check if there is traffic queued towards this peer
@@ -1986,23 +2236,21 @@
 static void
 connection_unlock_queue (struct MeshConnection *c, int fwd)
 {
-  struct MeshFlowControl *fc;
   struct MeshPeer *peer;
   struct MeshPeerQueue *q;
   size_t size;
 
-  peer = fwd ? connection_get_next_hop(c) : connection_get_prev_hop(c);
-  fc   = fwd ? &c->fwd_fc                 : &c->bck_fc;
+  peer = connection_get_hop (c, fwd);
 
-  if (NULL != fc->core_transmit)
+  if (NULL != peer->core_transmit)
     return; /* Already unlocked */
 
-  q = fc->queue_head;
+  q = connection_get_first_message (c, fwd);
   if (NULL == q)
     return; /* Nothing to transmit */
 
   size = q->size;
-  fc->core_transmit =
+  peer->core_transmit =
       GNUNET_CORE_notify_transmit_ready (core_handle,
                                          GNUNET_NO,
                                          0,
@@ -2026,6 +2274,7 @@
   struct MeshPeerQueue *q;
   struct MeshPeerQueue *next;
   struct MeshFlowControl *fc;
+  struct MeshPeer *peer;
 
   if (NULL == c)
   {
@@ -2033,7 +2282,9 @@
     return;
   }
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
-  for (q = fc->queue_head; NULL != q; q = next)
+  peer = connection_get_hop (c, fwd);
+
+  for (q = peer->queue_head; NULL != q; q = next)
   {
     next = q->next;
     if (q->c == c)
@@ -2044,12 +2295,12 @@
       queue_destroy (q, GNUNET_YES);
     }
   }
-  if (NULL == fc->queue_head)
+  if (NULL == peer->queue_head)
   {
-    if (NULL != fc->core_transmit)
+    if (NULL != peer->core_transmit)
     {
-      GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
-      fc->core_transmit = NULL;
+      GNUNET_CORE_notify_transmit_ready_cancel (peer->core_transmit);
+      peer->core_transmit = NULL;
     }
     if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
     {
@@ -2061,48 +2312,6 @@
 
 
 /**
- * Destroy the peer_info and free any allocated resources linked to it
- *
- * @param peer The peer_info to destroy.
- *
- * @return GNUNET_OK on success
- */
-static int
-peer_destroy (struct MeshPeer *peer)
-{
-  struct GNUNET_PeerIdentity id;
-  struct MeshPeerPath *p;
-  struct MeshPeerPath *nextp;
-
-  GNUNET_PEER_resolve (peer->id, &id);
-  GNUNET_PEER_change_rc (peer->id, -1);
-
-  if (GNUNET_YES !=
-      GNUNET_CONTAINER_multihashmap_remove (peers, &id.hashPubKey, peer))
-  {
-    GNUNET_break (0);
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "removing peer %s, not in hashmap\n", GNUNET_i2s (&id));
-  }
-  if (NULL != peer->dhtget)
-  {
-    GNUNET_DHT_get_stop (peer->dhtget);
-  }
-  p = peer->path_head;
-  while (NULL != p)
-  {
-    nextp = p->next;
-    GNUNET_CONTAINER_DLL_remove (peer->path_head, peer->path_tail, p);
-    path_destroy (p);
-    p = nextp;
-  }
-  tunnel_destroy_empty (peer->tunnel);
-  GNUNET_free (peer);
-  return GNUNET_OK;
-}
-
-
-/**
  * Remove all paths that rely on a direct connection between p1 and p2
  * from the peer itself and notify all tunnels about it.
  *
@@ -2874,7 +3083,8 @@
 
 
 /**
- * Send up to 64 buffered messages to the client for in order delivery.
+ * Send a buffered message to the client, for in order delivery or
+ * as result of client ACK.
  *
  * @param ch Channel on which to empty the message buffer.
  * @param c Client to send to.
@@ -2887,22 +3097,21 @@
                                    struct MeshChannelReliability *rel)
 {
   struct MeshReliableMessage *copy;
-  struct MeshReliableMessage *next;
   uint32_t *mid;
 
-  if (GNUNET_NO == ch->reliable)
+  if (GNUNET_NO == rel->client_ready)
   {
-    GNUNET_break (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client not ready\n");
     return;
   }
 
   mid = rel == ch->bck_rel ? &ch->mid_recv_fwd : &ch->mid_recv_bck;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_buffered_data\n");
-  for (copy = rel->head_recv; NULL != copy; copy = next)
+  copy = rel->head_recv;
+  if (NULL != copy)
   {
-    next = copy->next;
-    if (copy->mid == *mid)
+    if (copy->mid == *mid || GNUNET_NO == ch->reliable)
     {
       struct GNUNET_MESH_Data *msg = (struct GNUNET_MESH_Data *) &copy[1];
 
@@ -2910,6 +3119,7 @@
                   " have %u! now expecting %u\n",
                   copy->mid, *mid + 1);
       channel_send_client_data (ch, msg, (rel == ch->bck_rel));
+      rel->n_recv--;
       *mid = *mid + 1;
       GNUNET_CONTAINER_DLL_remove (rel->head_recv, rel->tail_recv, copy);
       GNUNET_free (copy);
@@ -2917,7 +3127,7 @@
     else
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  " don't have %u, next is %u\n",
+                  " reliable && don't have %u, next is %u\n",
                   *mid,
                   copy->mid);
       return;
@@ -2928,8 +3138,9 @@
 
 
 /**
- * We have received a message out of order, buffer it until we receive
- * the missing one and we can feed the rest to the client.
+ * We have received a message out of order, or the client is not ready.
+ * Buffer it until we receive an ACK from the client or the missing
+ * message from the channel.
  *
  * @param msg Message to buffer.
  * @param rel Reliability data to the corresponding direction.
@@ -2952,6 +3163,8 @@
   copy->rel = rel;
   memcpy (&copy[1], msg, size);
 
+  rel->n_recv++;
+
   // FIXME do something better than O(n), although n < 64...
   // FIXME start from the end (most messages are the latest ones)
   for (prev = rel->head_recv; NULL != prev; prev = prev->next)
@@ -3121,10 +3334,10 @@
   struct MeshChannelReliability *rel = cls;
   struct MeshReliableMessage *copy;
   struct MeshPeerQueue *q;
-  struct MeshFlowControl *fc;
   struct MeshChannel *ch;
   struct MeshConnection *c;
   struct GNUNET_MESH_Data *payload;
+  struct MeshPeer *hop;
   int fwd;
 
   rel->retry_task = GNUNET_SCHEDULER_NO_TASK;
@@ -3152,8 +3365,8 @@
   payload = (struct GNUNET_MESH_Data *) &copy[1];
   fwd = (rel == ch->fwd_rel);
   c = tunnel_get_connection (ch->t, fwd);
-  fc = fwd ? &c->fwd_fc : &c->bck_fc;
-  for (q = fc->queue_head; NULL != q; q = q->next)
+  hop = connection_get_hop (c, fwd);
+  for (q = hop->queue_head; NULL != q; q = q->next)
   {
     if (ntohs (payload->header.type) == q->type && ch == q->ch)
     {
@@ -3184,7 +3397,58 @@
 }
 
 
+
 /**
+ * Send ACK on one or more connections due to buffer space to the client.
+ */
+static void
+channel_send_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
+{
+  struct MeshTunnel2 *t = ch->t;
+  struct MeshConnection *c;
+  struct MeshFlowControl *fc;
+  uint32_t allowed;
+  uint32_t to_allow;
+  unsigned int cs;
+
+  /* Count connections, how many messages are already allowed */
+  for (cs = 0, allowed = 0, c = t->connection_head; NULL != c; c = c->next)
+  {
+    fc = fwd ? &c->fwd_fc : &c->bck_fc;
+    if (GMC_is_pid_bigger(fc->last_pid_recv, fc->last_ack_sent))
+    {
+      GNUNET_break (0);
+      continue;
+    }
+    allowed += fc->last_ack_sent - fc->last_pid_recv;
+    cs++;
+  }
+
+  /* Make sure there is no overflow */
+  if (allowed > buffer)
+  {
+    GNUNET_break (0);
+    return;
+  }
+
+  /* Authorize connections to send more data */
+  to_allow = buffer - allowed;
+  for (c = t->connection_head; NULL != c && to_allow > 0; c = c->next)
+  {
+    fc = fwd ? &c->fwd_fc : &c->bck_fc;
+    if (fc->last_ack_sent - fc->last_pid_recv > 64 / 3)
+    {
+      continue;
+    }
+    send_ack (c, fc->last_ack_sent + 1, fwd);
+    to_allow--;
+  }
+
+  GNUNET_break (to_allow == 0);
+}
+
+
+/**
  * Send keepalive packets for a connection.
  *
  * @param c Connection to keep alive..
@@ -3275,7 +3539,7 @@
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
-  connection_keepalive (c, GNUNET_YES);
+  connection_maintain (c, GNUNET_YES);
   c->fwd_maintenance_task = GNUNET_SCHEDULER_add_delayed 
(refresh_connection_time,
                                                           
&connection_fwd_keepalive,
                                                           c);
@@ -3291,7 +3555,7 @@
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
-  connection_keepalive (c, GNUNET_NO);
+  connection_maintain (c, GNUNET_NO);
   c->bck_maintenance_task = GNUNET_SCHEDULER_add_delayed 
(refresh_connection_time,
                                                           
&connection_bck_keepalive,
                                                           c);
@@ -3536,7 +3800,7 @@
     return;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying tunnel %s\n",
-              GNUNET_i2s (GNUNET_PEER_resolve2 (c->t->peer->id)));
+              GNUNET_i2s (GNUNET_PEER_resolve2 (t->peer->id)));
 
   if (GNUNET_YES != GNUNET_CONTAINER_multihashmap_remove (tunnels, &t->id, t))
     GNUNET_break (0);
@@ -3867,6 +4131,42 @@
 }
 
 
+/**
+ * Iterator to notify all connections of a broken link. Mark connections
+ * to destroy after all traffic has been sent.
+ *
+ * @param cls Closure (peer disconnected).
+ * @param key Current key code (tid).
+ * @param value Value in the hash map (connection).
+ *
+ * @return GNUNET_YES if we should continue to iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+connection_broken (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct MeshPeer *peer = cls;
+  struct MeshConnection *c = value;
+  struct GNUNET_MESH_ConnectionBroken msg;
+  int fwd;
+
+  fwd = peer == connection_get_prev_hop (c);
+  connection_cancel_queues (c, !fwd);
+
+  msg.header.size = htons (sizeof (struct GNUNET_MESH_ConnectionBroken));
+  msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_BROKEN);
+  msg.cid = htonl (c->id);
+  msg.tid = c->t->id;
+  msg.peer1 = my_full_id;
+  msg.peer2 = *GNUNET_PEER_resolve2 (peer->id);
+  send_prebuilt_message_connection (&msg.header, c, NULL, fwd);
+  c->destroy = GNUNET_YES;
+
+  return GNUNET_YES;
+}
+
 
/******************************************************************************/
 /****************      MESH NETWORK HANDLER HELPERS     
***********************/
 
/******************************************************************************/
@@ -3881,10 +4181,12 @@
 static void
 queue_destroy (struct MeshPeerQueue *queue, int clear_cls)
 {
+  struct MeshPeer *peer;
   struct MeshFlowControl *fc;
   int fwd;
 
-  fwd = (queue->peer == connection_get_next_hop (queue->c));
+  fwd = queue->fwd;
+  peer = queue->peer;
   fc = fwd ? &queue->c->fwd_fc : &queue->c->bck_fc;
 
   if (GNUNET_YES == clear_cls)
@@ -3916,9 +4218,18 @@
     }
     GNUNET_free_non_null (queue->cls);
   }
-  GNUNET_CONTAINER_DLL_remove (fc->queue_head, fc->queue_tail, queue);
+  GNUNET_CONTAINER_DLL_remove (peer->queue_head, peer->queue_tail, queue);
 
   fc->queue_n--;
+  peer->queue_n--;
+  if (NULL != queue->c)
+  {
+    queue->c->pending_messages--;
+    if (NULL != queue->c->t)
+    {
+      queue->c->t->pending_messages--;
+    }
+  }
 
   GNUNET_free (queue);
 }
@@ -3928,28 +4239,17 @@
 queue_send (void *cls, size_t size, void *buf)
 {
   struct MeshPeer *peer = cls;
-  const struct GNUNET_PeerIdentity *dst_id;
+  struct MeshFlowControl *fc;
+  struct MeshConnection *c;
   struct GNUNET_MessageHeader *msg;
   struct MeshPeerQueue *queue;
   struct MeshTunnel2 *t;
-  struct MeshFlowControl *fc;
-  struct MeshConnection *c;
+  const struct GNUNET_PeerIdentity *dst_id;
   size_t data_size;
   uint32_t pid;
   uint16_t type;
   int fwd;
 
-  c = queue->c;
-  fwd = (queue->peer == connection_get_next_hop (c));
-  fc = fwd ? &c->fwd_fc : &c->bck_fc;
-
-  if (NULL == fc)
-  {
-    GNUNET_break (0);
-    return 0;
-  }
-  fc->core_transmit = NULL;
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n");
 
   if (NULL == buf || 0 == size)
@@ -3957,22 +4257,27 @@
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Buffer size 0.\n");
     return 0;
   }
-  queue = fc->queue_head;
 
-  /* Queue has no traffic */
+  /* Initialize */
+  queue = peer_get_first_message (peer);
   if (NULL == queue)
   {
     GNUNET_break (0); /* Core tmt_rdy should've been canceled */
     return 0;
   }
+  queue->peer->core_transmit = NULL;
+  c = queue->c;
+  fwd = queue->fwd;
+  fc = fwd ? &c->fwd_fc : &c->bck_fc;
 
+
   dst_id = GNUNET_PEER_resolve2 (peer->id);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   towards %s\n", GNUNET_i2s 
(dst_id));
   /* Check if buffer size is enough for the message */
   if (queue->size > size)
   {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   not enough room, reissue\n");
-      fc->core_transmit =
+      peer->core_transmit =
           GNUNET_CORE_notify_transmit_ready (core_handle,
                                              GNUNET_NO,
                                              0,
@@ -4028,8 +4333,6 @@
       data_size = 0;
   }
 
-  fc->queue_n--;
-
   if (0 < drop_percent &&
       GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK, 101) < drop_percent)
   {
@@ -4055,12 +4358,12 @@
   }
 
   /* If more data in queue, send next */
-  queue = fc->queue_head;
+  queue = peer_get_first_message (peer);
   if (NULL != queue)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "*   more data!\n");
-    if (NULL == fc->core_transmit) {
-      fc->core_transmit =
+    if (NULL == peer->core_transmit) {
+      peer->core_transmit =
           GNUNET_CORE_notify_transmit_ready(core_handle,
                                             0,
                                             0,
@@ -4156,7 +4459,6 @@
     return; /* Drop this message */
   }
 
-  fc->queue_n++;
   if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) &&
       GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
     fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
@@ -4169,14 +4471,15 @@
   queue->peer = dst;
   queue->c = c;
   queue->ch = ch;
+  queue->fwd = fwd;
   if (100 <= priority)
-    GNUNET_CONTAINER_DLL_insert (fc->queue_head, fc->queue_tail, queue);
+    GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
   else
-    GNUNET_CONTAINER_DLL_insert_tail (fc->queue_head, fc->queue_tail, queue);
+    GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
 
-  if (NULL == fc->core_transmit)
+  if (NULL == dst->core_transmit)
   {
-    fc->core_transmit =
+    dst->core_transmit =
         GNUNET_CORE_notify_transmit_ready (core_handle,
                                            0,
                                            0,
@@ -4188,6 +4491,8 @@
   }
   c->pending_messages++;
   c->t->pending_messages++;
+  fc->queue_n++;
+  dst->queue_n++;
 }
 
 
@@ -4829,7 +5134,7 @@
   fc = fwd ? &c->fwd_fc : &c->bck_fc;
 
   /* Check if origin is as expected */
-  neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
+  neighbor = connection_get_hop (c, fwd);
   if (peer_get (peer)->id != neighbor->id)
   {
     GNUNET_break_op (0);
@@ -5145,7 +5450,7 @@
         GNUNET_YES : GNUNET_NO;
 
   /* Check if origin is as expected */
-  neighbor = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
+  neighbor = connection_get_hop (c, fwd);
   if (peer_get (peer)->id != neighbor->id)
   {
     GNUNET_break_op (0);
@@ -5651,7 +5956,6 @@
       copy = GNUNET_malloc (sizeof (struct MeshReliableMessage)
                             + sizeof(struct GNUNET_MESH_Data)
                             + size);
-
       copy->mid = *mid;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "!!! DATA %u\n", copy->mid);
       copy->timestamp = GNUNET_TIME_absolute_get ();
@@ -5685,11 +5989,10 @@
     payload->chid = htonl (ch->gid);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "  calling generic handler...\n");
-    if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
-      handle_data (ch->t, payload, GNUNET_YES);
-    else
-      handle_data (ch->t, payload, GNUNET_NO);
+    send_prebuilt_message_channel (&payload->header, ch, fwd);
   }
+  if (tunnel_get_buffer (ch->t, fwd) > 0)
+    send_local_ack (ch, c, fwd);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "receive done OK\n");
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
@@ -5709,11 +6012,14 @@
                   const struct GNUNET_MessageHeader *message)
 {
   struct GNUNET_MESH_LocalAck *msg;
+  struct MeshChannelReliability *rel;
   struct MeshChannel *ch;
   struct MeshClient *c;
   MESH_ChannelNumber chid;
+  int fwd;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got a local ACK\n");
+
   /* Sanity check for client registration */
   if (NULL == (c = client_get (client)))
   {
@@ -5725,7 +6031,7 @@
 
   msg = (struct GNUNET_MESH_LocalAck *) message;
 
-  /* Tunnel exists? */
+  /* Channel exists? */
   chid = ntohl (msg->channel_id);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  on channel %X\n", chid);
   ch = channel_get_by_local_id (c, chid);
@@ -5738,20 +6044,13 @@
     return;
   }
 
-  /* Does client own tunnel? I.E: Is this an ACK for BCK traffic? */
-  if (chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV)
-  {
-    /* The client owns the channel, ACK is for data to_origin, send BCK ACK. */
-    ch->prev_fc.last_ack_recv++;
-    tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_NO);
-  }
-  else
-  {
-    /* The client doesn't own the channel, this ACK is for FWD traffic. */
-    t->next_fc.last_ack_recv++;
-    tunnel_send_ack (t, GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK, GNUNET_YES);
-  }
+  fwd = chid < GNUNET_MESH_LOCAL_CHANNEL_ID_SERV;
+  rel = fwd ? ch->fwd_rel : ch->bck_rel;
 
+  rel->client_ready = GNUNET_YES;
+  channel_send_client_buffered_data (ch, c, rel);
+  channel_send_ack (ch, 64 - rel->n_recv, fwd);
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
   return;
@@ -5777,7 +6076,7 @@
   struct GNUNET_MESH_LocalMonitor *msg;
 
   msg = GNUNET_malloc (sizeof(struct GNUNET_MESH_LocalMonitor));
-  msg->channel_id = htonl (ch->id);
+  msg->channel_id = htonl (ch->gid);
   msg->header.size = htons (sizeof (struct GNUNET_MESH_LocalMonitor));
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_INFO_TUNNELS);
 
@@ -5855,7 +6154,8 @@
               c->id,
               &msg->owner,
               ntohl (msg->channel_id));
-  ch = channel_get (&msg->owner, ntohl (msg->channel_id));
+//   ch = channel_get (&msg->owner, ntohl (msg->channel_id));
+  ch = NULL; // FIXME
   if (NULL == ch)
   {
     /* We don't know the tunnel */
@@ -5920,13 +6220,13 @@
 static void
 core_connect (void *cls, const struct GNUNET_PeerIdentity *peer)
 {
-  struct MeshPeer *peer_info;
+  struct MeshPeer *pi;
   struct MeshPeerPath *path;
 
   DEBUG_CONN ("Peer connected\n");
   DEBUG_CONN ("     %s\n", GNUNET_i2s (&my_full_id));
-  peer_info = peer_get (peer);
-  if (myid == peer_info->id)
+  pi = peer_get (peer);
+  if (myid == pi->id)
   {
     DEBUG_CONN ("     (self)\n");
     path = path_new (1);
@@ -5935,13 +6235,15 @@
   {
     DEBUG_CONN ("     %s\n", GNUNET_i2s (peer));
     path = path_new (2);
-    path->peers[1] = peer_info->id;
-    GNUNET_PEER_change_rc (peer_info->id, 1);
+    path->peers[1] = pi->id;
+    GNUNET_PEER_change_rc (pi->id, 1);
     GNUNET_STATISTICS_update (stats, "# peers", 1, GNUNET_NO);
   }
   path->peers[0] = myid;
   GNUNET_PEER_change_rc (myid, 1);
-  peer_add_path (peer_info, path, GNUNET_YES);
+  peer_add_path (pi, path, GNUNET_YES);
+
+  pi->connections = GNUNET_CONTAINER_multihashmap_create (32, GNUNET_YES);
   return;
 }
 
@@ -5956,9 +6258,6 @@
 core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
 {
   struct MeshPeer *pi;
-  struct MeshPeerQueue *q;
-  struct MeshPeerQueue *n;
-  struct MeshFlowControl *fc;
 
   DEBUG_CONN ("Peer disconnected\n");
   pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey);
@@ -5967,33 +6266,19 @@
     GNUNET_break (0);
     return;
   }
-  fc = pi->fc;
-  if (NULL != fc)
-  {
-    GNUNET_break (0);
-    return;
-  }
-  pi->fc = NULL;
 
-  q = fc->queue_head;
-  while (NULL != q)
-  {
-      n = q->next;
-      queue_destroy (q, GNUNET_YES);
-      q = n;
-  }
-  if (NULL != fc->core_transmit)
-    GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
-  if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
-    GNUNET_SCHEDULER_cancel (fc->poll_task);
+  peer_remove_path (pi, myid, pi->id);
 
-  peer_remove_path (pi, pi->id, myid);
+  GNUNET_CONTAINER_multihashmap_iterate (pi->connections,
+                                         connection_broken,
+                                         pi);
+  GNUNET_CONTAINER_multihashmap_destroy (pi->connections);
+  pi->connections = NULL;
   if (myid == pi->id)
   {
     DEBUG_CONN ("     (self)\n");
   }
   GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
-  GNUNET_free (fc);
 
   return;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]