gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3398 - in GNUnet: . src/applications/fs/ecrs src/applicati


From: grothoff
Subject: [GNUnet-SVN] r3398 - in GNUnet: . src/applications/fs/ecrs src/applications/tbench src/server src/transports
Date: Wed, 13 Sep 2006 22:09:49 -0700 (PDT)

Author: grothoff
Date: 2006-09-13 22:09:41 -0700 (Wed, 13 Sep 2006)
New Revision: 3398

Modified:
   GNUnet/src/applications/fs/ecrs/ecrstest.c
   GNUnet/src/applications/tbench/Makefile.am
   GNUnet/src/applications/tbench/gnunet-tbench.c
   GNUnet/src/applications/tbench/tbench.c
   GNUnet/src/applications/tbench/tbenchtest.c
   GNUnet/src/applications/tbench/tbenchtest_udp.c
   GNUnet/src/server/connection.c
   GNUnet/src/server/gnunetd.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/tcp_helper.c
   GNUnet/todo
Log:
fixing bugs in tbench, transports, core bandwidth allocation and core message 
buffering

Modified: GNUnet/src/applications/fs/ecrs/ecrstest.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/ecrstest.c  2006-09-12 21:15:40 UTC (rev 
3397)
+++ GNUnet/src/applications/fs/ecrs/ecrstest.c  2006-09-14 05:09:41 UTC (rev 
3398)
@@ -302,6 +302,7 @@
   if (sock != NULL)
     connection_destroy(sock);
   GE_ASSERT(NULL, OK == os_daemon_stop(NULL, daemon));
+  GC_free(cfg);
   return (ok == YES) ? 0 : 1;
 }
 

Modified: GNUnet/src/applications/tbench/Makefile.am
===================================================================
--- GNUnet/src/applications/tbench/Makefile.am  2006-09-12 21:15:40 UTC (rev 
3397)
+++ GNUnet/src/applications/tbench/Makefile.am  2006-09-14 05:09:41 UTC (rev 
3398)
@@ -44,6 +44,7 @@
   $(top_builddir)/src/applications/stats/libgnunetstats_api.la \
   $(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
   $(top_builddir)/src/util/loggers/libgnunetutil_logging.la \
+  $(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
   $(top_builddir)/src/util/config_impl/libgnunetutil_config.la \
   $(top_builddir)/src/util/libgnunetutil.la 
 
@@ -53,6 +54,7 @@
   $(top_builddir)/src/applications/stats/libgnunetstats_api.la \
   $(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
   $(top_builddir)/src/util/loggers/libgnunetutil_logging.la \
+  $(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
   $(top_builddir)/src/util/config_impl/libgnunetutil_config.la \
   $(top_builddir)/src/util/libgnunetutil.la 
 

Modified: GNUnet/src/applications/tbench/gnunet-tbench.c
===================================================================
--- GNUnet/src/applications/tbench/gnunet-tbench.c      2006-09-12 21:15:40 UTC 
(rev 3397)
+++ GNUnet/src/applications/tbench/gnunet-tbench.c      2006-09-14 05:09:41 UTC 
(rev 3398)
@@ -82,14 +82,14 @@
     &gnunet_getopt_configure_set_ulong, &messageSize },
   { 'S', "space", "SPACE",
     gettext_noop("sleep for SPACE ms after each a message block"), 1,
-    &gnunet_getopt_configure_set_ulong, &messageTrainSize },
+    &gnunet_getopt_configure_set_ulong, &messageSpacing },
   { 't', "timeout", "TIMEOUT",
     gettext_noop("time to wait for the completion of an iteration (in ms)"), 1,
     &gnunet_getopt_configure_set_ulong, &messageTimeOut },
   COMMAND_LINE_OPTION_VERSION(PACKAGE_VERSION), /* -v */
   { 'X', "xspace", "COUNT",
     gettext_noop("number of messages in a message block"), 1,
-    &gnunet_getopt_configure_set_ulong, &messageSpacing },
+    &gnunet_getopt_configure_set_ulong, &messageTrainSize },
   COMMAND_LINE_OPTION_END,
 };
 
@@ -185,7 +185,7 @@
     GE_ASSERT(ectx, 
              ntohs(buffer->header.size) ==
              sizeof(CS_tbench_reply_MESSAGE));
-    if ((float)buffer->mean_loss <= 0){
+    if ((float)buffer->mean_loss < 0){
       GE_BREAK(ectx, 0);
       messagesPercentLoss = 0.0;
     } else {

Modified: GNUnet/src/applications/tbench/tbench.c
===================================================================
--- GNUnet/src/applications/tbench/tbench.c     2006-09-12 21:15:40 UTC (rev 
3397)
+++ GNUnet/src/applications/tbench/tbench.c     2006-09-14 05:09:41 UTC (rev 
3398)
@@ -56,16 +56,9 @@
  */
 static struct MUTEX * lock;
 
-static struct SEMAPHORE * presem;
-
 static struct SEMAPHORE * postsem;
 
 /**
- * What was the packet number we received?
- */
-static unsigned int lastPacketNumber;
-
-/**
  * What is the current iteration counter? (Used to verify
  * that replies match the current request series).
  */
@@ -82,49 +75,15 @@
 
 static CoreAPIForApplication * coreAPI;
 
+static IterationData * results;
+
 /**
- * Check if we have received a p2p reply,
- * update result counters accordingly.
- *
- * @return 0 if we have received all results, >0 otherwise
+ * Did we receive the last response for the current iteration
+ * before the timeout? If so, when?
  */
-static int pollResults(IterationData * results,
-                      int blocking) {
-  if (results->lossCount == 0)
-    return 0;
-  if (blocking == YES) {
-    if (timeoutOccured == YES)
-      return results->lossCount;
-    SEMAPHORE_DOWN(postsem, YES);
-  } else {
-    if (OK != SEMAPHORE_DOWN(postsem, NO))
-      return results->lossCount;
-  }
-  do {
-    if (timeoutOccured == YES) {
-      SEMAPHORE_UP(presem);
-      return results->lossCount;
-    }
-    if (lastPacketNumber > results->maxPacketNumber) {
-      SEMAPHORE_UP(presem);
-      return results->lossCount;
-    }
-    if (0 == results->packetsReceived[lastPacketNumber]++) {
-      results->lossCount--;
-    } else {
-      results->duplicateCount++;
-#if DEBUG_TBENCH
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-         "Received duplicate message %u from iteration %u\n",
-         lastPacketNumber,
-         currIteration);
-#endif
-    }
-    SEMAPHORE_UP(presem);
-  } while (OK == SEMAPHORE_DOWN(postsem, NO));
-  return results->lossCount;
-}
+static cron_t earlyEnd;
 
+
 /**
  * Another peer send us a tbench request.  Just turn
  * around and send it back.
@@ -134,6 +93,11 @@
   MESSAGE_HEADER * reply;
   const P2P_tbench_MESSAGE * msg;
 
+#if DEBUG_TBENCH
+  GE_LOG(ectx,
+        GE_DEBUG | GE_BULK | GE_USER,
+        "Received tbench request\n");
+#endif
   if ( ntohs(message->size) < sizeof(P2P_tbench_MESSAGE)) {
     GE_BREAK(ectx, 0);
     return SYSERR;
@@ -147,11 +111,12 @@
   }
 
 #if DEBUG_TBENCH
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "Received request %u from iteration %u/%u\n",
-      htonl(msg->packetNum),
-      htonl(msg->iterationNum),
-      htonl(msg->nounce));
+  GE_LOG(ectx,
+        GE_DEBUG | GE_BULK | GE_USER,
+        "Received request %u from iteration %u/%u\n",
+        htonl(msg->packetNum),
+        htonl(msg->iterationNum),
+        htonl(msg->nounce));
 #endif
   reply = MALLOC(ntohs(message->size));
   memcpy(reply,
@@ -160,7 +125,7 @@
   reply->type = htons(P2P_PROTO_tbench_REPLY);
   coreAPI->unicast(sender,
                   reply,
-                  ntohl(msg->priority), /* medium importance */
+                  ntohl(msg->priority),
                   0); /* no delay */
   FREE(reply);
   return OK;
