gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r30655 - in gnunet/src: multicast psyc


From: gnunet
Subject: [GNUnet-SVN] r30655 - in gnunet/src: multicast psyc
Date: Sun, 10 Nov 2013 00:12:27 +0100

Author: tg
Date: 2013-11-10 00:12:27 +0100 (Sun, 10 Nov 2013)
New Revision: 30655

Modified:
   gnunet/src/multicast/multicast_api.c
   gnunet/src/psyc/gnunet-service-psyc.c
   gnunet/src/psyc/psyc_api.c
   gnunet/src/psyc/test_psyc.c
Log:
psyc: handling messages from multicast and passing them to clients; 
pause/resume fixes

Modified: gnunet/src/multicast/multicast_api.c
===================================================================
--- gnunet/src/multicast/multicast_api.c        2013-11-09 23:12:23 UTC (rev 
30654)
+++ gnunet/src/multicast/multicast_api.c        2013-11-09 23:12:27 UTC (rev 
30655)
@@ -363,11 +363,12 @@
 
   size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
   struct GNUNET_MULTICAST_MessageHeader *msg
-    = GNUNET_malloc (sizeof (*msg) + buf_size);
+    = GNUNET_malloc (buf_size);
+  buf_size -= sizeof (*msg);
   int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
 
   if (! (GNUNET_YES == ret || GNUNET_NO == ret)
-      || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
+      || sizeof (*msg) + buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "MasterTransmitNotify() returned error or invalid message size.\n");
@@ -379,15 +380,15 @@
     return; /* Transmission paused. */
 
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
-  msg->header.size = htons (buf_size);
+  msg->header.size = htons (sizeof (*msg) + buf_size);
   msg->message_id = mh->message_id;
   msg->group_generation = mh->group_generation;
 
   /* FIXME: add fragment ID and signature in the service instead of here */
   msg->fragment_id = orig->next_fragment_id++;
   msg->fragment_offset = mh->fragment_offset;
-  mh->fragment_offset += buf_size;
-  msg->purpose.size = htonl (buf_size
+  mh->fragment_offset += sizeof (*msg) + buf_size;
+  msg->purpose.size = htonl (sizeof (*msg) + buf_size
                              - sizeof (msg->header)
                              - sizeof (msg->hop_counter)
                              - sizeof (msg->signature));

Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c       2013-11-09 23:12:23 UTC (rev 
30654)
+++ gnunet/src/psyc/gnunet-service-psyc.c       2013-11-09 23:12:27 UTC (rev 
30655)
@@ -56,7 +56,7 @@
 static struct GNUNET_PSYCSTORE_Handle *store;
 
 /**
- * channel's pub_key_hash -> struct Channel
+ * Channel's pub_key_hash -> struct Channel
  */
 static struct GNUNET_CONTAINER_MultiHashMap *clients;
 
@@ -70,6 +70,9 @@
 
   char *buf;
   uint16_t size;
+  /**
+   * enum GNUNET_PSYC_DataStatus
+   */
   uint8_t status;
 };
 
@@ -83,15 +86,17 @@
   struct TransmitMessage *tmit_head;
   struct TransmitMessage *tmit_tail;
 
-  char *tmit_buf;
   GNUNET_SCHEDULER_TaskIdentifier tmit_task;
   uint32_t tmit_mod_count;
   uint32_t tmit_mod_recvd;
-  uint16_t tmit_size;
+  /**
+   * enum GNUNET_PSYC_DataStatus
+   */
   uint8_t tmit_status;
 
   uint8_t in_transmit;
   uint8_t is_master;
