gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r35329 - gnunet/src/transport


From: gnunet
Subject: [GNUnet-SVN] r35329 - gnunet/src/transport
Date: Sun, 1 Mar 2015 15:32:34 +0100

Author: grothoff
Date: 2015-03-01 15:32:34 +0100 (Sun, 01 Mar 2015)
New Revision: 35329

Modified:
   gnunet/src/transport/plugin_transport_udp.c
   gnunet/src/transport/plugin_transport_udp.h
   gnunet/src/transport/plugin_transport_udp_broadcasting.c
Log:
-simplifying event loop for UDP, separting v4/v6 for better performance (in 
theory at least)

Modified: gnunet/src/transport/plugin_transport_udp.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp.c 2015-03-01 13:55:15 UTC (rev 
35328)
+++ gnunet/src/transport/plugin_transport_udp.c 2015-03-01 14:32:34 UTC (rev 
35329)
@@ -68,6 +68,29 @@
 
 
 /**
+ * UDP Message-Packet header (after defragmentation).
+ */
+struct UDPMessage
+{
+  /**
+   * Message header.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Always zero for now.
+   */
+  uint32_t reserved;
+
+  /**
+   * What is the identity of the sender
+   */
+  struct GNUNET_PeerIdentity sender;
+
+};
+
+
+/**
  * Closure for #append_port().
  */
 struct PrettyPrinterContext
@@ -213,33 +236,7 @@
 };
 
 
-/**
- * Closure for #find_receive_context().
- */
-struct FindReceiveContext
-{
-  /**
-   * Where to store the result.
-   */
-  struct DefragContext *rc;
 
-  /**
-   * Session associated with this context.
-   */
-  struct Session *session;
-
-  /**
-   * Address to find.
-   */
-  const union UdpAddress *udp_addr;
-
-  /**
-   * Number of bytes in @e udp_addr.
-   */
-  size_t udp_addr_len;
-
-};
-
 /**
  * Data structure to track defragmentation contexts based
  * on the source of the UDP traffic.
@@ -263,6 +260,12 @@
   struct GNUNET_CONTAINER_HeapNode *hnode;
 
   /**
+   * Source address this receive context is for (allocated at the
+   * end of the struct).
+   */
+  const union UdpAddress *udp_addr;
+
+  /**
    * Who's message(s) are we defragmenting here?
    * Only initialized once we succeeded and
    * @e have_sender is set.
@@ -270,12 +273,6 @@
   struct GNUNET_PeerIdentity sender;
 
   /**
-   * Source address this receive context is for (allocated at the
-   * end of the struct).
-   */
-  const union UdpAddress *udp_addr;
-
-  /**
    * Length of @e udp_addr.
    */
   size_t udp_addr_len;
@@ -313,7 +310,7 @@
   struct Plugin *plugin;
 
   /**
-   * Handle for GNUNET_FRAGMENT context
+   * Handle for fragmentation.
    */
   struct GNUNET_FRAGMENT_Context *frag;
 
@@ -347,11 +344,6 @@
    */
   size_t on_wire_size;
 
-  /**
-   * FIXME.
-   */
-  unsigned int fragments_used;
-
 };
 
 
@@ -366,22 +358,26 @@
   UMT_UNDEFINED = 0,
 
   /**
-   * Fragment of a message.
+   * This queue entry represents a fragment of a message.
    */
   UMT_MSG_FRAGMENTED = 1,
 
   /**
-   *
+   * This queue entry does not include a message, but merely
+   * represents that we finished sending a fragmented message
+   * (all fragments confirmed, or timeout).
    */
   UMT_MSG_FRAGMENTED_COMPLETE = 2,
 
   /**
-   * Unfragmented message.
+   * This queue entry represents a unfragmented message
+   * (was small enough to not require fragmentation).
    */
   UMT_MSG_UNFRAGMENTED = 3,
 
   /**
-   * Receipt confirmation.
+   * This queue entry represents the acknowledgement of us
+   * receiving a fragment.
    */
   UMT_MSG_ACK = 4
 
