gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32387 - gnunet/src/testbed


From: gnunet
Subject: [GNUnet-SVN] r32387 - gnunet/src/testbed
Date: Tue, 18 Feb 2014 16:43:38 +0100

Author: harsha
Date: 2014-02-18 16:43:38 +0100 (Tue, 18 Feb 2014)
New Revision: 32387

Modified:
   gnunet/src/testbed/testbed_api_operations.c
Log:
Consider the resources from failed operations as overloaded and not use them
until the parallelism is refreshed.

This commit also fixes a bug where the parallelism is set to 0 and hence no
progress can be made.


Modified: gnunet/src/testbed/testbed_api_operations.c
===================================================================
--- gnunet/src/testbed/testbed_api_operations.c 2014-02-18 12:51:51 UTC (rev 
32386)
+++ gnunet/src/testbed/testbed_api_operations.c 2014-02-18 15:43:38 UTC (rev 
32387)
@@ -29,8 +29,19 @@
 #include "testbed_api_operations.h"
 #include "testbed_api_sd.h"
 
+/**
+ * The number of readings containing past operation's timing information that 
we
+ * keep track of for adaptive queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 10
 
 /**
+ * The number of parallel opeartions we start with by default for adaptive
+ * queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
+
+/**
  * An entry in the operation queue
  */
 struct QueueEntry
@@ -141,7 +152,6 @@
    * Number of operations that have failed
    */
   unsigned int nfailed;
-
 };
 
 
@@ -216,10 +226,16 @@
    * Max number of operations which can be active at any time in this queue.
    * This value can be changed either by calling
    * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
-   * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
+   * algorithm if this operation queue is of type 
#OPERATION_QUEUE_TYPE_ADAPTIVE
    */
   unsigned int max_active;
 
+  /**
+   * The number of resources occupied by failed operations in the current shot.
+   * This is only relavant if the operation queue is of type
+   * #OPERATION_QUEUE_TYPE_ADAPTIVE
+   */
+  unsigned int overload;
 };
 
 
@@ -613,6 +629,7 @@
   unsigned int n_ops;
   unsigned int n_evict_entries;
   unsigned int need;
+  unsigned int max;
   int deficit;
   int rval;
 
@@ -623,14 +640,22 @@
   evict_entries = NULL;
   n_evict_entries = 0;
   rval = GNUNET_YES;
-  if (opq->active > opq->max_active)
+  if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
   {
+    GNUNET_assert (NULL != opq->fctx);
+    GNUNET_assert (opq->max_active >= opq->overload);
+    max = opq->max_active - opq->overload;
+  }
+  else
+    max = opq->max_active;
+  if (opq->active > max)
+  {
     rval = GNUNET_NO;
     goto ret;
   }
-  if ((opq->active + need) <= opq->max_active)
+  if ((opq->active + need) <= max)
     goto ret;
-  deficit = need - (opq->max_active - opq->active);
+  deficit = need - (max - opq->active);
   for (entry = opq->nq_head;
        (0 < deficit) && (NULL != entry);
        entry = entry->next)
@@ -825,6 +850,7 @@
   n = GNUNET_MIN (n ,fctx->max_active_bound);
   fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
   fctx->nfailed = 0;
+  FPRINTF (stderr, "Parallelism: %u\n", n);
   for (cnt = 0; cnt < n; cnt++)
   {
     tslot = &fctx->tslots_freeptr[cnt];
@@ -881,14 +907,19 @@
     adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
     return;
   }
+  if (-1 == sd)
+    adaptive_queue_set_max_active (queue, queue->max_active + 1);
+  if (sd <= -2)
+    adaptive_queue_set_max_active (queue, queue->max_active * 2);
+  if (1 == queue->max_active)
+  {
+    adaptive_queue_set_max_active (queue, 1);
+    return;
+  }
   if (1 == sd)
     adaptive_queue_set_max_active (queue, queue->max_active - 1);
   if (2 <= sd)
     adaptive_queue_set_max_active (queue, queue->max_active / 2);
-  if (-1 == sd)
-    adaptive_queue_set_max_active (queue, queue->max_active + 1);
-  if (sd <= -2)
-    adaptive_queue_set_max_active (queue, queue->max_active * 2);
 
 #if 0                           /* old algorithm */
   if (sd < 0)
@@ -934,6 +965,7 @@
   struct GNUNET_TIME_Relative t;
   struct TimeSlot *tslot;
   struct FeedbackCtx *fctx;
+  unsigned int i;
 
   t = GNUNET_TIME_absolute_get_duration (op->tstart);
   while (NULL != (tslot = op->tslots_head)) /* update time slots */
@@ -945,7 +977,14 @@
     GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
                                       tslot);
     if (op->failed)
+    {
       fctx->nfailed++;
+      for (i = 0; i < op->nqueues; i++)
+        if (queue == op->queues[i])
+            break;
+      GNUNET_assert (i != op->nqueues);
+      op->queues[i]->overload += op->nres[i];
+    }
     tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
     if (0 != tslot->nvals++)
       continue;
@@ -1004,9 +1043,9 @@
   {
     fctx = GNUNET_new (struct FeedbackCtx);
     fctx->max_active_bound = max_active;
-    fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+    fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
     queue->fctx = fctx;
-    adaptive_queue_set_max_active (queue, 4); /* start with 4 */
+    adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
   }
   return queue;
 }
@@ -1090,6 +1129,7 @@
   struct QueueEntry *entry;
 
   queue->max_active = max_active;
+  queue->overload = 0;
   while ( (queue->active > queue->max_active)
           && (NULL != (entry = queue->rq_head)) )
     defer (entry->op);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]