+  uint8_t disconnected;
 };
 
 /**
@@ -142,6 +147,10 @@
 };
 
 
+static void
+transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay);
+
+
 /**
  * Task run during shutdown.
  *
@@ -163,6 +172,30 @@
   }
 }
 
+
+static void
+client_cleanup (struct Channel *ch)
+{
+  if (ch->is_master)
+  {
+    struct Master *mst = (struct Master *) ch;
+    if (NULL != mst->origin)
+      GNUNET_MULTICAST_origin_stop (mst->origin);
+  }
+  else
+  {
+    struct Slave *slv = (struct Slave *) ch;
+    if (NULL != slv->join_req)
+      GNUNET_free (slv->join_req);
+    if (NULL != slv->relays)
+      GNUNET_free (slv->relays);
+    if (NULL != slv->member)
+      GNUNET_MULTICAST_member_part (slv->member);
+  }
+
+  GNUNET_free (ch);
+}
+
 /**
  * Called whenever a client is disconnected.
  * Frees our resources associated with that client.
@@ -188,30 +221,17 @@
     return;
   }
 
-  if (NULL != ch->tmit_buf)
-  {
-    GNUNET_free (ch->tmit_buf);
-    ch->tmit_buf = NULL;
-  }
+  ch->disconnected = GNUNET_YES;
 
-  if (ch->is_master)
+  /* Send pending messages to multicast before cleanup. */
+  if (NULL != ch->tmit_head)
   {
-    struct Master *mst = (struct Master *) ch;
-    if (NULL != mst->origin)
-      GNUNET_MULTICAST_origin_stop (mst->origin);
+    transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
   }
   else
   {
-    struct Slave *slv = (struct Slave *) ch;
-    if (NULL != slv->join_req)
-      GNUNET_free (slv->join_req);
-    if (NULL != slv->relays)
-      GNUNET_free (slv->relays);
-    if (NULL != slv->member)
-      GNUNET_MULTICAST_member_part (slv->member);
+    client_cleanup (ch);
   }
-
-  GNUNET_free (ch);
 }
 
 void
@@ -259,14 +279,98 @@
 
 }
 
+
 void
+fragment_store_result (void *cls, int64_t result, const char *err_msg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "fragment_store() returned %l (%s)\n", result, err_msg);
+}
+
+/**
+ * Send PSYC messages in an incoming multicast message to a client.
+ */
+int
+send_to_client (void *cls, const struct GNUNET_HashCode *ch_key_hash, void 
*chan)
+{
+  const struct GNUNET_MULTICAST_MessageHeader *msg = cls;
+  struct Channel *ch = chan;
+
+  uint16_t size = ntohs (msg->header.size);
+  uint16_t pos = 0;
+
+  while (sizeof (*msg) + pos < size)
+  {
+    const struct GNUNET_MessageHeader *pmsg
+      = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+    uint16_t psize = ntohs (pmsg->size);
+    if (sizeof (*msg) + pos + psize > size)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  "Ignoring message of type %u with invalid size. "
+                  "(%u + %u + %u > %u)\n", ntohs (pmsg->type),
+                  sizeof (*msg), pos, psize, size);
+      break;
+    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending message of type %u and size %u to client.\n",
+                ntohs (pmsg->type), psize);
+
+    GNUNET_SERVER_notification_context_add (nc, ch->client);
+    GNUNET_SERVER_notification_context_unicast (nc, ch->client, pmsg,
+                                                GNUNET_NO);
+    pos += psize;
+  }
+  return GNUNET_YES;
+}
+
+
+/**
+ * Incoming message fragment from multicast.
+ *
+ * Store it using PSYCstore and send it to all clients of the channel.
+ */
+void
 message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
 {
+  uint16_t type = ntohs (msg->type);
+  uint16_t size = ntohs (msg->size);
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received message of type %u from multicast.\n",
-              ntohs (msg->type));
+              "Received message of type %u and size %u from multicast.\n",
+              type, size);
+
+  struct Channel *ch = cls;
+  struct Master *mst = cls;
+  struct Slave *slv = cls;
+
+  struct GNUNET_CRYPTO_EddsaPublicKey *ch_key
+    = ch->is_master ? &mst->pub_key : &slv->chan_key;
+  struct GNUNET_HashCode *ch_key_hash
+    = ch->is_master ? &mst->pub_key_hash : &slv->chan_key_hash;
+
+  switch (type)
+  {
+  case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
+    GNUNET_PSYCSTORE_fragment_store (store, ch_key,
+                                     (const struct
+                                      GNUNET_MULTICAST_MessageHeader *) msg,
+                                     0, NULL, NULL);
+    GNUNET_CONTAINER_multihashmap_get_multiple (clients, ch_key_hash,
+                                                send_to_client, (void *) msg);
+    break;
+
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Ignoring unknown message of type %u and size %u.\n",
+                type, size);
+  }
 }
 
