[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r160 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r160 - GNUnet/src/applications/dht/module |
Date: |
Wed, 2 Feb 2005 04:33:56 -0800 (PST) |
Author: grothoff
Date: 2005-02-02 04:33:55 -0800 (Wed, 02 Feb 2005)
New Revision: 160
Modified:
GNUnet/src/applications/dht/module/cs.c
Log:
making cs.c compile
Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c 2005-02-02 11:40:23 UTC (rev
159)
+++ GNUnet/src/applications/dht/module/cs.c 2005-02-02 12:33:55 UTC (rev
160)
@@ -104,16 +104,14 @@
ClientHandle client;
struct DHT_PUT_RECORD * put_record;
DHT_TableId table;
- unsigned int replicas;
- unsigned int maxReplicas;
+ unsigned int replicas; /* confirmed puts? */
} CS_PUT_RECORD;
typedef struct {
ClientHandle client;
struct DHT_REMOVE_RECORD * remove_record;
DHT_TableId table;
- unsigned int replicas;
- unsigned int maxReplicas;
+ unsigned int replicas; /* confirmed dels? */
} CS_REMOVE_RECORD;
typedef struct {
@@ -121,8 +119,6 @@
struct DHT_GET_RECORD * get_record;
DHT_TableId table;
unsigned int count;
- unsigned int replyCount;
- DataContainer ** replies;
} CS_GET_RECORD;
static CS_GET_RECORD ** getRecords;
@@ -165,6 +161,7 @@
*/
static int tcp_get(void * closure,
unsigned int type,
+ unsigned int prio,
unsigned int keyCount,
const HashCode160 * keys,
DataProcessor resultCallback,
@@ -193,6 +190,7 @@
req->header.size = htons(size);
req->header.type = htons(DHT_CS_PROTO_REQUEST_GET);
req->type = htonl(type);
+ req->priority = htonl(prio);
req->table = handlers->table;
memcpy(&req->keys,
keys,
@@ -216,12 +214,14 @@
*
* @param key the key of the item
* @param value the value to store
+ * @param prio the priority for the store
* @return OK if the value could be stored, SYSERR if not (i.e. out of space)
*/
static int tcp_put(void * closure,
const HashCode160 * key,
unsigned int type,
- const DataContainer * value) {
+ const DataContainer * value,
+ unsigned int prio) {
DHT_CS_REQUEST_PUT * req;
CS_TableHandlers * handlers = closure;
int ret;
@@ -236,6 +236,7 @@
req->table = handlers->table;
req->key = *key;
req->timeout = htonl(0);
+ req->priority = htonl(prio);
memcpy(&req[1],
value,
ntohl(value->size));
@@ -375,8 +376,7 @@
ptr->prereply = SEMAPHORE_NEW(0);
ptr->postreply = SEMAPHORE_NEW(0);
ret = dhtAPI->join(ptr->store,
- &req->table,
- ntohl(req->timeout));
+ &req->table);
if (ret == OK) {
GROW(csHandlers,
csHandlersCount,
@@ -461,7 +461,8 @@
&record->table,
record->replicas)) {
LOG(LOG_FAILURE,
- _("sendAck failed. Terminating connection to client.\n"));
+ _("'%s' failed. Terminating connection to client.\n"),
+ "sendAck");
coreAPI->terminateClientConnection(record->client);
}
for (i=putRecordsSize-1;i>=0;i--)
@@ -481,18 +482,13 @@
*/
static void cs_put_complete_callback(const PeerIdentity * store,
CS_PUT_RECORD * record) {
- int mark = 0;
MUTEX_LOCK(&csLock);
record->replicas++;
- if (record->replicas == record->maxReplicas)
- mark = 1;
MUTEX_UNLOCK(&csLock);
- if (mark == 1) {
- /* trigger cron-job early if replication count reached. */
- advanceCronJob((CronJob) &cs_put_abort,
- 0,
- record);
- }
+ /* trigger cron-job early if replication confirmed. */
+ advanceCronJob((CronJob) &cs_put_abort,
+ 0,
+ record);
}
struct CSPutClosure {
@@ -508,21 +504,28 @@
DHT_CS_REQUEST_PUT * req;
DataContainer * data;
CS_PUT_RECORD * ptr;
+ unsigned int size;
req = cpc->message;
client = cpc->client;
FREE(cpc);
- cont.dataLength = ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_PUT);
- if (cont.dataLength == 0)
- cont.data = NULL;
- else
- cont.data = &((DHT_CS_REQUEST_PUT_GENERIC*)req)->value[0];
-
+ size = ntohs(req->header.size)
+ - sizeof(DHT_CS_REQUEST_PUT)
+ + sizeof(DataContainer);
+ GNUNET_ASSERT(size < 0xFFFF);
+ if (size == 0) {
+ data = NULL;
+ } else {
+ data = MALLOC(size);
+ data->size = htonl(size);
+ memcpy(&data[1],
+ &req[1],
+ size - sizeof(DataContainer));
+ }
ptr = MALLOC(sizeof(CS_PUT_RECORD));
ptr->client = client;
ptr->replicas = 0;
ptr->table = req->table;
- ptr->maxReplicas = 7;
ptr->put_record = NULL;
MUTEX_LOCK(&csLock);
@@ -537,11 +540,12 @@
MUTEX_UNLOCK(&csLock);
ptr->put_record = dhtAPI->put_start(&req->table,
&req->key,
+ ntohl(req->type),
ntohll(req->timeout),
- &cont,
- ptr->maxReplicas,
- (DHT_PUT_Complete)
&cs_put_complete_callback,
+ data,
+ (DHT_OP_Complete)
&cs_put_complete_callback,
ptr);
+ FREE(data);
FREE(req);
}
@@ -597,18 +601,13 @@
*/
static void cs_remove_complete_callback(const PeerIdentity * store,
CS_REMOVE_RECORD * record) {
- int mark = 0;
MUTEX_LOCK(&csLock);
record->replicas++;
- if (record->replicas == record->maxReplicas)
- mark = 1;
MUTEX_UNLOCK(&csLock);
- if (mark == 1) {
- /* trigger cron-job early if replication count reached. */
- advanceCronJob((CronJob) &cs_remove_abort,
- 0,
- record);
- }
+ /* trigger cron-job early if remove confirmed. */
+ advanceCronJob((CronJob) &cs_remove_abort,
+ 0,
+ record);
}
struct CSRemoveClosure {
@@ -621,24 +620,31 @@
*/
static void csRemoveJob(struct CSRemoveClosure * cpc) {
DHT_CS_REQUEST_REMOVE * req;
- DataContainer cont;
+ DataContainer * data;
CS_REMOVE_RECORD * ptr;
ClientHandle client;
+ unsigned int size;
req = cpc->message;
client = cpc->client;
FREE(cpc);
- cont.dataLength = ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_REMOVE);
- if (cont.dataLength == 0)
- cont.data = NULL;
- else
- cont.data = &((DHT_CS_REQUEST_REMOVE_GENERIC*)req)->value[0];
-
+ size = ntohs(req->header.size)
+ - sizeof(DHT_CS_REQUEST_REMOVE)
+ + sizeof(DataContainer);
+ GNUNET_ASSERT(size < 0xFFFF);
+ if (size == 0) {
+ data = NULL;
+ } else {
+ data = MALLOC(size);
+ data->size = htonl(size);
+ memcpy(&data[1],
+ &req[1],
+ size - sizeof(DataContainer));
+ }
ptr = MALLOC(sizeof(CS_REMOVE_RECORD));
ptr->client = client;
ptr->replicas = 0;
ptr->table = req->table;
- ptr->maxReplicas = 7;
ptr->remove_record = NULL;
addCronJob((CronJob) &cs_remove_abort,
ntohll(req->timeout),
@@ -652,12 +658,13 @@
MUTEX_UNLOCK(&csLock);
ptr->remove_record = dhtAPI->remove_start(&req->table,
&req->key,
+ ntohl(req->type),
ntohll(req->timeout),
- &cont,
- ptr->maxReplicas,
- (DHT_REMOVE_Complete)
&cs_remove_complete_callback,
+ data,
+ (DHT_OP_Complete)
&cs_remove_complete_callback,
ptr);
FREE(req);
+ FREE(data);
}
/**
@@ -683,51 +690,61 @@
}
-static void cs_get_abort(CS_GET_RECORD * record) {
- int i;
+
+static int cs_get_result_callback(const HashCode160 * key,
+ const DataContainer * value,
+ CS_GET_RECORD * record) {
DHT_CS_REPLY_RESULTS * msg;
size_t n;
+
+ n = sizeof(DHT_CS_REPLY_RESULTS) + ntohl(value->size);
+ msg = MALLOC(n);
+ msg->key = *key;
+ memcpy(&msg[1],
+ value,
+ ntohl(value->size));
+ LOG(LOG_DEBUG,
+ "'%s' processes reply '%.*s'\n",
+ __FUNCTION__,
+ ntohl(value->size) - sizeof(DataContainer),
+ &value[1]);
+ msg->table = record->table;
+ msg->header.size = htons(n);
+ msg->header.type = htons(DHT_CS_PROTO_REPLY_GET);
+ if (OK != coreAPI->sendToClient(record->client,
+ &msg->header)) {
+ LOG(LOG_FAILURE,
+ _("'%s' failed. Terminating connection to client.\n"),
+ "sendToClient");
+ coreAPI->terminateClientConnection(record->client);
+ }
+ FREE(msg);
+ return OK;
+}
+
+static void cs_get_abort(CS_GET_RECORD * record) {
+ int i;
dhtAPI->get_stop(record->get_record);
- for (i=0;i<record->count;i++) {
- n = sizeof(DHT_CS_REPLY_RESULTS) + record->replies[i].dataLength;
- msg = MALLOC(n);
- memcpy(&((DHT_CS_REPLY_RESULTS_GENERIC*)msg)->data[0],
- record->replies[i].data,
- record->replies[i].dataLength);
- LOG(LOG_DEBUG,
- "'%s' processes reply '%.*s'\n",
- __FUNCTION__,
- record->replies[i].dataLength,
- record->replies[i].data);
- FREENONNULL(record->replies[i].data);
- msg->totalResults = htonl(record->count);
- msg->table = record->table;
- msg->header.size = htons(n);
- msg->header.type = htons(DHT_CS_PROTO_REPLY_GET);
- if (OK != coreAPI->sendToClient(record->client,
- &msg->header)) {
+ if (record->count == 0) {
+ if (OK != sendAck(record->client,
+ &record->table,
+ SYSERR)) {
LOG(LOG_FAILURE,
_("'%s' failed. Terminating connection to client.\n"),
- "sendToClient");
+ "sendAck");
coreAPI->terminateClientConnection(record->client);
- }
- }
- GROW(record->replies,
- record->count,
- 0);
- if (record->count == 0) {
+ }
+ } else {
if (OK != sendAck(record->client,
&record->table,
- SYSERR)) {
+ record->count)) {
LOG(LOG_FAILURE,
_("'%s' failed. Terminating connection to client.\n"),
"sendAck");
coreAPI->terminateClientConnection(record->client);
}
}
-
-
MUTEX_LOCK(&csLock);
for (i=getRecordsSize-1;i>=0;i--)
if (getRecords[i] == record) {
@@ -742,37 +759,13 @@
}
/**
- * Notification: peer 'store' agreed to store data.
+ * Notification: peer 'get' operation complete (or timeout)
*/
-static void cs_get_complete_callback(const DataContainer * value,
+static void cs_get_complete_callback(const PeerIdentity * peer,
CS_GET_RECORD * record) {
- DataContainer * copy;
- int mark = 0;
-
- LOG(LOG_EVERYTHING,
- "'%s' called with result '%.*s'!\n",
- __FUNCTION__,
- value->dataLength,
- value->data);
- MUTEX_LOCK(&csLock);
- GROW(record->replies,
- record->count,
- record->count+1);
- copy = &record->replies[record->count-1];
- copy->dataLength = value->dataLength;
- copy->data = MALLOC(copy->dataLength);
- memcpy(copy->data,
- value->data,
- copy->dataLength);
- if (record->count == record->maxReplies)
- mark = 1;
- MUTEX_UNLOCK(&csLock);
- if (mark == 1) {
- /* trigger cron-job early if maxResult count reached. */
- advanceCronJob((CronJob) &cs_get_abort,
- 0,
- record);
- }
+ advanceCronJob((CronJob) &cs_get_abort,
+ 0,
+ record);
}
struct CSGetClosure {
@@ -787,15 +780,16 @@
DHT_CS_REQUEST_GET * req;
CS_GET_RECORD * ptr;
ClientHandle client;
+ unsigned int keyCount;
client = cpc->client;
req = cpc->message;
FREE(cpc);
+ keyCount = 1 + ((ntohs(req->header.size) - sizeof(DHT_CS_REQUEST_GET)) /
sizeof(HashCode160));
ptr = MALLOC(sizeof(CS_GET_RECORD));
ptr->client = client;
ptr->count = 0;
- ptr->maxReplies = 7;
ptr->table = req->table;
ptr->get_record = NULL;
@@ -810,10 +804,13 @@
getRecords[getRecordsSize-1] = ptr;
MUTEX_UNLOCK(&csLock);
ptr->get_record = dhtAPI->get_start(&req->table,
- &req->key,
+ ntohl(req->type),
+ keyCount,
+ &req->keys,
ntohll(req->timeout),
- ptr->maxReplies,
- (DHT_GET_Complete)
&cs_get_complete_callback,
+ (DataProcessor) &cs_get_result_callback,
+ ptr,
+ (DHT_OP_Complete)
&cs_get_complete_callback,
ptr);
return OK;
}
@@ -856,7 +853,8 @@
return SYSERR;
req =(DHT_CS_REPLY_ACK*) message;
LOG(LOG_EVERYTHING,
- "ACK received from client.\n");
+ "'%s' received from client.\n",
+ "DHT_CS_REPLY_ACK");
MUTEX_LOCK(&csLock);
for (i=0;i<csHandlersCount;i++) {
if ( (csHandlers[i]->handler == client) &&
@@ -872,32 +870,36 @@
}
MUTEX_UNLOCK(&csLock);
LOG(LOG_ERROR,
- _("Failed to deliver csACK signal.\n"));
+ _("Failed to deliver '%s' message.\n"),
+ "DHT_CS_REPLY_ACK");
return SYSERR; /* failed to signal */
}
/**
* CS handler for results. Finds the appropriate record
- * and appends the new result. If all results have been
+ * and passes on the new result. If all results have been
* collected, signals using the semaphore.
*/
static int csResults(ClientHandle client,
const CS_HEADER * message) {
DHT_CS_REPLY_RESULTS * req;
CS_TableHandlers * ptr;
- unsigned int tot;
unsigned int dataLength;
- DataContainer * cont;
int i;
- if (ntohs(message->size) < sizeof(DHT_CS_REPLY_RESULTS))
+ if (ntohs(message->size) < sizeof(DHT_CS_REPLY_RESULTS)) {
+ BREAK();
return SYSERR;
+ }
req = (DHT_CS_REPLY_RESULTS*) message;
- tot = ntohl(req->totalResults);
dataLength = ntohs(message->size) - sizeof(DHT_CS_REPLY_RESULTS);
+ if (dataLength != ntohl(req->data.size)) {
+ BREAK();
+ return SYSERR;
+ }
LOG(LOG_EVERYTHING,
- "%d RESULTS received from client.\n",
- tot);
+ "'%s' received from client.\n",
+ "DHT_CS_REPLY_RESULTS");
MUTEX_LOCK(&csLock);
for (i=0;i<csHandlersCount;i++) {
if ( (csHandlers[i]->handler == client) &&
@@ -905,32 +907,24 @@
&req->table)) ) {
ptr = csHandlers[i];
SEMAPHORE_DOWN(ptr->postreply);
- if ( (ptr->status == ptr->maxResults) ||
- (tot > ptr->maxResults) ) {
- MUTEX_UNLOCK(&csLock);
- LOG(LOG_ERROR,
- _("Received more results than allowed!\n"));
- return SYSERR;
- }
LOG(LOG_EVERYTHING,
"'%s' received result '%.*s'!\n",
__FUNCTION__,
- dataLength,
- &((DHT_CS_REPLY_RESULTS_GENERIC*)req)->data[0]);
+ dataLength - sizeof(DataContainer),
+ &(&req->data)[1]);
- ptr->resultCallback(data,
+ ptr->resultCallback(&req->key,
+ &req->data,
ptr->resultCallbackClosure);
ptr->status++;
- if (ptr->status == tot)
- SEMAPHORE_UP(ptr->prereply); /* all replies received, signal! */
MUTEX_UNLOCK(&csLock);
return OK;
}
}
MUTEX_UNLOCK(&csLock);
LOG(LOG_ERROR,
- _("Failed to deliver '%s' content.\n"),
- "CS_REPLY_GET");
+ _("Failed to deliver '%s' message.\n"),
+ "DHT_CS_REPLY_RESULTS");
return SYSERR; /* failed to deliver */
}
@@ -940,7 +934,6 @@
*/
static void csClientExit(ClientHandle client) {
int i;
- int j;
CS_GET_RECORD * gr;
CS_PUT_RECORD * pr;
CS_REMOVE_RECORD * rr;
@@ -973,11 +966,6 @@
0,
gr);
dhtAPI->get_stop(gr->get_record);
- for (j=0;j<gr->count;j++)
- FREENONNULL(gr->replies[j].data);
- GROW(gr->replies,
- gr->count,
- 0);
getRecords[i] = getRecords[getRecordsSize-1];
GROW(getRecords,
getRecordsSize,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r160 - GNUnet/src/applications/dht/module,
grothoff <=