gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 29/32: transport (quic): mq handling fixes


From: gnunet
Subject: [gnunet] 29/32: transport (quic): mq handling fixes
Date: Tue, 18 Jul 2023 17:16:18 +0200

This is an automated email from the git hooks/post-receive script.

marshall pushed a commit to branch master
in repository gnunet.

commit b37caeb2b2b453aa4cf8c704f2ce84eec4dbc170
Author: marshall <stmr@umich.edu>
AuthorDate: Wed Jul 12 14:21:57 2023 -0400

    transport (quic): mq handling fixes
---
 src/transport/gnunet-communicator-quic.c | 196 ++++++++++++++++++++++++++++---
 1 file changed, 180 insertions(+), 16 deletions(-)

diff --git a/src/transport/gnunet-communicator-quic.c 
b/src/transport/gnunet-communicator-quic.c
index cdbd05477..ab1b7b20b 100644
--- a/src/transport/gnunet-communicator-quic.c
+++ b/src/transport/gnunet-communicator-quic.c
@@ -70,6 +70,11 @@ struct PeerAddress
    */
   socklen_t address_len;
 
+  /**
+   * The QUIC connection associated with this peer
+  */
+  struct quic_conn *conn;
+
   /**
    * Default message queue we are providing for the #ch.
    */
@@ -170,7 +175,7 @@ struct QUIC_header
  * ASSUMES: connection is established to peer
 */
 static void
-recv_from_streams (quiche_conn *conn, char*stream_buf, size_t buf_size)
+recv_from_streams (quiche_conn *conn, char *stream_buf, size_t buf_size)
 {
   uint64_t s = 0;
   quiche_stream_iter *readable;
@@ -336,6 +341,55 @@ flush_egress (struct quic_conn *conn)
 }
 
 