+
+/**
+ * Response from PSYCstore with the current counter values for a channel 
master.
+ */
 void
 master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
                     uint64_t max_message_id, uint64_t max_group_generation,
@@ -299,6 +403,9 @@
 }
 
 
+/**
+ * Response from PSYCstore with the current counter values for a channel slave.
+ */
 void
 slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
                    uint64_t max_message_id, uint64_t max_group_generation,
@@ -332,6 +439,9 @@
 }
 
 
+/**
+ * Handle a connecting client starting a channel master.
+ */
 static void
 handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
                      const struct GNUNET_MessageHeader *msg)
@@ -357,6 +467,9 @@
 }
 
 
+/**
+ * Handle a connecting client joining as a channel slave.
+ */
 static void
 handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
                    const struct GNUNET_MessageHeader *msg)
@@ -389,14 +502,27 @@
 }
 
 
+/**
+ * Send transmission acknowledgement to a client.
+ *
+ * Sent after the last GNUNET_PSYC_MessageModifier and after each
+ * GNUNET_PSYC_MessageData.
+ *
+ * @param ch The channel struct for the client.
+ */
 static void
 send_transmit_ack (struct Channel *ch)
 {
   struct TransmitAck *res = GNUNET_malloc (sizeof (*res));
   res->header.size = htons (sizeof (*res));
   res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK);
-  res->buf_avail = htons (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE - ch->tmit_size);
 
+  res->buf_avail = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+  struct TransmitMessage *tmit_msg = ch->tmit_tail;
+  if (NULL != tmit_msg && GNUNET_PSYC_DATA_CONT == tmit_msg->status)
+    res->buf_avail -= tmit_msg->size;
+  res->buf_avail = htons (res->buf_avail);
+
   GNUNET_SERVER_notification_context_add (nc, ch->client);
   GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
                                               GNUNET_NO);
@@ -404,30 +530,53 @@
 }
 
 
+/**
+ * Callback for the transmit functions of multicast.
+ */
 static int
 transmit_notify (void *cls, size_t *data_size, void *data)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
   struct Channel *ch = cls;
   struct TransmitMessage *msg = ch->tmit_head;
 
-  if (NULL == msg || *data_size < ntohs (msg->size))
+  if (NULL == msg || *data_size < msg->size)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify: nothing to 
send.\n");
     *data_size = 0;
     return GNUNET_NO;
   }
 
-  *data_size = ntohs (msg->size);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "transmit_notify: sending %u bytes.\n", msg->size);
+
+  *data_size = msg->size;
   memcpy (data, msg->buf, *data_size);
 
-  GNUNET_free (ch->tmit_buf);
-  ch->tmit_buf = NULL;
   GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
+  GNUNET_free (msg);
 
-  return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+  int ret = (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
+
+  if (0 == ch->tmit_task)
+  {
+    if (NULL != ch->tmit_head)
+    {
+      transmit_message (ch, GNUNET_TIME_UNIT_ZERO);
+    }
+    else if (ch->disconnected)
+    {
+      /* FIXME: handle partial message (when still in_transmit) */
+      client_cleanup (ch);
+    }
+  }
+
+  return ret;
 }
 
 
+/**
+ * Transmit a message from a channel master to the multicast group.
+ */
 static void
 master_transmit_message (void *cls,
                          const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -449,6 +598,9 @@
 }
 
 
+/**
+ * Transmit a message from a channel slave to the multicast group.
+ */
 static void
 slave_transmit_message (void *cls,
                         const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -468,50 +620,90 @@
 }
 
 
