[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r165 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r165 - GNUnet/src/applications/dht/module |
Date: |
Thu, 3 Feb 2005 02:28:30 -0800 (PST) |
Author: grothoff
Date: 2005-02-03 02:28:29 -0800 (Thu, 03 Feb 2005)
New Revision: 165
Modified:
GNUnet/src/applications/dht/module/dht.c
Log:
down to one screenful of compiler errors. Yayh
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2005-02-03 10:03:16 UTC (rev
164)
+++ GNUnet/src/applications/dht/module/dht.c 2005-02-03 10:28:29 UTC (rev
165)
@@ -430,22 +430,6 @@
FindKNodesContext * kfnc;
/**
- * How many copies should we try to make?
- */
- unsigned int replicationLevel;
-
- /**
- * The set of peers that have responded (and claim to have
- * made a replica).
- */
- PeerIdentity * replicas;
-
- /**
- * Size of the replicas array.
- */
- unsigned int confirmedReplicas; /* size of replicas array! */
-
- /**
* Callback to call upon completion.
*/
DHT_OP_Complete callback;
@@ -455,6 +439,8 @@
*/
void * closure;
+ unsigned int confirmed_stores;
+
/**
* Size of the RPC array.
*/
@@ -497,23 +483,14 @@
*/
DataContainer * value;
+ unsigned int confirmed_stores;
+
/**
* Context of findKNodes (async); NULL if the table was local.
*/
FindKNodesContext * kfnc;
/**
- * How many copies should we try to remove? (Or: how many
- * replicas do we expect to exist?)
- */
- unsigned int replicationLevel;
-
- /**
- * Number of remove confirmations received.
- */
- unsigned int confirmedReplicas;
-
- /**
* Callback to call upon completion.
*/
DHT_OP_Complete callback;
@@ -587,10 +564,6 @@
typedef struct {
/**
- * Maximum number of replicas for this put operation.
- */
- unsigned int replicationLevel;
- /**
* Number of results currently received (size of the
* results-array).
*/
@@ -625,19 +598,10 @@
typedef struct {
/**
- * Maximum number of replicas for this put operation.
+ * Number of results currently received.
*/
- unsigned int replicationLevel;
- /**
- * Number of results currently received (size of the
- * results-array).
- */
unsigned int count;
/**
- * The peers that confirmed storing the record so far.
- */
- PeerIdentity * peers;
- /**
* RPC callback to call with the final result set.
*/
Async_RPC_Complete_Callback callback;
@@ -2248,7 +2212,6 @@
PeerInfo * pos;
unsigned int i;
unsigned int max;
- unsigned int j;
ENTER();
processOptionalFields(responder, results);
@@ -2277,21 +2240,6 @@
&enc);
return;
}
- /* ensure we don't count duplicates! */
- for (j=0;j<record->confirmedReplicas;j++)
- if (hostIdentityEquals(peer,
- &record->replicas[j])) {
- peer = NULL;
- break;
- }
- if (peer != NULL) {
- GROW(record->replicas,
- record->confirmedReplicas,
- record->confirmedReplicas+1);
- record->replicas[record->confirmedReplicas-1] = *peer;
- if (record->callback != NULL)
- record->callback(record->closure);
- }
}
MUTEX_UNLOCK(&record->lock);
}
@@ -2342,10 +2290,9 @@
"timeout",
sizeof(unsigned long long),
&timeout);
- RPC_paramAdd(param,
- "value",
- record->value.dataLength,
- record->value.data);
+ RPC_paramAddDataContainer(param,
+ "value",
+ record->value);
GROW(record->rpc,
record->rpcRepliesExpected,
record->rpcRepliesExpected+1);
@@ -2372,17 +2319,15 @@
* @param key the key to look up
* @param timeout how long to wait until this operation should
* automatically time-out
- * @param replicationLevel how many copies should we make?
* @param callback function to call on successful completion
* @param closure extra argument to callback
* @return handle to stop the async put
*/
static struct DHT_PUT_RECORD * dht_put_async_start(const DHT_TableId * table,
const HashCode160 * key,
- unsigned int type,
+ unsigned int type, /*
REMOVE! */
cron_t timeout,
const DataContainer * value,
- unsigned int
replicationLevel,
DHT_OP_Complete callback,
void * closure) {
int i;
@@ -2393,6 +2338,11 @@
EncName enc;
EncName enc2;
+ if (value == NULL) {
+ BREAK();
+ return NULL;
+ }
+
ENTER();
IFLOG(LOG_DEBUG,
hash2enc(key,
@@ -2412,22 +2362,19 @@
__FUNCTION__);
timeout = 1 * cronHOURS;
}
-
- if (replicationLevel == 0)
- replicationLevel = 1;
ret = MALLOC(sizeof(DHT_PUT_RECORD));
ret->timeout = cronTime(NULL) + timeout;
ret->key = *key;
ret->table = *table;
ret->callback = callback;
ret->closure = closure;
- ret->replicationLevel = replicationLevel;
- ret->value = *value;
+ ret->value = MALLOC(ntohl(value->size));
+ memcpy(ret->value,
+ value,
+ ntohl(value->size));
MUTEX_CREATE_RECURSIVE(&ret->lock);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
- ret->confirmedReplicas = 0;
- ret->replicas = NULL;
ret->kfnc = NULL;
MUTEX_LOCK(lock);
@@ -2446,13 +2393,13 @@
#endif
/* We do participate in the table, it is fair to assume
that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * replicationLevel);
+ hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
count = findLocalNodes(table,
key,
hosts,
- replicationLevel);
+ ALPHA);
/* try adding this peer to hosts */
- k_best_insert(replicationLevel,
+ k_best_insert(ALPHA,
&count,
key,
(HashCode160*) hosts,
@@ -2467,33 +2414,22 @@
for (i=0;i<count;i++) {
if (hostIdentityEquals(coreAPI->myIdentity,
&hosts[i])) {
- if (OK == ltd->store->store(ltd->store->closure,
- key,
- value)) {
- if (callback != NULL)
- callback(coreAPI->myIdentity,
- closure);
- ret->confirmedReplicas++;
- if (replicationLevel == 1) {
- /* that's it then */
- MUTEX_UNLOCK(lock);
- return ret;
- }
- } else {
- /* warning? How to communicate errors? */
- }
+ if (OK == ltd->store->put(ltd->store->closure,
+ key,
+ type,
+ value,
+ 0 /* FIXME: priority */))
+ ret->confirmed_stores++;
break;
}
}
- if (ret->replicationLevel > 0) {
- /* send dht_put_RPC to the other peers */
- for (i=0;i<count;i++)
- if (! hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i]))
- send_dht_put_rpc(&hosts[i],
- ret);
- }
+ /* send dht_put_RPC to the other peers */
+ for (i=0;i<count;i++)
+ if (! hostIdentityEquals(coreAPI->myIdentity,
+ &hosts[i]))
+ send_dht_put_rpc(&hosts[i],
+ ret);
} else {
/* We do not particpate in the table; hence we need to use
findKNodes to find an initial set of peers in that
@@ -2505,7 +2441,7 @@
= findKNodes_start(table,
key,
timeout,
- replicationLevel,
+ ALPHA,
(NodeFoundCallback) &send_dht_put_rpc,
ret);
}
@@ -2533,10 +2469,8 @@
for (i=0;i<record->rpcRepliesExpected;i++)
rpcAPI->RPC_stop(record->rpc[i]);
MUTEX_DESTROY(&record->lock);
- i = record->confirmedReplicas;
- GROW(record->replicas,
- record->confirmedReplicas,
- 0);
+ i = record->confirmed_stores;
+ FREE(record->value);
FREE(record);
if (i > 0)
return OK;
@@ -2565,7 +2499,6 @@
MUTEX_LOCK(&record->lock);
pos = findPeerInfo(responder);
pos->lastActivity = cronTime(NULL);
-
max = RPC_paramCount(results);
for (i=0;i<max;i++) {
if (0 != strcmp("peer",
@@ -2587,10 +2520,7 @@
&enc);
return;
}
- record->confirmedReplicas++;
- if (record->callback != NULL)
- record->callback(peer,
- record->closure);
+ record->confirmed_stores++;
}
MUTEX_UNLOCK(&record->lock);
}
@@ -2641,11 +2571,10 @@
"timeout",
sizeof(unsigned long long),
&timeout);
- if (record->value.dataLength > 0)
- RPC_paramAdd(param,
- "value",
- record->value.dataLength,
- record->value.data);
+ if (record->value != NULL)
+ RPC_paramAddDataContainer(param,
+ "value",
+ record->value);
GROW(record->rpc,
record->rpcRepliesExpected,
record->rpcRepliesExpected+1);
@@ -2671,7 +2600,6 @@
* @param key the key to look up
* @param timeout how long to wait until this operation should
* automatically time-out (relative time)
- * @param replicationLevel how many copies should we make?
* @param callback function to call on successful completion
* @param closure extra argument to callback
* @return handle to stop the async remove
@@ -2682,7 +2610,6 @@
unsigned int type,
cron_t timeout,
const DataContainer * value,
- unsigned int replicationLevel,
DHT_OP_Complete callback,
void * closure) {
int i;
@@ -2703,16 +2630,18 @@
ret->table = *table;
ret->callback = callback;
ret->closure = closure;
- ret->replicationLevel = replicationLevel;
if (value == NULL) {
- ret->value.dataLength = 0;
- ret->value.data = NULL;
- } else
- ret->value = *value;
+ ret->value = NULL;
+ } else {
+ ret->value = MALLOC(ntohl(value->size));
+ memcpy(ret->value,
+ value,
+ ntohl(value->size));
+ }
MUTEX_CREATE_RECURSIVE(&ret->lock);
ret->rpc = NULL;
ret->rpcRepliesExpected = 0;
- ret->confirmedReplicas = 0;
+ ret->confirmed_stores = 0;
ret->kfnc = NULL;
MUTEX_LOCK(lock);
@@ -2722,13 +2651,13 @@
PeerIdentity * hosts;
/* We do participate in the table, it is fair to assume
that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * replicationLevel);
+ hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
count = findLocalNodes(table,
key,
hosts,
- replicationLevel);
+ ALPHA);
/* try adding this peer to hosts */
- k_best_insert(replicationLevel,
+ k_best_insert(ALPHA,
&count,
key,
(HashCode160*) hosts,
@@ -2743,33 +2672,21 @@
for (i=0;i<count;i++) {
if (hostIdentityEquals(coreAPI->myIdentity,
&hosts[i])) {
- if (OK == ltd->store->remove(ltd->store->closure,
- key,
- value)) {
- if (callback != NULL)
- callback(coreAPI->myIdentity,
- closure);
- ret->confirmedReplicas++;
- if (replicationLevel == 1) {
- /* that's it then */
- MUTEX_UNLOCK(lock);
- return ret;
- }
- } else {
- /* warning? How to communicate errors? */
- }
+ if (OK == ltd->store->del(ltd->store->closure,
+ key,
+ type,
+ value))
+ ret->confirmed_stores++;
break;
}
}
- if (ret->replicationLevel > 0) {
- /* send dht_remove_RPC to the other peers */
- for (i=0;i<count;i++)
- if (! hostIdentityEquals(coreAPI->myIdentity,
- &hosts[i]))
- send_dht_remove_rpc(&hosts[i],
- ret);
- }
+ /* send dht_remove_RPC to the other peers */
+ for (i=0;i<count;i++)
+ if (! hostIdentityEquals(coreAPI->myIdentity,
+ &hosts[i]))
+ send_dht_remove_rpc(&hosts[i],
+ ret);
} else {
/* We do not particpate in the table; hence we need to use
findKNodes to find an initial set of peers in that
@@ -2781,7 +2698,7 @@
= findKNodes_start(table,
key,
timeout,
- replicationLevel,
+ ALPHA,
(NodeFoundCallback) &send_dht_remove_rpc,
ret);
}
@@ -2807,7 +2724,8 @@
for (i=0;i<record->rpcRepliesExpected;i++)
rpcAPI->RPC_stop(record->rpc[i]);
MUTEX_DESTROY(&record->lock);
- i = record->confirmedReplicas;
+ i = record->confirmed_stores;
+ FREE(record->value);
FREE(record);
if (i > 0)
return OK;
@@ -2860,9 +2778,9 @@
cls->puts[cls->putsPos]
= dht_put_async_start(&cls->table,
key,
+ 0, /* FIXME: type */
cls->timeout,
value,
- ALPHA,
NULL,
NULL);
cls->putsPos = (cls->putsPos + 1) % cls->maxPuts;
@@ -2926,13 +2844,16 @@
altogether... */
DataContainer * value;
- value.dataLength = sizeof(PeerIdentity);
- value.data = coreAPI->myIdentity;
+ value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
+ value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
+ memcpy(&value[1],
+ coreAPI->myIdentity,
+ sizeof(PeerIdentity));
remRec = dht_remove_async_start(&masterTableId,
table,
+ 0, /* FIXME: type */
timeout,
- &value,
- ALPHA,
+ value,
NULL,
NULL);
} else {
@@ -3286,21 +3207,9 @@
*/
static void rpc_dht_store_callback(const PeerIdentity * store,
RPC_DHT_store_Context * fw) {
- int stop;
-
MUTEX_LOCK(&fw->lock);
- GROW(fw->peers,
- fw->count,
- fw->count+1);
- fw->peers[fw->count-1] = *store;
- stop = fw->count == fw->replicationLevel;
+ fw->confirmed_stores++;
MUTEX_UNLOCK(&fw->lock);
- if (stop) {
- /* don't wait for timeout, run now! */
- advanceCronJob((CronJob) &rpc_DHT_store_abort,
- 0,
- fw);
- }
}
static void rpc_DHT_store(const PeerIdentity * sender,
@@ -3350,11 +3259,8 @@
ltd = getLocalTableData(table);
if (ltd == NULL) {
LOG(LOG_WARNING,
- "RPC for DHT_store received for table that we do not participate
in!\n");
- fw_context->replicationLevel = 1; /* or 0? Well, for now we'll just try
to
- find another peer anyway */
- } else {
- fw_context->replicationLevel = ALPHA;
+ _("RPC for '%s' received for table that we do not participate in!\n"),
+ "DHT_store");
}
MUTEX_UNLOCK(lock);
fw_context->count
@@ -3372,7 +3278,6 @@
key,
ntohll(*timeout),
&value,
- fw_context->replicationLevel,
&rpc_dht_store_callback,
fw_context);
addAbortJob((CronJob)&rpc_DHT_store_abort,
@@ -3437,22 +3342,10 @@
*/
static void rpc_dht_remove_callback(const PeerIdentity * store,
RPC_DHT_remove_Context * fw) {
- int stop;
-
ENTER();
MUTEX_LOCK(&fw->lock);
- GROW(fw->peers,
- fw->count,
- fw->count+1);
- fw->peers[fw->count-1] = *store;
- stop = fw->count == fw->replicationLevel;
+ fw->confirmed_stores++;
MUTEX_UNLOCK(&fw->lock);
- if (stop) {
- /* don't wait for timeout, run now! */
- advanceCronJob((CronJob) &rpc_DHT_remove_abort,
- 0,
- fw);
- }
}
/**
@@ -3515,11 +3408,8 @@
ltd = getLocalTableData(table);
if (ltd == NULL) {
LOG(LOG_DEBUG,
- "RPC for DHT_removed received for table that we do not participate
in!\n");
- fw_context->replicationLevel = 1; /* or 0? Well, for now we'll just try
to
- find another peer anyway */
- } else {
- fw_context->replicationLevel = ALPHA;
+ _("RPC for '%s' received for table that we do not participate in!\n"),
+ "DHT_removed");
}
MUTEX_UNLOCK(lock);
fw_context->count
@@ -3537,7 +3427,6 @@
key,
ntohll(*timeout),
(value.dataLength==0) ? NULL : &value,
- fw_context->replicationLevel,
&rpc_dht_remove_callback,
fw_context);
addAbortJob((CronJob)&rpc_DHT_remove_abort,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r165 - GNUnet/src/applications/dht/module,
grothoff <=