@@ -172,6 +137,8 @@
 static int handleTBenchReply(const PeerIdentity * sender,
                             const MESSAGE_HEADER * message) {
   const P2P_tbench_MESSAGE * pmsg;
+  unsigned int lastPacketNumber;
+  IterationData * res;
 
   if (ntohs(message->size) < sizeof(P2P_tbench_MESSAGE)) {
     GE_BREAK(ectx, 0);
@@ -184,29 +151,38 @@
     GE_BREAK(ectx, 0);
     return SYSERR;
   }
-#if DEBUG_TBENCH
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "Received response %u from iteration %u/%u\n",
-      htonl(pmsg->packetNum),
-      htonl(pmsg->iterationNum),
-      htonl(pmsg->nounce));
-#endif
-  MUTEX_LOCK(lock);
+  MUTEX_LOCK(lock);  
   if ( (timeoutOccured == NO) &&
-       (presem != NULL) &&
        (postsem != NULL) &&
        (htonl(pmsg->iterationNum) == currIteration) &&
        (htonl(pmsg->nounce) == currNounce) ) {
-    SEMAPHORE_DOWN(presem, YES);
+    res = &results[currIteration];
     lastPacketNumber = ntohl(pmsg->packetNum);
-    SEMAPHORE_UP(postsem);
+    if (lastPacketNumber <= res->maxPacketNumber) {
+      if (0 == res->packetsReceived[lastPacketNumber]++) {
+       res->lossCount--;
+       if (res->lossCount == 0)
+         earlyEnd = get_time();
+      } else {
+       res->duplicateCount++;
+      }      
+    }
+#if DEBUG_TBENCH
+  GE_LOG(ectx,
+        GE_DEBUG | GE_BULK | GE_USER,
+        "Received response %u from iteration %u/%u on time!\n",
+        htonl(pmsg->packetNum),
+        htonl(pmsg->iterationNum),
+        htonl(pmsg->nounce));
+#endif
   } else {
 #if DEBUG_TBENCH
-    GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-       "Received message %u from iteration %u too late (now at iteration 
%u)\n",
-       ntohl(pmsg->packetNum),
-       ntohl(pmsg->iterationNum),
-       currIteration);
+    GE_LOG(ectx, 
+          GE_DEBUG | GE_BULK | GE_USER,
+          "Received message %u from iteration %u too late (now at iteration 
%u)\n",
+          ntohl(pmsg->packetNum),
+          ntohl(pmsg->iterationNum),
+          currIteration);
 #endif
   }
   MUTEX_UNLOCK(lock);
@@ -237,20 +213,25 @@
   cron_t endTime;
   cron_t now;
   cron_t delay;
-  cron_t delayStart;
-  IterationData * results;
   unsigned long long sum_loss;
   unsigned int max_loss;
   unsigned int min_loss;
   cron_t sum_time;
   cron_t min_time;
   cron_t max_time;
-  cron_t earlyEnd;
   double sum_variance_time;
   double sum_variance_loss;
   unsigned int msgCnt;
   unsigned int iterations;
 
+#if DEBUG_TBENCH
+  GE_LOG(ectx,
+        GE_DEBUG | GE_USER | GE_BULK,
+        "Tbench received request from client.\n",
+        msgCnt,
+        size,
+        iterations);
+#endif
   if ( ntohs(message->size) != sizeof(CS_tbench_request_MESSAGE) )
     return SYSERR;
 
@@ -262,12 +243,21 @@
   iterations = ntohl(msg->iterations);
   msgCnt = ntohl(msg->msgCnt);
 #if DEBUG_TBENCH
-  LOG(LOG_MESSAGE,
-      "Tbench runs %u test messages of size %u in %u iterations.\n",
-      msgCnt,
-      size,
-      iterations);
+  GE_LOG(ectx,
+        GE_INFO | GE_USER | GE_BULK,
+        "Tbench runs %u test messages of size %u in %u iterations.\n",
+        msgCnt,
+        size,
+        iterations);
 #endif
+  MUTEX_LOCK(lock);
+  if (results != NULL) {
+    GE_LOG(ectx,
+          GE_WARNING | GE_USER | GE_IMMEDIATE,
+          "Cannot run multiple tbench sessions at the same time!\n");
+    MUTEX_UNLOCK(lock);
+    return SYSERR;
+  }
   results = MALLOC(sizeof(IterationData) * iterations);
 
   p2p = MALLOC(size);
@@ -278,7 +268,6 @@
   p2p->header.type = htons(P2P_PROTO_tbench_REQUEST);
   p2p->priority = msg->priority;
 
-  MUTEX_LOCK(lock);
   for (iteration=0;iteration<iterations;iteration++) {
     results[iteration].maxPacketNumber = msgCnt;
     results[iteration].packetsReceived = MALLOC(msgCnt);
@@ -289,7 +278,6 @@
     results[iteration].duplicateCount = 0;
 
     earlyEnd = 0;
-    presem = SEMAPHORE_CREATE(1);
     postsem = SEMAPHORE_CREATE(0);
     currNounce = weak_randomi(0xFFFFFF);
     p2p->nounce
@@ -316,72 +304,43 @@
                 postsem);
     for (packetNum=0;packetNum<msgCnt;packetNum++){
       now = get_time();
-      if (now > endTime)
-       break; /* timeout */
-
       p2p->packetNum = htonl(packetNum);
 #if DEBUG_TBENCH
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-         "Sending message %u of size %u in iteration %u\n",
-         packetNum,
-         size,
-         iteration);
+      GE_LOG(ectx, 
+            GE_DEBUG | GE_BULK | GE_USER,
+            "Sending message %u of size %u in iteration %u\n",
+            packetNum,
+            size,
+            iteration);
 #endif
       coreAPI->unicast(&msg->receiverId,
                       &p2p->header,
                       ntohl(msg->priority),
                       0); /* no delay */
-      pollResults(&results[iteration], NO);
       if ( (delay != 0) &&
           (htonl(msg->trainSize) != 0) &&
-          (packetNum % htonl(msg->trainSize)) == 0) {
-       delayStart = now;
-       while ( (get_time() < (delayStart+delay)) &&
-               (timeoutOccured == NO) ) {
-         now = get_time();
-         if (delayStart + delay - now  > 5 * cronMILLIS) {
-           pollResults(&results[iteration], NO);
-           PTHREAD_SLEEP(5 * cronMILLIS);
-         } else
-           PTHREAD_SLEEP(delayStart + delay - now);
-       }
-      }        
-      if ( (0 == pollResults(&results[iteration], NO)) &&
-          (earlyEnd == 0) )
-       earlyEnd = get_time();
+          (packetNum % htonl(msg->trainSize)) == 0) 
+       PTHREAD_SLEEP(delay); 
     }