+/**
+ * Schedule message transmission from a channel to the multicast group.
+ *
+ * @param ch The channel.
+ * @param delay Transmission delay.
+ */
+static void
+transmit_message (struct Channel *ch, struct GNUNET_TIME_Relative delay)
+{
+  if (0 != ch->tmit_task)
+    GNUNET_SCHEDULER_cancel (ch->tmit_task);
+
+  ch->tmit_task
+    = ch->is_master
+    ? GNUNET_SCHEDULER_add_delayed (delay, master_transmit_message, ch)
+    : GNUNET_SCHEDULER_add_delayed (delay, slave_transmit_message, ch);
+}
+
+/**
+ * Queue incoming message parts from a client for transmission, and send them 
to
+ * the multicast group when the buffer is full or reached the end of message.
+ *
+ * @param ch Channel struct for the client.
+ * @param msg Message from the client.
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR.
+ */
 static int
-buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
+queue_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
 {
   uint16_t size = ntohs (msg->size);
   struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
+  struct TransmitMessage *tmit_msg = ch->tmit_tail;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Queueing message of type %u and size %u "
+              "for transmission to multicast.\n",
+              ntohs (msg->type), size);
+
   if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
     return GNUNET_SYSERR;
 
-  if (0 == ch->tmit_size)
+  if (NULL == tmit_msg
+      || tmit_msg->status != GNUNET_PSYC_DATA_CONT
+      || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < tmit_msg->size + size)
   {
-    ch->tmit_buf = GNUNET_malloc (size);
-    memcpy (ch->tmit_buf, msg, size);
-    ch->tmit_size = size;
-  }
-  else if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE <= ch->tmit_size + size)
-  {
-    ch->tmit_buf = GNUNET_realloc (ch->tmit_buf, ch->tmit_size + size);
-    memcpy (ch->tmit_buf + ch->tmit_size, msg, size);
-    ch->tmit_size += size;
-  }
-
-  if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE
-      < ch->tmit_size + sizeof (struct GNUNET_PSYC_MessageData))
-  {
-    struct TransmitMessage *tmit_msg = GNUNET_new (struct TransmitMessage);
-    tmit_msg->buf = (char *) msg;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Appending message qto new buffer.\n");
+    /* Start filling up new buffer */
+    tmit_msg = GNUNET_new (struct TransmitMessage);
+    tmit_msg->buf = GNUNET_malloc (size);
+    memcpy (tmit_msg->buf, msg, size);
     tmit_msg->size = size;
     tmit_msg->status = ch->tmit_status;
     GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-    tmit_delay = GNUNET_TIME_UNIT_ZERO;
   }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Appending message to existing buffer.\n");
+    /* Append to existing buffer */
+    tmit_msg->buf = GNUNET_realloc (tmit_msg->buf, tmit_msg->size + size);
+    memcpy (tmit_msg->buf + tmit_msg->size, msg, size);
+    tmit_msg->size += size;
+    tmit_msg->status = ch->tmit_status;
+  }
 
-  if (0 != ch->tmit_task)
-    GNUNET_SCHEDULER_cancel (ch->tmit_task);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "tmit_size: %u\n", tmit_msg->size);
 
-  ch->tmit_task
-    = ch->is_master
-    ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
-    : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
+  /* Wait a bit for the remaining message parts from the client
+     if there's still some space left in the buffer. */
+  if (GNUNET_PSYC_DATA_CONT == tmit_msg->status
+      && (tmit_msg->size + sizeof (struct GNUNET_PSYC_MessageData)
+          < GNUNET_MULTICAST_FRAGMENT_MAX_SIZE))
+    tmit_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2);
 
+  transmit_message (ch, tmit_delay);
+
   return GNUNET_OK;
 }
 
+/**
+ * Incoming method from a client.
+ */
 static void
 handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
                         const struct GNUNET_MessageHeader *msg)