@@ -399,24 +395,22 @@
   struct Session *session;
 
   /**
-   * DLL of messages
-   * previous element
+   * DLL of messages, previous element
    */
   struct UDP_MessageWrapper *prev;
 
   /**
-   * DLL of messages
-   * previous element
+   * DLL of messages, next element
    */
   struct UDP_MessageWrapper *next;
 
   /**
-   * Message with size msg_size including UDP specific overhead
+   * Message with @e msg_size bytes including UDP-specific overhead.
    */
   char *msg_buf;
 
   /**
-   * Function to call upon completion of the transmission.
+   * Function to call upon completion of the transmission, can be NULL.
    */
   GNUNET_TRANSPORT_TransmitContinuation cont;
 
@@ -426,29 +420,29 @@
   void *cont_cls;
 
   /**
-   * Fragmentation context
+   * Fragmentation context.
    * frag_ctx == NULL if transport <= MTU
    * frag_ctx != NULL if transport > MTU
    */
   struct UDP_FragmentationContext *frag_ctx;
 
   /**
-   * Message timeout
+   * Message timeout.
    */
   struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * Size of UDP message to send including UDP specific overhead
+   * Size of UDP message to send, including UDP-specific overhead.
    */
   size_t msg_size;
 
   /**
-   * Payload size of original message
+   * Payload size of original message.
    */
   size_t payload_size;
 
   /**
-   * Message type
+   * Message type (what does this entry in the queue represent).
    */
   enum UDP_MessageType msg_type;
 
@@ -466,7 +460,7 @@
   struct GNUNET_MessageHeader header;
 
   /**
-   * Desired delay for flow control
+   * Desired delay for flow control, in us (in NBO).
    */
   uint32_t delay;
 
@@ -497,7 +491,9 @@
     return;
   if (GNUNET_YES == session->in_destroy)
     return; /* already destroyed, just RC>0 left-over actions */
-  memset (&info, 0, sizeof (info));
+  memset (&info,
+          0,
+          sizeof (info));
   info.state = state;
   info.is_inbound = GNUNET_SYSERR; /* hard to say */
   info.num_msg_pending = session->msgs_in_queue;
@@ -522,8 +518,8 @@
  * @param tc the scheduling context (for rescheduling this function again)
  */
 static void
-udp_plugin_select (void *cls,
-                   const struct GNUNET_SCHEDULER_TaskContext *tc);
+udp_plugin_select_v4 (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
 /**
@@ -540,55 +536,61 @@
 
 
 /**
- * (re)schedule select tasks for this plugin.
+ * (re)schedule IPv4-select tasks for this plugin.
  *
  * @param plugin plugin to reschedule
  */
 static void
-schedule_select (struct Plugin *plugin)
+schedule_select_v4 (struct Plugin *plugin)
 {
   struct GNUNET_TIME_Relative min_delay;
   struct UDP_MessageWrapper *udpw;
 
-  if ((GNUNET_YES == plugin->enable_ipv4) && (NULL != plugin->sockv4))
+  if ( (GNUNET_YES == plugin->enable_ipv4) &&
+       (NULL != plugin->sockv4) )
   {
     /* Find a message ready to send:
      * Flow delay from other peer is expired or not set (0) */
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
     for (udpw = plugin->ipv4_queue_head; NULL != udpw; udpw = udpw->next)
       min_delay = GNUNET_TIME_relative_min (min_delay,
-          GNUNET_TIME_absolute_get_remaining (
-              udpw->session->flow_delay_from_other_peer));
+                                            GNUNET_TIME_absolute_get_remaining 
(udpw->session->flow_delay_from_other_peer));
+    if (NULL != plugin->select_task_v4)
+      GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+    plugin->select_task_v4
+      = GNUNET_SCHEDULER_add_read_net (min_delay,
+                                       plugin->sockv4,
+                                       &udp_plugin_select_v4,
+                                       plugin);
+  }
+}
 
