gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r165 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r165 - GNUnet/src/applications/dht/module
Date: Thu, 3 Feb 2005 02:28:30 -0800 (PST)

Author: grothoff
Date: 2005-02-03 02:28:29 -0800 (Thu, 03 Feb 2005)
New Revision: 165

Modified:
   GNUnet/src/applications/dht/module/dht.c
Log:
down to one screenful of compiler errors.  Yayh

Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c    2005-02-03 10:03:16 UTC (rev 
164)
+++ GNUnet/src/applications/dht/module/dht.c    2005-02-03 10:28:29 UTC (rev 
165)
@@ -430,22 +430,6 @@
   FindKNodesContext * kfnc;
 
   /**
-   * How many copies should we try to make?
-   */
-  unsigned int replicationLevel;
-
-  /**
-   * The set of peers that have responded (and claim to have
-   * made a replica).
-   */
-  PeerIdentity * replicas;
-
-  /**
-   * Size of the replicas array.
-   */
-  unsigned int confirmedReplicas; /* size of replicas array! */
-
-  /**
    * Callback to call upon completion.
    */
   DHT_OP_Complete callback;
@@ -455,6 +439,8 @@
    */
   void * closure;
 
+  unsigned int confirmed_stores;
+
   /**
    * Size of the RPC array.
    */
@@ -497,23 +483,14 @@
    */
   DataContainer * value;
 
+  unsigned int confirmed_stores;
+
   /**
    * Context of findKNodes (async); NULL if the table was local.
    */
   FindKNodesContext * kfnc;
 
   /**
-   * How many copies should we try to remove? (Or: how many
-   * replicas do we expect to exist?)
-   */
-  unsigned int replicationLevel;
-
-  /**
-   * Number of remove confirmations received.
-   */
-  unsigned int confirmedReplicas;
-
-  /**
    * Callback to call upon completion.
    */ 
   DHT_OP_Complete callback;
@@ -587,10 +564,6 @@
 
 typedef struct {
   /**
-   * Maximum number of replicas for this put operation.
-   */
-  unsigned int replicationLevel;
-  /**
    * Number of results currently received (size of the
    * results-array).
    */
@@ -625,19 +598,10 @@
 
 typedef struct {
   /**
-   * Maximum number of replicas for this put operation.
+   * Number of results currently received.
    */
-  unsigned int replicationLevel;
-  /**
-   * Number of results currently received (size of the
-   * results-array).
-   */
   unsigned int count;
   /**
-   * The peers that confirmed storing the record so far.
-   */
-  PeerIdentity * peers;
-  /**
    * RPC callback to call with the final result set.
    */
   Async_RPC_Complete_Callback callback;
@@ -2248,7 +2212,6 @@
   PeerInfo * pos;
   unsigned int i;
   unsigned int max;
-  unsigned int j;
 
   ENTER();
   processOptionalFields(responder, results);
@@ -2277,21 +2240,6 @@
          &enc);
       return;
     }
-    /* ensure we don't count duplicates! */
-    for (j=0;j<record->confirmedReplicas;j++)
-      if (hostIdentityEquals(peer,
-                            &record->replicas[j])) {
-       peer = NULL;
-       break;
-      }
-    if (peer != NULL) {
-      GROW(record->replicas,
-          record->confirmedReplicas,
-          record->confirmedReplicas+1);
-      record->replicas[record->confirmedReplicas-1] = *peer;
-      if (record->callback != NULL)
-       record->callback(record->closure);
-    }
   }
   MUTEX_UNLOCK(&record->lock);
 }
@@ -2342,10 +2290,9 @@
               "timeout",
               sizeof(unsigned long long),
               &timeout);
-  RPC_paramAdd(param,
-              "value",
-              record->value.dataLength,
-              record->value.data);
+  RPC_paramAddDataContainer(param,
+                           "value",
+                           record->value);
   GROW(record->rpc,
        record->rpcRepliesExpected,
        record->rpcRepliesExpected+1);
@@ -2372,17 +2319,15 @@
  * @param key the key to look up  
  * @param timeout how long to wait until this operation should
  *        automatically time-out
