[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r166 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r166 - GNUnet/src/applications/dht/module |
Date: |
Thu, 3 Feb 2005 02:56:28 -0800 (PST) |
Author: grothoff
Date: 2005-02-03 02:56:27 -0800 (Thu, 03 Feb 2005)
New Revision: 166
Modified:
GNUnet/src/applications/dht/module/dht.c
Log:
dht compiles -- finally. Not that it works yet.
Modified: GNUnet/src/applications/dht/module/dht.c
===================================================================
--- GNUnet/src/applications/dht/module/dht.c 2005-02-03 10:28:29 UTC (rev
165)
+++ GNUnet/src/applications/dht/module/dht.c 2005-02-03 10:56:27 UTC (rev
166)
@@ -564,15 +564,6 @@
typedef struct {
/**
- * Number of results currently received (size of the
- * results-array).
- */
- unsigned int count;
- /**
- * The peers that confirmed storing the record so far.
- */
- PeerIdentity * peers;
- /**
* RPC callback to call with the final result set.
*/
Async_RPC_Complete_Callback callback;
@@ -598,10 +589,6 @@
typedef struct {
/**
- * Number of results currently received.
- */
- unsigned int count;
- /**
* RPC callback to call with the final result set.
*/
Async_RPC_Complete_Callback callback;
@@ -2323,13 +2310,14 @@
* @param closure extra argument to callback
* @return handle to stop the async put
*/
-static struct DHT_PUT_RECORD * dht_put_async_start(const DHT_TableId * table,
- const HashCode160 * key,
- unsigned int type, /*
REMOVE! */
- cron_t timeout,
- const DataContainer * value,
- DHT_OP_Complete callback,
- void * closure) {
+static struct DHT_PUT_RECORD *
+dht_put_async_start(const DHT_TableId * table,
+ const HashCode160 * key,
+ unsigned int type, /* REMOVE! */
+ cron_t timeout,
+ const DataContainer * value,
+ DHT_OP_Complete callback,
+ void * closure) {
int i;
LocalTableData * ltd;
DHT_PUT_RECORD * ret;
@@ -2744,8 +2732,7 @@
* @return SYSERR on error, OK on success
*/
static int dht_join(Blockstore * datastore,
- const DHT_TableId * table,
- cron_t timeout) {
+ const DHT_TableId * table) {
int i;
ENTER();
@@ -2985,8 +2972,6 @@
*/
static void rpc_DHT_findValue_abort(RPC_DHT_FindValue_Context * fw) {
RPC_Param * results;
- int errorCode;
- int i;
ENTER();
delAbortJob((CronJob) &rpc_DHT_findValue_abort,
@@ -3000,28 +2985,14 @@
fw->get_record = NULL;
/* build RPC reply, call RPC callback */
- results = RPC_paramNew();
- if (fw->count > 0) {
- errorCode = RPC_ERROR_OK;
- for (i=fw->count-1;i>=0;i--) {
- RPC_paramAdd(results,
- "data",
- ntohl(fw->results[i]->size),
- fw->results[i]);
- FREE(fw->results[i]);
- }
- GROW(fw->results,
- fw->count,
- 0);
- } else {
- errorCode = RPC_ERROR_TIMEOUT;
- }
- addOptionalFields(results);
- if (fw->callback != NULL)
+ if (fw->callback != NULL) {
+ results = RPC_paramNew();
+ addOptionalFields(results);
fw->callback(results,
- errorCode,
+ OK,
fw->rpc_context);
- RPC_paramFree(results);
+ RPC_paramFree(results);
+ }
fw->done = YES;
MUTEX_UNLOCK(&fw->lock);
}
@@ -3032,41 +3003,25 @@
* been accumulated this will also stop the cron-job and trigger
* sending the cummulative reply via RPC.
*/
-static void rpc_dht_findValue_callback(const DataContainer * value,
- RPC_DHT_FindValue_Context * fw) {
+static int rpc_dht_findValue_callback(const HashCode160 * key,
+ const DataContainer * value,
+ RPC_DHT_FindValue_Context * fw) {
ENTER();
MUTEX_LOCK(&fw->lock);
GROW(fw->results,
fw->count,
fw->count+1);
- fw->results[fw->count-1].dataLength = value->dataLength;
- fw->results[fw->count-1].data = MALLOC(value->dataLength);
- memcpy(fw->results[fw->count-1].data,
- value->data,
- value->dataLength);
+ fw->results[fw->count-1] = MALLOC(ntohl(value->size));
+ memcpy(fw->results[fw->count-1],
+ value,
+ ntohl(value->size));
MUTEX_UNLOCK(&fw->lock);
- if (stop) {
- /* don't wait for timeout, run now! */
- advanceCronJob((CronJob) &rpc_DHT_findValue_abort,
- 0,
- fw);
- }
+ return OK;
}
-static int addToFindValueResults(const HashCode160 * key,
- const DataContainer * data,
- RPC_DHT_FindValue_Context * ctx) {
- MUTEX_LOCK(&ctx->lock);
- GROW(ctx->results,
- ctx->count,
- ctx->count+1);
- ctx->results[ctx->count-1]
- = MALLOC(ntohl(data->size));
- memcpy(ctx->results[ctx->count-1],
- data,
- ntohl(data->size));
- MUTEX_UNLOCK(&ctx->lock);
- return OK;
+static void rpc_dht_findValue_complete(RPC_DHT_FindValue_Context * ctx) {
+ /* FIXME! */
+
}
/**
@@ -3091,6 +3046,7 @@
unsigned long long * timeout;
unsigned int * type;
unsigned int keysLength;
+ unsigned int dataLength;
RPC_DHT_FindValue_Context * fw_context;
ENTER();
@@ -3138,13 +3094,15 @@
fw_context->get_record
= dht_get_async_start(table,
ntohl(*type),
- keySize / sizeof(HashCode160),
+ keysLength / sizeof(HashCode160),
keys,
ntohll(*timeout),
- NULL, /* FIXME: resultCallback required! */
+ (DataProcessor) &rpc_dht_findValue_callback,
fw_context,
- &rpc_dht_findValue_callback,
+ (DHT_OP_Complete) &rpc_dht_findValue_complete,
fw_context);
+ /* FIXME: manage abort properly, also fix
+ rpc_dht_findValue_complete! */
addAbortJob((CronJob)&rpc_DHT_findValue_abort,
fw_context);
addCronJob((CronJob)&rpc_DHT_findValue_abort,
@@ -3163,8 +3121,6 @@
*/
static void rpc_DHT_store_abort(RPC_DHT_store_Context * fw) {
RPC_Param * results;
- int errorCode;
- int i;
ENTER();
delAbortJob((CronJob) &rpc_DHT_store_abort,
@@ -3178,23 +3134,14 @@
fw->put_record = NULL;
/* build RPC reply, call RPC callback */
- results = RPC_paramNew();
- if (fw->count > 0) {
- errorCode = RPC_ERROR_OK;
- for (i=fw->count-1;i>=0;i--)
- RPC_paramAdd(results,
- "peer",
- sizeof(PeerIdentity),
- &fw->peers[i]);
- } else {
- errorCode = RPC_ERROR_TIMEOUT;
- }
- addOptionalFields(results);
- if (fw->callback != NULL)
+ if (fw->callback != NULL) {
+ results = RPC_paramNew();
+ addOptionalFields(results);
fw->callback(results,
- errorCode,
+ OK,
fw->rpc_context);
- RPC_paramFree(results);
+ RPC_paramFree(results);
+ }
fw->done = YES;
MUTEX_UNLOCK(&fw->lock);
}
@@ -3205,11 +3152,8 @@
* the value, this will also stop the cron-job and trigger
* sending the cummulative reply via RPC.
*/
-static void rpc_dht_store_callback(const PeerIdentity * store,
- RPC_DHT_store_Context * fw) {
- MUTEX_LOCK(&fw->lock);
- fw->confirmed_stores++;
- MUTEX_UNLOCK(&fw->lock);
+static void rpc_dht_store_callback(RPC_DHT_store_Context * fw) {
+ /* FIXME: shutdown coordination! */
}
static void rpc_DHT_store(const PeerIdentity * sender,
@@ -3242,10 +3186,8 @@
&dataLength,
(void**) &timeout)) ||
(dataLength != sizeof(unsigned long long)) ||
- (OK != RPC_paramValueByName(arguments,
- "value",
- &value.dataLength,
- (void**) &value.data)) ) {
+ ((NULL == (value = RPC_paramDataContainerByName(arguments,
+ "value")))) ) {
LOG(LOG_WARNING,
_("Received invalid RPC '%s'.\n"),
"DHT_store");
@@ -3263,12 +3205,8 @@
"DHT_store");
}
MUTEX_UNLOCK(lock);
- fw_context->count
- = 0;
fw_context->done
= NO;
- fw_context->peers
- = NULL;
fw_context->callback
= callback;
fw_context->rpc_context
@@ -3276,16 +3214,20 @@
fw_context->put_record
= dht_put_async_start(table,
key,
+ 0, /* FIXME: type */
ntohll(*timeout),
- &value,
- &rpc_dht_store_callback,
+ value,
+ (DHT_OP_Complete) &rpc_dht_store_callback,
fw_context);
+ /* FIXME: fix shutdown
+ (also fix rpc_dht_store_callback) */
addAbortJob((CronJob)&rpc_DHT_store_abort,
fw_context);
addCronJob((CronJob)&rpc_DHT_store_abort,
ntohll(*timeout),
0,
fw_context);
+ FREE(value);
}
/**
@@ -3298,8 +3240,6 @@
*/
static void rpc_DHT_remove_abort(RPC_DHT_remove_Context * fw) {
RPC_Param * results;
- int errorCode;
- int i;
ENTER();
delAbortJob((CronJob) &rpc_DHT_remove_abort,
@@ -3314,20 +3254,10 @@
/* build RPC reply, call RPC callback */
results = RPC_paramNew();
- if (fw->count > 0) {
- errorCode = RPC_ERROR_OK;
- for (i=fw->count-1;i>=0;i--)
- RPC_paramAdd(results,
- "peer",
- sizeof(PeerIdentity),
- &fw->peers[i]);
- } else {
- errorCode = RPC_ERROR_TIMEOUT;
- }
addOptionalFields(results);
if (fw->callback != NULL)
fw->callback(results,
- errorCode,
+ OK,
fw->rpc_context);
RPC_paramFree(results);
fw->done = YES;
@@ -3340,12 +3270,8 @@
* number of replicas this will also stop the cron-job and trigger
* sending the cummulative reply via RPC.
*/
-static void rpc_dht_remove_callback(const PeerIdentity * store,
- RPC_DHT_remove_Context * fw) {
- ENTER();
- MUTEX_LOCK(&fw->lock);
- fw->confirmed_stores++;
- MUTEX_UNLOCK(&fw->lock);
+static void rpc_dht_remove_callback(RPC_DHT_remove_Context * fw) {
+ /* FIXME: shutdown sequence! */
}
/**
@@ -3395,12 +3321,8 @@
return;
}
- if (OK != RPC_paramValueByName(arguments,
- "value",
- &value.dataLength,
- (void**) &value.data))
- value.dataLength = 0;
-
+ value = RPC_paramDataContainerByName(arguments,
+ "value");
fw_context
= MALLOC(sizeof(RPC_DHT_remove_Context));
MUTEX_CREATE_RECURSIVE(&fw_context->lock);
@@ -3412,12 +3334,8 @@
"DHT_removed");
}
MUTEX_UNLOCK(lock);
- fw_context->count
- = 0;
fw_context->done
= NO;
- fw_context->peers
- = NULL;
fw_context->callback
= callback;
fw_context->rpc_context
@@ -3425,16 +3343,19 @@
fw_context->remove_record
= dht_remove_async_start(table,
key,
+ 0, /* FIXME, type */
ntohll(*timeout),
- (value.dataLength==0) ? NULL : &value,
- &rpc_dht_remove_callback,
+ value,
+ (DHT_OP_Complete) &rpc_dht_remove_callback,
fw_context);
+ /* FIXME: shutdown sequence! */
addAbortJob((CronJob)&rpc_DHT_remove_abort,
fw_context);
addCronJob((CronJob)&rpc_DHT_remove_abort,
ntohll(*timeout),
0,
fw_context);
+ FREE(value);
}
/**
@@ -3530,8 +3451,11 @@
/* for all of our tables, do a PUT on the master table */
request_param = vectorNew(4);
- value.dataLength = sizeof(PeerIdentity);
- value.data = coreAPI->myIdentity;
+ value = MALLOC(sizeof(PeerIdentity) + sizeof(DataContainer));
+ value->size = htonl(sizeof(PeerIdentity) + sizeof(DataContainer));
+ memcpy(&value[1],
+ coreAPI->myIdentity,
+ sizeof(PeerIdentity));
#if DEBUG_DHT
LOG(LOG_CRON,
"'%s' issues DHT_PUTs to advertise tables this peer participates in.\n",
@@ -3553,15 +3477,16 @@
putRecords[putRecordsSize-1]
= dht_put_async_start(&masterTableId,
&tables[i].id,
+ 0, /* FIXME: type */
DHT_MAINTAIN_BUCKET_FREQUENCY,
- &value,
- ALPHA,
+ value,
NULL,
NULL);
putTimes[putTimesSize-1] = now;
}
}
vectorFree(request_param);
+ FREE(value);
/*
for each table that we have joined gather OUR neighbours
@@ -3717,8 +3642,7 @@
masterTableDatastore
= create_datastore_dht_master(i);
dht_join(masterTableDatastore,
- &masterTableId,
- 0);
+ &masterTableId);
addCronJob(&dhtMaintainJob,
0,
DHT_MAINTAIN_FREQUENCY,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r166 - GNUnet/src/applications/dht/module,
grothoff <=