@@ -524,18 +716,16 @@
 
   if (GNUNET_NO != ch->in_transmit)
   {
-    // FIXME: already transmitting a message, send back error message.
+    /* FIXME: already transmitting a message, send back error message. */
     return;
   }
 
   ch->in_transmit = GNUNET_YES;
-  ch->tmit_buf = NULL;
-  ch->tmit_size = 0;
   ch->tmit_mod_recvd = 0;
   ch->tmit_mod_count = ntohl (meth->mod_count);
   ch->tmit_status = GNUNET_PSYC_DATA_CONT;
 
-  buffer_message (ch, msg);
+  queue_message (ch, msg);
 
   if (0 == ch->tmit_mod_count)
     send_transmit_ack (ch);
@@ -544,18 +734,23 @@
 };
 
 
+/**
+ * Incoming modifier from a client.
+ */
 static void
 handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
                           const struct GNUNET_MessageHeader *msg)
 {
+  /*
   const struct GNUNET_PSYC_MessageModifier *mod
     = (const struct GNUNET_PSYC_MessageModifier *) msg;
+  */
   struct Channel *ch
     = GNUNET_SERVER_client_get_user_context (client, struct Channel);
   GNUNET_assert (NULL != ch);
 
   ch->tmit_mod_recvd++;
-  buffer_message (ch, msg);
+  queue_message (ch, msg);
 
   if (ch->tmit_mod_recvd == ch->tmit_mod_count)
     send_transmit_ack (ch);
@@ -564,6 +759,9 @@
 };
 
 
+/**
+ * Incoming data from a client.
+ */
 static void
 handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *msg)
@@ -575,7 +773,7 @@
   GNUNET_assert (NULL != ch);
 
   ch->tmit_status = ntohs (data->status);
-  buffer_message (ch, msg);
+  queue_message (ch, msg);
   send_transmit_ack (ch);
 
   if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)

Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c  2013-11-09 23:12:23 UTC (rev 30654)
+++ gnunet/src/psyc/psyc_api.c  2013-11-09 23:12:27 UTC (rev 30655)
@@ -69,12 +69,12 @@
   /**
    * Head of operations to transmit.
    */
-  struct OperationHandle *transmit_head;
+  struct OperationHandle *tmit_head;
 
   /**
    * Tail of operations to transmit.
    */
-  struct OperationHandle *transmit_tail;
+  struct OperationHandle *tmit_tail;
 
   /**
    * Message to send on reconnect.
@@ -116,6 +116,16 @@
    * Buffer space available for transmitting the next data fragment.
    */
   uint16_t tmit_buf_avail;
+
+  /**
+   * Is transmission paused?
+   */
+  uint8_t tmit_paused;
+
+  /**
+   * Are we still waiting for a PSYC_TRANSMIT_ACK?
+   */
+  uint8_t tmit_ack_pending;
 };
 
 
@@ -243,6 +253,11 @@
 transmit_next (struct GNUNET_PSYC_Channel *ch);
 
 
+/**
+ * Request data from client to transmit.
+ *
+ * @param mst Master handle.
+ */
 static void
 master_transmit_data (struct GNUNET_PSYC_Master *mst)
 {
@@ -268,12 +283,13 @@
   default:
     mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
     data_size = 0;
-    LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
+    LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error.\n");
   }
 
   if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
   {
     /* Transmission paused, nothing to send. */
+    ch->tmit_paused = GNUNET_YES;
     GNUNET_free (op);
   }
   else
@@ -281,7 +297,8 @@
     GNUNET_assert (data_size <= ch->tmit_buf_avail);
     pdata->header.size = htons (sizeof (*pdata) + data_size);
     pdata->status = htons (mst->tmit->status);
-    GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, 
op);
+    GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
+    ch->tmit_ack_pending = GNUNET_YES;
     transmit_next (ch);
   }
 }
@@ -305,7 +322,6 @@
   struct CountersResult *cres;
   struct TransmitAck *tack;
 
