[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r27614 - gnunet/src/set
From: |
gnunet |
Subject: |
[GNUnet-SVN] r27614 - gnunet/src/set |
Date: |
Wed, 26 Jun 2013 12:06:52 +0200 |
Author: dold
Date: 2013-06-26 12:06:52 +0200 (Wed, 26 Jun 2013)
New Revision: 27614
Modified:
gnunet/src/set/gnunet-service-set.c
gnunet/src/set/gnunet-service-set.h
gnunet/src/set/gnunet-service-set_intersection.c
gnunet/src/set/gnunet-service-set_union.c
gnunet/src/set/set_protocol.h
Log:
- fixed tunnel context
- moved logic out of specific operations
Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-06-26 09:50:44 UTC (rev 27613)
+++ gnunet/src/set/gnunet-service-set.c 2013-06-26 10:06:52 UTC (rev 27614)
@@ -28,6 +28,55 @@
/**
+ * Peer that has connected to us, but is not yet evaluating a set operation.
+ * Once the peer has sent a request, and the client has
+ * accepted or rejected it, this information will be deleted.
+ */
+struct Incoming
+{
+ /**
+ * Incoming peers are held in a linked list
+ */
+ struct Incoming *next;
+
+ /**
+ * Incoming peers are held in a linked list
+ */
+ struct Incoming *prev;
+
+ /**
+ * Detail information about the operation.
+ */
+ struct OperationSpecification *spec;
+
+ /**
+ * The identity of the requesting peer.
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Tunnel to the peer.
+ */
+ struct GNUNET_MESH_Tunnel *tunnel;
+
+ /**
+ * Unique request id for the request from
+ * a remote peer, sent to the client, which will
+ * accept or reject the request.
+ * Set to '0' iff the request has not been
+ * suggested yet.
+ */
+ uint32_t suggest_id;
+
+ /**
+ * Timeout task, if the incoming peer has not been accepted
+ * after the timeout, it will be disconnected.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+};
+
+
+/**
* Configuration of our local peer.
* (Not declared 'static' as also needed in gnunet-service-set_union.c)
*/
@@ -77,7 +126,7 @@
* used to identify incoming operation requests from remote peers,
* that the client can choose to accept or refuse.
*/
-static uint32_t accept_id = 1;
+static uint32_t suggest_id = 1;
/**
@@ -131,7 +180,7 @@
struct Incoming *incoming;
for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
- if (incoming->accept_id == id)
+ if (incoming->suggest_id == id)
return incoming;
return NULL;
}
@@ -145,6 +194,14 @@
static void
listener_destroy (struct Listener *listener)
{
+ /* If the client is not dead yet, destroy it.
+ * The client's destroy callback will destroy the listener again. */
+ if (NULL != listener->client)
+ {
+ GNUNET_SERVER_client_disconnect (listener->client);
+ listener->client = NULL;
+ return;
+ }
if (NULL != listener->client_mq)
{
GNUNET_MQ_destroy (listener->client_mq);
@@ -163,6 +220,14 @@
static void
set_destroy (struct Set *set)
{
+ /* If the client is not dead yet, destroy it.
+ * The client's destroy callback will destroy the set again. */
+ if (NULL != set->client)
+ {
+ GNUNET_SERVER_client_disconnect (set->client);
+ set->client = NULL;
+ return;
+ }
switch (set->operation)
{
case GNUNET_SET_OPERATION_INTERSECTION:
@@ -195,10 +260,16 @@
set = set_get (client);
if (NULL != set)
+ {
+ set->client = NULL;
set_destroy (set);
+ }
listener = listener_get (client);
if (NULL != listener)
+ {
+ listener->client = NULL;
listener_destroy (listener);
+ }
}
@@ -210,6 +281,13 @@
static void
incoming_destroy (struct Incoming *incoming)
{
+ if (NULL != incoming->tunnel)
+ {
+ struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
+ incoming->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (t);
+ return;
+ }
GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
GNUNET_free (incoming);
}
@@ -246,16 +324,17 @@
struct GNUNET_MQ_Envelope *mqm;
struct GNUNET_SET_RequestMessage *cmsg;
- GNUNET_assert (GNUNET_NO == incoming->suggested);
- incoming->suggested = GNUNET_YES;
+ GNUNET_assert (0 == incoming->suggest_id);
+ GNUNET_assert (NULL != incoming->spec);
+ incoming->suggest_id = suggest_id++;
GNUNET_SCHEDULER_cancel (incoming->timeout_task);
mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
- incoming->context_msg);
+ incoming->spec->context_msg);
GNUNET_assert (NULL != mqm);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id
%u\n", incoming->accept_id);
- cmsg->accept_id = htonl (incoming->accept_id);
- cmsg->peer_id = incoming->tc->peer;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id
%u\n", incoming->suggest_id);
+ cmsg->accept_id = htonl (incoming->suggest_id);
+ cmsg->peer_id = incoming->spec->peer;
GNUNET_MQ_send (listener->client_mq, mqm);
}
@@ -280,6 +359,7 @@
struct Incoming *incoming;
const struct OperationRequestMessage *msg = (const struct
OperationRequestMessage *) mh;
struct Listener *listener;
+ struct OperationSpecification *spec;
if (CONTEXT_INCOMING != tc->type)
{
@@ -289,21 +369,27 @@
return GNUNET_SYSERR;
}
- incoming = tc->data;
+ incoming = tc->data.incoming;
- if (GNUNET_YES == incoming->received_request)
+ if (NULL != incoming->spec)
{
/* double operation request */
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- incoming->accept_id = accept_id++;
- incoming->context_msg =
+ spec = GNUNET_new (struct OperationSpecification);
+ spec->context_msg =
GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg));
+ spec->operation = ntohl (msg->operation);
+ spec->app_id = msg->app_id;
+ spec->salt = ntohl (msg->salt);
+ spec->peer = incoming->peer;
- if ( (NULL != incoming->context_msg) &&
- (ntohs (incoming->context_msg->size) >
GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+ incoming->spec = spec;
+
+ if ( (NULL != spec->context_msg) &&
+ (ntohs (spec->context_msg->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE)
)
{
GNUNET_break_op (0);
return GNUNET_SYSERR;
@@ -405,12 +491,12 @@
listener->operation, GNUNET_h2s (&listener->app_id));
for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
{
- if ( (GNUNET_NO == incoming->received_request) ||
- (GNUNET_YES == incoming->suggested) )
+ if ( (NULL == incoming->spec) ||
+ (0 != incoming->suggest_id) )
continue;
- if (listener->operation != incoming->operation)
+ if (listener->operation != incoming->spec->operation)
continue;
- if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id, &incoming->app_id))
+ if (0 != GNUNET_CRYPTO_hash_cmp (&listener->app_id,
&incoming->spec->app_id))
continue;
incoming_suggest (incoming, listener);
}
@@ -483,8 +569,7 @@
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
- /* the incoming peer will be destroyed in the tunnel end handler */
- GNUNET_MESH_tunnel_destroy (incoming->tc->tunnel);
+ GNUNET_MESH_tunnel_destroy (incoming->tunnel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -542,6 +627,10 @@
const struct GNUNET_MessageHeader *m)
{
struct Set *set;
+ struct TunnelContext *tc;
+ struct GNUNET_MESH_Tunnel *tunnel;
+ struct GNUNET_SET_EvaluateMessage *msg;
+ struct OperationSpecification *spec;
set = set_get (client);
if (NULL == set)
@@ -551,13 +640,27 @@
return;
}
+ msg = (struct GNUNET_SET_EvaluateMessage *) m;
+ tc = GNUNET_new (struct TunnelContext);
+ spec = GNUNET_new (struct OperationSpecification);
+ spec->operation = set->operation;
+ spec->app_id = msg->app_id;
+ spec->salt = ntohl (msg->salt);
+ spec->peer = msg->target_peer;
+
+ tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
+ GNUNET_APPLICATION_TYPE_SET);
+
switch (set->operation)
{
case GNUNET_SET_OPERATION_INTERSECTION:
+ tc->type = CONTEXT_OPERATION_INTERSECTION;
//_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m,
set);
break;
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
+ tc->type = CONTEXT_OPERATION_UNION;
+ tc->data.union_op =
+ _GSS_union_evaluate (spec, tunnel);
break;
default:
GNUNET_assert (0);
@@ -601,6 +704,9 @@
struct Set *set;
struct Incoming *incoming;
struct GNUNET_SET_AcceptRejectMessage *msg = (struct
GNUNET_SET_AcceptRejectMessage *) mh;
+ struct GNUNET_MESH_Tunnel *tunnel;
+ struct TunnelContext *tc;
+ struct OperationSpecification *spec;
incoming = get_incoming (ntohl (msg->accept_reject_id));
@@ -623,13 +729,20 @@
return;
}
+ tc = GNUNET_new (struct TunnelContext);
+ tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &incoming->spec->peer,
+ GNUNET_APPLICATION_TYPE_SET);
+ spec = GNUNET_new (struct OperationSpecification);
+
switch (set->operation)
{
case GNUNET_SET_OPERATION_INTERSECTION:
+ tc->type = CONTEXT_OPERATION_INTERSECTION;
// _GSS_intersection_accept (msg, set, incoming);
break;
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_accept (msg, set, incoming);
+ tc->type = CONTEXT_OPERATION_UNION;
+ tc->data.union_op = _GSS_union_accept (spec, tunnel);
break;
default:
GNUNET_assert (0);
@@ -719,11 +832,9 @@
GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
tc = GNUNET_new (struct TunnelContext);
incoming = GNUNET_new (struct Incoming);
- incoming->tc = tc;
- tc->peer = *initiator;
- tc->tunnel = tunnel;
- tc->mq = GNUNET_MESH_mq_create (tunnel);
- tc->data = incoming;
+ incoming->peer = *initiator;
+ incoming->tunnel = tunnel;
+ tc->data.incoming = incoming;
tc->type = CONTEXT_INCOMING;
incoming->timeout_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
incoming_timeout_cb, incoming);
@@ -750,22 +861,13 @@
{
struct TunnelContext *ctx = tunnel_ctx;
- /* tunnel is dead already */
- ctx->tunnel = NULL;
-
- if (NULL != ctx->mq)
- {
- GNUNET_MQ_destroy (ctx->mq);
- ctx->mq = NULL;
- }
-
switch (ctx->type)
{
case CONTEXT_INCOMING:
- incoming_destroy ((struct Incoming *) ctx->data);
+ incoming_destroy (ctx->data.incoming);
break;
case CONTEXT_OPERATION_UNION:
- _GSS_union_operation_destroy ((struct UnionEvaluateOperation *)
ctx->data);
+ _GSS_union_operation_destroy (ctx->data.union_op);
break;
case CONTEXT_OPERATION_INTERSECTION:
GNUNET_assert (0);
Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-06-26 09:50:44 UTC (rev 27613)
+++ gnunet/src/set/gnunet-service-set.h 2013-06-26 10:06:52 UTC (rev 27614)
@@ -42,14 +42,22 @@
struct IntersectionState;
+/* FIXME: cfuchs */
+struct IntersectionOperation;
+
+
/**
* Extra state required for set union.
*/
struct UnionState;
+/**
+ * State of a union operation being evaluated.
+ */
struct UnionEvaluateOperation;
+
/**
* A set that supports a specific operation
* with other peers.
@@ -94,6 +102,50 @@
/**
+ * Detail information about an operation.
+ */
+struct OperationSpecification
+{
+ /**
+ * The type of the operation.
+ */
+ enum GNUNET_SET_OperationType operation;
+
+ /**
+ * The remove peer we evaluate the operation with
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * Application ID for the operation, used to distinguish
+ * multiple operations of the same type with the same peer.
+ */
+ struct GNUNET_HashCode app_id;
+
+ /**
+ * Context message, may be NULL.
+ */
+ struct GNUNET_MessageHeader *context_msg;
+
+ /**
+ * Salt to use for the operation.
+ */
+ uint32_t salt;
+
+ /**
+ * ID used to identify responses to a client.
+ */
+ uint32_t client_request_id;
+
+ /**
+ * Set associated with the operation, NULL until the spec has been associated
+ * with a set.
+ */
+ struct Set *set;
+};
+
+
+/**
* A listener is inhabited by a client, and
* waits for evaluation requests from remote peers.
*/
@@ -121,12 +173,13 @@
struct GNUNET_MQ_Handle *client_mq;
/**
- * Type of operation supported for this set
+ * The type of the operation.
*/
enum GNUNET_SET_OperationType operation;
/**
- * Application id of intereset for this listener.
+ * Application ID for the operation, used to distinguish
+ * multiple operations of the same type with the same peer.
*/
struct GNUNET_HashCode app_id;
};
@@ -137,81 +190,53 @@
* Once the peer has sent a request, and the client has
* accepted or rejected it, this information will be deleted.
*/
-struct Incoming
-{
- /**
- * Incoming peers are held in a linked list
- */
- struct Incoming *next;
+struct Incoming;
- /**
- * Incoming peers are held in a linked list
- */
- struct Incoming *prev;
+/**
+ * Different types a tunnel can be.
+ */
+enum TunnelContextType {
/**
- * Tunnel context, stores information about
- * the tunnel and its peer.
+ * Tunnel is waiting for a set request from the tunnel,
+ * or for the ack/nack of the client for a received request.
*/
- struct TunnelContext *tc;
+ CONTEXT_INCOMING,
/**
- * GNUNET_YES if the incoming peer has sent
- * an operation request (and we are waiting
- * for the client to ack/nack), GNUNET_NO otherwise.
+ * The tunnel performs a union operation.
*/
- int received_request;
+ CONTEXT_OPERATION_UNION,
/**
- * App code, set once the peer has
- * requested an operation
+ * The tunnel performs an intersection operation.
*/
- struct GNUNET_HashCode app_id;
+ CONTEXT_OPERATION_INTERSECTION,
+};
- /**
- * Context message, set once the peer
- * has requested an operation.
- */
- struct GNUNET_MessageHeader *context_msg;
+/**
+ * State associated with the tunnel, dependent on
+ * tunnel type.
+ */
+union TunnelContextData
+{
/**
- * Salt the peer has requested to use for the
- * operation
+ * Valid for tag 'CONTEXT_INCOMING'
*/
- uint16_t salt;
+ struct Incoming *incoming;
/**
- * Operation the other peer wants to do
+ * Valid for tag 'CONTEXT_OPERATION_UNION'
*/
- enum GNUNET_SET_OperationType operation;
+ struct UnionEvaluateOperation *union_op;
/**
- * Has the incoming request been suggested to
- * a client listener yet?
+ * Valid for tag 'CONTEXT_OPERATION_INTERSECTION'
*/
- int suggested;
-
- /**
- * Unique request id for the request from
- * a remote peer, sent to the client, which will
- * accept or reject the request.
- */
- uint32_t accept_id;
-
- /**
- * Timeout task, if the incoming peer has not been accepted
- * after the timeout, it will be disconnected.
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+ struct IntersectionEvaluateOperation *intersection_op;
};
-
-enum TunnelContextType {
- CONTEXT_INCOMING,
- CONTEXT_OPERATION_UNION,
- CONTEXT_OPERATION_INTERSECTION,
-};
-
/**
* Information about a tunnel we are connected to.
* Used as tunnel context with mesh.
@@ -219,21 +244,6 @@
struct TunnelContext
{
/**
- * The mesh tunnel that has this context
- */
- struct GNUNET_MESH_Tunnel *tunnel;
-
- /**
- * The peer on the other side.
- */
- struct GNUNET_PeerIdentity peer;
-
- /**
- * Handle to the message queue for the tunnel.
- */
- struct GNUNET_MQ_Handle *mq;
-
- /**
* Type of the tunnel.
*/
enum TunnelContextType type;
@@ -242,7 +252,7 @@
* State associated with the tunnel, dependent on
* tunnel type.
*/
- void *data;
+ union TunnelContextData data;
};
@@ -268,11 +278,14 @@
* Evaluate a union operation with
* a remote peer.
*
- * @param m the evaluate request message from the client
+ * @param spec specification of the operation the evaluate
+ * @param tunnel tunnel already connected to the partner peer
* @param set the set to evaluate the operation with
+ * @return a handle to the operation
*/
-void
-_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set);
+struct UnionEvaluateOperation *
+_GSS_union_evaluate (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel);
/**
@@ -308,13 +321,13 @@
/**
* Accept an union operation request from a remote peer
*
- * @param m the accept message from the client
- * @param set the set of the client
- * @param incoming information about the requesting remote peer
+ * @param spec all necessary information about the operation
+ * @param tunnel open tunnel to the partner's peer
+ * @return operation
*/
-void
-_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
- struct Incoming *incoming);
+struct UnionEvaluateOperation *
+_GSS_union_accept (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel);
/**
Modified: gnunet/src/set/gnunet-service-set_intersection.c
===================================================================
--- gnunet/src/set/gnunet-service-set_intersection.c 2013-06-26 09:50:44 UTC
(rev 27613)
+++ gnunet/src/set/gnunet-service-set_intersection.c 2013-06-26 10:06:52 UTC
(rev 27614)
@@ -124,10 +124,9 @@
struct GNUNET_MessageHeader *context_msg;
/**
- * Tunnel context for the peer we
- * evaluate the union operation with.
+ * Tunnel to the other peer.
*/
- struct TunnelContext *tc;
+ struct GNUNET_MESH_Tunnel *tunnel;
/**
* Request ID to multiplex set operations to
@@ -397,12 +396,11 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
- if (NULL != eo->tc)
+ if (NULL != eo->tunnel)
{
- GNUNET_MQ_destroy (eo->tc->mq);
- GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
- GNUNET_free (eo->tc);
- eo->tc = NULL;
+ GNUNET_MESH_tunnel_destroy (eo->tunnel);
+ /* wait for the final destruction by the tunnel cleaner */
+ return;
}
if (NULL != eo->remote_ibf)
@@ -432,10 +430,8 @@
eo);
GNUNET_free (eo);
-
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
-
/* FIXME: do a garbage collection of the set generations */
}
@@ -1355,7 +1351,6 @@
* @param cls closure
* @param tunnel mesh tunnel
* @param tunnel_ctx tunnel context
- * @param sender ???
* @param mh message to process
* @return ???
*/
@@ -1363,7 +1358,6 @@
_GSS_union_handle_p2p_message (void *cls,
struct GNUNET_MESH_Tunnel *tunnel,
void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *mh)
{
struct TunnelContext *tc = *tunnel_ctx;
@@ -1371,7 +1365,6 @@
if (CONTEXT_OPERATION_UNION != tc->type)
{
- /* FIXME: kill the tunnel */
/* never kill mesh */
return GNUNET_OK;
}
Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c 2013-06-26 09:50:44 UTC (rev
27613)
+++ gnunet/src/set/gnunet-service-set_union.c 2013-06-26 10:06:52 UTC (rev
27614)
@@ -103,39 +103,22 @@
struct UnionEvaluateOperation
{
/**
- * Local set the operation is evaluated on.
+ * Tunnel to the remote peer.
*/
- struct Set *set;
+ struct GNUNET_MESH_Tunnel *tunnel;
/**
- * Peer with the remote set
+ * Detail information about the set operation,
+ * including the set to use.
*/
- struct GNUNET_PeerIdentity peer;
+ struct OperationSpecification *spec;
/**
- * Application-specific identifier
+ * Message queue for the peer.
*/
- struct GNUNET_HashCode app_id;
+ struct GNUNET_MQ_Handle *mq;
/**
- * Context message, given to us
- * by the client, may be NULL.
- */
- struct GNUNET_MessageHeader *context_msg;
-
- /**
- * Tunnel context for the peer we
- * evaluate the union operation with.
- */
- struct TunnelContext *tc;
-
- /**
- * Request ID to multiplex set operations to
- * the client inhabiting the set.
- */
- uint32_t request_id;
-
- /**
* Number of ibf buckets received
*/
unsigned int ibf_buckets_received;
@@ -167,11 +150,6 @@
enum UnionOperationPhase phase;
/**
- * Salt to use for this operation.
- */
- uint16_t salt;
-
- /**
* Generation in which the operation handle
* was created.
*/
@@ -395,16 +373,17 @@
void
_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
{
+ struct UnionState *st = eo->spec->set->state.u;
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
-
- if (NULL != eo->tc)
+
+ if (NULL != eo->tunnel)
{
- GNUNET_MQ_destroy (eo->tc->mq);
- GNUNET_MESH_tunnel_destroy (eo->tc->tunnel);
- GNUNET_free (eo->tc);
- eo->tc = NULL;
+ struct GNUNET_MESH_Tunnel *t = eo->tunnel;
+ eo->tunnel = NULL;
+ GNUNET_MESH_tunnel_destroy (t);
}
-
+
if (NULL != eo->remote_ibf)
{
ibf_destroy (eo->remote_ibf);
@@ -427,8 +406,8 @@
eo->key_to_element = NULL;
}
- GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
- eo->set->state.u->ops_tail,
+ GNUNET_CONTAINER_DLL_remove (st->ops_head,
+ st->ops_tail,
eo);
GNUNET_free (eo);
@@ -449,13 +428,13 @@
static void
fail_union_operation (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *msg;
- mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
- msg->request_id = htonl (eo->request_id);
- GNUNET_MQ_send (eo->set->client_mq, mqm);
+ msg->request_id = htonl (eo->spec->client_request_id);
+ GNUNET_MQ_send (eo->spec->set->client_mq, ev);
_GSS_union_operation_destroy (eo);
}
@@ -490,27 +469,27 @@
static void
send_operation_request (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct OperationRequestMessage *msg;
- mqm = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
- eo->context_msg);
+ ev = GNUNET_MQ_msg_nested_mh (msg,
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
+ eo->spec->context_msg);
- if (NULL == mqm)
+ if (NULL == ev)
{
/* the context message is too large */
GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (eo->set->client);
+ GNUNET_SERVER_client_disconnect (eo->spec->set->client);
return;
}
msg->operation = htons (GNUNET_SET_OPERATION_UNION);
- msg->app_id = eo->app_id;
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ msg->app_id = eo->spec->app_id;
+ GNUNET_MQ_send (eo->mq, ev);
- if (NULL != eo->context_msg)
+ if (NULL != eo->spec->context_msg)
{
- GNUNET_free (eo->context_msg);
- eo->context_msg = NULL;
+ GNUNET_free (eo->spec->context_msg);
+ eo->spec->context_msg = NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
@@ -565,7 +544,7 @@
struct IBF_Key ibf_key;
struct KeyEntry *k;
- ibf_key = get_ibf_key (&ee->element_hash, eo->salt);
+ ibf_key = get_ibf_key (&ee->element_hash, eo->spec->salt);
k = GNUNET_new (struct KeyEntry);
k->element = ee;
k->ibf_key = ibf_key;
@@ -644,9 +623,9 @@
if (NULL == eo->key_to_element)
{
unsigned int len;
- len = GNUNET_CONTAINER_multihashmap_size (eo->set->state.u->elements);
+ len = GNUNET_CONTAINER_multihashmap_size
(eo->spec->set->state.u->elements);
eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
- GNUNET_CONTAINER_multihashmap_iterate (eo->set->state.u->elements,
+ GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements,
init_key_to_element_iterator, eo);
}
if (NULL != eo->local_ibf)
@@ -678,7 +657,7 @@
while (buckets_sent < (1 << ibf_order))
{
unsigned int buckets_in_message;
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct IBFMessage *msg;
buckets_in_message = (1 << ibf_order) - buckets_sent;
@@ -686,14 +665,14 @@
if (buckets_in_message > MAX_BUCKETS_PER_MESSAGE)
buckets_in_message = MAX_BUCKETS_PER_MESSAGE;
- mqm = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
+ ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
msg->order = ibf_order;
msg->offset = htons (buckets_sent);
ibf_write_slice (ibf, buckets_sent,
buckets_in_message, &msg[1]);
buckets_sent += buckets_in_message;
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ GNUNET_MQ_send (eo->mq, ev);
}
eo->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
@@ -708,14 +687,15 @@
static void
send_strata_estimator (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *strata_msg;
+ struct UnionState *st = eo->spec->set->state.u;
- mqm = GNUNET_MQ_msg_header_extra (strata_msg,
+ ev = GNUNET_MQ_msg_header_extra (strata_msg,
SE_STRATA_COUNT * IBF_BUCKET_SIZE *
SE_IBF_SIZE,
GNUNET_MESSAGE_TYPE_SET_P2P_SE);
- strata_estimator_write (eo->set->state.u->se, &strata_msg[1]);
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ strata_estimator_write (st->se, &strata_msg[1]);
+ GNUNET_MQ_send (eo->mq, ev);
eo->phase = PHASE_EXPECT_IBF;
}
@@ -797,12 +777,12 @@
while (NULL != ke)
{
const struct GNUNET_SET_Element *const element = &ke->element->element;
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *mh;
GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
- mqm = GNUNET_MQ_msg_header_extra (mh, element->size,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
- if (NULL == mqm)
+ ev = GNUNET_MQ_msg_header_extra (mh, element->size,
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+ if (NULL == ev)
{
/* element too large */
GNUNET_break (0);
@@ -810,7 +790,7 @@
}
memcpy (&mh[1], element->data, element->size);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ GNUNET_MQ_send (eo->mq, ev);
ke = ke->next_colliding;
}
return GNUNET_NO;
@@ -882,11 +862,11 @@
}
if (GNUNET_NO == res)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending
DONE\n");
- mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+ GNUNET_MQ_send (eo->mq, ev);
break;
}
if (1 == side)
@@ -895,15 +875,15 @@
}
else
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_MessageHeader *msg;
/* FIXME: before sending the request, check if we may just have the
element */
/* FIXME: merge multiple requests */
- mqm = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
+ ev = GNUNET_MQ_msg_header_extra (msg, sizeof (struct IBF_Key),
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
*(struct IBF_Key *) &msg[1] = key;
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ GNUNET_MQ_send (eo->mq, ev);
}
}
ibf_destroy (diff_ibf);
@@ -980,21 +960,21 @@
send_client_element (struct UnionEvaluateOperation *eo,
struct GNUNET_SET_Element *element)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
- GNUNET_assert (0 != eo->request_id);
- mqm = GNUNET_MQ_msg_extra (rm, element->size,
GNUNET_MESSAGE_TYPE_SET_RESULT);
- if (NULL == mqm)
+ GNUNET_assert (0 != eo->spec->client_request_id);
+ ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ if (NULL == ev)
{
- GNUNET_MQ_discard (mqm);
+ GNUNET_MQ_discard (ev);
GNUNET_break (0);
return;
}
rm->result_status = htons (GNUNET_SET_STATUS_OK);
- rm->request_id = htonl (eo->request_id);
+ rm->request_id = htonl (eo->spec->client_request_id);
memcpy (&rm[1], element->data, element->size);
- GNUNET_MQ_send (eo->set->client_mq, mqm);
+ GNUNET_MQ_send (eo->spec->set->client_mq, ev);
}
@@ -1009,14 +989,13 @@
static void
send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
{
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_SET_ResultMessage *rm;
- GNUNET_assert (0 != eo->request_id);
- mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
- rm->request_id = htonl (eo->request_id);
+ ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ rm->request_id = htonl (eo->spec->client_request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
- GNUNET_MQ_send (eo->set->client_mq, mqm);
+ GNUNET_MQ_send (eo->spec->set->client_mq, ev);
}
@@ -1123,13 +1102,13 @@
if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
{
/* we got all requests, but still have to send our elements as response */
- struct GNUNET_MQ_Envelope *mqm;
+ struct GNUNET_MQ_Envelope *ev;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after
elements\n");
eo->phase = PHASE_FINISHED;
- mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
- GNUNET_MQ_notify_sent (mqm, peer_done_sent_cb, eo);
- GNUNET_MQ_send (eo->tc->mq, mqm);
+ ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
+ GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
+ GNUNET_MQ_send (eo->mq, ev);
return;
}
if (eo->phase == PHASE_EXPECT_ELEMENTS)
@@ -1148,80 +1127,69 @@
* Evaluate a union operation with
* a remote peer.
*
- * @param m the evaluate request message from the client
+ * @param spec specification of the operation the evaluate
+ * @param tunnel tunnel already connected to the partner peer
* @param set the set to evaluate the operation with
+ * @return a handle to the operation
*/
-void
-_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
+struct UnionEvaluateOperation *
+_GSS_union_evaluate (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel)
{
struct UnionEvaluateOperation *eo;
- struct GNUNET_MessageHeader *context_msg;
+ struct UnionState *st = spec->set->state.u;
eo = GNUNET_new (struct UnionEvaluateOperation);
- eo->peer = m->target_peer;
- eo->set = set;
- eo->request_id = htonl (m->request_id);
- GNUNET_assert (0 != eo->request_id);
- eo->se = strata_estimator_dup (set->state.u->se);
- eo->salt = ntohs (m->salt);
- eo->app_id = m->app_id;
-
- context_msg = GNUNET_MQ_extract_nested_mh (m);
- if (NULL != context_msg)
- {
- eo->context_msg = GNUNET_copy_message (context_msg);
- }
+ eo->se = strata_estimator_dup (spec->set->state.u->se);
+ eo->spec = spec;
+ eo->tunnel = tunnel;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"evaluating union operation, (app %s)\n",
- GNUNET_h2s (&eo->app_id));
+ GNUNET_h2s (&eo->spec->app_id));
- eo->tc = GNUNET_new (struct TunnelContext);
- eo->tc->tunnel = GNUNET_MESH_tunnel_create (mesh, eo->tc, &eo->peer,
- GNUNET_APPLICATION_TYPE_SET);
- GNUNET_assert (NULL != eo->tc->tunnel);
- eo->tc->peer = eo->peer;
- eo->tc->mq = GNUNET_MESH_mq_create (eo->tc->tunnel);
/* we started the operation, thus we have to send the operation request */
eo->phase = PHASE_EXPECT_SE;
- GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
- eo->set->state.u->ops_tail,
+ GNUNET_CONTAINER_DLL_insert (st->ops_head,
+ st->ops_tail,
eo);
send_operation_request (eo);
+
+ return eo;
}
/**
* Accept an union operation request from a remote peer
*
- * @param m the accept message from the client
- * @param set the set of the client
- * @param incoming information about the requesting remote peer
+ * @param spec all necessary information about the operation
+ * @param tunnel open tunnel to the partner's peer
+ * @return operation
*/
-void
-_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
- struct Incoming *incoming)
+struct UnionEvaluateOperation *
+_GSS_union_accept (struct OperationSpecification *spec,
+ struct GNUNET_MESH_Tunnel *tunnel)
{
struct UnionEvaluateOperation *eo;
+ struct UnionState *st = spec->set->state.u;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
eo = GNUNET_new (struct UnionEvaluateOperation);
- eo->tc = incoming->tc;
- eo->generation_created = set->state.u->current_generation++;
- eo->set = set;
- eo->salt = ntohs (incoming->salt);
- GNUNET_assert (0 != ntohl (m->request_id));
- eo->request_id = ntohl (m->request_id);
- eo->se = strata_estimator_dup (set->state.u->se);
+ eo->generation_created = st->current_generation++;
+ eo->spec = spec;
+ eo->tunnel = tunnel;
+ eo->se = strata_estimator_dup (st->se);
/* transfer ownership of mq and socket from incoming to eo */
- GNUNET_CONTAINER_DLL_insert (eo->set->state.u->ops_head,
- eo->set->state.u->ops_tail,
+ GNUNET_CONTAINER_DLL_insert (st->ops_head,
+ st->ops_tail,
eo);
/* kick off the operation */
send_strata_estimator (eo);
+
+ return eo;
}
@@ -1370,11 +1338,10 @@
if (CONTEXT_OPERATION_UNION != tc->type)
{
- GNUNET_break_op (0);
return GNUNET_SYSERR;
}
- eo = tc->data;
+ eo = tc->data.union_op;
switch (ntohs (mh->type))
{
Modified: gnunet/src/set/set_protocol.h
===================================================================
--- gnunet/src/set/set_protocol.h 2013-06-26 09:50:44 UTC (rev 27613)
+++ gnunet/src/set/set_protocol.h 2013-06-26 10:06:52 UTC (rev 27614)
@@ -42,9 +42,14 @@
/**
* Operation to request, values from 'enum GNUNET_SET_OperationType'
*/
- uint32_t operation;
+ uint32_t operation GNUNET_PACKED;
/**
+ * Salt to use for this operation.
+ */
+ uint32_t salt;
+
+ /**
* Application-specific identifier of the request.
*/
struct GNUNET_HashCode app_id;
@@ -52,20 +57,6 @@
/* rest: optional message */
};
-struct ElementRequestMessage
-{
- /**
- * Type: GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Salt the keys in the body use
- */
- uint8_t salt;
-};
-
-
struct IBFMessage
{
/**
@@ -80,15 +71,20 @@
uint8_t order;
/**
- * Salt used when hashing elements for this IBF.
+ * Padding, must be 0.
*/
- uint8_t salt;
+ uint8_t reserved;
/**
* Offset of the strata in the rest of the message
*/
uint16_t offset GNUNET_PACKED;
+ /**
+ * Salt used when hashing elements for this IBF.
+ */
+ uint32_t salt;
+
/* rest: strata */
};
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r27614 - gnunet/src/set,
gnunet <=