-    if (plugin->select_task != NULL )
-      GNUNET_SCHEDULER_cancel (plugin->select_task);
 
-    /* Schedule with:
-     * - write active set if message is ready
-     * - timeout minimum delay */
-    plugin->select_task = GNUNET_SCHEDULER_add_select (
-        GNUNET_SCHEDULER_PRIORITY_DEFAULT,
-        (0 == min_delay.rel_value_us) ?
-            GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v4,
-        (0 == min_delay.rel_value_us) ? plugin->ws_v4 : NULL,
-        &udp_plugin_select, plugin);
-  }
-  if ((GNUNET_YES == plugin->enable_ipv6) && (NULL != plugin->sockv6))
+/**
+ * (re)schedule IPv6-select tasks for this plugin.
+ *
+ * @param plugin plugin to reschedule
+ */
+static void
+schedule_select_v6 (struct Plugin *plugin)
+{
+  struct GNUNET_TIME_Relative min_delay;
+  struct UDP_MessageWrapper *udpw;
+
+  if ( (GNUNET_YES == plugin->enable_ipv6) &&
+       (NULL != plugin->sockv6) )
   {
     min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
     for (udpw = plugin->ipv6_queue_head; NULL != udpw; udpw = udpw->next)
       min_delay = GNUNET_TIME_relative_min (min_delay,
-          GNUNET_TIME_absolute_get_remaining (
-              udpw->session->flow_delay_from_other_peer));
-
+                                            GNUNET_TIME_absolute_get_remaining 
(udpw->session->flow_delay_from_other_peer));
     if (NULL != plugin->select_task_v6)
       GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
-    plugin->select_task_v6 = GNUNET_SCHEDULER_add_select (
-        GNUNET_SCHEDULER_PRIORITY_DEFAULT,
-        (0 == min_delay.rel_value_us) ?
-            GNUNET_TIME_UNIT_FOREVER_REL : min_delay, plugin->rs_v6,
-        (0 == min_delay.rel_value_us) ? plugin->ws_v6 : NULL,
-        &udp_plugin_select_v6, plugin);
+    plugin->select_task_v6
+      = GNUNET_SCHEDULER_add_read_net (min_delay,
+                                       plugin->sockv6,
+                                       &udp_plugin_select_v6,
+                                       plugin);
   }
 }
 