-
   if (NULL == msg)
   {
     reschedule_connect (ch);
@@ -317,7 +333,8 @@
   uint16_t type = ntohs (msg->type);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received message of type %d from PSYC service\n", type);
+       "Received message of type %d and size %u from PSYC service\n",
+       type, size);
 
   switch (type)
   {
@@ -328,10 +345,16 @@
   case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
     size_eq = sizeof (struct TransmitAck);
     break;
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+    size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+    size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+    size_min = sizeof (struct GNUNET_PSYC_MessageData);
   }
 
   if (! ((0 < size_eq && size == size_eq)
-         || (0 < size_min && size >= size_min)))
+         || (0 < size_min && size_min <= size)))
   {
     GNUNET_break (0);
     reschedule_connect (ch);
@@ -370,7 +393,9 @@
       else
       {
         ch->tmit_buf_avail = ntohs (tack->buf_avail);
-        master_transmit_data (mst);
+        ch->tmit_ack_pending = GNUNET_NO;
+        if (GNUNET_NO == ch->tmit_paused)
+          master_transmit_data (mst);
       }
     }
     else
@@ -378,6 +403,18 @@
       /* TODO: slave */
     }
     break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+
+    break;
+
+  case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+
+    break;
   }
 
   GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
@@ -397,9 +434,9 @@
 send_next_message (void *cls, size_t size, void *buf)
 {
   struct GNUNET_PSYC_Channel *ch = cls;
-  struct OperationHandle *op = ch->transmit_head;
+  struct OperationHandle *op = ch->tmit_head;
   size_t ret;
-
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
   ch->th = NULL;
   if (NULL == op->msg)
     return 0;
@@ -409,15 +446,12 @@
     reschedule_connect (ch);
     return 0;
   }
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Sending message of type %d to PSYC service\n",
-       ntohs (op->msg->type));
   memcpy (buf, op->msg, ret);
 
-  GNUNET_CONTAINER_DLL_remove (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, op);
   GNUNET_free (op);
 
-  if (NULL != ch->transmit_head)
+  if (NULL != ch->tmit_head)
     transmit_next (ch);
 
   if (GNUNET_NO == ch->in_receive)
@@ -438,10 +472,11 @@
 static void
 transmit_next (struct GNUNET_PSYC_Channel *ch)
 {
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
   if (NULL != ch->th || NULL == ch->client)
     return;
 
-  struct OperationHandle *op = ch->transmit_head;
+  struct OperationHandle *op = ch->tmit_head;
   if (NULL == op)
     return;
 
@@ -472,14 +507,14 @@
   ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg);
   GNUNET_assert (NULL != ch->client);
 
-  if (NULL == ch->transmit_head ||
-      ch->transmit_head->msg->type != ch->reconnect_msg->type)
+  if (NULL == ch->tmit_head ||
+      ch->tmit_head->msg->type != ch->reconnect_msg->type)
   {
     uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
     struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
     memcpy (&op[1], ch->reconnect_msg, reconn_size);
     op->msg = (struct GNUNET_MessageHeader *) &op[1];
-    GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+    GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, op);
   }
   transmit_next (ch);
 }
@@ -496,7 +531,7 @@
   struct GNUNET_PSYC_Channel *ch = c;
 
   GNUNET_assert (NULL != ch);
-  if (ch->transmit_head != ch->transmit_tail)
+  if (ch->tmit_head != ch->tmit_tail)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
          "Disconnecting while there are still outstanding messages!\n");
@@ -654,7 +689,7 @@
   memcpy (&pmod[1], mod->name, name_size);
   memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
 
-  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
   return GNUNET_YES;
 }
 
@@ -699,7 +734,7 @@
   pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count 
(env));
   memcpy (&pmeth[1], method_name, size);
 
-  GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+  GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, op);
   GNUNET_ENV_environment_iterate (env, send_modifier, master);
   transmit_next (ch);
 
@@ -720,7 +755,12 @@
 void
 GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle 
*th)
 {
-  master_transmit_data (th->master);
+  struct GNUNET_PSYC_Channel *ch = &th->master->ch;
+  if (GNUNET_NO == ch->tmit_ack_pending)
+  {
+    ch->tmit_paused = GNUNET_NO;
+    master_transmit_data (th->master);
+  }
 }
 
 