- * @param replicationLevel how many copies should we make?
  * @param callback function to call on successful completion
  * @param closure extra argument to callback
  * @return handle to stop the async put
  */
 static struct DHT_PUT_RECORD * dht_put_async_start(const DHT_TableId * table,
                                                   const HashCode160 * key,
-                                                  unsigned int type,
+                                                  unsigned int type, /* 
REMOVE! */
                                                   cron_t timeout,
                                                   const DataContainer * value,
-                                                  unsigned int 
replicationLevel,
                                                   DHT_OP_Complete callback,
                                                   void * closure) {
   int i;
@@ -2393,6 +2338,11 @@
   EncName enc;
   EncName enc2;
 
+  if (value == NULL) {
+    BREAK();
+    return NULL;
+  }
+
   ENTER();
   IFLOG(LOG_DEBUG,
        hash2enc(key,
@@ -2412,22 +2362,19 @@
        __FUNCTION__);
     timeout = 1 * cronHOURS;    
   }
-
-  if (replicationLevel == 0)
-    replicationLevel = 1;
   ret = MALLOC(sizeof(DHT_PUT_RECORD));
   ret->timeout = cronTime(NULL) + timeout;
   ret->key = *key;
   ret->table = *table;
   ret->callback = callback;
   ret->closure = closure;
-  ret->replicationLevel = replicationLevel;
-  ret->value = *value;
+  ret->value = MALLOC(ntohl(value->size));
+  memcpy(ret->value,
+        value,
+        ntohl(value->size));
   MUTEX_CREATE_RECURSIVE(&ret->lock);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
-  ret->confirmedReplicas = 0;
-  ret->replicas = NULL;
   ret->kfnc = NULL;
   MUTEX_LOCK(lock);
 
@@ -2446,13 +2393,13 @@
 #endif
     /* We do participate in the table, it is fair to assume
        that we know the relevant peers in my neighbour set */
-    hosts = MALLOC(sizeof(PeerIdentity) * replicationLevel);
+    hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
     count = findLocalNodes(table,
                           key,
                           hosts,
-                          replicationLevel);
+                          ALPHA);
     /* try adding this peer to hosts */