@@ -1347,6 +1349,34 @@
 
 
 /**
+ * Closure for #find_receive_context().
+ */
+struct FindReceiveContext
+{
+  /**
+   * Where to store the result.
+   */
+  struct DefragContext *rc;
+
+  /**
+   * Session associated with this context.
+   */
+  struct Session *session;
+
+  /**
+   * Address to find.
+   */
+  const union UdpAddress *udp_addr;
+
+  /**
+   * Number of bytes in @e udp_addr.
+   */
+  size_t udp_addr_len;
+
+};
+
+
+/**
  * Scan the heap for a receive context with the given address.
  *
  * @param cls the `struct FindReceiveContext`
@@ -1979,13 +2009,12 @@
 {
   struct UDP_FragmentationContext *frag_ctx = cls;
   struct Plugin *plugin = frag_ctx->plugin;
-  struct UDP_MessageWrapper * udpw;
+  struct UDP_MessageWrapper *udpw;
   size_t msg_len = ntohs (msg->size);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Enqueuing fragment with %u bytes\n",
        msg_len);
-  frag_ctx->fragments_used++;
   udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + msg_len);
   udpw->session = frag_ctx->session;
   udpw->msg_buf = (char *) &udpw[1];
@@ -1997,8 +2026,12 @@
   udpw->frag_ctx = frag_ctx;
   udpw->msg_type = UMT_MSG_FRAGMENTED;
   memcpy (udpw->msg_buf, msg, msg_len);
-  enqueue (plugin, udpw);
-  schedule_select (plugin);
+  enqueue (plugin,
+           udpw);
+  if (udpw->session->address->address_length == sizeof (struct IPv4UdpAddress))
+    schedule_select_v4 (plugin);
+  else
+    schedule_select_v6 (plugin);
 }
 
 
@@ -2151,7 +2184,10 @@
   notify_session_monitor (s->plugin,
                           s,
                           GNUNET_TRANSPORT_SS_UPDATE);
-  schedule_select (plugin);
+  if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+    schedule_select_v4 (plugin);
+  else
+    schedule_select_v6 (plugin);
   return udpmlen;
 }
 
@@ -2334,7 +2370,7 @@
 fragment_msg_proc (void *cls,
                    const struct GNUNET_MessageHeader *msg)
 {
-  struct DefragContext *rc = cls;
+  struct DefragContext *dc = cls;
   const struct UDPMessage *um;
 
   if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_TRANSPORT_UDP_MESSAGE)
@@ -2348,13 +2384,13 @@
     return;
   }
   um = (const struct UDPMessage *) msg;
-  rc->sender = um->sender;
-  rc->have_sender = GNUNET_YES;
-  process_udp_message (rc->plugin,
+  dc->sender = um->sender;
+  dc->have_sender = GNUNET_YES;
+  process_udp_message (dc->plugin,
                        um,
-                       rc->udp_addr,
-                       rc->udp_addr_len,
-                       rc->network_type);
+                       dc->udp_addr,
+                       dc->udp_addr_len,
+                       dc->network_type);
 }
 
 
@@ -2373,7 +2409,7 @@
   struct DefragContext *rc = cls;
   size_t msize = sizeof(struct UDP_ACK_Message) + ntohs (msg->size);
   struct UDP_ACK_Message *udp_ack;
-  uint32_t delay = 0;
+  uint32_t delay;
   struct UDP_MessageWrapper *udpw;
   struct Session *s;
   struct GNUNET_HELLO_Address *address;
@@ -2406,6 +2442,8 @@
   }
   if (s->flow_delay_for_other_peer.rel_value_us <= UINT32_MAX)
     delay = s->flow_delay_for_other_peer.rel_value_us;
+  else
+    delay = UINT32_MAX;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending ACK to `%s' including delay of %s\n",
@@ -2431,7 +2469,10 @@
   notify_session_monitor (s->plugin,
                           s,
                           GNUNET_TRANSPORT_SS_UPDATE);
-  schedule_select (rc->plugin);
+  if (s->address->address_length == sizeof (struct IPv4UdpAddress))
+    schedule_select_v4 (rc->plugin);
+  else
+    schedule_select_v6 (rc->plugin);
 }
 
 
@@ -2940,7 +2981,8 @@
 static void
 analyze_send_error (struct Plugin *plugin,
                     const struct sockaddr *sa,
-                    socklen_t slen, int error)
+                    socklen_t slen,
+                    int error)
 {
   enum GNUNET_ATS_Network_Type type;
 
@@ -2989,9 +3031,8 @@
  *
  * @param plugin the plugin
  * @param sock which socket (v4/v6) to send on
- * @return number of bytes transmitted, #GNUNET_SYSERR on failure
  */
