[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r35717 - in gnunet/src: include psyc psycstore social
From: |
gnunet |
Subject: |
[GNUnet-SVN] r35717 - in gnunet/src: include psyc psycstore social |
Date: |
Thu, 7 May 2015 14:15:58 +0200 |
Author: tg
Date: 2015-05-07 14:15:58 +0200 (Thu, 07 May 2015)
New Revision: 35717
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/include/gnunet_psyc_util_lib.h
gnunet/src/include/gnunet_psycstore_service.h
gnunet/src/include/gnunet_social_service.h
gnunet/src/psyc/gnunet-service-psyc.c
gnunet/src/psyc/psyc.h
gnunet/src/psyc/psyc_api.c
gnunet/src/psyc/psyc_util_lib.c
gnunet/src/psyc/test_psyc.c
gnunet/src/psycstore/gnunet-service-psycstore.c
gnunet/src/psycstore/plugin_psycstore_sqlite.c
gnunet/src/psycstore/psycstore_api.c
gnunet/src/psycstore/test_psycstore.c
gnunet/src/social/gnunet-service-social.c
gnunet/src/social/social_api.c
gnunet/src/social/test_social.c
Log:
psyc/social: request history & state from psycstore; more documentation, tests,
cleanup
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/include/gnunet_protocols.h 2015-05-07 12:15:58 UTC (rev
35717)
@@ -2222,7 +2222,7 @@
/* 700 */
-/** C->S: client requests channel history from PSYCstore. */
+/** C->S: request channel history replay from PSYCstore. */
#define GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY 701
/** S->C: result for a channel history request */
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/include/gnunet_psyc_service.h 2015-05-07 12:15:58 UTC (rev
35717)
@@ -376,6 +376,106 @@
/* Followed by struct GNUNET_MessageHeader join_response */
};
+
+enum GNUNET_PSYC_HistoryReplayFlags
+{
+ /**
+ * Replay locally available messages.
+ */
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL = 0,
+
+ /**
+ * Replay messages from remote peers if not found locally.
+ */
+ GNUNET_PSYC_HISTORY_REPLAY_REMOTE = 1,
+};
+
+
+struct GNUNET_PSYC_HistoryRequestMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_HISTORY_REPLAY
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * @see enum GNUNET_PSYC_HistoryReplayFlags
+ */
+ uint32_t flags GNUNET_PACKED;
+
+ /**
+ * ID for this operation.
+ */
+ uint64_t op_id GNUNET_PACKED;
+
+ uint64_t start_message_id GNUNET_PACKED;
+
+ uint64_t end_message_id GNUNET_PACKED;
+
+ uint64_t message_limit GNUNET_PACKED;
+
+ /* Followed by NUL-terminated method name prefix. */
+};
+
+
+struct GNUNET_PSYC_StateRequestMessage
+{
+ /**
+ * Types:
+ * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET
+ * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_GET_PREFIX
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * ID for this operation.
+ */
+ uint64_t op_id GNUNET_PACKED;
+
+ /* Followed by NUL-terminated name. */
+};
+
+
+/**** service -> library ****/
+
+
+/**
+ * Answer from service to client about last operation.
+ */
+struct GNUNET_PSYC_OperationResultMessage
+{
+ /**
+ * Types:
+ * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE
+ * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id GNUNET_PACKED;
+
+ /**
+ * Status code for the operation.
+ */
+ uint64_t result_code GNUNET_PACKED;
+
+ /* Followed by:
+ * - on error: NUL-terminated error message
+ * - on success: one of the following message types
+ *
+ * For a STATE_RESULT, one of:
+ * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
+ * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
+ * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END
+ */
+};
+
GNUNET_NETWORK_STRUCT_END
@@ -907,23 +1007,6 @@
/**
- * Function called with the result of an asynchronous operation.
- *
- * @param cls
- * Closure.
- * @param result
- * Result of the operation.
- * Usually one of #GNUNET_OK, #GNUNET_YES, #GNUNET_NO, or
#GNUNET_SYSERR.
- * @param err_msg
- * Error message.
- */
-typedef void
-(*GNUNET_PSYC_ResultCallback) (void *cls,
- int64_t result,
- const char *err_msg);
-
-
-/**
* Convert a channel @a master to a @e channel handle to access the @e channel
* APIs.
*
@@ -960,17 +1043,28 @@
* correctly; not doing so correctly will result in either denying other slaves
* access or offering access to channel data to non-members.
*
- * @param channel Channel handle.
- * @param slave_key Identity of channel slave to add.
- * @param announced_at ID of the message that announced the membership change.
- * @param effective_since Addition of slave is in effect since this message ID.
+ * @param channel
+ * Channel handle.
+ * @param slave_key
+ * Identity of channel slave to add.
+ * @param announced_at
+ * ID of the message that announced the membership change.
+ * @param effective_since
+ * Addition of slave is in effect since this message ID.
+ * @param result_cb
+ * Function to call with the result of the operation.
+ * The @e result_code argument is #GNUNET_OK on success, or
+ * #GNUNET_SYSERR on error. In case of an error, the @e data argument
+ * can contain an optional error message.
+ * @param cls
+ * Closure for @a result_cb.
*/
void
GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t announced_at,
uint64_t effective_since,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
@@ -991,33 +1085,33 @@
* denying members access or offering access to channel data to
* non-members.
*
- * @param channel Channel handle.
- * @param slave_key Identity of channel slave to remove.
- * @param announced_at ID of the message that announced the membership change.
+ * @param channel
+ * Channel handle.
+ * @param slave_key
+ * Identity of channel slave to remove.
+ * @param announced_at
+ * ID of the message that announced the membership change.
+ * @param result_cb
+ * Function to call with the result of the operation.
+ * The @e result_code argument is #GNUNET_OK on success, or
+ * #GNUNET_SYSERR on error. In case of an error, the @e data argument
+ * can contain an optional error message.
+ * @param cls
+ * Closure for @a result_cb.
*/
void
GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t announced_at,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
/**
- * Function called to inform a member about stored state values for a channel.
- *
- * @param cls Closure.
- * @param name Name of the state variable. A NULL value indicates that there
- * are no more state variables to be returned.
- * @param value Value of the state variable.
- * @param value_size Number of bytes in @a value.
+ * History request handle.
*/
-typedef void
-(*GNUNET_PSYC_StateVarCallback) (void *cls,
- const char *name,
- const void *value,
- size_t value_size);
+struct GNUNET_PSYC_HistoryRequest;
/**
@@ -1032,22 +1126,28 @@
* Earliest interesting point in history.
* @param end_message_id
* Last (inclusive) interesting point in history.
- * @param finish_cb
- * Function to call when the requested history has been fully replayed
- * (counting message IDs might not suffice, as some messages might be
- * secret and thus the listener would not know the story is finished
- * without being told explicitly)o once this function has been called,
the
- * client must not call GNUNET_PSYC_channel_history_replay_cancel()
anymore.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @param flags
+ * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
+ * @param result_cb
+ * Function to call when the requested history has been fully replayed.
+ * Once this function has been called, the client must not call
+ * GNUNET_PSYC_channel_history_replay_cancel() anymore.
* @param cls
* Closure for the callbacks.
*
* @return Handle to cancel history replay operation.
*/
-void
+struct GNUNET_PSYC_HistoryRequest *
GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *channel,
uint64_t start_message_id,
uint64_t end_message_id,
- GNUNET_PSYC_ResultCallback finish_cb,
+ const char *method_prefix,
+ uint32_t flags,
+ GNUNET_PSYC_MessageCallback message_cb,
+ GNUNET_PSYC_MessagePartCallback
message_part_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
@@ -1061,6 +1161,8 @@
* Which channel should be replayed?
* @param message_limit
* Maximum number of messages to replay.
+ * @param flags
+ * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
* @param finish_cb
* Function to call when the requested history has been fully replayed
* (counting message IDs might not suffice, as some messages might be
@@ -1072,14 +1174,45 @@
*
* @return Handle to cancel history replay operation.
*/
-void
+struct GNUNET_PSYC_HistoryRequest *
GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *channel,
uint64_t message_limit,
- GNUNET_PSYC_ResultCallback
finish_cb,
+ const char *method_prefix,
+ uint32_t flags,
+ GNUNET_PSYC_MessageCallback
message_cb,
+ GNUNET_PSYC_MessagePartCallback
message_part_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
+void
+GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
+ struct GNUNET_PSYC_HistoryRequest
*hr);
+
+
/**
+ * Function called to inform a member about stored state values for a channel.
+ *
+ * @param cls Closure.
+ * @param name Name of the state variable. A NULL value indicates that there
+ * are no more state variables to be returned.
+ * @param value Value of the state variable.
+ * @param value_size Number of bytes in @a value.
+ */
+typedef void
+(*GNUNET_PSYC_StateVarCallback) (void *cls,
+ const char *name,
+ const void *value,
+ size_t value_size);
+
+
+/**
+ * State request handle.
+ */
+struct GNUNET_PSYC_StateRequest;
+
+
+/**
* Retrieve the best matching channel state variable.
*
* If the requested variable name is not present in the state, the nearest
@@ -1100,11 +1233,11 @@
* @param cls
* Closure for the callbacks.
*/
-void
+struct GNUNET_PSYC_StateRequest *
GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *channel,
const char *full_name,
GNUNET_PSYC_StateVarCallback var_cb,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
@@ -1131,14 +1264,24 @@
* @param cls
* Closure for the callbacks.
*/
-void
+struct GNUNET_PSYC_StateRequest *
GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
const char *name_prefix,
GNUNET_PSYC_StateVarCallback var_cb,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls);
+/**
+ * Cancel a state request operation.
+ *
+ * @param sr
+ * Handle for the operation to cancel.
+ */
+void
+GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr);
+
+
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/include/gnunet_psyc_util_lib.h
===================================================================
--- gnunet/src/include/gnunet_psyc_util_lib.h 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/include/gnunet_psyc_util_lib.h 2015-05-07 12:15:58 UTC (rev
35717)
@@ -115,16 +115,24 @@
/**
* Transmit a message.
*
- * @param tmit Transmission handle.
- * @param method_name Which method should be invoked.
- * @param env Environment for the message.
- * Should stay available until the first call to notify_data.
- * Can be NULL if there are no modifiers or @a notify_mod is provided
instead.
- * @param notify_mod Function to call to obtain modifiers.
- * Can be NULL if there are no modifiers or @a env is provided instead.
- * @param notify_data Function to call to obtain fragments of the data.
- * @param notify_cls Closure for @a notify_mod and @a notify_data.
- * @param flags Flags for the message being transmitted.
+ * @param tmit
+ * Transmission handle.
+ * @param method_name
+ * Which method should be invoked.
+ * @param env
+ * Environment for the message.
+ * Should stay available until the first call to notify_data.
+ * Can be NULL if there are no modifiers or @a notify_mod is
+ * provided instead.
+ * @param notify_mod
+ * Function to call to obtain modifiers.
+ * Can be NULL if there are no modifiers or @a env is provided instead.
+ * @param notify_data
+ * Function to call to obtain fragments of the data.
+ * @param notify_cls
+ * Closure for @a notify_mod and @a notify_data.
+ * @param flags
+ * Flags for the message being transmitted.
*
* @return #GNUNET_OK if the transmission was started.
* #GNUNET_SYSERR if another transmission is already going on.
Modified: gnunet/src/include/gnunet_psycstore_service.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_service.h 2015-05-07 12:15:32 UTC
(rev 35716)
+++ gnunet/src/include/gnunet_psycstore_service.h 2015-05-07 12:15:58 UTC
(rev 35717)
@@ -107,15 +107,20 @@
/**
* Function called with the result of an asynchronous operation.
*
+ * @param cls
+ * Closure.
* @param result
- * #GNUNET_YES on success or if the peer was a member,
- * #GNUNET_NO if the peer was not a member,
- * #GNUNET_SYSERR on error,
+ * Result of the operation.
+ * @param err_msg
+ * Error message, or NULL if there's no error.
+ * @param err_msg_size
+ * Size of @a err_msg
*/
typedef void
(*GNUNET_PSYCSTORE_ResultCallback) (void *cls,
int64_t result,
- const char *err_msg);
+ const char *err_msg,
+ uint16_t err_msg_size);
/**
@@ -318,15 +323,15 @@
* @param channel_key
* The channel we are interested in.
* @param slave_key
- * The slave requesting the message. If not NULL, a membership test is
- * performed first and the message is only returned if the slave has
- * access to it.
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
* @param first_message_id
* First message ID to retrieve.
- * Use 0 to get the latest message.
* @param last_message_id
* Last consecutive message ID to retrieve.
- * Use 0 to get the latest message.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
* @param fragment_cb
* Callback to call with the retrieved fragments.
* @param result_cb
@@ -342,6 +347,7 @@
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t first_message_id,
uint64_t last_message_id,
+ const char *method_prefix,
GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
GNUNET_PSYCSTORE_ResultCallback result_cb,
void *cls);
@@ -355,14 +361,16 @@
* @param channel_key
* The channel we are interested in.
* @param slave_key
- * The slave requesting the message. If not NULL, a membership test is
- * performed first and the message is only returned if the slave has
- * access to it.
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
* @param message_limit
* Maximum number of messages to retrieve.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
* @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -374,6 +382,7 @@
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t message_limit,
+ const char *method_prefix,
GNUNET_PSYCSTORE_FragmentCallback
fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls);
Modified: gnunet/src/include/gnunet_social_service.h
===================================================================
--- gnunet/src/include/gnunet_social_service.h 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/include/gnunet_social_service.h 2015-05-07 12:15:58 UTC (rev
35717)
@@ -749,49 +749,81 @@
/**
- * A history lesson.
+ * A history request.
*/
-struct GNUNET_SOCIAL_HistoryLesson;
+struct GNUNET_SOCIAL_HistoryRequest;
+
/**
* Learn about the history of a place.
*
+ * Messages are returned through the @a slicer function
+ * and have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
+ *
+ * @param place
+ * Place we want to learn more about.
+ * @param start_message_id
+ * First historic message we are interested in.
+ * @param end_message_id
+ * Last historic message we are interested in (inclusive).
+ * @param method_prefix
+ * Only retrieve messages with this method prefix.
+ * @param flags
+ * OR'ed GNUNET_PSYC_HistoryReplayFlags
+ * @param slicer
+ * Slicer to use for retrieved messages.
+ * Can be the same as the slicer of the place.
+ * @param result_cb
+ * Function called after all messages retrieved.
+ * NULL if not needed.
+ * @param cls Closure for @a result_cb.
+ */
+struct GNUNET_SOCIAL_HistoryRequest *
+GNUNET_SOCIAL_place_history_replay (struct GNUNET_SOCIAL_Place *plc,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ const char *method_prefix,
+ uint32_t flags,
+ struct GNUNET_SOCIAL_Slicer *slicer,
+ GNUNET_ResultCallback result_cb,
+ void *cls);
+
+
+/**
+ * Learn about the history of a place.
+ *
* Sends messages through the slicer function of the place where
* start_message_id <= message_id <= end_message_id.
* The messages will have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
*
* To get the latest message, use 0 for both the start and end message ID.
*
- * @param place Place we want to learn more about.
- * @param start_message_id First historic message we are interested in.
- * @param end_message_id Last historic message we are interested in
(inclusive).
- * @param slicer Slicer to use to process history. Can be the same as the
- * slicer of the place, as the HISTORIC flag allows
distinguishing
- * old messages from fresh ones.
- * @param finish_cb Function called after the last message in the history
lesson
- * is passed through the @a slicer. NULL if not needed.
- * @param finish_cb_cls Closure for @a finish_cb.
- * @return Handle to abort history lesson, never NULL (multiple lessons
- * at the same time are allowed).
+ * @param place
+ * Place we want to learn more about.
+ * @param message_limit
+ * Maximum number of historic messages we are interested in.
+ * @param result_cb
+ * Function called after all messages retrieved.
+ * NULL if not needed.
+ * @param cls Closure for @a result_cb.
*/
-struct GNUNET_SOCIAL_HistoryLesson *
-GNUNET_SOCIAL_place_get_history (struct GNUNET_SOCIAL_Place *place,
- uint64_t start_message_id,
- uint64_t end_message_id,
- const struct GNUNET_SOCIAL_Slicer *slicer,
- void (*finish_cb)(void *),
- void *finish_cb_cls);
+struct GNUNET_SOCIAL_HistoryRequest *
+GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc,
+ uint64_t message_limit,
+ const char *method_prefix,
+ uint32_t flags,
+ struct GNUNET_SOCIAL_Slicer *slicer,
+ GNUNET_ResultCallback result_cb,
+ void *cls);
-
/**
- * Stop processing messages from the history lesson.
+ * Cancel learning about the history of a place.
*
- * Must not be called after the finish callback of the history lesson is
called.
- *
- * @param hist History lesson to cancel.
+ * @param hist
+ * History lesson to cancel.
*/
void
-GNUNET_SOCIAL_place_get_history_cancel (struct GNUNET_SOCIAL_HistoryLesson
*hist);
+GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest
*hist);
struct GNUNET_SOCIAL_WatchHandle;
@@ -803,7 +835,7 @@
* Place to watch.
* @param object_filter
* Object prefix to match.
- * @param state_var_cb
+ * @param var_cb
* Function to call when an object/state var changes.
* @param cls
* Closure for callback.
@@ -813,7 +845,7 @@
struct GNUNET_SOCIAL_WatchHandle *
GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
const char *object_filter,
- GNUNET_PSYC_StateVarCallback state_var_cb,
+ GNUNET_PSYC_StateVarCallback var_cb,
void *cls);
@@ -830,13 +862,35 @@
/**
- * Look at objects in the place with a matching name prefix.
+ * Look at a particular object in the place.
*
+ * The best matching object is returned (its name might be less specific than
+ * what was requested).
+ *
* @param place
+ * The place to look the object at.
+ * @param full_name
+ * Full name of the object.
+ * @param value_size
+ * Set to the size of the returned value.
+ *
+ * @return NULL if there is no such object at this place.
+ */
+struct GNUNET_SOCIAL_LookHandle *
+GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *plc,
+ const char *full_name,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb,
+ void *cls);
+
+/**
+ * Look for objects in the place with a matching name prefix.
+ *
+ * @param place
* The place to look its objects at.
* @param name_prefix
* Look at objects with names beginning with this value.
- * @param state_var_cb
+ * @param var_cb
* Function to call for each object found.
* @param cls
* Closure for callback function.
@@ -844,10 +898,11 @@
* @return Handle that can be used to stop looking at objects.
*/
struct GNUNET_SOCIAL_LookHandle *
-GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place,
- const char *name_prefix,
- GNUNET_PSYC_StateVarCallback state_var_cb,
- void *cls);
+GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc,
+ const char *name_prefix,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb,
+ void *cls);
/**
@@ -859,24 +914,6 @@
GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh);
-
-/**
- * Look at a particular object in the place.
- *
- * The best matching object is returned (its name might be less specific than
- * what was requested).
- *
- * @param place The place to look the object at.
- * @param full_name Full name of the object.
- * @param value_size Set to the size of the returned value.
- * @return NULL if there is no such object at this place.
- */
-const void *
-GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *place,
- const char *full_name,
- size_t *value_size);
-
-
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: gnunet/src/psyc/gnunet-service-psyc.c
===================================================================
--- gnunet/src/psyc/gnunet-service-psyc.c 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/psyc/gnunet-service-psyc.c 2015-05-07 12:15:58 UTC (rev
35717)
@@ -181,22 +181,38 @@
/**
* List of connected clients.
*/
-struct ClientListItem
+struct Client
{
- struct ClientListItem *prev;
- struct ClientListItem *next;
+ struct Client *prev;
+ struct Client *next;
+
struct GNUNET_SERVER_Client *client;
};
+struct Operation
+{
+ struct Operation *prev;
+ struct Operation *next;
+
+ struct GNUNET_SERVER_Client *client;
+ struct Channel *chn;
+ uint64_t op_id;
+ uint32_t flags;
+};
+
+
/**
* Common part of the client context for both a channel master and slave.
*/
struct Channel
{
- struct ClientListItem *clients_head;
- struct ClientListItem *clients_tail;
+ struct Client *clients_head;
+ struct Client *clients_tail;
+ struct Operation *op_head;
+ struct Operation *op_tail;
+
struct TransmitMessage *tmit_head;
struct TransmitMessage *tmit_tail;
@@ -397,14 +413,6 @@
};
-struct OperationClosure
-{
- struct GNUNET_SERVER_Client *client;
- struct Channel *chn;
- uint64_t op_id;
-};
-
-
static void
transmit_message (struct Channel *chn);
@@ -435,6 +443,28 @@
}
+static struct Operation *
+op_add (struct Channel *chn, struct GNUNET_SERVER_Client *client,
+ uint64_t op_id, uint32_t flags)
+{
+ struct Operation *op = GNUNET_malloc (sizeof (*op));
+ op->client = client;
+ op->chn = chn;
+ op->op_id = op_id;
+ op->flags = flags;
+ GNUNET_CONTAINER_DLL_insert (chn->op_head, chn->op_tail, op);
+ return op;
+}
+
+
+static void
+op_remove (struct Channel *chn, struct Operation *op)
+{
+ GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
+ GNUNET_free (op);
+}
+
+
/**
* Clean up master data structures after a client disconnected.
*/
@@ -541,7 +571,7 @@
chn, (GNUNET_YES == chn->is_master) ? "master" : "slave",
GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = chn->clients_head;
+ struct Client *cli = chn->clients_head;
while (NULL != cli)
{
if (cli->client == client)
@@ -553,6 +583,17 @@
cli = cli->next;
}
+ struct Operation *op = chn->op_head;
+ while (NULL != op)
+ {
+ if (op->client == client)
+ {
+ op->client = NULL;
+ break;
+ }
+ op = op->next;
+ }
+
if (NULL == chn->clients_head)
{ /* Last client disconnected. */
if (NULL != chn->tmit_head)
@@ -574,10 +615,10 @@
client_send_msg (const struct Channel *chn,
const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending message to clients.\n", chn);
- struct ClientListItem *cli = chn->clients_head;
+ struct Client *cli = chn->clients_head;
while (NULL != cli)
{
GNUNET_SERVER_notification_context_add (nc, cli->client);
@@ -596,33 +637,29 @@
* Code to transmit.
* @param op_id
* Operation ID in network byte order.
- * @param err_msg
- * Error message to include (or NULL for none).
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
*/
static void
client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
- int64_t result_code, const char *err_msg)
+ int64_t result_code, const void *data, uint16_t data_size)
{
- struct OperationResult *res;
- size_t err_size = 0;
+ struct GNUNET_OperationResultMessage *res;
- if (NULL != err_msg)
- err_size = strnlen (err_msg,
- GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
- res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
+ res = GNUNET_malloc (sizeof (*res) + data_size);
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
- res->header.size = htons (sizeof (struct OperationResult) + err_size);
- res->result_code = GNUNET_htonll (result_code + INT64_MAX + 1);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll_signed (result_code);
res->op_id = op_id;
- if (0 < err_size)
- {
- memcpy (&res[1], err_msg, err_size);
- ((char *) &res[1])[err_size - 1] = '\0';
- }
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending result to client for operation #%" PRIu64 ": "
- "%" PRId64 " (%s)\n",
- client, GNUNET_ntohll (op_id), result_code, err_msg);
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
@@ -647,7 +684,8 @@
* Membership test result callback used for join requests.
*/
static void
-join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
+join_mem_test_cb (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct JoinMemTestClosure *jcls = cls;
@@ -663,6 +701,12 @@
}
else
{
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Could not perform membership test (%.*s)\n",
+ err_msg_size, err_msg);
+ }
// FIXME: add relays
GNUNET_MULTICAST_join_decision (jcls->jh, result, 0, NULL, NULL);
}
@@ -759,12 +803,13 @@
* Received result of GNUNET_PSYCSTORE_membership_test()
*/
static void
-store_recv_membership_test_result (void *cls, int64_t result, const char
*err_msg)
+store_recv_membership_test_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct GNUNET_MULTICAST_MembershipTestHandle *mth = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 "
(%s)\n",
- mth, result, err_msg);
+ "%p GNUNET_PSYCSTORE_membership_test() returned %" PRId64 "
(%.*s)\n",
+ mth, result, err_msg_size, err_msg);
GNUNET_MULTICAST_membership_test_result (mth, result);
}
@@ -805,12 +850,13 @@
* Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
*/
static void
-store_recv_fragment_replay_result (void *cls, int64_t result, const char
*err_msg)
+store_recv_fragment_replay_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct GNUNET_MULTICAST_ReplayHandle *rh = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%s)\n",
- rh, result, err_msg);
+ "%p Fragment replay: PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ rh, result, err_msg_size, err_msg);
switch (result)
{
@@ -867,7 +913,7 @@
{
struct Channel *chn = cls;
GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, slave_key,
- message_id, message_id,
+ message_id, message_id, NULL,
&store_recv_fragment_replay,
&store_recv_fragment_replay_result, rh);
}
@@ -911,6 +957,42 @@
/**
+ * Initialize PSYC message header.
+ */
+static inline void
+psyc_msg_init (struct GNUNET_PSYC_MessageHeader *pmsg,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t
flags)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+ pmsg->fragment_offset = mmsg->fragment_offset;
+ pmsg->flags = htonl (flags);
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+}
+
+
+/**
+ * Create a new PSYC message from a multicast message for sending it to
clients.
+ */
+static inline struct GNUNET_PSYC_MessageHeader *
+psyc_msg_new (const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint32_t
flags)
+{
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg = GNUNET_malloc (psize);
+ psyc_msg_init (pmsg, mmsg, flags);
+ return pmsg;
+}
+
+
+/**
* Send multicast message to all clients connected to the channel.
*/
static void
@@ -918,24 +1000,13 @@
const struct GNUNET_MULTICAST_MessageHeader *mmsg,
uint32_t flags)
{
- struct GNUNET_PSYC_MessageHeader *pmsg;
- uint16_t size = ntohs (mmsg->header.size);
- uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending multicast message to client. "
"fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n",
chn, GNUNET_ntohll (mmsg->fragment_id),
GNUNET_ntohll (mmsg->message_id));
- pmsg = GNUNET_malloc (psize);
- pmsg->header.size = htons (psize);
- pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
- pmsg->message_id = mmsg->message_id;
- pmsg->fragment_offset = mmsg->fragment_offset;
- pmsg->flags = htonl (flags);
-
- memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+ struct GNUNET_PSYC_MessageHeader *pmsg = psyc_msg_new (mmsg, flags);
client_send_msg (chn, &pmsg->header);
GNUNET_free (pmsg);
}
@@ -1327,12 +1398,13 @@
* Received result of GNUNET_PSYCSTORE_fragment_store().
*/
static void
-store_recv_fragment_store_result (void *cls, int64_t result, const char
*err_msg)
+store_recv_fragment_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct Channel *chn = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 "
(%s)\n",
- chn, result, err_msg);
+ "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 "
(%.*s)\n",
+ chn, result, err_msg_size, err_msg);
}
@@ -1430,7 +1502,7 @@
struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (result - INT32_MIN);
+ res.result_code = GNUNET_htonl_signed (result);
res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1476,7 +1548,7 @@
struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl (result - INT32_MIN);
+ res.result_code = GNUNET_htonl_signed (result);
res.max_message_id = GNUNET_htonll (max_message_id);
if (GNUNET_OK == result || GNUNET_NO == result)
@@ -1566,7 +1638,7 @@
struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl ((uint32_t) GNUNET_OK + INT32_MIN);
+ res.result_code = GNUNET_htonl_signed (GNUNET_OK);
res.max_message_id = GNUNET_htonll (mst->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
@@ -1578,7 +1650,7 @@
"%p Client connected as master to channel %s.\n",
mst, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ struct Client *cli = GNUNET_new (struct Client);
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
@@ -1677,7 +1749,7 @@
struct GNUNET_PSYC_CountersResultMessage res;
res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
res.header.size = htons (sizeof (res));
- res.result_code = htonl ((uint32_t) GNUNET_OK - INT32_MIN);
+ res.result_code = GNUNET_htonl_signed (GNUNET_OK);
res.max_message_id = GNUNET_htonll (chn->max_message_id);
GNUNET_SERVER_notification_context_add (nc, client);
@@ -1716,7 +1788,7 @@
"%p Client connected as slave to channel %s.\n",
slv, GNUNET_h2s (&chn->pub_key_hash));
- struct ClientListItem *cli = GNUNET_new (struct ClientListItem);
+ struct Client *cli = GNUNET_new (struct Client);
cli->client = client;
GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli);
@@ -2119,14 +2191,15 @@
* Received result of GNUNET_PSYCSTORE_membership_store()
*/
static void
-store_recv_membership_store_result (void *cls, int64_t result, const char
*err_msg)
+store_recv_membership_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct MembershipStoreClosure *mcls = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 "
(%s)\n",
- mcls->chn, result, err_msg);
+ "%p GNUNET_PSYCSTORE_membership_store() returned %" PRId64 "
(%.s)\n",
+ mcls->chn, result, err_msg_size, err_msg);
- client_send_result (mcls->client, mcls->op_id, result, err_msg);
+ client_send_result (mcls->client, mcls->op_id, result, err_msg,
err_msg_size);
}
@@ -2165,36 +2238,73 @@
}
+/**
+ * Received a fragment for GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
+ */
static int
store_recv_fragment_history (void *cls,
- struct GNUNET_MULTICAST_MessageHeader *msg,
+ struct GNUNET_MULTICAST_MessageHeader *mmsg,
enum GNUNET_PSYCSTORE_MessageFlags flags)
{
- struct OperationClosure *opcls = cls;
- struct Channel *chn = opcls->chn;
- client_send_mcast_msg (chn, msg, GNUNET_PSYC_MESSAGE_HISTORIC);
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return GNUNET_NO;
+ }
+ struct Channel *chn = op->chn;
+
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t msize = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + msize - sizeof (*mmsg);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + psize);
+ res->header.size = htons (sizeof (*res) + psize);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = op->op_id;
+ res->result_code = GNUNET_htonll_signed (GNUNET_OK);
+
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+ psyc_msg_init (pmsg, mmsg, flags | GNUNET_PSYC_MESSAGE_HISTORIC);
+ memcpy (&res[1], pmsg, psize);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (chn, &res->header);
return GNUNET_YES;
}
/**
- * Received result of GNUNET_PSYCSTORE_fragment_get() for multicast replay.
+ * Received the result of GNUNET_PSYCSTORE_fragment_get(),
+ * in response to a history request from a client.
*/
static void
-store_recv_fragment_history_result (void *cls, int64_t result, const char
*err_msg)
+store_recv_fragment_history_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
- struct OperationClosure *opcls = cls;
+ struct Operation *op = cls;
+ if (NULL == op->client)
+ { /* Requesting client already disconnected. */
+ return;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p History replay #%" PRIu64 ": "
- "PSYCSTORE returned %" PRId64 " (%s)\n",
- opcls->chn, opcls->op_id, result, err_msg);
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, op->op_id, result, err_msg_size, err_msg);
- client_send_result (opcls->client, opcls->op_id, result, err_msg);
+ if (op->flags & GNUNET_PSYC_HISTORY_REPLAY_REMOTE)
+ {
+ /** @todo Multicast replay request for messages not found locally. */
+ }
+
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
}
/**
- * Client requests channel history from PSYCstore.
+ * Client requests channel history.
*/
static void
client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
@@ -2204,26 +2314,39 @@
chn = GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != chn);
- const struct HistoryRequest *
- req = (const struct HistoryRequest *) msg;
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ chn, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ struct Operation *op = op_add (chn, client, req->op_id, ntohl (req->flags));
+
if (0 == req->message_limit)
GNUNET_PSYCSTORE_message_get (store, &chn->pub_key, NULL,
GNUNET_ntohll (req->start_message_id),
GNUNET_ntohll (req->end_message_id),
+ method_prefix,
&store_recv_fragment_history,
- &store_recv_fragment_history_result, opcls);
+ &store_recv_fragment_history_result, op);
else
GNUNET_PSYCSTORE_message_get_latest (store, &chn->pub_key, NULL,
GNUNET_ntohll (req->message_limit),
+ method_prefix,
&store_recv_fragment_history,
&store_recv_fragment_history_result,
- opcls);
+ op);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -2236,19 +2359,19 @@
store_recv_state_var (void *cls, const char *name,
const void *value, size_t value_size)
{
- struct OperationClosure *opcls = cls;
- struct OperationResult *op;
+ struct Operation *op = cls;
+ struct GNUNET_OperationResultMessage *res;
if (NULL != name)
{
uint16_t name_size = strnlen (name, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD) + 1;
struct GNUNET_PSYC_MessageModifier *mod;
- op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + name_size + value_size);
- op->header.size = htons (sizeof (*op) + sizeof (*mod) + name_size +
value_size);
- op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
- op->op_id = opcls->op_id;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + name_size +
value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + name_size +
value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
- mod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
+ mod = (struct GNUNET_PSYC_MessageModifier *) &res[1];
mod->header.size = htons (sizeof (*mod) + name_size + value_size);
mod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
mod->name_size = htons (name_size);
@@ -2260,19 +2383,20 @@
else
{
struct GNUNET_MessageHeader *mod;
- op = GNUNET_malloc (sizeof (*op) + sizeof (*mod) + value_size);
- op->header.size = htons (sizeof (*op) + sizeof (*mod) + value_size);
- op->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
- op->op_id = opcls->op_id;
+ res = GNUNET_malloc (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.size = htons (sizeof (*res) + sizeof (*mod) + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT);
+ res->op_id = op->op_id;
- mod = (struct GNUNET_MessageHeader *) &op[1];
+ mod = (struct GNUNET_MessageHeader *) &res[1];
mod->size = htons (sizeof (*mod) + value_size);
mod->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
memcpy (&mod[1], value, value_size);
}
- GNUNET_SERVER_notification_context_add (nc, opcls->client);
- GNUNET_SERVER_notification_context_unicast (nc, opcls->client, &op->header,
+ // FIXME: client might have been disconnected
+ GNUNET_SERVER_notification_context_add (nc, op->client);
+ GNUNET_SERVER_notification_context_unicast (nc, op->client, &res->header,
GNUNET_NO);
GNUNET_free (op);
return GNUNET_YES;
@@ -2284,15 +2408,17 @@
* or GNUNET_PSYCSTORE_state_get_prefix()
*/
static void
-store_recv_state_result (void *cls, int64_t result, const char *err_msg)
+store_recv_state_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
- struct OperationClosure *opcls = cls;
+ struct Operation *op = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p History replay #%" PRIu64 ": "
- "PSYCSTORE returned %" PRId64 " (%s)\n",
- opcls->chn, opcls->op_id, result, err_msg);
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ op->chn, op->op_id, result, err_msg_size, err_msg);
- client_send_result (opcls->client, opcls->op_id, result, err_msg);
+ // FIXME: client might have been disconnected
+ client_send_result (op->client, op->op_id, result, err_msg, err_msg_size);
}
@@ -2314,18 +2440,15 @@
const char *name = (const char *) &req[1];
if (0 == name_size || '\0' != name[name_size - 1])
{
+ GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
-
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
GNUNET_PSYCSTORE_state_get (store, &chn->pub_key, name,
&store_recv_state_var,
- &store_recv_state_result, opcls);
+ &store_recv_state_result, op);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -2348,20 +2471,16 @@
const char *name = (const char *) &req[1];
if (0 == name_size || '\0' != name[name_size - 1])
{
+ GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
- struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
- opcls->client = client;
- opcls->chn = chn;
- opcls->op_id = req->op_id;
-
+ struct Operation *op = op_add (chn, client, req->op_id, 0);
GNUNET_PSYCSTORE_state_get_prefix (store, &chn->pub_key, name,
&store_recv_state_var,
- &store_recv_state_result, opcls);
+ &store_recv_state_result, op);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
}
Modified: gnunet/src/psyc/psyc.h
===================================================================
--- gnunet/src/psyc/psyc.h 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/psyc/psyc.h 2015-05-07 12:15:58 UTC (rev 35717)
@@ -171,42 +171,6 @@
/**** service -> library ****/
-/**
- * Answer from service to client about last operation.
- */
-struct OperationResult
-{
- /**
- * Types:
- * - GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE
- * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STORY_RESULT
- * - GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_STATE_RESULT
- */
- struct GNUNET_MessageHeader header;
-
- uint32_t reserved GNUNET_PACKED;
-
- /**
- * Operation ID.
- */
- uint64_t op_id GNUNET_PACKED;
-
- /**
- * Status code for the operation.
- */
- uint64_t result_code GNUNET_PACKED;
-
- /* Followed by:
- * - on error: NUL-terminated error message
- * - on success: one of the following message types
- *
- * For a STATE_RESULT, one of:
- * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
- * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT
- * - GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END
- */
-};
-
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/psyc/psyc_api.c
===================================================================
--- gnunet/src/psyc/psyc_api.c 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/psyc/psyc_api.c 2015-05-07 12:15:58 UTC (rev 35717)
@@ -43,33 +43,6 @@
#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
-struct OperationListItem
-{
- struct OperationListItem *prev;
- struct OperationListItem *next;
-
- /**
- * Operation ID.
- */
- uint64_t op_id;
-
- /**
- * Continuation to invoke with the result of an operation.
- */
- GNUNET_PSYC_ResultCallback result_cb;
-
- /**
- * State variable result callback.
- */
- GNUNET_PSYC_StateVarCallback state_var_cb;
-
- /**
- * Closure for the callbacks.
- */
- void *cls;
-};
-
-
/**
* Handle to access PSYC channel operations for both the master and slaves.
*/
@@ -111,21 +84,6 @@
void *disconnect_cls;
/**
- * First operation in the linked list.
- */
- struct OperationListItem *op_head;
-
- /**
- * Last operation in the linked list.
- */
- struct OperationListItem *op_tail;
-
- /**
- * Last operation ID used.
- */
- uint64_t last_op_id;
-
- /**
* Are we polling for incoming messages right now?
*/
uint8_t in_receive;
@@ -204,83 +162,62 @@
};
-/**
- * Get a fresh operation ID to distinguish between PSYCstore requests.
- *
- * @param h Handle to the PSYCstore service.
- * @return next operation id to use
- */
-static uint64_t
-op_get_next_id (struct GNUNET_PSYC_Channel *chn)
+struct GNUNET_PSYC_HistoryRequest
{
- return ++chn->last_op_id;
-}
+ /**
+ * Channel.
+ */
+ struct GNUNET_PSYC_Channel *chn;
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
-/**
- * Find operation by ID.
- *
- * @return Operation, or NULL if none found.
- */
-static struct OperationListItem *
-op_find_by_id (struct GNUNET_PSYC_Channel *chn, uint64_t op_id)
-{
- struct OperationListItem *op = chn->op_head;
- while (NULL != op)
- {
- if (op->op_id == op_id)
- return op;
- op = op->next;
- }
- return NULL;
-}
+ /**
+ * Message handler.
+ */
+ struct GNUNET_PSYC_ReceiveHandle *recv;
+ /**
+ * Function to call when the operation finished.
+ */
+ GNUNET_ResultCallback result_cb;
-static uint64_t
-op_add (struct GNUNET_PSYC_Channel *chn, GNUNET_PSYC_ResultCallback result_cb,
- void *cls)
-{
- if (NULL == result_cb)
- return 0;
+ /**
+ * Closure for @a result_cb.
+ */
+ void *cls;
+};
- struct OperationListItem *op = GNUNET_malloc (sizeof (*op));
- op->op_id = op_get_next_id (chn);
- op->result_cb = result_cb;
- op->cls = cls;
- GNUNET_CONTAINER_DLL_insert_tail (chn->op_head, chn->op_tail, op);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%p Added operation #%" PRIu64 "\n", chn, op->op_id);
- return op->op_id;
-}
-
-
-static int
-op_result (struct GNUNET_PSYC_Channel *chn, uint64_t op_id,
- int64_t result_code, const char *err_msg)
+struct GNUNET_PSYC_StateRequest
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%p Received result for operation #%" PRIu64 ": %" PRId64 " (%s)\n",
- chn, op_id, result_code, err_msg);
- if (0 == op_id)
- return GNUNET_NO;
+ /**
+ * Channel.
+ */
+ struct GNUNET_PSYC_Channel *chn;
- struct OperationListItem *op = op_find_by_id (chn, op_id);
- if (NULL == op)
- {
- LOG (GNUNET_ERROR_TYPE_WARNING,
- "Could not find operation #%" PRIu64 "\n", op_id);
- return GNUNET_NO;
- }
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
- GNUNET_CONTAINER_DLL_remove (chn->op_head, chn->op_tail, op);
+ /**
+ * State variable result callback.
+ */
+ GNUNET_PSYC_StateVarCallback var_cb;
- if (NULL != op->result_cb)
- op->result_cb (op->cls, result_code, err_msg);
+ /**
+ * Function to call when the operation finished.
+ */
+ GNUNET_ResultCallback result_cb;
- GNUNET_free (op);
- return GNUNET_YES;
-}
+ /**
+ * Closure for @a result_cb.
+ */
+ void *cls;
+};
static void
@@ -313,22 +250,97 @@
struct GNUNET_PSYC_Channel *
chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+
uint16_t size = ntohs (msg->size);
- const struct OperationResult *res = (const struct OperationResult *) msg;
- const char *err_msg = NULL;
+ if (size < sizeof (*res))
+ { /* Error, message too small. */
+ GNUNET_break (0);
+ return;
+ }
- if (sizeof (struct OperationResult) < size)
- {
- err_msg = (const char *) &res[1];
- if ('\0' != err_msg[size - sizeof (struct OperationResult) - 1])
- {
- GNUNET_break (0);
- err_msg = NULL;
- }
+ uint16_t data_size = size - sizeof (*res);
+ const char *data = (0 < data_size) ? (void *) &res[1] : NULL;
+ GNUNET_CLIENT_MANAGER_op_result (chn->client, GNUNET_ntohll (res->op_id),
+ GNUNET_ntohll_signed (res->result_code),
+ data, data_size);
+}
+
+
+static void
+op_recv_history_result (void *cls, int64_t result,
+ const void *data, uint16_t data_size)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received history replay result: %" PRId64 ".\n", result);
+
+ struct GNUNET_PSYC_HistoryRequest *hist = cls;
+
+ if (NULL != hist->result_cb)
+ hist->result_cb (hist->cls, result, data, data_size);
+
+ GNUNET_PSYC_receive_destroy (hist->recv);
+ GNUNET_free (hist);
+}
+
+
+static void
+op_recv_state_result (void *cls, int64_t result,
+ const void *data, uint16_t data_size)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received state request result: %" PRId64 ".\n", result);
+
+ struct GNUNET_PSYC_StateRequest *sr = cls;
+
+ if (NULL != sr->result_cb)
+ sr->result_cb (sr->cls, result, data, data_size);
+
+ GNUNET_free (sr);
+}
+
+
+static void
+channel_recv_history_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PSYC_Channel *
+ chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
+
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+ struct GNUNET_PSYC_MessageHeader *
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received historic fragment for message #%" PRIu64 ".\n",
+ chn, GNUNET_ntohll (pmsg->message_id));
+
+ GNUNET_ResultCallback result_cb = NULL;
+ struct GNUNET_PSYC_HistoryRequest *hist = NULL;
+
+ if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
+ GNUNET_ntohll (res->op_id),
+ &result_cb, (void *) &hist))
+ { /* Operation not found. */
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "%p Replay operation not found for historic fragment of message #%"
+ PRIu64 ".\n",
+ chn, GNUNET_ntohll (pmsg->message_id));
+ return;
}
- op_result (chn, GNUNET_ntohll (res->op_id),
- GNUNET_ntohll (res->result_code) + INT64_MIN, err_msg);
+ uint16_t size = ntohs (msg->size);
+ if (size < sizeof (*res) + sizeof (*pmsg))
+ { /* Error, message too small. */
+ GNUNET_break (0);
+ return;
+ }
+
+ GNUNET_PSYC_receive_message (hist->recv,
+ (const struct GNUNET_PSYC_MessageHeader *)
pmsg);
}
@@ -340,12 +352,21 @@
struct GNUNET_PSYC_Channel *
chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn));
- const struct OperationResult *res = (const struct OperationResult *) msg;
- struct OperationListItem *op = op_find_by_id (chn, GNUNET_ntohll
(res->op_id));
- if (NULL == op || NULL == op->state_var_cb)
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+
+ GNUNET_ResultCallback result_cb = NULL;
+ struct GNUNET_PSYC_StateRequest *sr = NULL;
+
+ if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (chn->client,
+ GNUNET_ntohll (res->op_id),
+ &result_cb, (void *) &sr))
+ { /* Operation not found. */
return;
+ }
- const struct GNUNET_MessageHeader *modc = (struct GNUNET_MessageHeader *)
&op[1];
+ const struct GNUNET_MessageHeader *
+ modc = (struct GNUNET_MessageHeader *) &res[1];
uint16_t modc_size = ntohs (modc->size);
if (ntohs (msg->size) - sizeof (*msg) != modc_size)
{
@@ -366,13 +387,13 @@
GNUNET_break (0);
return;
}
- op->state_var_cb (op->cls, name, name + name_size, ntohs
(mod->value_size));
+ sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
- op->state_var_cb (op->cls, NULL, (const char *) &modc[1],
- modc_size - sizeof (*modc));
+ sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
+ modc_size - sizeof (*modc));
break;
}
}
@@ -412,11 +433,12 @@
struct GNUNET_PSYC_CountersResultMessage *
cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
- int32_t result = ntohl (cres->result_code) + INT32_MIN;
+ int32_t result = GNUNET_ntohl_signed (cres->result_code);
if (GNUNET_OK != result && GNUNET_NO != result)
{
- LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master.\n");
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Could not start master: %ld\n", result);
GNUNET_break (0);
+ /* FIXME: disconnect */
}
if (NULL != mst->start_cb)
mst->start_cb (mst->cb_cls, result, GNUNET_ntohll (cres->max_message_id));
@@ -464,11 +486,12 @@
sizeof (struct
GNUNET_PSYC_Channel));
struct GNUNET_PSYC_CountersResultMessage *
cres = (struct GNUNET_PSYC_CountersResultMessage *) msg;
- int32_t result = ntohl (cres->result_code) + INT32_MIN;
+ int32_t result = GNUNET_ntohl_signed (cres->result_code);
if (GNUNET_YES != result && GNUNET_NO != result)
{
LOG (GNUNET_ERROR_TYPE_ERROR, "Could not join slave.\n");
GNUNET_break (0);
+ /* FIXME: disconnect */
}
if (NULL != slv->connect_cb)
slv->connect_cb (slv->cb_cls, result, GNUNET_ntohll
(cres->max_message_id));
@@ -513,13 +536,17 @@
GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST,
sizeof (struct GNUNET_PSYC_JoinRequestMessage), GNUNET_YES },
+ { &channel_recv_history_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
{ &channel_recv_state_result, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
- sizeof (struct OperationResult), GNUNET_YES },
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
{ &channel_recv_result, NULL,
GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
- sizeof (struct OperationResult), GNUNET_YES },
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
{ &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
@@ -545,13 +572,17 @@
GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
+ { &channel_recv_history_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
{ &channel_recv_state_result, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
- sizeof (struct OperationResult), GNUNET_YES },
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
{ &channel_recv_result, NULL,
GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
- sizeof (struct OperationResult), GNUNET_YES },
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
{ &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO },
@@ -1011,17 +1042,28 @@
* correctly; not doing so correctly will result in either denying other slaves
* access or offering access to channel data to non-members.
*
- * @param channel Channel handle.
- * @param slave_key Identity of channel slave to add.
- * @param announced_at ID of the message that announced the membership change.
- * @param effective_since Addition of slave is in effect since this message ID.
+ * @param chn
+ * Channel handle.
+ * @param slave_key
+ * Identity of channel slave to add.
+ * @param announced_at
+ * ID of the message that announced the membership change.
+ * @param effective_since
+ * Addition of slave is in effect since this message ID.
+ * @param result_cb
+ * Function to call with the result of the operation.
+ * The @e result_code argument is #GNUNET_OK on success, or
+ * #GNUNET_SYSERR on error. In case of an error, the @e data argument
+ * can contain an optional error message.
+ * @param cls
+ * Closure for @a result_cb.
*/
void
GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t announced_at,
uint64_t effective_since,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1031,7 +1073,8 @@
req->announced_at = GNUNET_htonll (announced_at);
req->effective_since = GNUNET_htonll (effective_since);
req->did_join = GNUNET_YES;
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
+ result_cb, cls));
GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
@@ -1054,15 +1097,25 @@
* denying members access or offering access to channel data to
* non-members.
*
- * @param channel Channel handle.
- * @param slave_key Identity of channel slave to remove.
- * @param announced_at ID of the message that announced the membership change.
+ * @param chn
+ * Channel handle.
+ * @param slave_key
+ * Identity of channel slave to remove.
+ * @param announced_at
+ * ID of the message that announced the membership change.
+ * @param result_cb
+ * Function to call with the result of the operation.
+ * The @e result_code argument is #GNUNET_OK on success, or
+ * #GNUNET_SYSERR on error. In case of an error, the @e data argument
+ * can contain an optional error message.
+ * @param cls
+ * Closure for @a result_cb.
*/
void
GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t announced_at,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
struct ChannelMembershipStoreRequest *req = GNUNET_malloc (sizeof (*req));
@@ -1071,26 +1124,73 @@
req->slave_key = *slave_key;
req->announced_at = GNUNET_htonll (announced_at);
req->did_join = GNUNET_NO;
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ req->op_id = GNUNET_htonll (GNUNET_CLIENT_MANAGER_op_add (chn->client,
+ result_cb, cls));
GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
+static struct GNUNET_PSYC_HistoryRequest *
+channel_history_replay (struct GNUNET_PSYC_Channel *chn,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ uint64_t message_limit,
+ const char *method_prefix,
+ uint32_t flags,
+ GNUNET_PSYC_MessageCallback message_cb,
+ GNUNET_PSYC_MessagePartCallback message_part_cb,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
+{
+ struct GNUNET_PSYC_HistoryRequestMessage *req;
+ struct GNUNET_PSYC_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
+ hist->chn = chn;
+ hist->recv = GNUNET_PSYC_receive_create (message_cb, message_part_cb, cls);
+ hist->result_cb = result_cb;
+ hist->cls = cls;
+ hist->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
+ &op_recv_history_result, hist);
+
+ GNUNET_assert (NULL != method_prefix);
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ GNUNET_assert ('\0' == method_prefix[method_size - 1]);
+ req = GNUNET_malloc (sizeof (*req) + method_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
+ req->header.size = htons (sizeof (*req) + method_size);
+ req->start_message_id = GNUNET_htonll (start_message_id);
+ req->end_message_id = GNUNET_htonll (end_message_id);
+ req->message_limit = GNUNET_htonll (message_limit);
+ req->flags = htonl (flags);
+ req->op_id = GNUNET_htonll (hist->op_id);
+ memcpy (&req[1], method_prefix, method_size);
+
+ GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
+ return hist;
+}
+
+
/**
* Request to replay a part of the message history of the channel.
*
- * Historic messages (but NOT the state at the time) will be replayed (given to
- * the normal method handlers) if available and if access is permitted.
+ * Historic messages (but NOT the state at the time) will be replayed and given
+ * to the normal method handlers with a #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
*
+ * Messages are retrieved from the local PSYCstore if available,
+ * otherwise requested from the network.
+ *
* @param channel
* Which channel should be replayed?
* @param start_message_id
* Earliest interesting point in history.
* @param end_message_id
* Last (inclusive) interesting point in history.
- * FIXME: @param method_prefix
+ * @param method_prefix
* Retrieve only messages with a matching method prefix.
+ * @param flags
+ * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
* @param result_cb
* Function to call when the requested history has been fully replayed.
* @param cls
@@ -1098,22 +1198,20 @@
*
* @return Handle to cancel history replay operation.
*/
-void
+struct GNUNET_PSYC_HistoryRequest *
GNUNET_PSYC_channel_history_replay (struct GNUNET_PSYC_Channel *chn,
uint64_t start_message_id,
uint64_t end_message_id,
- /* FIXME: const char *method_prefix, */
- GNUNET_PSYC_ResultCallback result_cb,
+ const char *method_prefix,
+ uint32_t flags,
+ GNUNET_PSYC_MessageCallback message_cb,
+ GNUNET_PSYC_MessagePartCallback
message_part_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
- struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
- req->header.size = htons (sizeof (*req));
- req->start_message_id = GNUNET_htonll (start_message_id);
- req->end_message_id = GNUNET_htonll (end_message_id);
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
-
- GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
+ return channel_history_replay (chn, start_message_id, end_message_id, 0,
+ method_prefix, flags,
+ message_cb, message_part_cb, result_cb, cls);
}
@@ -1127,8 +1225,11 @@
* Which channel should be replayed?
* @param message_limit
* Maximum number of messages to replay.
- * FIXME: @param method_prefix
+ * @param method_prefix
* Retrieve only messages with a matching method prefix.
+ * Use NULL or "" to retrieve all.
+ * @param flags
+ * OR'ed enum GNUNET_PSYC_HistoryReplayFlags
* @param result_cb
* Function to call when the requested history has been fully replayed.
* @param cls
@@ -1136,20 +1237,78 @@
*
* @return Handle to cancel history replay operation.
*/
-void
+struct GNUNET_PSYC_HistoryRequest *
GNUNET_PSYC_channel_history_replay_latest (struct GNUNET_PSYC_Channel *chn,
uint64_t message_limit,
- /* FIXME: const char
*method_prefix, */
- GNUNET_PSYC_ResultCallback
result_cb,
+ const char *method_prefix,
+ uint32_t flags,
+ GNUNET_PSYC_MessageCallback
message_cb,
+ GNUNET_PSYC_MessagePartCallback
message_part_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
- struct HistoryRequest *req = GNUNET_malloc (sizeof (*req));
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
- req->header.size = htons (sizeof (*req));
- req->message_limit = GNUNET_htonll (message_limit);
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
+ return channel_history_replay (chn, 0, 0, message_limit, method_prefix,
flags,
+ message_cb, message_part_cb, result_cb, cls);
+}
+
+void
+GNUNET_PSYC_channel_history_replay_cancel (struct GNUNET_PSYC_Channel *channel,
+ struct GNUNET_PSYC_HistoryRequest
*hist)
+{
+ GNUNET_PSYC_receive_destroy (hist->recv);
+ GNUNET_CLIENT_MANAGER_op_cancel (hist->chn->client, hist->op_id);
+ GNUNET_free (hist);
+}
+
+
+/**
+ * Retrieve the best matching channel state variable.
+ *
+ * If the requested variable name is not present in the state, the nearest
+ * less-specific name is matched; for example, requesting "_a_b" will match
"_a"
+ * if "_a_b" does not exist.
+ *
+ * @param channel
+ * Channel handle.
+ * @param full_name
+ * Full name of the requested variable.
+ * The actual variable returned might have a shorter name.
+ * @param var_cb
+ * Function called once when a matching state variable is found.
+ * Not called if there's no matching state variable.
+ * @param result_cb
+ * Function called after the operation finished.
+ * (i.e. all state variables have been returned via @a state_cb)
+ * @param cls
+ * Closure for the callbacks.
+ */
+static struct GNUNET_PSYC_StateRequest *
+channel_state_get (struct GNUNET_PSYC_Channel *chn,
+ uint16_t type, const char *name,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb, void *cls)
+{
+ struct StateRequest *req;
+ struct GNUNET_PSYC_StateRequest *sr = GNUNET_malloc (sizeof (*sr));
+ sr->chn = chn;
+ sr->var_cb = var_cb;
+ sr->result_cb = result_cb;
+ sr->cls = cls;
+ sr->op_id = GNUNET_CLIENT_MANAGER_op_add (chn->client,
+ &op_recv_state_result, sr);
+
+ GNUNET_assert (NULL != name);
+ size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ req = GNUNET_malloc (sizeof (*req) + name_size);
+ req->header.type = htons (type);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->op_id = GNUNET_htonll (sr->op_id);
+ memcpy (&req[1], name, name_size);
+
GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
+ return sr;
}
@@ -1174,21 +1333,16 @@
* @param cls
* Closure for the callbacks.
*/
-void
+struct GNUNET_PSYC_StateRequest *
GNUNET_PSYC_channel_state_get (struct GNUNET_PSYC_Channel *chn,
const char *full_name,
GNUNET_PSYC_StateVarCallback var_cb,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
- size_t name_size = strlen (full_name) + 1;
- struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
- req->header.size = htons (sizeof (*req) + name_size);
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
- memcpy (&req[1], full_name, name_size);
+ return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
+ full_name, var_cb, result_cb, cls);
- GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
}
@@ -1215,21 +1369,29 @@
* @param cls
* Closure for the callbacks.
*/
-void
+struct GNUNET_PSYC_StateRequest *
GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *chn,
const char *name_prefix,
GNUNET_PSYC_StateVarCallback var_cb,
- GNUNET_PSYC_ResultCallback result_cb,
+ GNUNET_ResultCallback result_cb,
void *cls)
{
- size_t name_size = strlen (name_prefix) + 1;
- struct StateRequest *req = GNUNET_malloc (sizeof (*req) + name_size);
- req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_STATE_GET);
- req->header.size = htons (sizeof (*req) + name_size);
- req->op_id = GNUNET_htonll (op_add (chn, result_cb, cls));
- memcpy (&req[1], name_prefix, name_size);
+ return channel_state_get (chn, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
+ name_prefix, var_cb, result_cb, cls);
+}
- GNUNET_CLIENT_MANAGER_transmit (chn->client, &req->header);
+
+/**
+ * Cancel a state request operation.
+ *
+ * @param sr
+ * Handle for the operation to cancel.
+ */
+void
+GNUNET_PSYC_channel_state_get_cancel (struct GNUNET_PSYC_StateRequest *sr)
+{
+ GNUNET_CLIENT_MANAGER_op_cancel (sr->chn->client, sr->op_id);
+ GNUNET_free (sr);
}
/* end of psyc_api.c */
Modified: gnunet/src/psyc/psyc_util_lib.c
===================================================================
--- gnunet/src/psyc/psyc_util_lib.c 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/psyc/psyc_util_lib.c 2015-05-07 12:15:58 UTC (rev 35717)
@@ -326,9 +326,13 @@
* The message part is added to the current message buffer.
* When this buffer is full, it is added to the transmission queue.
*
- * @param tmit Transmission handle.
- * @param msg Message part, or NULL.
- * @param end End of message? #GNUNET_YES or #GNUNET_NO.
+ * @param tmit
+ * Transmission handle.
+ * @param msg
+ * Message part, or NULL.
+ * @param end
+ * End of message?
+ * #GNUNET_YES or #GNUNET_NO.
*/
static void
transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
@@ -632,16 +636,24 @@
/**
* Transmit a message.
*
- * @param tmit Transmission handle.
- * @param method_name Which method should be invoked.
- * @param env Environment for the message.
- * Should stay available until the first call to notify_data.
- * Can be NULL if there are no modifiers or @a notify_mod is provided
instead.
- * @param notify_mod Function to call to obtain modifiers.
- * Can be NULL if there are no modifiers or @a env is provided instead.
- * @param notify_data Function to call to obtain fragments of the data.
- * @param notify_cls Closure for @a notify_mod and @a notify_data.
- * @param flags Flags for the message being transmitted.
+ * @param tmit
+ * Transmission handle.
+ * @param method_name
+ * Which method should be invoked.
+ * @param env
+ * Environment for the message.
+ * Should stay available until the first call to notify_data.
+ * Can be NULL if there are no modifiers or @a notify_mod is
+ * provided instead.
+ * @param notify_mod
+ * Function to call to obtain modifiers.
+ * Can be NULL if there are no modifiers or @a env is provided instead.
+ * @param notify_data
+ * Function to call to obtain fragments of the data.
+ * @param notify_cls
+ * Closure for @a notify_mod and @a notify_data.
+ * @param flags
+ * Flags for the message being transmitted.
*
* @return #GNUNET_OK if the transmission was started.
* #GNUNET_SYSERR if another transmission is already going on.
Modified: gnunet/src/psyc/test_psyc.c
===================================================================
--- gnunet/src/psyc/test_psyc.c 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/psyc/test_psyc.c 2015-05-07 12:15:58 UTC (rev 35717)
@@ -82,7 +82,7 @@
struct TransmitClosure *tmit;
-uint8_t join_req_count;
+uint8_t join_req_count, end_count;
enum
{
@@ -105,7 +105,10 @@
void
master_transmit ();
+void
+master_history_replay_latest ();
+
void master_stopped (void *cls)
{
if (NULL != tmit)
@@ -198,6 +201,134 @@
void
+master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Master got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
+ // FIXME
+}
+
+
+void
+master_message_part_cb (void *cls, uint64_t message_id,
+ uint64_t data_offset, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
+{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message %" PRIu64 "\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Master got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
+
+ switch (test)
+ {
+ case TEST_SLAVE_TRANSMIT:
+ if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Unexpected request flags: %x" PRIu32 "\n", flags);
+ GNUNET_assert (0);
+ return;
+ }
+ // FIXME: check rest of message
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
+ master_transmit ();
+ break;
+
+ case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_MASTER_HISTORY_REPLAY:
+ case TEST_MASTER_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32
"\n",
+ test, flags);
+ GNUNET_assert (0);
+ return;
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+void
+slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Slave got PSYC message fragment of size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, ntohs (msg->header.size), message_id, flags);
+ // FIXME
+}
+
+
+void
+slave_message_part_cb (void *cls, uint64_t message_id,
+ uint64_t data_offset, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
+{
+ if (NULL == msg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error while receiving message " PRIu64 "\n", message_id);
+ return;
+ }
+
+ uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%d: Slave got message part of type %u and size %u "
+ "belonging to message ID %" PRIu64 " with flags %x\n",
+ test, type, size, message_id, flags);
+
+ switch (test)
+ {
+ case TEST_MASTER_TRANSMIT:
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type && 2 == ++end_count)
+ master_history_replay_latest ();
+ break;
+
+ case TEST_SLAVE_HISTORY_REPLAY:
+ case TEST_SLAVE_HISTORY_REPLAY_LATEST:
+ if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Test #%d: Unexpected flags for historic message: %x" PRIu32
"\n",
+ flags);
+ GNUNET_assert (0);
+ return;
+ }
+ break;
+
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+void
state_get_var (void *cls, const char *name, const void *value, size_t
value_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -208,10 +339,12 @@
/*** Slave state_get_prefix() ***/
void
-slave_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+slave_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
+ "slave_state_get_prefix:\t%" PRId64 " (%.s)\n",
+ result, err_msg_size, err_msg);
// FIXME: GNUNET_assert (2 == result);
end ();
}
@@ -230,7 +363,8 @@
void
-master_state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+master_state_get_prefix_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"master_state_get_prefix:\t%" PRId64 " (%s)\n", result, err_msg);
@@ -252,10 +386,12 @@
void
-slave_state_get_result (void *cls, int64_t result, const char *err_msg)
+slave_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
+ "slave_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
// FIXME: GNUNET_assert (2 == result);
master_state_get_prefix ();
}
@@ -274,10 +410,12 @@
void
-master_state_get_result (void *cls, int64_t result, const char *err_msg)
+master_state_get_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "master_state_get:\t%" PRId64 " (%s)\n", result, err_msg);
+ "master_state_get:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
// FIXME: GNUNET_assert (1 == result);
slave_state_get ();
}
@@ -295,10 +433,12 @@
/*** Slave history_replay() ***/
void
-slave_history_replay_result (void *cls, int64_t result, const char *err_msg)
+slave_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
+ "slave_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
GNUNET_assert (9 == result);
master_state_get ();
@@ -309,9 +449,11 @@
slave_history_replay ()
{
test = TEST_SLAVE_HISTORY_REPLAY;
- GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1,
- &slave_history_replay_result,
- NULL);
+ GNUNET_PSYC_channel_history_replay (slv_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
+ &slave_history_replay_result, NULL);
}
@@ -319,10 +461,12 @@
void
-master_history_replay_result (void *cls, int64_t result, const char *err_msg)
+master_history_replay_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "master_history_replay:\t%" PRId64 " (%s)\n", result, err_msg);
+ "master_history_replay:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
GNUNET_assert (9 == result);
slave_history_replay ();
@@ -333,9 +477,11 @@
master_history_replay ()
{
test = TEST_MASTER_HISTORY_REPLAY;
- GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1,
- &master_history_replay_result,
- NULL);
+ GNUNET_PSYC_channel_history_replay (mst_chn, 1, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
+ &master_history_replay_result, NULL);
}
@@ -343,10 +489,12 @@
void
-slave_history_replay_latest_result (void *cls, int64_t result, const char
*err_msg)
+slave_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_history_replay_latest:\t%" PRId64 " (%s)\n", result,
err_msg);
+ "slave_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
GNUNET_assert (9 == result);
master_history_replay ();
@@ -357,7 +505,10 @@
slave_history_replay_latest ()
{
test = TEST_SLAVE_HISTORY_REPLAY_LATEST;
- GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1,
+ GNUNET_PSYC_channel_history_replay_latest (slv_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &slave_message_cb,
+ &slave_message_part_cb,
&slave_history_replay_latest_result,
NULL);
}
@@ -367,10 +518,12 @@
void
-master_history_replay_latest_result (void *cls, int64_t result, const char
*err_msg)
+master_history_replay_latest_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t
err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "master_history_replay_latest:\t%" PRId64 " (%s)\n", result,
err_msg);
+ "master_history_replay_latest:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
GNUNET_assert (9 == result);
slave_history_replay_latest ();
@@ -381,139 +534,16 @@
master_history_replay_latest ()
{
test = TEST_MASTER_HISTORY_REPLAY_LATEST;
- GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1,
+ GNUNET_PSYC_channel_history_replay_latest (mst_chn, 1, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ &master_message_cb,
+ &master_message_part_cb,
&master_history_replay_latest_result,
NULL);
}
void
-master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
- const struct GNUNET_PSYC_MessageHeader *msg)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Test #%d: Master got PSYC message fragment of size %u "
- "belonging to message ID %" PRIu64 " with flags %x\n",
- test, ntohs (msg->header.size), message_id, flags);
- // FIXME
-}
-
-
-void
-master_message_part_cb (void *cls, uint64_t message_id,
- uint64_t data_offset, uint32_t flags,
- const struct GNUNET_MessageHeader *msg)
-{
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message %" PRIu64 "\n", message_id);
- return;
- }
-
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
-
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Test #%d: Master got message part of type %u and size %u "
- "belonging to message ID %" PRIu64 " with flags %x\n",
- test, type, size, message_id, flags);
-
- switch (test)
- {
- case TEST_SLAVE_TRANSMIT:
- if (GNUNET_PSYC_MESSAGE_REQUEST != flags)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Unexpected request flags: %x" PRIu32 "\n", flags);
- GNUNET_assert (0);
- return;
- }
- // FIXME: check rest of message
-
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
- master_transmit ();
- break;
-
- case TEST_MASTER_TRANSMIT:
- break;
-
- case TEST_MASTER_HISTORY_REPLAY:
- case TEST_MASTER_HISTORY_REPLAY_LATEST:
- if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Test #%d: Unexpected flags for historic message: %x" PRIu32
"\n",
- flags);
- GNUNET_assert (0);
- return;
- }
- break;
-
- default:
- GNUNET_assert (0);
- }
-}
-
-
-void
-slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
- const struct GNUNET_PSYC_MessageHeader *msg)
-{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Test #%d: Slave got PSYC message fragment of size %u "
- "belonging to message ID %" PRIu64 " with flags %x\n",
- test, ntohs (msg->header.size), message_id, flags);
- // FIXME
-}
-
-
-void
-slave_message_part_cb (void *cls, uint64_t message_id,
- uint64_t data_offset, uint32_t flags,
- const struct GNUNET_MessageHeader *msg)
-{
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error while receiving message " PRIu64 "\n", message_id);
- return;
- }
-
- uint16_t type = ntohs (msg->type);
- uint16_t size = ntohs (msg->size);
-
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Test #%d: Slave got message part of type %u and size %u "
- "belonging to message ID %" PRIu64 " with flags %x\n",
- test, type, size, message_id, flags);
-
- switch (test)
- {
- case TEST_MASTER_TRANSMIT:
- if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END == type)
- master_history_replay_latest ();
- break;
-
- case TEST_SLAVE_HISTORY_REPLAY:
- case TEST_SLAVE_HISTORY_REPLAY_LATEST:
- if (GNUNET_PSYC_MESSAGE_HISTORIC != flags)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Test #%d: Unexpected flags for historic message: %x" PRIu32
"\n",
- flags);
- GNUNET_assert (0);
- return;
- }
- break;
-
- default:
- GNUNET_assert (0);
- }
-}
-
-
-void
transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
@@ -665,27 +695,31 @@
tmit->data[0] = "slave test";
tmit->data_count = 1;
tmit->slv_tmit
- = GNUNET_PSYC_slave_transmit (slv, "_request_test", tmit_notify_mod,
- tmit_notify_data, tmit,
+ = GNUNET_PSYC_slave_transmit (slv, "_request_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
}
void
-slave_remove_cb (void *cls, int64_t result, const char *err_msg)
+slave_remove_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_remove:\t%" PRId64 " (%s)\n", result, err_msg);
+ "slave_remove:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
slave_transmit ();
}
void
-slave_add_cb (void *cls, int64_t result, const char *err_msg)
+slave_add_cb (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "slave_add:\t%" PRId64 " (%s)\n", result, err_msg);
+ "slave_add:\t%" PRId64 " (%.*s)\n",
+ result, err_msg_size, err_msg);
struct GNUNET_PSYC_Channel *chn = cls;
GNUNET_PSYC_channel_slave_remove (chn, &slave_pub_key, 2,
@@ -775,6 +809,8 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master sending message to all.\n");
test = TEST_MASTER_TRANSMIT;
+ end_count = 0;
+
uint32_t i, j;
char *name_max = "_test_max";
@@ -816,8 +852,8 @@
tmit->data_delay[1] = 3;
tmit->data_count = 4;
tmit->mst_tmit
- = GNUNET_PSYC_master_transmit (mst, "_notice_test", tmit_notify_mod,
- tmit_notify_data, tmit,
+ = GNUNET_PSYC_master_transmit (mst, "_notice_test", &tmit_notify_mod,
+ &tmit_notify_data, tmit,
GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
}
Modified: gnunet/src/psycstore/gnunet-service-psycstore.c
===================================================================
--- gnunet/src/psycstore/gnunet-service-psycstore.c 2015-05-07 12:15:32 UTC
(rev 35716)
+++ gnunet/src/psycstore/gnunet-service-psycstore.c 2015-05-07 12:15:58 UTC
(rev 35717)
@@ -109,7 +109,7 @@
if (NULL != err_msg)
err_size = strnlen (err_msg,
- GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res)) + 1;
+ GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (*res) - 1) +
1;
res = GNUNET_malloc (sizeof (struct OperationResult) + err_size);
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
res->header.size = htons (sizeof (struct OperationResult) + err_size);
@@ -222,7 +222,7 @@
struct StateResult *res;
size_t name_size = strlen (name) + 1;
- /* FIXME: split up value into 64k chunks */
+ /** @todo FIXME: split up value into 64k chunks */
res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
@@ -333,7 +333,7 @@
first_fragment_id, last_fragment_id,
&ret_frags, &send_fragment, &sc);
else
- ret = db->fragment_get_latest (db->cls, &req->channel_key, limit,
+ ret = db->fragment_get_latest (db->cls, &req->channel_key, limit,
&ret_frags, &send_fragment, &sc);
switch (ret)
@@ -373,6 +373,20 @@
{
const struct MessageGetRequest *
req = (const struct MessageGetRequest *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Message get: invalid method prefix. size: %u < %u?\n",
+ size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
struct SendClosure
sc = { .op_id = req->op_id, .client = client,
.channel_key = req->channel_key, .slave_key = req->slave_key,
@@ -384,6 +398,7 @@
uint64_t last_message_id = GNUNET_ntohll (req->last_message_id);
uint64_t limit = GNUNET_ntohll (req->message_limit);
+ /** @todo method_prefix */
if (0 == limit)
ret = db->message_get (db->cls, &req->channel_key,
first_message_id, last_message_id,
@@ -478,7 +493,7 @@
}
-/* FIXME: stop processing further state modify messages after an error */
+/** @todo FIXME: stop processing further state modify messages after an error
*/
static void
handle_state_modify (void *cls,
struct GNUNET_SERVER_Client *client,
@@ -551,7 +566,7 @@
}
-/* FIXME: stop processing further state sync messages after an error */
+/** @todo FIXME: stop processing further state sync messages after an error */
static void
handle_state_sync (void *cls,
struct GNUNET_SERVER_Client *client,
@@ -761,8 +776,7 @@
sizeof (struct FragmentGetRequest) },
{ &handle_message_get, NULL,
- GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET,
- sizeof (struct MessageGetRequest) },
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET, 0 },
{ &handle_message_get_fragment, NULL,
GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT,
Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c 2015-05-07 12:15:32 UTC
(rev 35716)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c 2015-05-07 12:15:58 UTC
(rev 35717)
@@ -376,6 +376,7 @@
"CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id "
"ON membership (channel_id, slave_id);");
+ /** @todo messages table: add method_name column */
sql_exec (plugin->dbh,
"CREATE TABLE IF NOT EXISTS messages (\n"
" channel_id INTEGER NOT NULL REFERENCES channels(id),\n"
@@ -468,6 +469,7 @@
" AND ? <= fragment_id AND fragment_id <= ?;",
&plugin->select_fragments);
+ /** @todo select_messages: add method_prefix filter */
sql_prepare (plugin->dbh,
"SELECT hop_counter, signature, purpose, fragment_id,\n"
" fragment_offset, message_id, group_generation,\n"
@@ -489,6 +491,7 @@
"ORDER BY fragment_id;",
&plugin->select_latest_fragments);
+ /** @todo select_latest_messages: add method_prefix filter */
sql_prepare (plugin->dbh,
"SELECT hop_counter, signature, purpose, fragment_id,\n"
" fragment_offset, message_id, group_generation,\n"
@@ -499,6 +502,7 @@
" (SELECT message_id\n"
" FROM messages\n"
" WHERE channel_id = (SELECT id FROM channels WHERE
pub_key = ?)\n"
+ " GROUP BY message_id\n"
" ORDER BY message_id\n"
" DESC LIMIT ?)\n"
"ORDER BY fragment_id;",
Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/psycstore/psycstore_api.c 2015-05-07 12:15:58 UTC (rev
35717)
@@ -283,7 +283,7 @@
return;
}
if (size == sizeof (struct OperationResult))
- str = NULL;
+ str = "";
op = find_op_by_id (h, GNUNET_ntohll (opres->op_id));
if (NULL == op)
@@ -321,7 +321,7 @@
}
}
if (NULL != op->res_cb)
- op->res_cb (op->cls, result_code, str);
+ op->res_cb (op->cls, result_code, str, size - sizeof (*opres));
GNUNET_free (op);
}
break;
@@ -965,18 +965,19 @@
* @param channel_key
* The channel we are interested in.
* @param slave_key
- * The slave requesting the message. If not NULL, a membership test is
- * performed first and the message is only returned if the slave has
- * access to it.
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
* @param first_message_id
* First message ID to retrieve.
- * Use 0 to get the latest message.
* @param last_message_id
* Last consecutive message ID to retrieve.
- * Use 0 to get the latest message.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @todo Implement method_prefix query.
* @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -989,11 +990,18 @@
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t first_message_id,
uint64_t last_message_id,
+ const char *method_prefix,
GNUNET_PSYCSTORE_FragmentCallback fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls)
{
struct MessageGetRequest *req;
+ if (NULL == method_prefix)
+ method_prefix = "";
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+
struct GNUNET_PSYCSTORE_OperationHandle *
op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
op->h = h;
@@ -1004,7 +1012,7 @@
req = (struct MessageGetRequest *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) req;
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
- req->header.size = htons (sizeof (*req));
+ req->header.size = htons (sizeof (*req) + method_size);
req->channel_key = *channel_key;
req->first_message_id = GNUNET_htonll (first_message_id);
req->last_message_id = GNUNET_htonll (last_message_id);
@@ -1013,6 +1021,8 @@
req->slave_key = *slave_key;
req->do_membership_test = GNUNET_YES;
}
+ memcpy (&req[1], method_prefix, method_size);
+ ((char *) &req[1])[method_size - 1] = '\0';
op->op_id = get_next_op_id (h);
req->op_id = GNUNET_htonll (op->op_id);
@@ -1032,14 +1042,17 @@
* @param channel_key
* The channel we are interested in.
* @param slave_key
- * The slave requesting the message. If not NULL, a membership test is
- * performed first and the message is only returned if the slave has
- * access to it.
+ * The slave requesting the message.
+ * If not NULL, a membership test is performed first
+ * and the message is only returned if the slave has access to it.
* @param message_limit
* Maximum number of messages to retrieve.
+ * @param method_prefix
+ * Retrieve only messages with a matching method prefix.
+ * @todo Implement method_prefix query.
* @param fragment_cb
* Callback to call with the retrieved fragments.
- * @param rcb
+ * @param result_cb
* Callback to call with the result of the operation.
* @param cls
* Closure for the callbacks.
@@ -1051,13 +1064,22 @@
const struct GNUNET_CRYPTO_EddsaPublicKey
*channel_key,
const struct GNUNET_CRYPTO_EcdsaPublicKey
*slave_key,
uint64_t message_limit,
+ const char *method_prefix,
GNUNET_PSYCSTORE_FragmentCallback
fragment_cb,
GNUNET_PSYCSTORE_ResultCallback rcb,
void *cls)
{
struct MessageGetRequest *req;
+
+ if (NULL == method_prefix)
+ method_prefix = "";
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ GNUNET_assert ('\0' == method_prefix[method_size - 1]);
+
struct GNUNET_PSYCSTORE_OperationHandle *
- op = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + method_size);
op->h = h;
op->data_cb = (DataCallback) fragment_cb;
op->res_cb = rcb;
@@ -1066,7 +1088,7 @@
req = (struct MessageGetRequest *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) req;
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
- req->header.size = htons (sizeof (*req));
+ req->header.size = htons (sizeof (*req) + method_size);
req->channel_key = *channel_key;
req->message_limit = GNUNET_ntohll (message_limit);
if (NULL != slave_key)
@@ -1077,6 +1099,7 @@
op->op_id = get_next_op_id (h);
req->op_id = GNUNET_htonll (op->op_id);
+ memcpy (&req[1], method_prefix, method_size);
GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
transmit_next (h);
Modified: gnunet/src/psycstore/test_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_psycstore.c 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/psycstore/test_psycstore.c 2015-05-07 12:15:58 UTC (rev
35717)
@@ -154,7 +154,8 @@
void
-state_reset_result (void *cls, int64_t result, const char *err_msg)
+state_reset_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_reset_result:\t%d\n", result);
@@ -195,7 +196,8 @@
void
-state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+state_get_prefix_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct StateClosure *scls = cls;
op = NULL;
@@ -208,7 +210,8 @@
void
-state_get_result (void *cls, int64_t result, const char *err_msg)
+state_get_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%d\n", result);
@@ -260,7 +263,8 @@
void
-state_modify_result (void *cls, int64_t result, const char *err_msg)
+state_modify_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%d\n", result);
@@ -272,7 +276,8 @@
void
-state_sync_result (void *cls, int64_t result, const char *err_msg)
+state_sync_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -328,7 +333,8 @@
void
-message_get_latest_result (void *cls, int64_t result, const char *err_msg)
+message_get_latest_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -355,7 +361,8 @@
void
-message_get_result (void *cls, int64_t result, const char *err_msg)
+message_get_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -365,13 +372,14 @@
fcls->n = 0;
fcls->n_expected = 3;
op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key,
&slave_pub_key,
- 1, &fragment_result,
+ 1, "", &fragment_result,
&message_get_latest_result, fcls);
}
void
-message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
+message_get_fragment_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -382,14 +390,15 @@
fcls->n_expected = 3;
uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id);
op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key,
- message_id, message_id,
+ message_id, message_id, "",
&fragment_result,
&message_get_result, fcls);
}
void
-fragment_get_latest_result (void *cls, int64_t result, const char *err_msg)
+fragment_get_latest_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -407,7 +416,8 @@
void
-fragment_get_result (void *cls, int64_t result, const char *err_msg)
+fragment_get_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
struct FragmentClosure *fcls = cls;
op = NULL;
@@ -424,7 +434,8 @@
void
-fragment_store_result (void *cls, int64_t result, const char *err_msg)
+fragment_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%d\n", result);
@@ -495,7 +506,8 @@
void
-membership_test_result (void *cls, int64_t result, const char *err_msg)
+membership_test_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%d\n", result);
@@ -506,7 +518,8 @@
void
-membership_store_result (void *cls, int64_t result, const char *err_msg)
+membership_store_result (void *cls, int64_t result,
+ const char *err_msg, uint16_t err_msg_size)
{
op = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%d\n", result);
@@ -517,6 +530,7 @@
&membership_test_result, NULL);
}
+
/**
* Main function of the test, run from scheduler.
*
Modified: gnunet/src/social/gnunet-service-social.c
===================================================================
--- gnunet/src/social/gnunet-service-social.c 2015-05-07 12:15:32 UTC (rev
35716)
+++ gnunet/src/social/gnunet-service-social.c 2015-05-07 12:15:58 UTC (rev
35717)
@@ -137,6 +137,8 @@
struct MessageTransmitQueue *tmit_msgs_head;
struct MessageTransmitQueue *tmit_msgs_tail;
+ struct GNUNET_PSYC_Channel *channel;
+
/**
* Public key of the channel.
*/
@@ -288,6 +290,15 @@
};
+struct OperationClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ struct Place *plc;
+ uint64_t op_id;
+ uint32_t flags;
+};
+
+
static int
psyc_transmit_message (struct Place *plc);
@@ -450,7 +461,7 @@
client_send_msg (const struct Place *plc,
const struct GNUNET_MessageHeader *msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Sending message to clients.\n", plc);
struct ClientListItem *cli = plc->clients_head;
@@ -464,6 +475,46 @@
/**
+ * Send a result code back to the client.
+ *
+ * @param client
+ * Client that should receive the result code.
+ * @param result_code
+ * Code to transmit.
+ * @param op_id
+ * Operation ID in network byte order.
+ * @param data
+ * Data payload or NULL.
+ * @param data_size
+ * Size of @a data.
+ */
+static void
+client_send_result (struct GNUNET_SERVER_Client *client, uint64_t op_id,
+ int64_t result_code, const void *data, uint16_t data_size)
+{
+ struct GNUNET_OperationResultMessage *res;
+
+ res = GNUNET_malloc (sizeof (*res) + data_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE);
+ res->header.size = htons (sizeof (*res) + data_size);
+ res->result_code = GNUNET_htonll_signed (result_code);
+ res->op_id = op_id;
+ if (0 < data_size)
+ memcpy (&res[1], data, data_size);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Sending result to client for operation #%" PRIu64 ": "
+ "%" PRId64 " (size: %u)\n",
+ client, GNUNET_ntohll (op_id), result_code, data_size);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
+/**
* Called after a PSYC master is started.
*/
static void
@@ -603,6 +654,7 @@
&psyc_master_started,
&psyc_recv_join_request,
&psyc_recv_message, NULL, hst);
+ hst->plc.channel = GNUNET_PSYC_master_get_channel (hst->master);
}
else
{
@@ -720,6 +772,7 @@
&gst->origin, gst->relay_count, gst->relays,
&psyc_recv_message, NULL,
&psyc_slave_connected,
&psyc_recv_join_dcsn, gst, join_msg);
+ gst->plc.channel = GNUNET_PSYC_slave_get_channel (gst->slave);
}
else
{
@@ -1483,6 +1536,132 @@
/**
+ * A historic message result arrived from PSYC.
+ */
+static void
+psyc_recv_history_message (void *cls,
+ uint64_t message_id,
+ uint32_t flags,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ struct OperationClosure *opcls = cls;
+ struct Place *plc = opcls->plc;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received historic message #%" PRId64 " (flags: %x)\n",
+ plc, message_id, flags);
+
+ uint16_t size = ntohs (msg->header.size);
+
+ struct GNUNET_OperationResultMessage *
+ res = GNUNET_malloc (sizeof (*res) + size);
+ res->header.size = htons (sizeof (*res) + size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT);
+ res->op_id = opcls->op_id;
+ res->result_code = GNUNET_htonll_signed (GNUNET_OK);
+
+ memcpy (&res[1], msg, size);
+
+ /** @todo FIXME: send only to requesting client */
+ client_send_msg (plc, &res->header);
+}
+
+
+static void
+psyc_recv_history_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ struct OperationClosure *opcls = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p History replay #%" PRIu64 ": "
+ "PSYCSTORE returned %" PRId64 " (%.*s)\n",
+ opcls->plc, GNUNET_ntohll (opcls->op_id), result, err_msg_size,
err_msg);
+
+ // FIXME: place might have been destroyed
+ client_send_result (opcls->client, opcls->op_id, result, err_msg,
err_msg_size);
+}
+
+
+/**
+ * Client requests channel history.
+ */
+static void
+client_recv_history_replay (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Client *
+ ctx = GNUNET_SERVER_client_get_user_context (client, struct Client);
+ GNUNET_assert (NULL != ctx);
+ struct Place *plc = ctx->plc;
+
+ const struct GNUNET_PSYC_HistoryRequestMessage *
+ req = (const struct GNUNET_PSYC_HistoryRequestMessage *) msg;
+ uint16_t size = ntohs (msg->size);
+ const char *method_prefix = (const char *) &req[1];
+
+ if (size < sizeof (*req) + 1
+ || '\0' != method_prefix[size - sizeof (*req) - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "%p History replay #%" PRIu64 ": "
+ "invalid method prefix. size: %u < %u?\n",
+ plc, GNUNET_ntohll (req->op_id), size, sizeof (*req) + 1);
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct OperationClosure *opcls = GNUNET_malloc (sizeof (*opcls));
+ opcls->client = client;
+ opcls->plc = plc;
+ opcls->op_id = req->op_id;
+ opcls->flags = ntohl (req->flags);
+
+ if (0 == req->message_limit)
+ GNUNET_PSYC_channel_history_replay (plc->channel,
+ GNUNET_ntohll (req->start_message_id),
+ GNUNET_ntohll (req->end_message_id),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message, NULL,
+ &psyc_recv_history_result, opcls);
+ else
+ GNUNET_PSYC_channel_history_replay_latest (plc->channel,
+ GNUNET_ntohll
(req->message_limit),
+ method_prefix, opcls->flags,
+ &psyc_recv_history_message,
NULL,
+ &psyc_recv_history_result,
opcls);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static const struct GNUNET_SERVER_MessageHandler handlers[] = {
+ { &client_recv_host_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
+
+ { &client_recv_guest_enter, NULL,
+ GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
+
+ { &client_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
+
+ { &client_recv_psyc_message, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
+
+ { &client_recv_history_replay, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY, 0 },
+#if FIXME
+ { &client_recv_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
+
+ { &client_recv_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 },
+#endif
+ { NULL, NULL, 0, 0 }
+};
+
+
+/**
* Initialize the PSYC service.
*
* @param cls Closure.
@@ -1493,20 +1672,6 @@
run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &client_recv_host_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_HOST_ENTER, 0 },
-
- { &client_recv_guest_enter, NULL,
- GNUNET_MESSAGE_TYPE_SOCIAL_GUEST_ENTER, 0 },
-
- { &client_recv_join_decision, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
-
- { &client_recv_psyc_message, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }
- };
-
cfg = c;
stats = GNUNET_STATISTICS_create ("social", cfg);
hosts = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
Modified: gnunet/src/social/social_api.c
===================================================================
--- gnunet/src/social/social_api.c 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/social/social_api.c 2015-05-07 12:15:58 UTC (rev 35717)
@@ -47,6 +47,7 @@
static struct GNUNET_NAMESTORE_Handle *namestore;
static struct GNUNET_PeerIdentity this_peer;
+
/**
* Handle for a place where social interactions happen.
*/
@@ -239,33 +240,79 @@
};
-struct GNUNET_SOCIAL_WatchHandle
+/**
+ * A talk request.
+ */
+struct GNUNET_SOCIAL_TalkRequest
{
};
-struct GNUNET_SOCIAL_LookHandle
+struct GNUNET_SOCIAL_WatchHandle
{
};
/**
- * A talk request.
+ * A history lesson.
*/
-struct GNUNET_SOCIAL_TalkRequest
+struct GNUNET_SOCIAL_HistoryRequest
{
+ /**
+ * Place.
+ */
+ struct GNUNET_SOCIAL_Place *plc;
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
+
+ /**
+ * Message handler.
+ */
+ struct GNUNET_PSYC_ReceiveHandle *recv;
+
+ /**
+ * Function to call when the operation finished.
+ */
+ GNUNET_ResultCallback result_cb;
+
+ /**
+ * Closure for @a result_cb.
+ */
+ void *cls;
};
-/**
- * A history lesson.
- */
-struct GNUNET_SOCIAL_HistoryLesson
+struct GNUNET_SOCIAL_LookHandle
{
+ /**
+ * Place.
+ */
+ struct GNUNET_SOCIAL_Place *plc;
+ /**
+ * Operation ID.
+ */
+ uint64_t op_id;
+
+ /**
+ * State variable result callback.
+ */
+ GNUNET_PSYC_StateVarCallback var_cb;
+
+ /**
+ * Function to call when the operation finished.
+ */
+ GNUNET_ResultCallback result_cb;
+
+ /**
+ * Closure for @a result_cb.
+ */
+ void *cls;
};
@@ -418,7 +465,7 @@
GNUNET_assert (message_id == slicer->message_id);
}
- LOG (GNUNET_ERROR_TYPE_WARNING,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Slicer received message of type %u and size %u, "
"with ID %" PRIu64 " and method %s\n",
ptype, ntohs (msg->size), message_id, slicer->method_name);
@@ -594,6 +641,165 @@
static void
+place_recv_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_SOCIAL_Place *
+ plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
+
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+
+ uint16_t size = ntohs (msg->size);
+ if (size < sizeof (*res))
+ { /* Error, message too small. */
+ GNUNET_break (0);
+ return;
+ }
+
+ uint16_t data_size = size - sizeof (*res);
+ const char *data = (0 < data_size) ? (const char *) &res[1] : NULL;
+ GNUNET_CLIENT_MANAGER_op_result (plc->client, GNUNET_ntohll (res->op_id),
+ GNUNET_ntohll_signed (res->result_code),
+ data, data_size);
+}
+
+
+static void
+op_recv_history_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received history replay result: %" PRId64 ".\n", result);
+
+ struct GNUNET_SOCIAL_HistoryRequest *hist = cls;
+
+ if (NULL != hist->result_cb)
+ hist->result_cb (hist->cls, result, err_msg, err_msg_size);
+
+ GNUNET_PSYC_receive_destroy (hist->recv);
+ GNUNET_free (hist);
+}
+
+
+static void
+op_recv_state_result (void *cls, int64_t result,
+ const void *err_msg, uint16_t err_msg_size)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received state request result: %" PRId64 ".\n", result);
+
+ struct GNUNET_SOCIAL_LookHandle *look = cls;
+
+ if (NULL != look->result_cb)
+ look->result_cb (look->cls, result, err_msg, err_msg_size);
+
+ GNUNET_free (look);
+}
+
+
+static void
+place_recv_history_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_SOCIAL_Place *
+ plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
+
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+ struct GNUNET_PSYC_MessageHeader *
+ pmsg = (struct GNUNET_PSYC_MessageHeader *) &res[1];
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Received historic fragment for message #%" PRIu64 ".\n",
+ plc, GNUNET_ntohll (pmsg->message_id));
+
+ GNUNET_ResultCallback result_cb = NULL;
+ struct GNUNET_SOCIAL_HistoryRequest *hist = NULL;
+
+ if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client,
+ GNUNET_ntohll (res->op_id),
+ &result_cb, (void *) &hist))
+ { /* Operation not found. */
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "%p Replay operation not found for historic fragment of message #%"
+ PRIu64 ".\n",
+ plc, GNUNET_ntohll (pmsg->message_id));
+ return;
+ }
+
+ uint16_t size = ntohs (msg->size);
+ if (size < sizeof (*res) + sizeof (*pmsg))
+ { /* Error, message too small. */
+ GNUNET_break (0);
+ return;
+ }
+
+ GNUNET_PSYC_receive_message (hist->recv,
+ (const struct GNUNET_PSYC_MessageHeader *)
pmsg);
+}
+
+
+static void
+place_recv_state_result (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_SOCIAL_Place *
+ plc = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*plc));
+
+ const struct GNUNET_OperationResultMessage *
+ res = (const struct GNUNET_OperationResultMessage *) msg;
+
+#if FIXME
+ GNUNET_ResultCallback result_cb = NULL;
+ struct GNUNET_PSYC_StateRequest *sr = NULL;
+
+ if (GNUNET_YES != GNUNET_CLIENT_MANAGER_op_find (plc->client,
+ GNUNET_ntohll (res->op_id),
+ &result_cb, (void *) &sr))
+ { /* Operation not found. */
+ return;
+ }
+
+ const struct GNUNET_MessageHeader *
+ modc = (struct GNUNET_MessageHeader *) &res[1];
+ uint16_t modc_size = ntohs (modc->size);
+ if (ntohs (msg->size) - sizeof (*msg) != modc_size)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ switch (ntohs (modc->type))
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ const struct GNUNET_PSYC_MessageModifier *
+ mod = (const struct GNUNET_PSYC_MessageModifier *) modc;
+
+ const char *name = (const char *) &mod[1];
+ uint16_t name_size = ntohs (mod->name_size);
+ if ('\0' != name[name_size - 1])
+ {
+ GNUNET_break (0);
+ return;
+ }
+ sr->var_cb (sr->cls, name, name + name_size, ntohs (mod->value_size));
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ sr->var_cb (sr->cls, NULL, (const char *) &modc[1],
+ modc_size - sizeof (*modc));
+ break;
+ }
+#endif
+}
+
+
+static void
place_recv_message_ack (void *cls,
struct GNUNET_CLIENT_MANAGER_Connection *client,
const struct GNUNET_MessageHeader *msg)
@@ -752,6 +958,18 @@
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK,
sizeof (struct GNUNET_MessageHeader), GNUNET_NO },
+ { &place_recv_history_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
+ { &place_recv_state_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
+ { &place_recv_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
{ &place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
{ NULL, NULL, 0, 0, GNUNET_NO }
@@ -780,6 +998,18 @@
GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION,
sizeof (struct GNUNET_PSYC_JoinDecisionMessage), GNUNET_YES },
+ { &place_recv_history_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_HISTORY_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
+ { &place_recv_state_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_STATE_RESULT,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
+ { &place_recv_result, NULL,
+ GNUNET_MESSAGE_TYPE_PSYC_RESULT_CODE,
+ sizeof (struct GNUNET_OperationResultMessage), GNUNET_YES },
+
{ &place_recv_disconnect, NULL, 0, 0, GNUNET_NO },
{ NULL, NULL, 0, 0, GNUNET_NO }
@@ -1546,67 +1776,13 @@
/**
- * A history lesson.
- */
-struct GNUNET_SOCIAL_HistoryLesson;
-
-/**
- * Learn about the history of a place.
- *
- * Sends messages through the slicer function of the place where
- * start_message_id <= message_id <= end_message_id.
- * The messages will have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
- *
- * To get the latest message, use 0 for both the start and end message ID.
- *
- * @param place Place we want to learn more about.
- * @param start_message_id First historic message we are interested in.
- * @param end_message_id Last historic message we are interested in
(inclusive).
- * @param slicer Slicer to use to process history. Can be the same as the
- * slicer of the place, as the HISTORIC flag allows
distinguishing
- * old messages from fresh ones.
- * @param finish_cb Function called after the last message in the history
lesson
- * is passed through the @a slicer. NULL if not needed.
- * @param finish_cb_cls Closure for @a finish_cb.
- * @return Handle to abort history lesson, never NULL (multiple lessons
- * at the same time are allowed).
- */
-struct GNUNET_SOCIAL_HistoryLesson *
-GNUNET_SOCIAL_place_get_history (struct GNUNET_SOCIAL_Place *place,
- uint64_t start_message_id,
- uint64_t end_message_id,
- const struct GNUNET_SOCIAL_Slicer *slicer,
- void (*finish_cb)(void *),
- void *finish_cb_cls)
-{
- return NULL;
-}
-
-
-/**
- * Stop processing messages from the history lesson.
- *
- * Must not be called after the finish callback of the history lesson is
called.
- *
- * @param hist History lesson to cancel.
- */
-void
-GNUNET_SOCIAL_place_get_history_cancel (struct GNUNET_SOCIAL_HistoryLesson
*hist)
-{
-
-}
-
-
-struct GNUNET_SOCIAL_WatchHandle;
-
-/**
* Watch a place for changed objects.
*
* @param place
* Place to watch.
* @param object_filter
* Object prefix to match.
- * @param state_var_cb
+ * @param var_cb
* Function to call when an object/state var changes.
* @param cls
* Closure for callback.
@@ -1616,7 +1792,7 @@
struct GNUNET_SOCIAL_WatchHandle *
GNUNET_SOCIAL_place_watch (struct GNUNET_SOCIAL_Place *place,
const char *object_filter,
- GNUNET_PSYC_StateVarCallback state_var_cb,
+ GNUNET_PSYC_StateVarCallback var_cb,
void *cls)
{
return NULL;
@@ -1635,64 +1811,233 @@
}
-struct GNUNET_SOCIAL_LookHandle;
+static struct GNUNET_SOCIAL_HistoryRequest *
+place_history_replay (struct GNUNET_SOCIAL_Place *plc,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ uint64_t message_limit,
+ const char *method_prefix,
+ uint32_t flags,
+ struct GNUNET_SOCIAL_Slicer *slicer,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
+{
+ struct GNUNET_PSYC_HistoryRequestMessage *req;
+ struct GNUNET_SOCIAL_HistoryRequest *hist = GNUNET_malloc (sizeof (*hist));
+ hist->plc = plc;
+ hist->recv = GNUNET_PSYC_receive_create (NULL, &slicer_message, slicer);
+ hist->result_cb = result_cb;
+ hist->cls = cls;
+ hist->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client,
+ &op_recv_history_result, hist);
+ GNUNET_assert (NULL != method_prefix);
+ uint16_t method_size = strnlen (method_prefix,
+ GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ GNUNET_assert ('\0' == method_prefix[method_size - 1]);
+ req = GNUNET_malloc (sizeof (*req) + method_size);
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_HISTORY_REPLAY);
+ req->header.size = htons (sizeof (*req) + method_size);
+ req->start_message_id = GNUNET_htonll (start_message_id);
+ req->end_message_id = GNUNET_htonll (end_message_id);
+ req->message_limit = GNUNET_htonll (message_limit);
+ req->flags = htonl (flags);
+ req->op_id = GNUNET_htonll (hist->op_id);
+ memcpy (&req[1], method_prefix, method_size);
+ GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header);
+ return hist;
+}
+
+
/**
- * Look at objects in the place with a matching name prefix.
+ * Learn about the history of a place.
*
+ * Messages are returned through the @a slicer function
+ * and have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
+ *
* @param place
- * The place to look its objects at.
- * @param name_prefix
- * Look at objects with names beginning with this value.
- * @param state_var_cb
- * Function to call for each object found.
- * @param cls
- * Closure for callback function.
+ * Place we want to learn more about.
+ * @param start_message_id
+ * First historic message we are interested in.
+ * @param end_message_id
+ * Last historic message we are interested in (inclusive).
+ * @param method_prefix
+ * Only retrieve messages with this method prefix.
+ * @param flags
+ * OR'ed GNUNET_PSYC_HistoryReplayFlags
+ * @param slicer
+ * Slicer to use for retrieved messages.
+ * Can be the same as the slicer of the place.
+ * @param result_cb
+ * Function called after all messages retrieved.
+ * NULL if not needed.
+ * @param cls Closure for @a result_cb.
+ */
+struct GNUNET_SOCIAL_HistoryRequest *
+GNUNET_SOCIAL_place_history_replay (struct GNUNET_SOCIAL_Place *plc,
+ uint64_t start_message_id,
+ uint64_t end_message_id,
+ const char *method_prefix,
+ uint32_t flags,
+ struct GNUNET_SOCIAL_Slicer *slicer,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
+{
+ return place_history_replay (plc, start_message_id, end_message_id, 0,
+ method_prefix, flags, slicer, result_cb, cls);
+}
+
+
+/**
+ * Learn about the history of a place.
*
- * @return Handle that can be used to stop looking at objects.
+ * Sends messages through the slicer function of the place where
+ * start_message_id <= message_id <= end_message_id.
+ * The messages will have the #GNUNET_PSYC_MESSAGE_HISTORIC flag set.
+ *
+ * To get the latest message, use 0 for both the start and end message ID.
+ *
+ * @param place
+ * Place we want to learn more about.
+ * @param message_limit
+ * Maximum number of historic messages we are interested in.
+ * @param method_prefix
+ * Only retrieve messages with this method prefix.
+ * @param flags
+ * OR'ed GNUNET_PSYC_HistoryReplayFlags
+ * @param result_cb
+ * Function called after all messages retrieved.
+ * NULL if not needed.
+ * @param cls Closure for @a result_cb.
*/
-struct GNUNET_SOCIAL_LookHandle *
-GNUNET_SOCIAL_place_look (struct GNUNET_SOCIAL_Place *place,
- const char *name_prefix,
- GNUNET_PSYC_StateVarCallback state_var_cb,
- void *cls)
+struct GNUNET_SOCIAL_HistoryRequest *
+GNUNET_SOCIAL_place_history_replay_latest (struct GNUNET_SOCIAL_Place *plc,
+ uint64_t message_limit,
+ const char *method_prefix,
+ uint32_t flags,
+ struct GNUNET_SOCIAL_Slicer *slicer,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
{
- return NULL;
+ return place_history_replay (plc, 0, 0, message_limit, method_prefix, flags,
+ slicer, result_cb, cls);
}
/**
- * Stop looking at objects.
+ * Cancel learning about the history of a place.
*
- * @param lh Look handle to stop.
+ * @param hist
+ * History lesson to cancel.
*/
void
-GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *lh)
+GNUNET_SOCIAL_place_history_replay_cancel (struct GNUNET_SOCIAL_HistoryRequest
*hist)
{
+ GNUNET_PSYC_receive_destroy (hist->recv);
+ GNUNET_CLIENT_MANAGER_op_cancel (hist->plc->client, hist->op_id);
+ GNUNET_free (hist);
+}
+
+/**
+ * Request matching state variables.
+ */
+static struct GNUNET_SOCIAL_LookHandle *
+place_state_get (struct GNUNET_SOCIAL_Place *plc,
+ uint16_t type, const char *name,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb, void *cls)
+{
+ struct GNUNET_PSYC_StateRequestMessage *req;
+ struct GNUNET_SOCIAL_LookHandle *look = GNUNET_malloc (sizeof (*look));
+ look->plc = plc;
+ look->var_cb = var_cb;
+ look->result_cb = result_cb;
+ look->cls = cls;
+ look->op_id = GNUNET_CLIENT_MANAGER_op_add (plc->client,
+ &op_recv_state_result, look);
+
+ GNUNET_assert (NULL != name);
+ size_t name_size = strnlen (name, GNUNET_SERVER_MAX_MESSAGE_SIZE
+ - sizeof (*req)) + 1;
+ req = GNUNET_malloc (sizeof (*req) + name_size);
+ req->header.type = htons (type);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->op_id = GNUNET_htonll (look->op_id);
+ memcpy (&req[1], name, name_size);
+
+ GNUNET_CLIENT_MANAGER_transmit (plc->client, &req->header);
+ return look;
}
-
/**
* Look at a particular object in the place.
*
* The best matching object is returned (its name might be less specific than
* what was requested).
*
- * @param place The place to look the object at.
- * @param full_name Full name of the object.
- * @param value_size Set to the size of the returned value.
+ * @param place
+ * The place to look the object at.
+ * @param full_name
+ * Full name of the object.
+ * @param value_size
+ * Set to the size of the returned value.
+ *
* @return NULL if there is no such object at this place.
*/
-const void *
-GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *place,
+struct GNUNET_SOCIAL_LookHandle *
+GNUNET_SOCIAL_place_look_at (struct GNUNET_SOCIAL_Place *plc,
const char *full_name,
- size_t *value_size)
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
{
- return NULL;
+ return place_state_get (plc, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET,
+ full_name, var_cb, result_cb, cls);
}
+/**
+ * Look for objects in the place with a matching name prefix.
+ *
+ * @param place
+ * The place to look its objects at.
+ * @param name_prefix
+ * Look at objects with names beginning with this value.
+ * @param var_cb
+ * Function to call for each object found.
+ * @param cls
+ * Closure for callback function.
+ *
+ * @return Handle that can be used to stop looking at objects.
+ */
+struct GNUNET_SOCIAL_LookHandle *
+GNUNET_SOCIAL_place_look_for (struct GNUNET_SOCIAL_Place *plc,
+ const char *name_prefix,
+ GNUNET_PSYC_StateVarCallback var_cb,
+ GNUNET_ResultCallback result_cb,
+ void *cls)
+{
+ return place_state_get (plc, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX,
+ name_prefix, var_cb, result_cb, cls);
+}
+
+
+/**
+ * Cancel a state request operation.
+ *
+ * @param sr
+ * Handle for the operation to cancel.
+ */
+void
+GNUNET_SOCIAL_place_look_cancel (struct GNUNET_SOCIAL_LookHandle *look)
+{
+ GNUNET_CLIENT_MANAGER_op_cancel (look->plc->client, look->op_id);
+ GNUNET_free (look);
+}
+
+
/* end of social_api.c */
Modified: gnunet/src/social/test_social.c
===================================================================
--- gnunet/src/social/test_social.c 2015-05-07 12:15:32 UTC (rev 35716)
+++ gnunet/src/social/test_social.c 2015-05-07 12:15:58 UTC (rev 35717)
@@ -75,6 +75,9 @@
struct GNUNET_SOCIAL_Host *hst;
struct GNUNET_SOCIAL_Guest *gst;
+struct GNUNET_SOCIAL_Place *hst_plc;
+struct GNUNET_SOCIAL_Place *gst_plc;
+
struct GuestEnterMessage
{
struct GNUNET_PSYC_Message *msg;
@@ -99,6 +102,8 @@
uint8_t join_req_count;
struct GNUNET_PSYC_Message *join_resp;
+uint32_t counter;
+
enum
{
TEST_NONE = 0,
@@ -106,11 +111,15 @@
TEST_GUEST_RECV_ENTRY_DCSN_REFUSE = 2,
TEST_HOST_ANSWER_DOOR_ADMIT = 3,
TEST_GUEST_RECV_ENTRY_DCSN_ADMIT = 4,
- TEST_HOST_ANNOUNCE = 5,
- TEST_HOST_ANNOUNCE_END = 6,
- TEST_GUEST_TALK = 7,
- TEST_GUEST_LEAVE = 8,
- TEST_HOST_LEAVE = 9,
+ TEST_HOST_ANNOUNCE = 5,
+ TEST_HOST_ANNOUNCE_END = 6,
+ TEST_HOST_ANNOUNCE2 = 7,
+ TEST_HOST_ANNOUNCE2_END = 8,
+ TEST_GUEST_TALK = 9,
+ TEST_GUEST_HISTORY_REPLAY = 10,
+ TEST_GUEST_HISTORY_REPLAY_LATEST = 11,
+ TEST_GUEST_LEAVE = 12,
+ TEST_HOST_LEAVE = 13,
} test;
@@ -148,11 +157,13 @@
{
GNUNET_SOCIAL_guest_leave (gst, GNUNET_NO, NULL, NULL);
gst = NULL;
+ gst_plc = NULL;
}
if (NULL != hst)
{
GNUNET_SOCIAL_host_leave (hst, GNUNET_NO, NULL, NULL);
hst = NULL;
+ hst_plc = NULL;
}
GNUNET_SCHEDULER_shutdown ();
}
@@ -273,6 +284,7 @@
GNUNET_SOCIAL_slicer_destroy (host_slicer);
host_slicer = NULL;
hst = NULL;
+ hst_plc = NULL;
end ();
}
@@ -316,6 +328,7 @@
GNUNET_SOCIAL_slicer_destroy (guest_slicer);
guest_slicer = NULL;
gst = NULL;
+ gst_plc = NULL;
GNUNET_SCHEDULER_add_now (&schedule_host_leave, NULL);
}
@@ -331,6 +344,69 @@
static void
+schedule_guest_leave (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ guest_leave ();
+}
+
+
+static void
+guest_recv_history_replay_latest_result (void *cls, int64_t result,
+ const void *data, uint16_t data_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%u: Guest received latest history replay result: %"
PRId64 "\n"
+ "%.*s\n",
+ test, result, data_size, data);
+ GNUNET_assert (2 == counter); /* message count */
+ GNUNET_assert (7 == result); /* fragment count */
+
+ GNUNET_SCHEDULER_add_now (&schedule_guest_leave, NULL);
+}
+
+
+static void
+guest_history_replay_latest ()
+{
+ test = TEST_GUEST_HISTORY_REPLAY_LATEST;
+ counter = 0;
+ GNUNET_SOCIAL_place_history_replay_latest (gst_plc, 3, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ guest_slicer,
+
&guest_recv_history_replay_latest_result,
+ NULL);
+}
+
+
+static void
+guest_recv_history_replay_result (void *cls, int64_t result,
+ const void *data, uint16_t data_size)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Test #%u: Guest received history replay result: %" PRId64 "\n"
+ "%.*s\n",
+ test, result, data_size, data);
+ GNUNET_assert (2 == counter); /* message count */
+ GNUNET_assert (7 == result); /* fragment count */
+
+ guest_history_replay_latest ();
+}
+
+
+static void
+guest_history_replay ()
+{
+ test = TEST_GUEST_HISTORY_REPLAY;
+ counter = 0;
+ GNUNET_SOCIAL_place_history_replay (gst_plc, 1, 3, "",
+ GNUNET_PSYC_HISTORY_REPLAY_LOCAL,
+ guest_slicer,
+ &guest_recv_history_replay_result,
+ NULL);
+}
+
+
+static void
guest_recv_method (void *cls,
const struct GNUNET_PSYC_MessageMethod *meth,
uint64_t message_id,
@@ -340,8 +416,8 @@
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Test #%u: Guest received method for message ID %" PRIu64 ":\n"
- "%s\n",
- test, message_id, method_name);
+ "%s (flags: %x)\n",
+ test, message_id, method_name, flags);
/* FIXME: check message */
}
@@ -395,9 +471,22 @@
break;
case TEST_HOST_ANNOUNCE_END:
+ host_announce2 ();
+ break;
+
+ case TEST_HOST_ANNOUNCE2:
+ test = TEST_HOST_ANNOUNCE2_END;
+ break;
+
+ case TEST_HOST_ANNOUNCE2_END:
guest_talk ();
break;
+ case TEST_GUEST_HISTORY_REPLAY:
+ case TEST_GUEST_HISTORY_REPLAY_LATEST:
+ counter++;
+ break;
+
default:
GNUNET_assert (0);
}
@@ -466,15 +555,22 @@
{
case TEST_HOST_ANNOUNCE:
test = TEST_HOST_ANNOUNCE_END;
- //host_announce2 ();
break;
case TEST_HOST_ANNOUNCE_END:
+ host_announce2 ();
+ break;
+
+ case TEST_HOST_ANNOUNCE2:
+ test = TEST_HOST_ANNOUNCE2_END;
+ break;
+
+ case TEST_HOST_ANNOUNCE2_END:
guest_talk ();
break;
case TEST_GUEST_TALK:
- guest_leave ();
+ guest_history_replay ();
break;
default:
@@ -535,7 +631,7 @@
static void
host_announce2 ()
{
- test = TEST_HOST_ANNOUNCE;
+ test = TEST_HOST_ANNOUNCE2;
tmit = (struct TransmitClosure) {};
tmit.env = GNUNET_ENV_environment_create ();
@@ -667,6 +763,7 @@
&this_peer, 0, NULL, emsg->msg,
guest_slicer, &guest_recv_local_enter,
&guest_recv_entry_decision, NULL);
+ gst_plc = GNUNET_SOCIAL_guest_get_place (gst);
}
@@ -727,6 +824,7 @@
GNUNET_PSYC_CHANNEL_PRIVATE, host_slicer,
&host_entered, &host_answer_door,
&host_farewell, NULL);
+ hst_plc = GNUNET_SOCIAL_host_get_place (hst);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r35717 - in gnunet/src: include psyc psycstore social,
gnunet <=