gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r2674 - GNUnet/src/server


From: grothoff
Subject: [GNUnet-SVN] r2674 - GNUnet/src/server
Date: Thu, 27 Apr 2006 22:23:19 -0700 (PDT)

Author: grothoff
Date: 2006-04-27 22:23:16 -0700 (Thu, 27 Apr 2006)
New Revision: 2674

Modified:
   GNUnet/src/server/connection.c
Log:
fixing bug #1049

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2006-04-27 21:51:25 UTC (rev 2673)
+++ GNUnet/src/server/connection.c      2006-04-28 05:23:16 UTC (rev 2674)
@@ -867,10 +867,10 @@
   totalMessageSize = 0;
   (*priority) = 0;
 
-  for(i = be->sendBufferSize - 1; i >= 0; i--)
+  for (i = be->sendBufferSize - 1; i >= 0; i--)
     be->sendBuffer[i]->knapsackSolution = NO;
 
-  if(be->session.mtu == 0) {
+  if (be->session.mtu == 0) {
     totalMessageSize = sizeof(P2P_PACKET_HEADER);
     i = 0;
     /* assumes entries are sorted by priority! */
@@ -928,8 +928,7 @@
          a small message if there is nothing else to do! */
       return 0;
     }
-  }
-  else {                        /* if (be->session.mtu == 0) */
+  } else { /* if (be->session.mtu == 0) */
     /* solve knapsack problem, compute accumulated priority */
     approxProb = getCPULoad();
     if(approxProb > 50) {
@@ -970,17 +969,20 @@
     if(j == 0) {
       LOG(LOG_ERROR,
           _("`%s' selected %d out of %d messages (MTU: %d).\n"),
-          "solveKnapsack",
-          j, be->sendBufferSize, be->session.mtu - sizeof(P2P_PACKET_HEADER));
+          __FUNCTION__,
+          j, be->sendBufferSize,
+         be->session.mtu - sizeof(P2P_PACKET_HEADER));
 
       for(j = 0; j < be->sendBufferSize; j++)
         LOG(LOG_ERROR,
             _("Message details: %u: length %d, priority: %d\n"),
-            j, be->sendBuffer[j]->len, be->sendBuffer[j]->pri);
+            j, 
+           be->sendBuffer[j]->len,
+           be->sendBuffer[j]->pri);
       return 0;
     }
 
-    if(be->available_send_window < be->session.mtu) {
+    if (be->available_send_window < be->session.mtu) {
       /* if we have a very high priority, we may
          want to ignore bandwidth availability (e.g. for HANGUP,
          which  has EXTREME_PRIORITY) */
@@ -1188,6 +1190,55 @@
 }
 
 /**
+ * The MTU has changed.  We may have messages larger than the
+ * MTU in the buffer.  Check if this is the case, and if so,
+ * fragment those messages.
+ */
+static void fragmentIfNecessary(BufferEntry * be) {
+  SendEntry ** entries;
+  SendEntry * entry;
+  unsigned int i;
+  unsigned int ret;
+  unsigned int j;
+  int changed;
+
+  if (be->session.mtu == 0) 
+    return; /* clearly not necessary */
+
+  /* MTU change may require new fragmentation! */
+  changed = YES;
+  while (changed) {
+    changed = NO;
+    entries = be->sendBuffer;
+    ret = be->sendBufferSize;
+    for (i=0;i<ret;i++) {
+      entry = entries[i];
+      if (entry->len <= be->session.mtu - sizeof(P2P_PACKET_HEADER)) 
+       continue;
+      ret--;
+      for (j = i; j < ret; j++)
+       entries[j] = entries[j + 1];  /* preserve ordering */
+      GROW(be->sendBuffer,
+          be->sendBufferSize, 
+          ret);
+      /* calling fragment will change be->sendBuffer;
+        thus we need to restart from the beginning afterwards... */
+      fragmentation->fragment(&be->session.sender,
+                             be->session.mtu - sizeof(P2P_PACKET_HEADER), 
+                             entry->pri,
+                             entry->transmissionTime, 
+                             entry->len,
+                             entry->callback, 
+                             entry->closure);
+      FREE(entry);
+      changed = YES; 
+      break; /* "entries" changed as side-effect of fragment call */
+    }
+  } /* while changed */
+  return OK;
+}
+
+/**
  * Try to make sure that the transport service for the given buffer is
  * connected.  If the transport service changes, this function also
  * ensures that the pending messages are properly fragmented (if
@@ -1196,55 +1247,16 @@
  * @return OK on success, NO on error
  */
 static int ensureTransportConnected(BufferEntry * be) {
-  SendEntry **entries;
-  SendEntry *entry;
-  int i;
-  int ret;
-  int j;
-  int changed;
-
-  if(be->session.tsession == NULL) {
-    be->session.tsession = transport->connectFreely(&be->session.sender, YES);
-    if(be->session.tsession == NULL)
-      return NO;
-    be->session.mtu = transport->getMTU(be->session.tsession->ttype);
-    if(be->session.mtu > 0) {
-      /* MTU change may require new fragmentation! */
-      changed = YES;
-      while(changed) {
-        changed = NO;
-        entries = be->sendBuffer;
-        i = 0;
-        ret = be->sendBufferSize;
-        while(i < ret) {
-          entry = entries[i];
-          if(entry->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) {
-            ret--;
-            for(j = i; j < ret; j++)
-              entries[j] = entries[j + 1];  /* preserve ordering */
-            GROW(be->sendBuffer, be->sendBufferSize, ret);
-            /* calling fragment will change be->sendBuffer;
-               thus we need to restart from the beginning afterwards... */
-            fragmentation->fragment(&be->session.sender,
-                                    be->session.mtu -
-                                    sizeof(P2P_PACKET_HEADER), entry->pri,
-                                    entry->transmissionTime, entry->len,
-                                    entry->callback, entry->closure);
-            FREE(entry);
-            changed = YES;
-            break;
-          }
-          else {
-            i++;
-          }
-        }                       /* for all i (until change) */
-      }                         /* while changed */
-    }                           /* if MTU changed */
-  }                             /* if need to reconnect */
+  if (be->session.tsession != NULL) 
+    return OK;
+  be->session.tsession = transport->connectFreely(&be->session.sender, YES);
+  if (be->session.tsession == NULL)
+    return NO;
+  be->session.mtu = transport->getMTU(be->session.tsession->ttype);
+  fragmentIfNecessary(be);
   return OK;
 }
 
-
 /**
  * Send a buffer; assumes that access is already synchronized.  This
  * message solves the knapsack problem, assembles the message
@@ -1295,7 +1307,7 @@
 #endif
 
   totalMessageSize = selectMessagesToSend(be, &priority);
-  if(totalMessageSize == 0) {
+  if (totalMessageSize == 0) {
     expireSendBufferEntries(be);
     be->inSendBuffer = NO;
     return;                     /* deferr further */
@@ -1347,7 +1359,8 @@
   while(pos != NULL) {
     if(pos->minimumPadding + p <= totalMessageSize) {
       p += pos->callback(&be->session.sender,
-                         &plaintextMsg[p], be->session.mtu - p);
+                         &plaintextMsg[p], 
+                        be->session.mtu - p);
     }
     pos = pos->next;
   }