-static size_t
+static void
 udp_select_send (struct Plugin *plugin,
                  struct GNUNET_NETWORK_Handle *sock)
 {
@@ -3004,103 +3045,102 @@
   struct sockaddr_in6 a6;
   struct UDP_MessageWrapper *udpw;
 
-  /* Find message to send */
-  udpw = remove_timeout_messages_and_select (plugin,
-                                             sock);
-  if (NULL == udpw)
-    return 0; /* No message to send */
-
-  if (sizeof (struct IPv4UdpAddress) == udpw->session->address->address_length)
+  /* Find message(s) to send */
+  while (NULL != (udpw = remove_timeout_messages_and_select (plugin,
+                                                             sock)))
   {
-    u4 = udpw->session->address->address;
-    memset (&a4, 0, sizeof(a4));
-    a4.sin_family = AF_INET;
+    if (sizeof (struct IPv4UdpAddress) == 
udpw->session->address->address_length)
+    {
+      u4 = udpw->session->address->address;
+      memset (&a4, 0, sizeof(a4));
+      a4.sin_family = AF_INET;
 #if HAVE_SOCKADDR_IN_SIN_LEN
-    a4.sin_len = sizeof (a4);
+      a4.sin_len = sizeof (a4);
 #endif
-    a4.sin_port = u4->u4_port;
-    memcpy (&a4.sin_addr,
-           &u4->ipv4_addr,
-           sizeof(struct in_addr));
-    a = (struct sockaddr *) &a4;
-    slen = sizeof (a4);
-  }
-  else if (sizeof (struct IPv6UdpAddress) == 
udpw->session->address->address_length)
-  {
-    u6 = udpw->session->address->address;
-    memset (&a6, 0, sizeof(a6));
-    a6.sin6_family = AF_INET6;
+      a4.sin_port = u4->u4_port;
+      memcpy (&a4.sin_addr,
+              &u4->ipv4_addr,
+              sizeof(struct in_addr));
+      a = (struct sockaddr *) &a4;
+      slen = sizeof (a4);
+    }
+    else if (sizeof (struct IPv6UdpAddress) == 
udpw->session->address->address_length)
+    {
+      u6 = udpw->session->address->address;
+      memset (&a6, 0, sizeof(a6));
+      a6.sin6_family = AF_INET6;
 #if HAVE_SOCKADDR_IN_SIN_LEN
-    a6.sin6_len = sizeof (a6);
+      a6.sin6_len = sizeof (a6);
 #endif
-    a6.sin6_port = u6->u6_port;
-    memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
-    a = (struct sockaddr *) &a6;
-    slen = sizeof (a6);
-  }
-  else
-  {
-    call_continuation (udpw,
-                      GNUNET_OK);
+      a6.sin6_port = u6->u6_port;
+      memcpy (&a6.sin6_addr, &u6->ipv6_addr, sizeof(struct in6_addr));
+      a = (struct sockaddr *) &a6;
+      slen = sizeof (a6);
+    }
+    else
+    {
+      call_continuation (udpw,
+                         GNUNET_OK);
+      dequeue (plugin,
+               udpw);
+      notify_session_monitor (plugin,
+                              udpw->session,
+                              GNUNET_TRANSPORT_SS_UPDATE);
+      GNUNET_free (udpw);
+      continue;
+    }
+    sent = GNUNET_NETWORK_socket_sendto (sock,
+                                         udpw->msg_buf,
+                                         udpw->msg_size,
+                                         a,
+                                         slen);
+    if (GNUNET_SYSERR == sent)
+    {
+      /* Failure */
+      analyze_send_error (plugin,
+                          a,
+                          slen,
+                          errno);
+      call_continuation (udpw,
+                         GNUNET_SYSERR);
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                                "# UDP, total, bytes, sent, failure",
+                                sent,
+                                GNUNET_NO);
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                                "# UDP, total, messages, sent, failure",
+                                1,
+                                GNUNET_NO);
+    }
+    else
+    {
+      /* Success */
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
+           (unsigned int) (udpw->msg_size),
+           GNUNET_i2s (&udpw->session->target),
+           GNUNET_a2s (a, slen),
+           (int ) sent,
+           (sent < 0) ? STRERROR (errno) : "ok");
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                                "# UDP, total, bytes, sent, success",
+                                sent,
+                                GNUNET_NO);
+      GNUNET_STATISTICS_update (plugin->env->stats,
+                                "# UDP, total, messages, sent, success",
+                                1,
+                                GNUNET_NO);
+      if (NULL != udpw->frag_ctx)
+        udpw->frag_ctx->on_wire_size += udpw->msg_size;
+      call_continuation (udpw, GNUNET_OK);
+    }
     dequeue (plugin,
-            udpw);
+             udpw);
     notify_session_monitor (plugin,
                             udpw->session,
                             GNUNET_TRANSPORT_SS_UPDATE);
     GNUNET_free (udpw);