-    k_best_insert(replicationLevel,
+    k_best_insert(ALPHA,
                  &count,
                  key,
                  (HashCode160*) hosts,
@@ -2467,33 +2414,22 @@
     for (i=0;i<count;i++) {
       if (hostIdentityEquals(coreAPI->myIdentity,
                             &hosts[i])) {
-       if (OK == ltd->store->store(ltd->store->closure,
-                                   key,
-                                   value)) {
-         if (callback != NULL)
-           callback(coreAPI->myIdentity,
-                    closure); 
-         ret->confirmedReplicas++;
-         if (replicationLevel == 1) {
-           /* that's it then */
-           MUTEX_UNLOCK(lock);
-           return ret;
-         }
-       } else {
-         /* warning?  How to communicate errors? */
-       }
+       if (OK == ltd->store->put(ltd->store->closure,
+                                 key,
+                                 type,
+                                 value,
+                                 0 /* FIXME: priority */)) 
+         ret->confirmed_stores++;
        break;
       }  
     }
 
-    if (ret->replicationLevel > 0) {
-      /* send dht_put_RPC to the other peers */
-      for (i=0;i<count;i++) 
-       if (! hostIdentityEquals(coreAPI->myIdentity,
-                                &hosts[i]))
-         send_dht_put_rpc(&hosts[i],
-                          ret);
-    }
+    /* send dht_put_RPC to the other peers */
+    for (i=0;i<count;i++) 
+      if (! hostIdentityEquals(coreAPI->myIdentity,
+                              &hosts[i]))
+       send_dht_put_rpc(&hosts[i],
+                        ret);  
   } else {
     /* We do not particpate in the table; hence we need to use 
        findKNodes to find an initial set of peers in that
@@ -2505,7 +2441,7 @@
       = findKNodes_start(table,
                         key,
                         timeout,
-                        replicationLevel,
+                        ALPHA,
                         (NodeFoundCallback) &send_dht_put_rpc,
                         ret);
   }  
@@ -2533,10 +2469,8 @@
   for (i=0;i<record->rpcRepliesExpected;i++) 
     rpcAPI->RPC_stop(record->rpc[i]);
   MUTEX_DESTROY(&record->lock);
-  i = record->confirmedReplicas;
-  GROW(record->replicas,
-       record->confirmedReplicas,
-       0);
+  i = record->confirmed_stores;
+  FREE(record->value);
   FREE(record); 
   if (i > 0)
     return OK;
@@ -2565,7 +2499,6 @@
   MUTEX_LOCK(&record->lock);
   pos = findPeerInfo(responder);
   pos->lastActivity = cronTime(NULL);
-
   max = RPC_paramCount(results);
   for (i=0;i<max;i++) {
     if (0 != strcmp("peer",
@@ -2587,10 +2520,7 @@
          &enc);
       return;
     }
-    record->confirmedReplicas++;
-    if (record->callback != NULL)
-      record->callback(peer,
-                      record->closure);       
+    record->confirmed_stores++;
   }
   MUTEX_UNLOCK(&record->lock);
 }
@@ -2641,11 +2571,10 @@
               "timeout",
               sizeof(unsigned long long),
               &timeout);
-  if (record->value.dataLength > 0)
-    RPC_paramAdd(param,
-                "value",
-                record->value.dataLength,
-                record->value.data);
+  if (record->value != NULL)
+    RPC_paramAddDataContainer(param,
+                             "value",
+                             record->value);
   GROW(record->rpc,
        record->rpcRepliesExpected,
        record->rpcRepliesExpected+1);
@@ -2671,7 +2600,6 @@
  * @param key the key to look up  
  * @param timeout how long to wait until this operation should
  *        automatically time-out (relative time)
- * @param replicationLevel how many copies should we make?
  * @param callback function to call on successful completion
  * @param closure extra argument to callback
  * @return handle to stop the async remove
@@ -2682,7 +2610,6 @@
                       unsigned int type,
                       cron_t timeout,
                       const DataContainer * value,
-                      unsigned int replicationLevel,
                       DHT_OP_Complete callback,
                       void * closure) {
   int i;
@@ -2703,16 +2630,18 @@
   ret->table = *table;
   ret->callback = callback;
   ret->closure = closure;
-  ret->replicationLevel = replicationLevel;
   if (value == NULL) {
-    ret->value.dataLength = 0;
-    ret->value.data = NULL;
-  } else
-    ret->value = *value;
+    ret->value = NULL;
+  } else {
+    ret->value = MALLOC(ntohl(value->size));
+    memcpy(ret->value,
+          value,
+          ntohl(value->size));
+  }
   MUTEX_CREATE_RECURSIVE(&ret->lock);
   ret->rpc = NULL;
   ret->rpcRepliesExpected = 0;
-  ret->confirmedReplicas = 0;
+  ret->confirmed_stores = 0;
   ret->kfnc = NULL;
   MUTEX_LOCK(lock);
 
@@ -2722,13 +2651,13 @@
     PeerIdentity * hosts;
     /* We do participate in the table, it is fair to assume
        that we know the relevant peers in my neighbour set */
-    hosts = MALLOC(sizeof(PeerIdentity) * replicationLevel);
+    hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
     count = findLocalNodes(table,
                           key,
                           hosts,
-                          replicationLevel);
+                          ALPHA);
     /* try adding this peer to hosts */
-    k_best_insert(replicationLevel,
+    k_best_insert(ALPHA,
                  &count,
                  key,
                  (HashCode160*) hosts,
@@ -2743,33 +2672,21 @@
     for (i=0;i<count;i++) {
       if (hostIdentityEquals(coreAPI->myIdentity,
                             &hosts[i])) {
-       if (OK == ltd->store->remove(ltd->store->closure,
-                                    key,
-                                    value)) {
-         if (callback != NULL)
-           callback(coreAPI->myIdentity,
-                    closure); 
-         ret->confirmedReplicas++;
-         if (replicationLevel == 1) {
-           /* that's it then */
-           MUTEX_UNLOCK(lock);
-           return ret;
-         }
-       } else {
-         /* warning?  How to communicate errors? */
-       }
+       if (OK == ltd->store->del(ltd->store->closure,
+                                 key,
+                                 type,
+                                 value)) 
+         ret->confirmed_stores++;
        break;
       }  
     }
 
-    if (ret->replicationLevel > 0) {
-      /* send dht_remove_RPC to the other peers */
-      for (i=0;i<count;i++) 
-       if (! hostIdentityEquals(coreAPI->myIdentity,
-                                &hosts[i]))
-         send_dht_remove_rpc(&hosts[i],
-                          ret);
-    }
+    /* send dht_remove_RPC to the other peers */
+    for (i=0;i<count;i++) 
+      if (! hostIdentityEquals(coreAPI->myIdentity,
+                              &hosts[i]))
+       send_dht_remove_rpc(&hosts[i],
+                           ret);  
   } else {
     /* We do not particpate in the table; hence we need to use 
        findKNodes to find an initial set of peers in that
@@ -2781,7 +2698,7 @@
       = findKNodes_start(table,
                         key,
                         timeout,
-                        replicationLevel,
+                        ALPHA,
                         (NodeFoundCallback) &send_dht_remove_rpc,
                         ret);
   }  
@@ -2807,7 +2724,8 @@
   for (i=0;i<record->rpcRepliesExpected;i++) 
     rpcAPI->RPC_stop(record->rpc[i]);
   MUTEX_DESTROY(&record->lock);
-  i = record->confirmedReplicas;
+  i = record->confirmed_stores;
+  FREE(record->value);
   FREE(record); 
   if (i > 0)
     return OK;
@@ -2860,9 +2778,9 @@
   cls->puts[cls->putsPos] 
     = dht_put_async_start(&cls->table,
                          key,
+                         0, /* FIXME: type */
                          cls->timeout,
                          value,
-                         ALPHA,
                          NULL,
                          NULL);
   cls->putsPos = (cls->putsPos + 1) % cls->maxPuts;  
@@ -2926,13 +2844,16 @@
        altogether... */
     DataContainer * value;
     
-    value.dataLength = sizeof(PeerIdentity);
-    value.data = coreAPI->myIdentity;
+    value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
+    value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
+    memcpy(&value[1],
+          coreAPI->myIdentity,
+          sizeof(PeerIdentity));
     remRec = dht_remove_async_start(&masterTableId,
                                    table,
+                                   0, /* FIXME: type */
                                    timeout,
-                                   &value,
-                                   ALPHA,
+                                   value,
                                    NULL,
                                    NULL);
   } else {
@@ -3286,21 +3207,9 @@
  */
 static void rpc_dht_store_callback(const PeerIdentity * store,
                                   RPC_DHT_store_Context * fw) {
-  int stop;
-
   MUTEX_LOCK(&fw->lock);
-  GROW(fw->peers,
-       fw->count,
-       fw->count+1);
-  fw->peers[fw->count-1] = *store;
-  stop = fw->count == fw->replicationLevel;
+  fw->confirmed_stores++;
   MUTEX_UNLOCK(&fw->lock);
-  if (stop) {
-    /* don't wait for timeout, run now! */
-    advanceCronJob((CronJob) &rpc_DHT_store_abort,
-                  0,
-                  fw);
-  }
 }
 
 static void rpc_DHT_store(const PeerIdentity * sender,
@@ -3350,11 +3259,8 @@
   ltd = getLocalTableData(table);
   if (ltd == NULL) {    
     LOG(LOG_WARNING,
-       "RPC for DHT_store received for table that we do not participate 
in!\n");
-    fw_context->replicationLevel = 1; /* or 0?  Well, for now we'll just try 
to 
-                                        find another peer anyway */
-  } else {
-    fw_context->replicationLevel = ALPHA;
+       _("RPC for '%s' received for table that we do not participate in!\n"),
+       "DHT_store");
   }
   MUTEX_UNLOCK(lock);
   fw_context->count
@@ -3372,7 +3278,6 @@
                          key,
                          ntohll(*timeout),
                          &value,
-                         fw_context->replicationLevel,
                          &rpc_dht_store_callback,
                          fw_context);
   addAbortJob((CronJob)&rpc_DHT_store_abort,
@@ -3437,22 +3342,10 @@
  */
 static void rpc_dht_remove_callback(const PeerIdentity * store,
                                    RPC_DHT_remove_Context * fw) {
-  int stop;
-  
   ENTER();
   MUTEX_LOCK(&fw->lock);
-  GROW(fw->peers,
-       fw->count,
-       fw->count+1);
-  fw->peers[fw->count-1] = *store;
-  stop = fw->count == fw->replicationLevel;
+  fw->confirmed_stores++;
   MUTEX_UNLOCK(&fw->lock);
-  if (stop) {
-    /* don't wait for timeout, run now! */
-    advanceCronJob((CronJob) &rpc_DHT_remove_abort,
-                  0,
-                  fw);
-  }
 }
 
 /**
@@ -3515,11 +3408,8 @@
   ltd = getLocalTableData(table);
   if (ltd == NULL) {    
     LOG(LOG_DEBUG,
-       "RPC for DHT_removed received for table that we do not participate 
in!\n");
-    fw_context->replicationLevel = 1; /* or 0?  Well, for now we'll just try 
to 
-                                        find another peer anyway */
-  } else {
-    fw_context->replicationLevel = ALPHA;
+       _("RPC for '%s' received for table that we do not participate in!\n"),
+       "DHT_removed");
   }
   MUTEX_UNLOCK(lock);
   fw_context->count
@@ -3537,7 +3427,6 @@
                             key,
                             ntohll(*timeout),
                             (value.dataLength==0) ? NULL : &value,
-                            fw_context->replicationLevel,
                             &rpc_dht_remove_callback,
                             fw_context);
   addAbortJob((CronJob)&rpc_DHT_remove_abort,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]