-    while ( (timeoutOccured == NO) &&
-           (get_time() < endTime) ) {
-      if ( (0 == pollResults(&results[iteration], YES) ) &&
-          (earlyEnd == 0) )
-       earlyEnd = get_time();
-      PTHREAD_SLEEP(5 * cronMILLIS);
-    }
-
-    /* make sure to unblock waiting jobs */
-    timeoutOccured = YES;
-    SEMAPHORE_UP(presem);
-
+    SEMAPHORE_DOWN(postsem, YES);
     MUTEX_LOCK(lock);
     if (earlyEnd == 0)
-      earlyEnd = now;
+      earlyEnd = get_time();
     results[iteration].totalTime
       = earlyEnd - startTime;
     FREE(results[iteration].packetsReceived);
-    cron_suspend(coreAPI->cron,
-                NO);
-    cron_del_job(coreAPI->cron,
-                &semaUp,
-                0,
-                postsem);
-    cron_resume_jobs(coreAPI->cron,
-                    NO);
-    SEMAPHORE_DESTROY(presem);
     SEMAPHORE_DESTROY(postsem);
-    presem = NULL;
     postsem = NULL;
   }
   MUTEX_UNLOCK(lock);
+#if DEBUG_TBENCH
+  GE_LOG(ectx, 
+        GE_DEBUG | GE_BULK | GE_USER,
+        "Done waiting for response.\n",
+        packetNum,
+        size,
+        iteration);
+#endif
 
   sum_loss = 0;
   sum_time = 0;
@@ -427,6 +386,7 @@
   reply.variance_time = sum_variance_time/(iterations-1);
   reply.variance_loss = sum_variance_loss/(iterations-1);
   FREE(results);
+  results = NULL;
   return coreAPI->sendToClient(client,
                               &reply.header);
 }
@@ -455,7 +415,7 @@
   GE_ASSERT(capi->ectx,
            0 == GC_set_configuration_value_string(capi->cfg,
                                                   capi->ectx,
-                                                 "ABOUT",
+                                                  "ABOUT",
                                                   "tbench",
                                                   gettext_noop("allows 
profiling of direct "
                                                                "peer-to-peer 
connections")));

Modified: GNUnet/src/applications/tbench/tbenchtest.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest.c 2006-09-12 21:15:40 UTC (rev 
3397)
+++ GNUnet/src/applications/tbench/tbenchtest.c 2006-09-14 05:09:41 UTC (rev 
3398)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing 
authors)
+     (C) 2001, 2002, 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -27,15 +27,11 @@
 #include "platform.h"
 #include "gnunet_protocols.h"
 #include "gnunet_stats_lib.h"
+#include "gnunet_util_crypto.h"
 #include "gnunet_util_config_impl.h"
 #include "gnunet_util_network_client.h"
 #include "tbench.h"
 
-/**
- * Identity of peer 2.
- */
-static PeerIdentity peer2;
-
 static int test(struct ClientServerConnection * sock,
                unsigned int messageSize,
                unsigned int messageCnt,
@@ -43,11 +39,15 @@
                cron_t messageSpacing,
                unsigned int messageTrainSize,
                cron_t messageTimeOut /* in milli-seconds */) {
+  PeerIdentity peer2;
   int ret;
   CS_tbench_request_MESSAGE msg;
   CS_tbench_reply_MESSAGE * buffer;
   float messagesPercentLoss;
 
+  
enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77FOPDA8F1VIKESLSNBO",
+          &peer2.hashPubKey);
+
   printf(_("Using %u messages of size %u for %u times.\n"),
         messageCnt,
         messageSize,
@@ -64,7 +64,7 @@
   msg.receiverId  = peer2;
 
   if (SYSERR == connection_write(sock,
-                             &msg.header))
+                                &msg.header))
     return -1;
   ret = 0;
 
@@ -211,7 +211,7 @@
     printf(_("Running benchmark...\n"));
     /* 'slow' pass: wait for bandwidth negotiation! */
     if (ret == 0)
-      ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+      ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 5 * cronSECONDS);
     checkConnected(sock);
     /* 'blast' pass: hit bandwidth limits! */
     for (i=8;i<60000;i*=2) {

Modified: GNUnet/src/applications/tbench/tbenchtest_udp.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest_udp.c     2006-09-12 21:15:40 UTC 
(rev 3397)
+++ GNUnet/src/applications/tbench/tbenchtest_udp.c     2006-09-14 05:09:41 UTC 
(rev 3398)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing 
authors)
+     (C) 2001, 2002, 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -27,15 +27,11 @@
 #include "platform.h"
 #include "gnunet_protocols.h"
 #include "gnunet_stats_lib.h"
+#include "gnunet_util_crypto.h"
 #include "gnunet_util_config_impl.h"
 #include "gnunet_util_network_client.h"
 #include "tbench.h"
 
-/**
- * Identity of peer 2.
- */
-static PeerIdentity peer2;
-
 static int test(struct ClientServerConnection * sock,
                unsigned int messageSize,
                unsigned int messageCnt,
@@ -43,11 +39,14 @@
                cron_t messageSpacing,
                unsigned int messageTrainSize,
                cron_t messageTimeOut /* in milli-seconds */) {
+  PeerIdentity peer2; 
   int ret;
   CS_tbench_request_MESSAGE msg;
   CS_tbench_reply_MESSAGE * buffer;
   float messagesPercentLoss;
 
+  
enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77FOPDA8F1VIKESLSNBO",
+          &peer2.hashPubKey);
   printf(_("Using %u messages of size %u for %u times.\n"),
         messageCnt,
         messageSize,
@@ -165,8 +164,8 @@
 #endif
   /* in case existing hellos have expired */
   PTHREAD_SLEEP(30 * cronSECONDS);
-  system("cp peer1/data/hosts/* peer2/data/hosts/");
-  system("cp peer2/data/hosts/* peer1/data/hosts/");
+  system("cp peer1-udp/data/hosts/* peer2-udp/data/hosts/");
+  system("cp peer2-udp/data/hosts/* peer1-udp/data/hosts/");
   ret = 0;
 #if START_PEERS
   if (daemon1 != -1) {
@@ -211,7 +210,7 @@
     printf(_("Running benchmark...\n"));
     /* 'slow' pass: wait for bandwidth negotiation! */
     if (ret == 0)
-      ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+      ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 5 * cronSECONDS);
     checkConnected(sock);
     /* 'blast' pass: hit bandwidth limits! */
     for (i=8;i<60000;i*=2) {

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/server/connection.c      2006-09-14 05:09:41 UTC (rev 3398)
@@ -128,7 +128,7 @@
 /**
  * How often do we expect to re-run the traffic allocation
  * code? (depends on MINIMUM_SAMPLE_COUNT and MIN_BPM_PER_PEER
- * and MTU size).
+ * and MTU size). [2 * 32 M / 50 = 5M ]
  */
 #define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU / 
MIN_BPM_PER_PEER)
 
@@ -532,9 +532,14 @@
   }
   *dst = 0;
 
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "%s: Sender `%s', key `%s', IV %u msg CRC %u\n",
-      prefix, &enc, skey, *((int *) iv), crc);
+  GE_LOG(ectx,
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "%s: Sender `%s', key `%s', IV %u msg CRC %u\n",
+        prefix, 
+        &enc, 
+        skey,
+        *((int *) iv),
+        crc);
 }
 #endif
 
@@ -854,14 +859,16 @@
       / (be->max_bpm * cronMINUTES / cronMILLIS)  /* bytes per ms */
       /2;                       /* some head-room */
   }