@@ -938,8 +978,8 @@
   slvadd->header.size = htons (sizeof (*slvadd));
   slvadd->announced_at = GNUNET_htonll (announced_at);
   slvadd->effective_since = GNUNET_htonll (effective_since);
-  GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head,
-                                    channel->transmit_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
+                                    channel->tmit_tail,
                                     op);
   transmit_next (channel);
 }
@@ -979,8 +1019,8 @@
   slvrm->header.type = GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM;
   slvrm->header.size = htons (sizeof (*slvrm));
   slvrm->announced_at = GNUNET_htonll (announced_at);
-  GNUNET_CONTAINER_DLL_insert_tail (channel->transmit_head,
-                                    channel->transmit_tail,
+  GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head,
+                                    channel->tmit_tail,
                                     op);
   transmit_next (channel);
 }

Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2013-11-09 23:12:23 UTC (rev 30654)
+++ gnunet/src/psyc/test_psyc.c 2013-11-09 23:12:27 UTC (rev 30655)
@@ -144,10 +144,12 @@
   return GNUNET_OK;
 }
 
+
 struct TransmitClosure
 {
   struct GNUNET_PSYC_MasterTransmitHandle *handle;
   uint8_t n;
+  uint8_t paused;
   uint8_t fragment_count;
   char *fragments[16];
   uint16_t fragment_sizes[16];
@@ -157,8 +159,9 @@
 static void
 transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
   struct TransmitClosure *tmit = cls;
+  tmit->paused = GNUNET_NO;
   GNUNET_PSYC_master_transmit_resume (tmit->handle);
 }
 
@@ -167,33 +170,36 @@
 transmit_notify (void *cls, size_t *data_size, void *data)
 {
   struct TransmitClosure *tmit = cls;
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-              "Transmit notify: %lu bytes\n", *data_size);
-
-  if (tmit->fragment_count <= tmit->n)
-    return GNUNET_YES;
-
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Transmit notify: %lu bytes available, "
+              "processing fragment %u/%u.\n",
+              *data_size, tmit->n + 1, tmit->fragment_count);
   GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
 
-  *data_size = tmit->fragment_sizes[tmit->n];
-  memcpy (data, tmit->fragments[tmit->n], *data_size);
-  tmit->n++;
-
-  if (tmit->n == tmit->fragment_count - 1)
+  if (GNUNET_YES == tmit->paused && tmit->n == tmit->fragment_count - 1)
   {
     /* Send last fragment later. */
-    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume,
-                                  tmit);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n");
+    tmit->paused = GNUNET_YES;
+    GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+                                  (GNUNET_TIME_UNIT_SECONDS, 3),
+                                  &transmit_resume, tmit);
     *data_size = 0;
     return GNUNET_NO;
   }
-  return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
+
+  GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
+  *data_size = tmit->fragment_sizes[tmit->n];
+  memcpy (data, tmit->fragments[tmit->n], *data_size);
+
+  return ++tmit->n < tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
 }
 
 void
 master_started (void *cls, uint64_t max_message_id)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", 
max_message_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Master started: %lu\n", max_message_id);
 
   struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
   GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
@@ -202,11 +208,13 @@
                                   "_foo_bar", "foo bar baz", 11);
 
   struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
-  tmit->fragment_count = 2;
-  tmit->fragments[0] = "foo bar";
-  tmit->fragment_sizes[0] = 7;
-  tmit->fragments[1] = "baz!";
-  tmit->fragment_sizes[1] = 4;
+  tmit->fragment_count = 3;
+  tmit->fragments[0] = "foo";
+  tmit->fragment_sizes[0] = 4;
+  tmit->fragments[1] = "foo bar";
+  tmit->fragment_sizes[1] = 7;
+  tmit->fragments[2] = "foo bar baz";
+  tmit->fragment_sizes[2] = 11;
   tmit->handle
     = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
                                    GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]