gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3885 - GNUnet/src/server


From: grothoff
Subject: [GNUnet-SVN] r3885 - GNUnet/src/server
Date: Wed, 6 Dec 2006 23:03:56 -0800 (PST)

Author: grothoff
Date: 2006-12-06 23:03:53 -0800 (Wed, 06 Dec 2006)
New Revision: 3885

Modified:
   GNUnet/src/server/connection.c
Log:
improvements and bugfixes in bandwidth scheduling code

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2006-12-05 06:00:28 UTC (rev 3884)
+++ GNUnet/src/server/connection.c      2006-12-07 07:03:53 UTC (rev 3885)
@@ -107,18 +107,12 @@
 
 /**
  * How many ping/pong messages to we want to transmit
- * per SECONDS_INACTIVE_DROP interval? (must be >=4 to
+ * per SECONDS_INACTIVE_DROP interval? (must be >= 4 to
  * keep connection alive with reasonable probability).
  */
 #define TARGET_MSG_SID 8
 
 /**
- * Minimum number of sample messages (per peer) before we recompute
- * traffic assignments?
- */
-#define MINIMUM_SAMPLE_COUNT 8
-
-/**
  * What is the minimum number of bytes per minute that
  * we allocate PER peer? (5 minutes inactivity timeout,
  * 32768 MTU, 8 MSGs => 8 * 32768 / 5 = ~50000 bpm [ ~800 bps])
@@ -126,9 +120,15 @@
 #define MIN_BPM_PER_PEER (TARGET_MSG_SID * EXPECTED_MTU * 60 / 
SECONDS_INACTIVE_DROP)
 
 /**
+ * Minimum number of sample messages (per peer) before we recompute
+ * traffic assignments?
+ */
+#define MINIMUM_SAMPLE_COUNT 2
+
+/**
  * How often do we expect to re-run the traffic allocation
  * code? (depends on MINIMUM_SAMPLE_COUNT and MIN_BPM_PER_PEER
- * and MTU size). [2 * 32 M / 50 = 5M ]
+ * and MTU size). [2 * 32 M / 50 = 78s ]
  */
 #define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU / 
MIN_BPM_PER_PEER)
 
@@ -412,7 +412,8 @@
  * @param be the buffer entry
  * @param data context for callee
  */
-typedef void (*BufferEntryCallback) (BufferEntry * be, void *data);
+typedef void (*BufferEntryCallback) (BufferEntry * be, 
+                                    void *data);
 
 /* ***************** globals ********************** */
 
@@ -629,7 +630,8 @@
  * @return the overall priority that was achieved
  */
 static unsigned int
-approximateKnapsack(BufferEntry * be, unsigned int available) {
+approximateKnapsack(BufferEntry * be,
+                   unsigned int available) {
   unsigned int i;
   unsigned int count;
   SendEntry **entries;
@@ -676,7 +678,7 @@
   int *efflen;
   cron_t startTime;
   cron_t endTime;
-  SendEntry **entries;
+  SendEntry ** entries;
   unsigned int count;
 #define VARR(i,j) v[(i)+(j)*(count+1)]
 
@@ -1879,12 +1881,16 @@
     FREENONNULL(be->sendBuffer[i]->closure);
     FREE(be->sendBuffer[i]);
   }
-  GROW(be->sendBuffer, be->sendBufferSize, 0);
+  GROW(be->sendBuffer,
+       be->sendBufferSize,
+       0);
 }
 
 /* ******** inbound bandwidth scheduling ************* */
 
-static void gatherEntries(BufferEntry * be, UTL_Closure * utl) {
+static void gatherEntries(BufferEntry * be,
+                         void * cls) {
+  UTL_Closure * utl = cls;
   utl->e[utl->pos++] = be;
 }
 
@@ -1941,6 +1947,7 @@
   int earlyRun;
   int load;
   int * perm;
+  EncName enc;
 
   MUTEX_LOCK(lock);
   now = get_time();
@@ -1953,14 +1960,12 @@
     MUTEX_UNLOCK(lock);
     return;
   }