-    return GNUNET_SYSERR;
   }
-  sent = GNUNET_NETWORK_socket_sendto (sock,
-                                       udpw->msg_buf,
-                                       udpw->msg_size,
-                                       a,
-                                       slen);
-  if (GNUNET_SYSERR == sent)
-  {
-    /* Failure */
-    analyze_send_error (plugin,
-                       a,
-                       slen,
-                       errno);
-    call_continuation (udpw,
-                      GNUNET_SYSERR);
-    GNUNET_STATISTICS_update (plugin->env->stats,
-                             "# UDP, total, bytes, sent, failure",
-                             sent,
-                             GNUNET_NO);
-    GNUNET_STATISTICS_update (plugin->env->stats,
-                             "# UDP, total, messages, sent, failure",
-                             1,
-                             GNUNET_NO);
-  }
-  else
-  {
-    /* Success */
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "UDP transmitted %u-byte message to  `%s' `%s' (%d: %s)\n",
-         (unsigned int) (udpw->msg_size),
-         GNUNET_i2s (&udpw->session->target),
-         GNUNET_a2s (a, slen),
-         (int ) sent,
-         (sent < 0) ? STRERROR (errno) : "ok");
-    GNUNET_STATISTICS_update (plugin->env->stats,
-                              "# UDP, total, bytes, sent, success",
-                              sent,
-                              GNUNET_NO);
-    GNUNET_STATISTICS_update (plugin->env->stats,
-                              "# UDP, total, messages, sent, success",
-                              1,
-                              GNUNET_NO);
-    if (NULL != udpw->frag_ctx)
-      udpw->frag_ctx->on_wire_size += udpw->msg_size;
-    call_continuation (udpw, GNUNET_OK);
-  }
-  dequeue (plugin, udpw);
-  notify_session_monitor (plugin,
-                          udpw->session,
-                          GNUNET_TRANSPORT_SS_UPDATE);
-  GNUNET_free (udpw);
-  return sent;
 }
 
 
@@ -3110,26 +3150,26 @@
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
+ * @param tc the scheduling context
  */
 static void
-udp_plugin_select (void *cls,
-                   const struct GNUNET_SCHEDULER_TaskContext *tc)
+udp_plugin_select_v4 (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct Plugin *plugin = cls;
 
-  plugin->select_task = NULL;
+  plugin->select_task_v4 = NULL;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY))
-      && (NULL != plugin->sockv4)
-      && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv4)))
-    udp_select_read (plugin, plugin->sockv4);
-  if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
-      && (NULL != plugin->sockv4) && (NULL != plugin->ipv4_queue_head)
-      && (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv4)))
-    udp_select_send (plugin, plugin->sockv4);
-  schedule_select (plugin);
+  if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
+      (NULL != plugin->sockv4) &&
+      (GNUNET_NETWORK_fdset_isset (tc->read_ready,
+                                   plugin->sockv4)))
+    udp_select_read (plugin,
+                     plugin->sockv4);
+  udp_select_send (plugin,
+                   plugin->sockv4);
+  schedule_select_v4 (plugin);
 }
 
 
@@ -3139,7 +3179,7 @@
  * Then reschedule this function to be called again once more is available.
  *
  * @param cls the plugin handle