+/**
+ * Increment receiver timeout due to activity.
+ *
+ * @param receiver address for which the timeout should be rescheduled
+ */
+static void
+reschedule_peer_timeout (struct PeerAddress *peer)
+{
+  peer->timeout =
+    GNUNET_TIME_relative_to_absolute 
(GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
+  GNUNET_CONTAINER_heap_update_cost (peer->hn,
+                                     peer->timeout.abs_value_us);
+}
+
+
+/**
+ * Destroys a receiving state due to timeout or shutdown.
+ *
+ * @param receiver entity to close down
+ */
+static void
+peer_destroy (struct PeerAddress *peer)
+{
+
+  peer->peer_destroy_called = GNUNET_YES;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Disconnecting peer for peer `%s'\n",
+              GNUNET_i2s (&peer->target));
+  if (NULL != peer->d_qh)
+  {
+    GNUNET_TRANSPORT_communicator_mq_del (peer->d_qh);
+    peer->d_qh = NULL;
+  }
+  GNUNET_assert (GNUNET_YES ==
+                 GNUNET_CONTAINER_multipeermap_remove (peers,
+                                                       &peer->target,
+                                                       peer));
+  GNUNET_assert (peer == GNUNET_CONTAINER_heap_remove_node (peer->hn));
+  GNUNET_STATISTICS_set (stats,
+                         "# peers active",
+                         GNUNET_CONTAINER_multipeermap_size (peers),
+                         GNUNET_NO);
+  GNUNET_free (peer->address);
+  GNUNET_free (peer->foreign_addr);
+  GNUNET_free (peer);
+}
+
+
 /**
  * Signature of functions implementing the sending functionality of a
  * message queue.
@@ -351,6 +405,14 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
 {
   struct PeerAddress *peer = impl_state;
   uint16_t msize = ntohs (msg->size);
+  struct quic_conn *q_conn = peer->conn;
+
+  if (NULL == q_conn->conn)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "peer never established quic connection\n");
+    return;
+  }
 
   GNUNET_assert (mq == peer->d_mq);
   if (msize > peer->d_mtu)
@@ -365,6 +427,7 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
     return;
   }
   reschedule_peer_timeout (peer);
+
   // if (-1 == GNUNET_NETWORK_socket_sendto (udp_sock,
   //                                         dgram,
   //                                         sizeof(dgram),
@@ -380,6 +443,65 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
 }
 
 
+/**
+ * Signature of functions implementing the destruction of a message
+ * queue.  Implementations must not free @a mq, but should take care
+ * of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state our `struct PeerAddress`
+ */
+static void
+mq_destroy_d (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  struct PeerAddress *peer = impl_state;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Default MQ destroyed\n");
+  if (mq == peer->d_mq)
+  {
+    peer->d_mq = NULL;
+    if (GNUNET_YES != peer->peer_destroy_called)
+      peer_destroy (peer);
+  }
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ *
+ * @param mq message queue
+ * @param impl_state our `struct PeerAddress`
+ */
+static void
+mq_cancel (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+  /* Cancellation is impossible with QUIC; bail */
+  GNUNET_assert (0);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate
+ * error code and the same closure specified at the creation of
+ * the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls our `struct ReceiverAddress`
+ * @param error error code
+ */
+static void
+mq_error (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct PeerAddress *peer = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+              "MQ error in queue to %s: %d\n",
+              GNUNET_i2s (&peer->target),
+              (int) error);
+  peer_destroy (peer);
+}
+
+
 /**
  * Convert UDP bind specification to a `struct sockaddr *`
  *
@@ -557,10 +679,10 @@ setup_peer_mq (struct PeerAddress *peer)
     peer->d_mq = GNUNET_MQ_queue_for_callbacks (&mq_send_d,
                                                 &mq_destroy_d,
                                                 &mq_cancel,
-                                                receiver,
+                                                peer,
                                                 NULL,
                                                 &mq_error,
-                                                receiver);
+                                                peer);
   peer->d_qh =
     GNUNET_TRANSPORT_communicator_mq_add (ch,
                                           &peer->target,
@@ -574,6 +696,44 @@ setup_peer_mq (struct PeerAddress *peer)
 }
 
 
+/**
+ * Taken from: UDP communicator
+ * Converts @a address to the address string format used by this
+ * communicator in HELLOs.
+ *
+ * @param address the address to convert, must be AF_INET or AF_INET6.
+ * @param address_len number of bytes in @a address
+ * @return string representation of @a address
+ */
+static char *
+sockaddr_to_udpaddr_string (const struct sockaddr *address,
+                            socklen_t address_len)
+{
+  char *ret;
+
+  switch (address->sa_family)
+  {
+  case AF_INET:
+    GNUNET_asprintf (&ret,
+                     "%s-%s",
+                     COMMUNICATOR_ADDRESS_PREFIX,
+                     GNUNET_a2s (address, address_len));
+    break;
+
+  case AF_INET6:
+    GNUNET_asprintf (&ret,
+                     "%s-%s",
+                     COMMUNICATOR_ADDRESS_PREFIX,
+                     GNUNET_a2s (address, address_len));
+    break;
+
+  default:
+    GNUNET_assert (0);
+  }
+  return ret;
+}
+
+
 /**
  * Function called when the transport service has received a
  * backchannel message for this communicator (!) via a different return
@@ -682,9 +842,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity 
*peer_id, const
   socklen_t in_len;
   uint8_t scid[LOCAL_CONN_ID_LEN];
 
-  struct quic_conn *conn;
+  struct quic_conn *q_conn;
   char *bindto;
-  socklen_t in_len;
+  socklen_t local_in_len;
   struct sockaddr *local_addr;
 
   if (GNUNET_OK !=
@@ -696,9 +856,9 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity 
*peer_id, const
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                                COMMUNICATOR_CONFIG_SECTION,
                                "BINDTO");
-    return;
+    return GNUNET_SYSERR;
   }
-  local_addr = udp_address_to_sockaddr (bindto, &in_len);
+  local_addr = udp_address_to_sockaddr (bindto, &local_in_len);
 
   if (0 != strncmp (address,
                     COMMUNICATOR_ADDRESS_PREFIX "-",
@@ -739,24 +899,27 @@ mq_init (void *cls, const struct GNUNET_PeerIdentity 
*peer_id, const
   */
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_STRONG, scid,
                               LOCAL_CONN_ID_LEN);
-  conn = GNUNET_new (struct quic_conn);
-  GNUNET_memcpy (conn->cid, scid, LOCAL_CONN_ID_LEN);
+  q_conn = GNUNET_new (struct quic_conn);
+  GNUNET_memcpy (q_conn->cid, scid, LOCAL_CONN_ID_LEN);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Attempting to perform handshake with peer\n");
-  conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN,
-                               local_addr,
-                               in_len, peer->address, peer->address_len,
-                               config);
+              "Attempting to perform QUIC handshake with peer\n");
+  q_conn->conn = quiche_connect (peer->foreign_addr, scid, LOCAL_CONN_ID_LEN,
+                                 local_addr,
+                                 local_in_len, peer->address, 
peer->address_len,
+                                 config);
+
+  peer->conn = q_conn;
   /**
    * Insert connection into hashmap
   */
   struct GNUNET_HashCode key;
-  GNUNET_CRYPTO_hash (conn->cid, LOCAL_CONN_ID_LEN, &key);
-  GNUNET_CONTAINER_multihashmap_put (conn_map, &key, conn,
+  GNUNET_CRYPTO_hash (q_conn->cid, LOCAL_CONN_ID_LEN, &key);
+  GNUNET_CONTAINER_multihashmap_put (conn_map, &key, q_conn,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
   setup_peer_mq (peer);
   if (NULL == timeout_task)
     timeout_task = GNUNET_SCHEDULER_add_now (&check_timeouts, NULL);
+  GNUNET_free (local_addr);
   return GNUNET_OK;
 }
 
@@ -1035,6 +1198,7 @@ sock_read (void *cls)
     quiche_conn_free (conn->conn);
     GNUNET_free (conn);
   }
+  GNUNET_free (local_addr);
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]