-  /* Also: allow at least MINIMUM_SAMPLE_COUNT knapsack
+  /* Also: allow at least 2 * MINIMUM_SAMPLE_COUNT knapsack
      solutions for any MIN_SAMPLE_TIME! */
-  if(be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
-    be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
+  if (be->MAX_SEND_FREQUENCY > 2 * MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
+    be->MAX_SEND_FREQUENCY = 2 * MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
 
-  if(be->lastSendAttempt + be->MAX_SEND_FREQUENCY > get_time()) {
-#if DEBUG_CONNECTION
-    GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Send frequency too high 
(CPU load), send deferred.\n");
+  if (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > get_time()) {
+#if DEBUG_CONNECTION || 1
+    GE_LOG(ectx,
+          GE_DEBUG | GE_REQUEST | GE_USER, 
+          "Send frequency too high (CPU load), send deferred.\n");
 #endif
     return NO;                  /* frequency too high, wait */
   }
@@ -882,6 +889,7 @@
   int i;
   int j;
   int approxProb;
+  cron_t deadline;
 
   totalMessageSize = 0;
   (*priority) = 0;
@@ -891,45 +899,56 @@
 
   if (be->session.mtu == 0) {
     totalMessageSize = sizeof(P2P_PACKET_HEADER);
+    deadline = (cron_t) -1L; /* infinity */
+
     i = 0;
     /* assumes entries are sorted by priority! */
-    while(i < be->sendBufferSize) {
+    while (i < be->sendBufferSize) {
       entry = be->sendBuffer[i];
-      if((totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
-         (entry->pri >= EXTREME_PRIORITY)) {
+      if ( (totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
+          (entry->pri >= EXTREME_PRIORITY)) {
         entry->knapsackSolution = YES;
+       if (entry->transmissionTime < deadline)
+         deadline = entry->transmissionTime;
         (*priority) += entry->pri;
 #if DEBUG_CONNECTION
-        GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Selecting msg %u with 
length %u\n", i, entry->len);
+        GE_LOG(ectx, 
+              GE_DEBUG | GE_REQUEST | GE_USER, 
+              "Selecting msg %u with length %u\n", 
+              i, 
+              entry->len);
 #endif
         totalMessageSize += entry->len;
-      }
-      else {
+      } else {
         entry->knapsackSolution = NO;
         break;
       }
       i++;
     }
-    if((i == 0) && (be->sendBuffer[i]->len > be->available_send_window))
+    if ( (i == 0) &&
+        (be->sendBuffer[i]->len > be->available_send_window)) {
       return 0;                 /* always wait for the highest-priority
                                    message (otherwise large messages may
                                    starve! */
-    while((i < be->sendBufferSize) &&
-          (be->available_send_window > totalMessageSize)) {
+    }
+    while ( (i < be->sendBufferSize) &&
+           (be->available_send_window > totalMessageSize)) {
       entry = be->sendBuffer[i];
-      if((entry->len + totalMessageSize <=
-          be->available_send_window) &&
-         (totalMessageSize + entry->len < MAX_BUFFER_SIZE)) {
+      if ( (entry->len + totalMessageSize <= be->available_send_window) &&
+          (totalMessageSize + entry->len < MAX_BUFFER_SIZE)) {
         entry->knapsackSolution = YES;
+       if (entry->transmissionTime < deadline)
+         deadline = entry->transmissionTime;   
 #if DEBUG_CONNECTION
-        GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Selecting msg %u with 
length %u\n", i, entry->len);
+        GE_LOG(ectx, 
+              GE_DEBUG | GE_REQUEST | GE_USER,
+              "Selecting msg %u with length %u\n", i, entry->len);
 #endif
         totalMessageSize += entry->len;
         (*priority) += entry->pri;
-      }
-      else {
+      } else {
         entry->knapsackSolution = NO;
-        if(totalMessageSize == sizeof(P2P_PACKET_HEADER)) {
+        if (totalMessageSize == sizeof(P2P_PACKET_HEADER)) {
           /* if the highest-priority message does not yet
              fit, wait for send window to grow so that
              we can get it out (otherwise we would starve
@@ -939,10 +958,11 @@
       }
       i++;
     }
-    if((totalMessageSize == sizeof(P2P_PACKET_HEADER)) ||
-       (((*priority) < EXTREME_PRIORITY) &&
-        ((totalMessageSize / sizeof(P2P_PACKET_HEADER)) < 4) &&
-        (weak_randomi(16) != 0))) {
+    if ( (totalMessageSize == sizeof(P2P_PACKET_HEADER)) ||
+        ( ((*priority) < EXTREME_PRIORITY) &&
+          ((totalMessageSize / sizeof(P2P_PACKET_HEADER)) < 4) &&
+          (deadline > get_time() + 500 * cronMILLIS) &&
+          (weak_randomi(16) != 0) ) ) {
       /* randomization necessary to ensure we eventually send
          a small message if there is nothing else to do! */
       return 0;
@@ -963,7 +983,10 @@
                                           be->session.mtu -
                                           sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-        FPRINTF(prioFile, "%llu 0 %d\n", get_time(), priority);
+        FPRINTF(prioFile,
+               "%llu 0 %d\n", 
+               get_time(),
+               priority);
 #endif
       }
       else {
@@ -971,7 +994,10 @@
                                     be->session.mtu -
                                     sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-        FPRINTF(prioFile, "%llu 1 %d\n", get_time(), priority);
+        FPRINTF(prioFile,
+               "%llu 1 %d\n", 
+               get_time(), 
+               priority);
 #endif
       }
     }
@@ -980,26 +1006,32 @@
                                   be->session.mtu -
                                   sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-      FPRINTF(prioFile, "%llu 2 %d\n", get_time(), priority);
+      FPRINTF(prioFile,
+             "%llu 2 %d\n", 
+             get_time(),
+             priority);
 #endif
     }
     j = 0;
     for(i = 0; i < be->sendBufferSize; i++)
       if(be->sendBuffer[i]->knapsackSolution == YES)
         j++;
-    if(j == 0) {
-      GE_LOG(ectx, GE_ERROR | GE_BULK | GE_DEVELOPER,
-          _("`%s' selected %d out of %d messages (MTU: %d).\n"),
-          __FUNCTION__,
-          j, be->sendBufferSize,
-         be->session.mtu - sizeof(P2P_PACKET_HEADER));
-
+    if (j == 0) {
+      GE_LOG(ectx,
+            GE_ERROR | GE_BULK | GE_DEVELOPER,
+            _("`%s' selected %d out of %d messages (MTU: %d).\n"),
+            __FUNCTION__,
+            j,
+            be->sendBufferSize,
+            be->session.mtu - sizeof(P2P_PACKET_HEADER));
+      
       for(j = 0; j < be->sendBufferSize; j++)
-        GE_LOG(ectx, GE_ERROR | GE_BULK | GE_DEVELOPER,
-            _("Message details: %u: length %d, priority: %d\n"),
-            j, 
-           be->sendBuffer[j]->len,
-           be->sendBuffer[j]->pri);
+        GE_LOG(ectx, 
+              GE_ERROR | GE_BULK | GE_DEVELOPER,
+              _("Message details: %u: length %d, priority: %d\n"),
+              j, 
+              be->sendBuffer[j]->len,
+              be->sendBuffer[j]->pri);
       return 0;
     }
 
@@ -1007,11 +1039,12 @@
       /* if we have a very high priority, we may
          want to ignore bandwidth availability (e.g. for HANGUP,
          which  has EXTREME_PRIORITY) */
-      if((*priority) < EXTREME_PRIORITY) {
+      if ((*priority) < EXTREME_PRIORITY) {
 #if DEBUG_CONNECTION
-        GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-            "bandwidth limits prevent sending (send window %u too small).\n",
-            be->available_send_window);
+        GE_LOG(ectx,
+              GE_DEBUG | GE_REQUEST | GE_USER,
+              "bandwidth limits prevent sending (send window %u too small).\n",
+              be->available_send_window);
 #endif
         return 0;               /* can not send, BPS available is too small */
       }
@@ -1039,7 +1072,9 @@
   be->lastSendAttempt = get_time();
   expired = be->lastSendAttempt - SECONDS_PINGATTEMPT * cronSECONDS;
 #if DEBUG_CONNECTION
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "policy prevents sending 
message\n");
+  GE_LOG(ectx,
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "policy prevents sending message\n");
 #endif
 
   load = os_cpu_get_load(ectx, cfg);
@@ -1067,10 +1102,11 @@
       continue;
     if(entry->transmissionTime <= expired) {
 #if DEBUG_CONNECTION
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-          "expiring message, expired %ds ago, queue size is %llu (bandwidth 
stressed)\n",
-          (int) ((get_time() - entry->transmissionTime) / cronSECONDS),
-          usedBytes);
+      GE_LOG(ectx,
+            GE_DEBUG | GE_REQUEST | GE_USER,
+            "expiring message, expired %ds ago, queue size is %llu (bandwidth 
stressed)\n",
+            (int) ((get_time() - entry->transmissionTime) / cronSECONDS),
+            usedBytes);
 #endif
       if (stats != NULL) {
         stats->change(stat_messagesDropped, 1);
@@ -1308,30 +1344,48 @@
     GE_BREAK(ectx, 0);
     return;
   }
-  if((be->status != STAT_UP) ||
-     (be->sendBufferSize == 0) || (be->inSendBuffer == YES)) {
+  if ( (be->status != STAT_UP) ||
+       (be->sendBufferSize == 0) || 
+       (be->inSendBuffer == YES) ) {
     return;                     /* must not run */
   }
   be->inSendBuffer = YES;
 
-  if((OK != ensureTransportConnected(be)) ||
-     (be->sendBufferSize == 0) || (OK != checkSendFrequency(be))) {
+  if ( (OK != ensureTransportConnected(be)) ||
+       (be->sendBufferSize == 0) || 
+       (OK != checkSendFrequency(be)) ){ 
     be->inSendBuffer = NO;
+#if 0
+    GE_LOG(ectx,
+          GE_DEBUG | GE_DEVELOPER | GE_BULK,
+          "Will not try to send: %d %d %d\n",
+          (OK != ensureTransportConnected(be)),
+          (be->sendBufferSize == 0),
+          (OK != checkSendFrequency(be)));
+#endif
     return;
   }
 
   /* test if receiver has enough bandwidth available!  */
   updateCurBPS(be);
 #if DEBUG_CONNECTION
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "receiver window available: %lld bytes (MTU: %u)\n",
-      be->available_send_window, be->session.mtu);
+  GE_LOG(ectx,
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "receiver window available: %lld bytes (MTU: %u)\n",
+        be->available_send_window,
+        be->session.mtu);
 #endif
 
   totalMessageSize = selectMessagesToSend(be, &priority);
   if (totalMessageSize == 0) {
     expireSendBufferEntries(be);
     be->inSendBuffer = NO;
+#if DEBUG_CONNECTION
+    GE_LOG(ectx,
+          GE_DEBUG | GE_DEVELOPER | GE_BULK,
+          "No messages selected for sending (%d)\n",
+          be->available_send_window);
+#endif
     return;                     /* deferr further */
   }
   GE_ASSERT(ectx, totalMessageSize > sizeof(P2P_PACKET_HEADER));
@@ -1339,8 +1393,11 @@
   /* check if we (sender) have enough bandwidth available
      if so, trigger callbacks on selected entries; if either
      fails, return (but clean up garbage) */
-  if((SYSERR == outgoingCheck(priority)) ||
-     (0 == prepareSelectedMessages(be))) {
+  if ((SYSERR == outgoingCheck(priority)) ||
+      (0 == prepareSelectedMessages(be))) {
+    GE_LOG(ectx,
+          GE_DEBUG | GE_DEVELOPER | GE_BULK,
+          "Insufficient bandwidth or priority to send message\n");
     expireSendBufferEntries(be);
     be->inSendBuffer = NO;
     return;                     /* deferr further */
@@ -1365,7 +1422,11 @@
       continue;
     if(entry->knapsackSolution == YES) {
 #if DEBUG_CONNECTION
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Queuing msg %u with 
length %u\n", perm[i], entry->len);
+      GE_LOG(ectx, 
+            GE_DEBUG | GE_REQUEST | GE_USER, 
+            "Queuing msg %u with length %u\n", 
+            perm[i],
+            entry->len);
 #endif
       GE_ASSERT(ectx, entry->callback == NULL);
       GE_ASSERT(ectx, p + entry->len <= totalMessageSize);
@@ -1414,14 +1475,22 @@
                     (const INITVECTOR *) encryptedMsg,  /* IV */
                      &((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber);
 #if DEBUG_CONNECTION
-  printMsg("Encrypting P2P data", &be->session.sender,
-           &be->skey_local, (const INITVECTOR *) encryptedMsg,
+  printMsg("Encrypting P2P data",
+          &be->session.sender,
+           &be->skey_local, 
+          (const INITVECTOR *) encryptedMsg,
            crc32N(&((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber,
                   ret));
 #endif
   if(stats != NULL)
     stats->change(stat_encrypted, p - sizeof(HashCode512));
   GE_ASSERT(ectx, be->session.tsession != NULL);
+#if DEBUG_CONNECTION
+  GE_LOG(ectx,
+        GE_DEBUG | GE_DEVELOPER | GE_BULK,
+        "Asking transport to send message with priority %u\n",
+        priority);
+#endif
   ret = transport->send(be->session.tsession, 
                        encryptedMsg,
                        p,
@@ -1512,19 +1581,24 @@
   }
 
 #if DEBUG_CONNECTION
-  IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, 
hash2enc(&be->session.sender.hashPubKey, &enc));
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-      "adding message of size %d to buffer of host `%s'\n",
-      se->len,
-      &enc);
+  IF_GELOG(ectx,
+          GE_DEBUG | GE_REQUEST | GE_USER, 
+          hash2enc(&be->session.sender.hashPubKey, 
+                   &enc));
+  GE_LOG(ectx,
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "adding message of size %d to buffer of host `%s'\n",
+        se->len,
+        &enc);
 #endif
   if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
     /* as long as we do not have a confirmed
        connection, do NOT queue messages! */
 #if DEBUG_CONNECTION
-    GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, 
-       "not connected to `%s', message dropped\n",
-       &enc);
+    GE_LOG(ectx,
+          GE_DEBUG | GE_REQUEST | GE_USER, 
+          "not connected to `%s', message dropped\n",
+          &enc);
 #endif
     FREE(se->closure);
     FREE(se);
@@ -1534,23 +1608,24 @@
   for(i = 0; i < be->sendBufferSize; i++)
     queueSize += be->sendBuffer[i]->len;
 
-  if(queueSize >= MAX_SEND_BUFFER_SIZE) {
+  if (queueSize >= MAX_SEND_BUFFER_SIZE) {
     /* first, try to remedy! */
     sendBuffer(be);
     /* did it work? */
 
     queueSize = 0;
-    for(i = 0; i < be->sendBufferSize; i++)
+    for (i = 0; i < be->sendBufferSize; i++)
       queueSize += be->sendBuffer[i]->len;
 
-    if(queueSize >= MAX_SEND_BUFFER_SIZE) {
+    if (queueSize >= MAX_SEND_BUFFER_SIZE) {
       /* we need to enforce some hard limit here, otherwise we may take
          FAR too much memory (200 MB easily) */
 #if DEBUG_CONNECTION
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-          "queueSize (%llu) >= %d, refusing to queue message.\n",
-          queueSize, 
-         MAX_SEND_BUFFER_SIZE);
+      GE_LOG(ectx, 
+            GE_DEBUG | GE_REQUEST | GE_USER,
+            "queueSize (%llu) >= %d, refusing to queue message.\n",
+            queueSize, 
+            MAX_SEND_BUFFER_SIZE);
 #endif
       FREE(se->closure);
       FREE(se);
@@ -1847,12 +1922,13 @@
   int firstRound;
   int earlyRun;
   int load;
+  int * perm;
 
   MUTEX_LOCK(lock);
   now = get_time();
 
   /* if this is the first round, don't bother... */
-  if(lastRoundStart == 0) {
+  if (lastRoundStart == 0) {
     /* no allocation the first time this function is called! */
     lastRoundStart = now;
     forAllConnectedHosts(&resetRecentlyReceived, NULL);
@@ -1861,7 +1937,7 @@
   }
 
   activePeerCount = forAllConnectedHosts(NULL, NULL);
-  if(activePeerCount == 0) {
+  if (activePeerCount == 0) {
     MUTEX_UNLOCK(lock);
     return;                     /* nothing to be done here. */
   }
@@ -1874,15 +1950,15 @@
      to the limits anyway) */
   timeDifference = now - lastRoundStart;
   earlyRun = 0;
-  if(timeDifference < MIN_SAMPLE_TIME) {
+  if (timeDifference < MIN_SAMPLE_TIME) {
     earlyRun = 1;
-    if(activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
+    if (activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
       MUTEX_UNLOCK(lock);
       return;                   /* don't update too frequently, we need at 
least some
                                    semi-representative sampling! */
     }
   }
-  if(timeDifference == 0)
+  if (timeDifference == 0)
     timeDifference = 1;
 
   /* build an array containing all BEs */
@@ -1904,28 +1980,28 @@
   }
 
   /* normalize distribution */
-  if(shareSum >= 0.00001) {     /* avoid numeric glitches... */
+  if (shareSum >= 0.00001) {     /* avoid numeric glitches... */
     for(u = 0; u < activePeerCount; u++)
       shares[u] = shares[u] / shareSum;
-  }
-  else {
+  } else {
+    /* proportional shareing */
     for(u = 0; u < activePeerCount; u++)
       shares[u] = 1 / activePeerCount;
   }
 
   /* compute how much bandwidth we can bargain with */
   minCon = minConnect();
-  if(minCon > activePeerCount)
+  if (minCon > activePeerCount)
     minCon = activePeerCount;
   schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
   load = os_network_monitor_get_load(load_monitor,
                                     Download);
-  if(load > 100) {
+  if (load > 100) {
     /* take counter measures! */
     schedulableBandwidth = schedulableBandwidth * 100 / load;
     /* make sure we do not take it down too far */
-    if((schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
-       (max_bpm > minCon * MIN_BPM_PER_PEER * 2))
+    if ( (schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
+        (max_bpm > minCon * MIN_BPM_PER_PEER * 2) )
       schedulableBandwidth = minCon * MIN_BPM_PER_PEER / 2;
   }
 
@@ -1943,34 +2019,42 @@
 #if DEBUG_CONNECTION
     if(adjustedRR[u] > entries[u]->idealized_limit) {
       EncName enc;
-      IF_GELOG(ectx, GE_INFO | GE_BULK | GE_USER, 
hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
-      GE_LOG(ectx, GE_INFO | GE_BULK | GE_USER,
-          "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
-          &enc, adjustedRR[u], entries[u]->idealized_limit);
+      IF_GELOG(ectx,
+              GE_INFO | GE_BULK | GE_USER, 
+              hash2enc(&entries[u]->session.sender.hashPubKey, 
+                       &enc));
+      GE_LOG(ectx, 
+            GE_INFO | GE_BULK | GE_USER,
+            "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
+            &enc, 
+            adjustedRR[u], 
+            entries[u]->idealized_limit);
     }
 #endif
     /* Check for peers grossly exceeding send limits. Be a bit
      * reasonable and make the check against the max value we have
      * sent to this peer (assume announcements may have got lost).
      */
-    if((earlyRun == 0) &&
-       (adjustedRR[u] > 2 * MAX_BUF_FACT *
-        entries[u]->max_transmitted_limit) &&
-       (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
+    if ( (earlyRun == 0) &&
+        (adjustedRR[u] > 2 * MAX_BUF_FACT *
+         entries[u]->max_transmitted_limit) &&
+        (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
       EncName enc;
 
       entries[u]->violations++;
       entries[u]->recently_received = 0;  /* "clear" slate */
       if (entries[u]->violations > 10) {
-        IF_GELOG(ectx, GE_INFO | GE_BULK | GE_USER,
-              hash2enc(&entries[u]->session.sender.hashPubKey,
-                      &enc));
-        GE_LOG(ectx, GE_INFO | GE_BULK | GE_USER,
-            "blacklisting `%s': sent repeatedly %llu bpm "
-            "(limit %u bpm, target %u bpm)\n",
-            &enc,
-            adjustedRR[u],
-            entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
+        IF_GELOG(ectx, 
+                GE_INFO | GE_BULK | GE_USER,
+                hash2enc(&entries[u]->session.sender.hashPubKey,
+                         &enc));
+        GE_LOG(ectx, 
+              GE_INFO | GE_BULK | GE_USER,
+              "blacklisting `%s': sent repeatedly %llu bpm "
+              "(limit %u bpm, target %u bpm)\n",
+              &enc,
+              adjustedRR[u],
+              entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
         identity->blacklistHost(&entries[u]->session.sender,
                                 1 / topology->getSaturation(), YES);
         shutdownConnection(entries[u]);
@@ -1981,11 +2065,10 @@
         u--;
         continue;
       }
-    }
-    else {
-      if((earlyRun == 0) &&
-         (adjustedRR[u] < entries[u]->max_transmitted_limit / 2) &&
-         (entries[u]->violations > 0)) {
+    } else {
+      if ( (earlyRun == 0) &&
+          (adjustedRR[u] < entries[u]->max_transmitted_limit / 2) &&
+          (entries[u]->violations > 0)) {
         /* allow very low traffic volume to
            balance out (rare) times of high
            volume */
@@ -1993,7 +2076,7 @@
       }
     }
 
-    if(adjustedRR[u] < MIN_BPM_PER_PEER / 2)
+    if (adjustedRR[u] < MIN_BPM_PER_PEER / 2)
       adjustedRR[u] = MIN_BPM_PER_PEER / 2;
     /* even if we received NO traffic, allow
        at least MIN_BPM_PER_PEER */
@@ -2015,84 +2098,73 @@
   firstRound = YES;
   for (u = 0; u < activePeerCount; u++)
     entries[u]->idealized_limit = 0;
-  while((schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
-        (activePeerCount > 0) && (didAssign == YES)) {
+  while ( (schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
+         (activePeerCount > 0) && 
+         (didAssign == YES) ) {
     didAssign = NO;
     decrementSB = 0;
-    for(u = 0; u < activePeerCount; u++) {
+    for (u = 0; u < activePeerCount; u++) {
       /* always allow allocating MIN_BPM_PER_PEER */
-      if((firstRound == NO) ||
-         (entries[u]->idealized_limit < adjustedRR[u] * 2)) {
+      if ( (firstRound == NO) ||
+          (entries[u]->idealized_limit < adjustedRR[u] * 2) ) {
         unsigned int share;
 
-        share =
+       share =
           entries[u]->idealized_limit +
           (unsigned int) (shares[u] * schedulableBandwidth);
-        if(share < entries[u]->idealized_limit)
+        if (share < entries[u]->idealized_limit)
           share = 0xFFFFFFFF;   /* int overflow */
-        if((share > adjustedRR[u] * 2) && (firstRound == YES))
+        if ( (share > adjustedRR[u] * 2) && (firstRound == YES))
           share = adjustedRR[u] * 2;
-        if(share > entries[u]->idealized_limit) {
+        if ( (share < MIN_BPM_PER_PEER) &&
+            (minCon > 0) ) {
+          /* use one of the minCon's to keep the connection! */
+          share += MIN_BPM_PER_PEER;
+          decrementSB -= MIN_BPM_PER_PEER; /* do not count */
+          minCon--;
+        }
+        if (share > entries[u]->idealized_limit) {
           decrementSB += share - entries[u]->idealized_limit;
           didAssign = YES;
         }
-        if((share < MIN_BPM_PER_PEER) && (minCon > 0)) {
-          /* use one of the minCon's to keep the connection! */
-          decrementSB -= share;
-          share = MIN_BPM_PER_PEER;
-          minCon--;
-        }
+       if (share > 0)
+         didAssign = YES;
         entries[u]->idealized_limit = share;
       }
-    }
-    if(decrementSB > schedulableBandwidth) {
+    } /* end for all peers */
+    if (decrementSB < schedulableBandwidth) {
       schedulableBandwidth -= decrementSB;
-    }
-    else {
+    } else {
       schedulableBandwidth = 0;
       break;
     }
-    if((activePeerCount > 0) && (didAssign == NO)) {
-      int *perm = permute(WEAK, activePeerCount);
+    if ( (activePeerCount > 0) && 
+        (didAssign == NO) ) {
+      perm = permute(WEAK, activePeerCount);
       /* assign also to random "worthless" (zero-share) peers */
-      for(u = 0; u < activePeerCount; u++) {
+      for (u = 0; u < activePeerCount; u++) {
         unsigned int v = perm[u]; /* use perm to avoid preference to 
low-numbered slots */
-        if((firstRound == NO) ||
-           (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
+        if ( (firstRound == NO) ||
+            (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
           unsigned int share;
 
           share =
             entries[v]->idealized_limit +
             (unsigned int) (schedulableBandwidth);
-          if(share < entries[u]->idealized_limit)
+          if (share < entries[u]->idealized_limit)
             share = 0xFFFFFFFF; /* int overflow */
-          if((firstRound == YES) && (share > adjustedRR[u] * 2))
+          if ((firstRound == YES) && (share > adjustedRR[u] * 2))
             share = adjustedRR[u] * 2;
-          schedulableBandwidth -= share - entries[v]->idealized_limit;
-          entries[v]->idealized_limit = share;
+
+          schedulableBandwidth -= share - entries[v]->idealized_limit; 
+         entries[v]->idealized_limit = share;
         }
       }
       FREE(perm);
       perm = NULL;
 
-      if((schedulableBandwidth > 0) && (activePeerCount > 0)) {
-        /* assign rest disregarding traffic limits */
-        perm = permute(WEAK, activePeerCount);
-        for(u = 0; u < activePeerCount; u++) {
-          unsigned int share;
-
-          share =
-            entries[perm[u]]->idealized_limit +
-            (unsigned int) (schedulableBandwidth / activePeerCount);
-          if(share > entries[perm[u]]->idealized_limit) /* no int-overflow? */
-            entries[perm[u]]->idealized_limit = share;
-        }
-        schedulableBandwidth = 0;
-        FREE(perm);
-        perm = NULL;
-      }
-    }                           /* didAssign == NO? */
-    if(firstRound == YES) {
+    }  /* didAssign == NO? */
+    if (firstRound == YES) {
       /* keep some bandwidth off the market
          for new connections */
       schedulableBandwidth /= 2;
@@ -2100,11 +2172,31 @@
     firstRound = NO;
   }                             /* while bandwidth to distribute */
 
+  if ( (schedulableBandwidth > 0) && 
+       (activePeerCount > 0) ) {
+    /* assign rest disregarding traffic limits */
+    perm = permute(WEAK, activePeerCount);
+    for(u = 0; u < activePeerCount; u++) {
+      unsigned int share;
+      
+      share =
+       entries[perm[u]]->idealized_limit +
+       (unsigned int) (schedulableBandwidth / activePeerCount);
+      if (share > entries[perm[u]]->idealized_limit) { /* no int-overflow? */
+       entries[perm[u]]->idealized_limit = share;
+      } else {
+       entries[perm[u]]->idealized_limit = 0xFFFF0000;     
+      }
+    }
+    schedulableBandwidth = 0;
+    FREE(perm);
+    perm = NULL;
+  }
 
   /* randomly add the remaining MIN_BPM_PER_PEER to minCon peers; yes, this 
will
      yield some fluctuation, but some amount of fluctuation should be
      good since it creates opportunities. */
-  if(activePeerCount > 0)
+  if (activePeerCount > 0)
     for(u = 0; u < minCon; u++)
       entries[weak_randomi(activePeerCount)]->idealized_limit
         += MIN_BPM_PER_PEER;
@@ -2115,13 +2207,30 @@
 #if DEBUG_CONNECTION
     EncName enc;
 
-    IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, 
hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
-    GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-        "inbound limit for peer %u: %s set to %u bpm\n",
-        u, &enc, entries[u]->idealized_limit);
+    IF_GELOG(ectx,
+            GE_DEBUG | GE_BULK | GE_USER, 
+            hash2enc(&entries[u]->session.sender.hashPubKey, 
+                     &enc));
+    GE_LOG(ectx, 
+          GE_DEBUG | GE_BULK | GE_USER,
+          "inbound limit for peer %u: %s set to %u bpm\n",
+          u, 
+          &enc, 
+          entries[u]->idealized_limit);
 #endif
-    entries[u]->current_connection_value /= 2.0;
-    entries[u]->recently_received /= 2;
+    if ( (timeDifference > 50) &&
+        (weak_randomi(timeDifference + 1) > 50) )
+      entries[u]->current_connection_value /= 2.0;
+    decrementSB = entries[u]->idealized_limit * timeDifference / cronMINUTES / 
2;
+    if ( (decrementSB == 0) &&
+        (weak_randomi(timeDifference + 1) != 0) )
+      decrementSB = 1;
+    if (entries[u]->recently_received >= decrementSB) {
+      entries[u]->recently_received
+       -= decrementSB;
+    } else {
+      entries[u]->recently_received = 0;
+    }
   }
 
   /* free memory */
@@ -2129,19 +2238,22 @@
   FREE(shares);
   FREE(entries);
   for (u = 0; u < CONNECTION_MAX_HOSTS_; u++) {
-    BufferEntry *be = CONNECTION_buffer_[u];
-    if(be == NULL)
+    BufferEntry * be = CONNECTION_buffer_[u];
+    if (be == NULL)
       continue;
     if (be->idealized_limit < MIN_BPM_PER_PEER) {
-#if DEBUG_CONNECTION || 1
+#if DEBUG_CONNECTION
       EncName enc;
 
-      IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-           hash2enc(&be->session.sender.hashPubKey, 
-                    &enc));
-      GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-          "Number of connections too high, shutting down low-traffic 
connection to `%s' (had only %u bpm)\n",
-          &enc, be->idealized_limit);
+      IF_GELOG(ectx,
+              GE_DEBUG | GE_REQUEST | GE_USER,
+              hash2enc(&be->session.sender.hashPubKey, 
+                       &enc));
+      GE_LOG(ectx,
+            GE_DEBUG | GE_REQUEST | GE_USER,
+            "Number of connections too high, shutting down low-traffic 
connection to `%s' (had only %u bpm)\n",
+            &enc,
+            be->idealized_limit);
 #endif
       /* We need to avoid giving a too low limit (especially 0, which
         would indicate a plaintex msg).  So we set the limit to the
@@ -2374,7 +2486,10 @@
 
   be->max_bpm = ntohl(msg->bandwidth);
 #if DEBUG_CONNECTION
-  GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Received bandwidth cap of %u 
bpm\n", be->max_bpm);
+  GE_LOG(ectx, 
+        GE_DEBUG | GE_REQUEST | GE_USER,
+        "Received bandwidth cap of %u bpm\n", 
+        be->max_bpm);
 #endif
   if(be->available_send_window >= be->max_bpm) {
     be->available_send_window = be->max_bpm;
@@ -2472,9 +2587,10 @@
   if(be != NULL) {
     be->isAlive = get_time();
     identity->whitelistHost(peer);
-    if(((be->status & STAT_SETKEY_SENT) > 0) &&
-       ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
-       (OK == ensureTransportConnected(be)) && (be->status != STAT_UP)) {
+    if( ((be->status & STAT_SETKEY_SENT) > 0) &&
+       ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
+       (OK == ensureTransportConnected(be)) && 
+       (be->status != STAT_UP) ) {
       be->status = STAT_UP;
       be->lastSequenceNumberReceived = 0;
       be->lastSequenceNumberSend = 1;
@@ -3082,7 +3198,8 @@
   ENTRY();
   MUTEX_LOCK(lock);
   be = addHost(hostId, YES);
-  if((be != NULL) && (be->status != STAT_DOWN)) {
+  if ((be != NULL) &&
+      (be->status != STAT_DOWN)) {
     SendEntry *entry;
 
     entry = MALLOC(sizeof(SendEntry));
@@ -3094,8 +3211,7 @@
     entry->closure = closure;
     entry->knapsackSolution = NO;
     appendToBuffer(be, entry);
-  }
-  else {
+  } else {
     FREENONNULL(closure);
   }
   MUTEX_UNLOCK(lock);
@@ -3116,17 +3232,21 @@
   char *closure;
   unsigned short len;
 
-  if(msg == NULL) {
+  if (msg == NULL) {
     /* little hack for topology,
        which cannot do this directly
        due to cyclic dependencies! */
-    if(getBandwidthAssignedTo(receiver) == 0)
+    if (getBandwidthAssignedTo(receiver) == 0)
       session->tryConnect(receiver);
     return;
   }
   len = ntohs(msg->size);
-  if(len == 0)
+  if (len == 0) {
+    GE_LOG(ectx,
+          GE_DEBUG | GE_BULK | GE_DEVELOPER,
+          "Empty message send (hopefully used to initiate connection 
attempt)\n");
     return;
+  }
   closure = MALLOC(len);
   memcpy(closure, msg, len);
   unicastCallback(receiver,
@@ -3199,13 +3319,14 @@
  * @param node the identity of the other peer
  * @param preference how much should the traffic preference be increased?
  */
-void updateTrafficPreference(const PeerIdentity * node, double preference) {
+void updateTrafficPreference(const PeerIdentity * node, 
+                            double preference) {
   BufferEntry *be;
 
   ENTRY();
   MUTEX_LOCK(lock);
   be = lookForHost(node);
-  if(be != NULL)
+  if (be != NULL)
     be->current_connection_value += preference;
   MUTEX_UNLOCK(lock);
 }
@@ -3224,9 +3345,14 @@
   MUTEX_LOCK(lock);
   be = lookForHost(node);
   if(be != NULL) {
-    IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, 
hash2enc(&node->hashPubKey, &enc));
-    GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
-        "Closing connection to `%s' as requested by application.\n", &enc);
+    IF_GELOG(ectx,
+            GE_DEBUG | GE_REQUEST | GE_USER,
+            hash2enc(&node->hashPubKey,
+                     &enc));
+    GE_LOG(ectx, 
+          GE_DEBUG | GE_REQUEST | GE_USER,
+          "Closing connection to `%s' as requested by application.\n",
+          &enc);
     shutdownConnection(be);
   }
   MUTEX_UNLOCK(lock);

Modified: GNUnet/src/server/gnunetd.c
===================================================================
--- GNUnet/src/server/gnunetd.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/server/gnunetd.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -199,7 +199,7 @@
            "Sorry, your C compiler did not properly align the C structs. 
Aborting.\n");
     return -1;
   }
-  ectx = GE_create_context_stderr(NO, 
+  ectx = GE_create_context_stderr(YES, 
                                  GE_DEBUG |
                                  GE_WARNING | GE_ERROR | GE_FATAL |
                                  GE_USER | GE_ADMIN | GE_DEVELOPER |

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/transports/tcp.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -31,7 +31,7 @@
 #include "platform.h"
 #include "ip.h"
 
-#define DEBUG_TCP YES
+#define DEBUG_TCP NO
 
 /**
  * after how much time of the core not being associated with a tcp

Modified: GNUnet/src/transports/tcp_helper.c
===================================================================
--- GNUnet/src/transports/tcp_helper.c  2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/transports/tcp_helper.c  2006-09-14 05:09:41 UTC (rev 3398)
@@ -310,6 +310,12 @@
   memcpy(&mp[1],
         msg,
         size);
+#if DEBUG_TCP
+  GE_LOG(ectx,
+        GE_DEBUG | GE_DEVELOPER | GE_BULK,
+        "Transport asks select to queue message of size %u\n",
+        size);
+#endif
   ok = select_write(selector,
                    tcpSession->sock,
                    mp,

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/todo 2006-09-14 05:09:41 UTC (rev 3398)
@@ -18,10 +18,9 @@
     + loggers: SMTP logger
     + use new loggers in for CS error reporting
   * make testcases compile & pass again:
-    + tbench -- compiles
-    + gap
-    + fs/namespace
-    + fs/fsui
+    + gap -- does not yet compile
+    + fs/namespace -- does not yet compile
+    + fs/fsui -- downloadtest does not yet compile
     + dht/tools, dht/module
   * transports:
     + SMTP/HTTP: do not yet compile (commented out from build)
@@ -34,7 +33,6 @@
       - state
       - advertising,
       - ecrs_core
-      - tbench
       - tracekit
       - gap
       - rpc





reply via email to

[Prev in Thread] Current Thread [Next in Thread]