- * @param tc the scheduling context (for rescheduling this function again)
+ * @param tc the scheduling context
  */
 static void
 udp_plugin_select_v6 (void *cls,
@@ -3150,14 +3190,15 @@
   plugin->select_task_v6 = NULL;
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if (((tc->reason & GNUNET_SCHEDULER_REASON_READ_READY) != 0)
-      && (NULL != plugin->sockv6)
-      && (GNUNET_NETWORK_fdset_isset (tc->read_ready, plugin->sockv6)))
-    udp_select_read (plugin, plugin->sockv6);
-  if ((0 != (tc->reason & GNUNET_SCHEDULER_REASON_WRITE_READY))
-      && (NULL != plugin->sockv6) && (plugin->ipv6_queue_head != NULL )&&
-      (GNUNET_NETWORK_fdset_isset (tc->write_ready, plugin->sockv6)) 
)udp_select_send (plugin, plugin->sockv6);
-  schedule_select (plugin);
+  if ( (0 != (tc->reason & GNUNET_SCHEDULER_REASON_READ_READY)) &&
+       (NULL != plugin->sockv6) &&
+       (GNUNET_NETWORK_fdset_isset (tc->read_ready,
+                                    plugin->sockv6)) )
+    udp_select_read (plugin,
+                     plugin->sockv6);
+  udp_select_send (plugin,
+                   plugin->sockv6);
+  schedule_select_v6 (plugin);
 }
 
 
@@ -3365,35 +3406,8 @@
          _("Failed to open UDP sockets\n"));
     return 0; /* No sockets created, return */
   }
-
-  /* Create file descriptors */
-  if (plugin->enable_ipv4 == GNUNET_YES)
-  {
-    plugin->rs_v4 = GNUNET_NETWORK_fdset_create ();
-    plugin->ws_v4 = GNUNET_NETWORK_fdset_create ();
-    GNUNET_NETWORK_fdset_zero (plugin->rs_v4);
-    GNUNET_NETWORK_fdset_zero (plugin->ws_v4);
-    if (NULL != plugin->sockv4)
-    {
-      GNUNET_NETWORK_fdset_set (plugin->rs_v4, plugin->sockv4);
-      GNUNET_NETWORK_fdset_set (plugin->ws_v4, plugin->sockv4);
-    }
-  }
-
-  if (plugin->enable_ipv6 == GNUNET_YES)
-  {
-    plugin->rs_v6 = GNUNET_NETWORK_fdset_create ();
-    plugin->ws_v6 = GNUNET_NETWORK_fdset_create ();
-    GNUNET_NETWORK_fdset_zero (plugin->rs_v6);
-    GNUNET_NETWORK_fdset_zero (plugin->ws_v6);
-    if (NULL != plugin->sockv6)
-    {
-      GNUNET_NETWORK_fdset_set (plugin->rs_v6, plugin->sockv6);
-      GNUNET_NETWORK_fdset_set (plugin->ws_v6, plugin->sockv6);
-    }
-  }
-
-  schedule_select (plugin);
+  schedule_select_v4 (plugin);
+  schedule_select_v6 (plugin);
   plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
                                      GNUNET_NO,
                                      plugin->port,
@@ -3719,12 +3733,12 @@
     return NULL;
   }
   stop_broadcast (plugin);
-  if (plugin->select_task != NULL)
+  if (NULL != plugin->select_task_v4)
   {
-    GNUNET_SCHEDULER_cancel (plugin->select_task);
-    plugin->select_task = NULL;
+    GNUNET_SCHEDULER_cancel (plugin->select_task_v4);
+    plugin->select_task_v4 = NULL;
   }
-  if (plugin->select_task_v6 != NULL)
+  if (NULL != plugin->select_task_v6)
   {
     GNUNET_SCHEDULER_cancel (plugin->select_task_v6);
     plugin->select_task_v6 = NULL;
@@ -3739,8 +3753,6 @@
                     GNUNET_NETWORK_socket_close (plugin->sockv4));
       plugin->sockv4 = NULL;
     }