-
   activePeerCount = forAllConnectedHosts(NULL, NULL);
   if (activePeerCount == 0) {
     MUTEX_UNLOCK(lock);
     return;                     /* nothing to be done here. */
   }
 
-
   /* if time difference is too small, we don't have enough
      sample data and should NOT update the limits;
      however, if we have FAR to few peers, reschedule
@@ -1980,19 +1985,18 @@
     timeDifference = 1;
 
   /* build an array containing all BEs */
-
   entries = MALLOC(sizeof(BufferEntry *) * activePeerCount);
   utl.pos = 0;
   utl.e = entries;
-  forAllConnectedHosts((BufferEntryCallback) & gatherEntries, &utl);
+  forAllConnectedHosts(&gatherEntries, 
+                      &utl);
 
-
-  /* compute shares */
+  /* compute latest shares based on traffic preferences */
   shares = MALLOC(sizeof(double) * activePeerCount);
   shareSum = 0.0;
-  for(u = 0; u < activePeerCount; u++) {
+  for (u = 0; u < activePeerCount; u++) {
     shares[u] = SHARE_DISTRIBUTION_FUNCTION(entries[u]);
-    if(shares[u] < 0.0)
+    if (shares[u] < 0.0)
       shares[u] = 0.0;
     shareSum += shares[u];
   }
@@ -2011,32 +2015,27 @@
   minCon = minConnect();
   if (minCon > activePeerCount)
     minCon = activePeerCount;
-  schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
+  if (max_bpm > minCon * MIN_BPM_PER_PEER) {
+    schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
+  } else {
+    schedulableBandwidth = 0;
+    minCon = max_bpm / MIN_BPM_PER_PEER;
+  }
   load = os_network_monitor_get_load(load_monitor,
                                     Download);
-  if (load > 100) {
-    /* take counter measures! */
-    schedulableBandwidth = schedulableBandwidth * 100 / load;
-    /* make sure we do not take it down too far */
-    if ( (schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
-        (max_bpm > minCon * MIN_BPM_PER_PEER * 2) )
-      schedulableBandwidth = minCon * MIN_BPM_PER_PEER / 2;
-  }
+  if (load > 100) /* take counter measure */
+    schedulableBandwidth = schedulableBandwidth * 100 / load;  
 
+  /* compute recent activity profile of the peer */
   adjustedRR = MALLOC(sizeof(long long) * activePeerCount);
-
-  /* reset idealized limits; if we want a smoothed-limits
-     algorithm, we'd need to compute the new limits separately
-     and then merge the values; but for now, let's just go
-     hardcore and adjust all values rapidly */
-  GE_ASSERT(ectx, timeDifference != 0);
-  for(u = 0; u < activePeerCount; u++) {
-    adjustedRR[u] =
-      entries[u]->recently_received * cronMINUTES / timeDifference / 2;
-
+  GE_ASSERT(ectx, 
+           timeDifference != 0);
+  for (u=0;u<activePeerCount;u++) {
+    adjustedRR[u] 
+      = entries[u]->recently_received * cronMINUTES / timeDifference / 2;
+ 
 #if DEBUG_CONNECTION
-    if(adjustedRR[u] > entries[u]->idealized_limit) {
-      EncName enc;
+    if (adjustedRR[u] > entries[u]->idealized_limit) {
       IF_GELOG(ectx,
               GE_INFO | GE_BULK | GE_USER,
               hash2enc(&entries[u]->session.sender.hashPubKey,
@@ -2057,8 +2056,6 @@
         (adjustedRR[u] > 2 * MAX_BUF_FACT *
          entries[u]->max_transmitted_limit) &&
         (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
-      EncName enc;
-
       entries[u]->violations++;
       entries[u]->recently_received = 0;  /* "clear" slate */
       if (entries[u]->violations > 10) {
@@ -2074,7 +2071,8 @@
               adjustedRR[u],
               entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
         identity->blacklistHost(&entries[u]->session.sender,
-                                1 / topology->getSaturation(), YES);
+                                1 / topology->getSaturation(), 
+                               YES);
         shutdownConnection(entries[u]);
         activePeerCount--;
         entries[u] = entries[activePeerCount];
@@ -2093,11 +2091,13 @@
         entries[u]->violations--;
       }
     }
-
-    if (adjustedRR[u] < MIN_BPM_PER_PEER / 2)
-      adjustedRR[u] = MIN_BPM_PER_PEER / 2;
     /* even if we received NO traffic, allow
        at least MIN_BPM_PER_PEER */
+    if (adjustedRR[u] < MIN_BPM_PER_PEER)
+      adjustedRR[u] = MIN_BPM_PER_PEER;
+    /* initial adjustedRR's should reflect aged value
+       from previous idealized_limit / iteration */
+    adjustedRR[u] = (entries[u]->idealized_limit * 3 + adjustedRR[u]) / 4;
   }
 
   /* now distribute the schedulableBandwidth according
@@ -2111,10 +2111,11 @@
      potentially under-allocated.  Since there's always some
      (unencrypted) traffic that we're not quite accounting for anyway,
      that's probably not so bad. */
+
   didAssign = YES;
   /* in the first round we cap by 2* previous utilization */
   firstRound = YES;
-  for (u = 0; u < activePeerCount; u++)
+  for (u = 0; u < activePeerCount; u++) 
     entries[u]->idealized_limit = 0;
   while ( (schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
          (activePeerCount > 0) &&
@@ -2122,7 +2123,6 @@
     didAssign = NO;
     decrementSB = 0;
     for (u = 0; u < activePeerCount; u++) {
-      /* always allow allocating MIN_BPM_PER_PEER */
       if ( (firstRound == NO) ||
           (entries[u]->idealized_limit < adjustedRR[u] * 2) ) {
         unsigned int share;
@@ -2132,8 +2132,9 @@
           (unsigned int) (shares[u] * schedulableBandwidth);
         if (share < entries[u]->idealized_limit)
           share = 0xFFFFFFFF;   /* int overflow */
-        if ( (share > adjustedRR[u] * 2) && (firstRound == YES))
+        if ( (share > adjustedRR[u] * 2) && (firstRound == YES) )
           share = adjustedRR[u] * 2;
+       /* always allow allocating MIN_BPM_PER_PEER */
         if ( (share < MIN_BPM_PER_PEER) &&
             (minCon > 0) ) {
           /* use one of the minCon's to keep the connection! */
@@ -2143,11 +2144,12 @@
         }
         if (share > entries[u]->idealized_limit) {
           decrementSB += share - entries[u]->idealized_limit;
-          didAssign = YES;
-        }
-        entries[u]->idealized_limit = share;
+          didAssign = YES;        
+         entries[u]->idealized_limit = share;
+       }
       }
     } /* end for all peers */
+
     if (decrementSB < schedulableBandwidth) {
       schedulableBandwidth -= decrementSB;
     } else {
@@ -2161,29 +2163,29 @@
       for (u = 0; u < activePeerCount; u++) {
         unsigned int v = perm[u]; /* use perm to avoid preference to 
low-numbered slots */
         if ( (firstRound == NO) ||
-            (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
+            (entries[v]->idealized_limit < adjustedRR[v] * 2)) {
           unsigned int share;
 
           share =
             entries[v]->idealized_limit +
             (unsigned int) (schedulableBandwidth);
-          if (share < entries[u]->idealized_limit)
+          if (share < entries[v]->idealized_limit)
             share = 0xFFFFFFFF; /* int overflow */
-          if ((firstRound == YES) && (share > adjustedRR[u] * 2))
-            share = adjustedRR[u] * 2;
-
-          schedulableBandwidth -= share - entries[v]->idealized_limit;
-         entries[v]->idealized_limit = share;
+          if ( (firstRound == YES) && (share > adjustedRR[v] * 2) )
+            share = adjustedRR[v] * 2;
+         if (share > entries[v]->idealized_limit) {
+           schedulableBandwidth -= share - entries[v]->idealized_limit;
+           entries[v]->idealized_limit = share;
+         }
         }
       }
       FREE(perm);
       perm = NULL;
-
     }  /* didAssign == NO? */
     if (firstRound == YES) {
       /* keep some bandwidth off the market
          for new connections */
-      schedulableBandwidth /= 2;
+      schedulableBandwidth = (schedulableBandwidth * 7) / 8;
     }
     firstRound = NO;
   }                             /* while bandwidth to distribute */
@@ -2194,14 +2196,15 @@
     perm = permute(WEAK, activePeerCount);
     for(u = 0; u < activePeerCount; u++) {
       unsigned int share;
+      unsigned int v = perm[u]; /* use perm to avoid preference to 
low-numbered slots */
 
       share =
-       entries[perm[u]]->idealized_limit +
+       entries[v]->idealized_limit +
        (unsigned int) (schedulableBandwidth / activePeerCount);
-      if (share > entries[perm[u]]->idealized_limit) { /* no int-overflow? */
-       entries[perm[u]]->idealized_limit = share;
+      if (share > entries[v]->idealized_limit) { /* no int-overflow? */
+       entries[v]->idealized_limit = share;
       } else {
-       entries[perm[u]]->idealized_limit = 0xFFFF0000; 
+       entries[v]->idealized_limit = 0xFFFF0000;       
       }
     }
     schedulableBandwidth = 0;
@@ -2213,16 +2216,14 @@
      yield some fluctuation, but some amount of fluctuation should be
      good since it creates opportunities. */
   if (activePeerCount > 0)
-    for(u = 0; u < minCon; u++)
+    for (u=0;u<minCon;u++)
       entries[weak_randomi(activePeerCount)]->idealized_limit
         += MIN_BPM_PER_PEER;
 
   /* prepare for next round */
   lastRoundStart = now;
-  for(u = 0; u < activePeerCount; u++) {
+  for (u=0;u<activePeerCount;u++) {
 #if DEBUG_CONNECTION
-    EncName enc;
-
     IF_GELOG(ectx,
             GE_DEBUG | GE_BULK | GE_USER,
             hash2enc(&entries[u]->session.sender.hashPubKey,
@@ -2236,17 +2237,15 @@
 #endif
     if ( (timeDifference > 50) &&
         (weak_randomi(timeDifference + 1) > 50) )
-      entries[u]->current_connection_value /= 2.0;
+      entries[u]->current_connection_value *= 0.9; /* age */
     decrementSB = entries[u]->idealized_limit * timeDifference / cronMINUTES / 
2;
     if ( (decrementSB == 0) &&
         (weak_randomi(timeDifference + 1) != 0) )
       decrementSB = 1;
-    if (entries[u]->recently_received >= decrementSB) {
-      entries[u]->recently_received
-       -= decrementSB;
-    } else {
-      entries[u]->recently_received = 0;
-    }
+    if (entries[u]->recently_received >= decrementSB) 
+      entries[u]->recently_received -= decrementSB;
+    else 
+      entries[u]->recently_received = 0;    
   }
 
   /* free memory */
@@ -2259,8 +2258,6 @@
       continue;
     if (be->idealized_limit < MIN_BPM_PER_PEER) {
 #if DEBUG_CONNECTION
-      EncName enc;
-
       IF_GELOG(ectx,
               GE_DEBUG | GE_REQUEST | GE_USER,
               hash2enc(&be->session.sender.hashPubKey,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]