@@ -1443,19 +1456,22 @@
   unsigned long long queueSize;
 
   ENTRY();
-  if((se == NULL) || (se->len == 0)) {
+  if ( (se == NULL) || 
+       (se->len == 0) ) {
     BREAK();
     FREENONNULL(se);
     return;
   }
-  if((be->session.mtu != 0) &&
-     (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER))) {
+  if ( (be->session.mtu != 0) &&
+       (se->len > be->session.mtu - sizeof(P2P_PACKET_HEADER)) ) {
     /* this message is so big that it must be fragmented! */
     fragmentation->fragment(&be->session.sender,
                             be->session.mtu - sizeof(P2P_PACKET_HEADER),
                             se->pri,
                             se->transmissionTime,
-                            se->len, se->callback, se->closure);
+                            se->len,
+                           se->callback, 
+                           se->closure);
     FREE(se);
     return;
   }
@@ -1463,13 +1479,17 @@
 #if DEBUG_CONNECTION
   IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
   LOG(LOG_DEBUG,
-      "adding message of size %d to buffer of host %s.\n", se->len, &enc);
+      "adding message of size %d to buffer of host `%s'\n",
+      se->len,
+      &enc);
 #endif
   if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
     /* as long as we do not have a confirmed
        connection, do NOT queue messages! */
 #if DEBUG_CONNECTION
-    LOG(LOG_DEBUG, "not connected to %s, message dropped\n", &enc);
+    LOG(LOG_DEBUG, 
+       "not connected to `%s', message dropped\n",
+       &enc);
 #endif
     FREE(se->closure);
     FREE(se);
@@ -1494,7 +1514,8 @@
 #if DEBUG_CONNECTION
       LOG(LOG_DEBUG,
           "queueSize (%llu) >= %d, refusing to queue message.\n",
-          queueSize, MAX_SEND_BUFFER_SIZE);
+          queueSize, 
+         MAX_SEND_BUFFER_SIZE);
 #endif
       FREE(se->closure);
       FREE(se);
@@ -2500,7 +2521,8 @@
  * @param tsession the transport session that is for grabs
  * @param sender the identity of the other node
  */
-void considerTakeover(const PeerIdentity * sender, TSession * tsession) {
+void considerTakeover(const PeerIdentity * sender, 
+                     TSession * tsession) {
   BufferEntry *be;
 
   ENTRY();
@@ -2530,9 +2552,10 @@
             transport->disconnect(be->session.tsession);
           be->session.tsession = tsession;
           be->session.mtu = transport->getMTU(tsession->ttype);
+         fragmentIfNecessary(be);
         }
-      }                         /* end if cheaper AND possible */
-    }                           /* end if connected */
+      } /* end if cheaper AND possible */
+    } /* end if connected */
   }
   MUTEX_UNLOCK(&lock);
   transport->disconnect(tsession);





reply via email to

[Prev in Thread] Current Thread [Next in Thread]