-    GNUNET_NETWORK_fdset_destroy (plugin->rs_v4);
-    GNUNET_NETWORK_fdset_destroy (plugin->ws_v4);
   }
   if (GNUNET_YES == plugin->enable_ipv6)
   {
@@ -3749,9 +3761,6 @@
       GNUNET_break (GNUNET_OK ==
                     GNUNET_NETWORK_socket_close (plugin->sockv6));
       plugin->sockv6 = NULL;
-
-      GNUNET_NETWORK_fdset_destroy (plugin->rs_v6);
-      GNUNET_NETWORK_fdset_destroy (plugin->ws_v6);
     }
   }
   if (NULL != plugin->nat)

Modified: gnunet/src/transport/plugin_transport_udp.h
===================================================================
--- gnunet/src/transport/plugin_transport_udp.h 2015-03-01 13:55:15 UTC (rev 
35328)
+++ gnunet/src/transport/plugin_transport_udp.h 2015-03-01 14:32:34 UTC (rev 
35329)
@@ -121,29 +121,14 @@
 
 
 /**
- * UDP Message-Packet header (after defragmentation).
+ * Information we track for each message in the queue.
  */
-struct UDPMessage
-{
-  /**
-   * Message header.
-   */
-  struct GNUNET_MessageHeader header;
+struct UDP_MessageWrapper;
 
-  /**
-   * Always zero for now.
-   */
-  uint32_t reserved;
 
-  /**
-   * What is the identity of the sender
-   */
-  struct GNUNET_PeerIdentity sender;
-
-};
-
-struct UDP_MessageWrapper;
-
+/**
+ * Closure for #append_port().
+ */
 struct PrettyPrinterContext;
 
 
@@ -172,7 +157,7 @@
   /**
    * ID of select task for IPv4
    */
-  struct GNUNET_SCHEDULER_Task *select_task;
+  struct GNUNET_SCHEDULER_Task *select_task_v4;
 
   /**
    * ID of select task for IPv6
@@ -205,31 +190,11 @@
   struct GNUNET_NAT_Handle *nat;
 
   /**
-   * FD Read set
-   */
-  struct GNUNET_NETWORK_FDSet *rs_v4;
-
-  /**
-   * FD Write set
-   */
-  struct GNUNET_NETWORK_FDSet *ws_v4;
-
-  /**
    * The read socket for IPv4
    */
   struct GNUNET_NETWORK_Handle *sockv4;
 
   /**
-   * FD Read set
-   */
-  struct GNUNET_NETWORK_FDSet *rs_v6;
-
-  /**
-   * FD Write set
-   */
-  struct GNUNET_NETWORK_FDSet *ws_v6;
-
-  /**
    * The read socket for IPv6
    */
   struct GNUNET_NETWORK_Handle *sockv6;
@@ -347,6 +312,17 @@
 };
 
 
+/**
+ * Function called for a quick conversion of the binary address to
+ * a numeric address.  Note that the caller must not free the
+ * address and that the next call to this function is allowed
+ * to override the address again.
+ *
+ * @param cls closure
+ * @param addr binary address (a `union UdpAddress`)
+ * @param addrlen length of the @a addr
+ * @return string representing the same address
+ */
 const char *
 udp_address_to_string (void *cls,
                        const void *addr,

Modified: gnunet/src/transport/plugin_transport_udp_broadcasting.c
===================================================================
--- gnunet/src/transport/plugin_transport_udp_broadcasting.c    2015-03-01 
13:55:15 UTC (rev 35328)
+++ gnunet/src/transport/plugin_transport_udp_broadcasting.c    2015-03-01 
14:32:34 UTC (rev 35329)
@@ -527,6 +527,13 @@
 }
 
 
+/**
+ * Setup broadcasting subsystem.
+ *
+ * @param plugin
+ * @param server_addrv6
+ * @param server_addrv4
+ */
 void
 setup_broadcast (struct Plugin *plugin,
                  struct sockaddr_in6 *server_addrv6,
@@ -577,6 +584,11 @@
 }
 
 
+/**
+ * Stop broadcasting subsystem.
+ *
+ * @param plugin
+ */
 void
 stop_broadcast (struct Plugin *plugin)
 {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]