[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r163 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r163 - GNUnet/src/applications/dht/module |
Date: |
Thu, 3 Feb 2005 01:23:45 -0800 (PST) |
Author: grothoff
Date: 2005-02-03 01:23:44 -0800 (Thu, 03 Feb 2005)
New Revision: 163
Modified:
GNUnet/src/applications/dht/module/dht.c
Log:
working on DHT
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2005-02-03 08:25:31 UTC (rev
162)
+++ GNUnet/src/applications/dht/module/dht.c 2005-02-03 09:23:44 UTC (rev
163)
@@ -378,13 +378,8 @@
*/
FindKNodesContext * kfnc;
- /**
- * How many more results are we looking for?
- */
- unsigned int maxResults;
+ DHT_OP_Complete callback;
- DHT_GET_Complete callback;
-
void * closure;
/**
@@ -449,7 +444,7 @@
/**
* Callback to call upon completion.
*/
- DHT_PUT_Complete callback;
+ DHT_OP_Complete callback;
/**
* Extra argument to callback.
@@ -517,7 +512,7 @@
/**
* Callback to call upon completion.
*/
- DHT_REMOVE_Complete callback;
+ DHT_OP_Complete callback;
/**
* Extra argument to callback.
@@ -552,11 +547,8 @@
typedef struct {
+
/**
- * Maximum number of results for this get operation.
- */
- unsigned int maxResults;
- /**
* Number of results currently received (size of the
* results-array).
*/
@@ -1146,7 +1138,7 @@
}
FREE(tabs);
- /* here: add other optional fields (HELOs) */
+ /* FIXME: here: add other optional fields (HELOs) */
}
/**
@@ -1502,12 +1494,9 @@
max);
#endif
for (i=0;i<max;i++) {
- value.data = NULL;
- value.dataLength = 0;
- if (OK != RPC_paramValueByPosition(results,
- i,
- &value.dataLength,
- (void**)&value.data)) {
+ value = RPC_paramDataContainerByPosition(results,
+ i);
+ if (value == NULL) {
hash2enc(&responder->hashPubKey,
&enc);
LOG(LOG_WARNING,
@@ -1517,15 +1506,12 @@
return;
}
MUTEX_LOCK(&record->lock);
- if (record->maxResults > 0) {
- record->maxResults--;
- record->resultsFound++;
- if (record->callback != NULL) {
- record->callback(&value,
- record->closure);
- }
- }
+ if (record->callback != NULL)
+ record->resultCallback(&record->keys[0],
+ value,
+ record->resultClosure);
MUTEX_UNLOCK(&record->lock);
+ FREE(value);
}
}
@@ -1555,7 +1541,7 @@
#endif
if (isNotCloserThanMe(&record->table,
peer,
- &record->key))
+ &record->keys[0]))
return; /* refuse! */
cronTime(&now);
if (record->timeout > now)
@@ -1598,7 +1584,7 @@
/**
* Callback called for local results found in
- * dht_get_async_start. Calls the DHT_GET_Complete
+ * dht_get_async_start. Calls the DHT_OP_Complete
* callback with the results found locally.
* A DataProcessor.
*/
@@ -1627,8 +1613,6 @@
* @param key the key to look up
* @param timeout how long to wait until this operation should
* automatically time-out
- * @param maxResults maximum number of results to obtain;
- * also used to determine the level of parallelism; is that wise?
* @param callback function to call on each result
* @param closure extra argument to callback
* @return handle to stop the async get
@@ -1641,7 +1625,7 @@
cron_t timeout,
DataProcessor resultCallback,
void * cls,
- DHT_GET_Complete callback,
+ DHT_OP_Complete callback,
void * closure) {
int i;
LocalTableData * ltd;
@@ -1708,13 +1692,13 @@
#endif
/* We do participate in the table, it is fair to assume
that we know the relevant peers in my neighbour set */
- hosts = MALLOC(sizeof(PeerIdentity) * maxResults);
+ hosts = MALLOC(sizeof(PeerIdentity) * ALPHA);
count = findLocalNodes(table,
&keys[0],
hosts,
- maxResults);
+ ALPHA);
/* try adding this peer to hosts */
- k_best_insert(maxResults,
+ k_best_insert(ALPHA,
&count,
&keys[0],
(HashCode160*) hosts,
@@ -1749,8 +1733,8 @@
break;
}
- if (maxResults > ret->resultsFound) {
- /* if less than maxResults replies were found, send
+ if (ALPHA > ret->resultsFound) {
+ /* if less than ALPHA replies were found, send
dht_get_RPC to the other peers */
for (i=0;i<count;i++) {
if (! hostIdentityEquals(coreAPI->myIdentity,
@@ -1777,7 +1761,7 @@
LOG(LOG_DEBUG,
"I do not participate in the table '%s', finding %d other nodes that
do.\n",
&enc,
- maxResults);
+ ALPHA);
#endif
/* We do not particpate in the table; hence we need to use
findKNodes to find an initial set of peers in that
@@ -1789,7 +1773,7 @@
= findKNodes_start(table,
key,
timeout,
- maxResults,
+ ALPHA,
(NodeFoundCallback) &send_dht_get_rpc,
ret);
}
@@ -1974,7 +1958,7 @@
timeout,
ALPHA - fnc->k, /* level of parallelism
proportional to
number of peers we're looking
for */
-
(DHT_GET_Complete)&findnodes_dht_master_get_callback,
+ &findnodes_dht_master_get_callback,
fnc);
}
}
@@ -2186,7 +2170,7 @@
timeout,
fnc->k, /* level of parallelism proportional to
number of peers we're looking for */
-
(DHT_GET_Complete)&find_k_nodes_dht_master_get_callback,
+ &find_k_nodes_dht_master_get_callback,
fnc);
}
return fnc;
@@ -2369,7 +2353,7 @@
cron_t timeout,
const DataContainer * value,
unsigned int
replicationLevel,
- DHT_PUT_Complete callback,
+ DHT_OP_Complete callback,
void * closure) {
int i;
LocalTableData * ltd;
@@ -2495,6 +2479,8 @@
(NodeFoundCallback) &send_dht_put_rpc,
ret);
}
+ /* FIXME: ensure we call OP_Complete callback
+ after timeout! */
MUTEX_UNLOCK(lock);
return ret;
}
@@ -2667,7 +2653,7 @@
cron_t timeout,
const DataContainer * value,
unsigned int replicationLevel,
- DHT_REMOVE_Complete callback,
+ DHT_OP_Complete callback,
void * closure) {
int i;
LocalTableData * ltd;
@@ -3097,8 +3083,6 @@
*/
static void rpc_dht_findValue_callback(const DataContainer * value,
RPC_DHT_FindValue_Context * fw) {
- int stop;
-
ENTER();
MUTEX_LOCK(&fw->lock);
GROW(fw->results,
@@ -3109,7 +3093,6 @@
memcpy(fw->results[fw->count-1].data,
value->data,
value->dataLength);
- stop = fw->count == fw->maxResults;
MUTEX_UNLOCK(&fw->lock);
if (stop) {
/* don't wait for timeout, run now! */
@@ -3191,8 +3174,6 @@
fw_context
= MALLOC(sizeof(RPC_DHT_FindValue_Context));
MUTEX_CREATE_RECURSIVE(&fw_context->lock);
- fw_context->maxResults
- = ntohl(*maxResults);
fw_context->count
= 0;
fw_context->done
@@ -3211,7 +3192,7 @@
ntohll(*timeout),
NULL, /* FIXME: resultCallback required! */
fw_context,
- (DHT_GET_Complete) &rpc_dht_findValue_callback,
+ &rpc_dht_findValue_callback,
fw_context);
addAbortJob((CronJob)&rpc_DHT_findValue_abort,
fw_context);
@@ -3362,7 +3343,7 @@
ntohll(*timeout),
&value,
fw_context->replicationLevel,
- (DHT_PUT_Complete) &rpc_dht_store_callback,
+ &rpc_dht_store_callback,
fw_context);
addAbortJob((CronJob)&rpc_DHT_store_abort,
fw_context);
@@ -3527,7 +3508,7 @@
ntohll(*timeout),
(value.dataLength==0) ? NULL : &value,
fw_context->replicationLevel,
- (DHT_REMOVE_Complete) &rpc_dht_remove_callback,
+ &rpc_dht_remove_callback,
fw_context);
addAbortJob((CronJob)&rpc_DHT_remove_abort,
fw_context);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r163 - GNUnet/src/applications/dht/module,
grothoff <=