[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r38174 - in gnunet/src: transport util
From: |
gnunet |
Subject: |
[GNUnet-SVN] r38174 - in gnunet/src: transport util |
Date: |
Fri, 21 Oct 2016 18:04:46 +0200 |
Author: grothoff
Date: 2016-10-21 18:04:46 +0200 (Fri, 21 Oct 2016)
New Revision: 38174
Modified:
gnunet/src/transport/transport_api_core.c
gnunet/src/util/client.c
gnunet/src/util/client_new.c
gnunet/src/util/mq.c
Log:
activating client_new implementation, seems to mostly work fine, or better than
the old one
Modified: gnunet/src/transport/transport_api_core.c
===================================================================
--- gnunet/src/transport/transport_api_core.c 2016-10-21 14:26:15 UTC (rev
38173)
+++ gnunet/src/transport/transport_api_core.c 2016-10-21 16:04:46 UTC (rev
38174)
@@ -354,6 +354,25 @@
* @param cls the `struct Neighbour` where the message was sent
*/
static void
+notify_send_done_fin (void *cls)
+{
+ struct Neighbour *n = cls;
+
+ n->timeout_task = NULL;
+ n->is_ready = GNUNET_YES;
+ GNUNET_MQ_impl_send_continue (n->mq);
+}
+
+
+/**
+ * A message from the handler's message queue to a neighbour was
+ * transmitted. Now trigger (possibly delayed) notification of the
+ * neighbour's message queue that we are done and thus ready for
+ * the next message.
+ *
+ * @param cls the `struct Neighbour` where the message was sent
+ */
+static void
notify_send_done (void *cls)
{
struct Neighbour *n = cls;
@@ -364,8 +383,8 @@
{
GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
n->env_size + n->traffic_overhead);
+ n->env = NULL;
n->traffic_overhead = 0;
- n->env = NULL;
}
delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
128);
@@ -375,10 +394,11 @@
GNUNET_MQ_impl_send_continue (n->mq);
return;
}
+ GNUNET_MQ_impl_send_in_flight (n->mq);
/* cannot send even a small message without violating
- quota, wait a before notifying MQ */
+ quota, wait a before allowing MQ to send next message */
n->timeout_task = GNUNET_SCHEDULER_add_delayed (delay,
- ¬ify_send_done,
+ ¬ify_send_done_fin,
n);
}
@@ -411,6 +431,7 @@
GNUNET_MQ_impl_send_continue (mq);
return;
}
+ GNUNET_assert (NULL == n->env);
n->env = GNUNET_MQ_msg_nested_mh (obm,
GNUNET_MESSAGE_TYPE_TRANSPORT_SEND,
msg);
Modified: gnunet/src/util/client.c
===================================================================
--- gnunet/src/util/client.c 2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/client.c 2016-10-21 16:04:46 UTC (rev 38174)
@@ -375,7 +375,7 @@
* @return the message queue, NULL on error
*/
struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+GNUNET_CLIENT_connecTX (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *service_name,
const struct GNUNET_MQ_MessageHandler *handlers,
GNUNET_MQ_ErrorHandler error_handler,
Modified: gnunet/src/util/client_new.c
===================================================================
--- gnunet/src/util/client_new.c 2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/client_new.c 2016-10-21 16:04:46 UTC (rev 38174)
@@ -213,10 +213,9 @@
static void
connect_fail_continuation (struct ClientState *cstate)
{
- LOG (GNUNET_ERROR_TYPE_INFO,
- "Failed to establish TCP connection to `%s:%u', no further addresses to
try.\n",
- cstate->hostname,
- cstate->port);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to establish connection to `%s', no further addresses to
try.\n",
+ cstate->service_name);
GNUNET_break (NULL == cstate->ap_head);
GNUNET_break (NULL == cstate->ap_tail);
GNUNET_break (NULL == cstate->dns_active);
@@ -245,6 +244,7 @@
ssize_t ret;
size_t len;
const char *pos;
+ int notify_in_flight;
cstate->send_task = NULL;
pos = (const char *) cstate->msg;
@@ -262,10 +262,7 @@
GNUNET_MQ_ERROR_WRITE);
return;
}
- if (0 == cstate->msg_off)
- {
- GNUNET_MQ_impl_send_in_flight (cstate->mq);
- }
+ notify_in_flight = (0 == cstate->msg_off);
cstate->msg_off += ret;
if (cstate->msg_off < len)
{
@@ -274,6 +271,8 @@
cstate->sock,
&transmit_ready,
cstate);
+ if (notify_in_flight)
+ GNUNET_MQ_impl_send_in_flight (cstate->mq);
return;
}
cstate->msg = NULL;
@@ -345,6 +344,7 @@
{
/* defer destruction */
cstate->in_destroy = GNUNET_YES;
+ cstate->mq = NULL;
return;
}
if (NULL != cstate->dns_active)
@@ -384,8 +384,12 @@
GNUNET_NO);
if (GNUNET_SYSERR == ret)
{
- GNUNET_MQ_inject_error (cstate->mq,
- GNUNET_MQ_ERROR_READ);
+ if (NULL != cstate->mq)
+ GNUNET_MQ_inject_error (cstate->mq,
+ GNUNET_MQ_ERROR_READ);
+ if (GNUNET_YES == cstate->in_destroy)
+ connection_client_destroy_impl (cstate->mq,
+ cstate);
return;
}
if (GNUNET_YES == cstate->in_destroy)
@@ -723,9 +727,11 @@
#endif
if ( (0 == (cstate->attempts++ % 2)) ||
- (0 == cstate->port) )
+ (0 == cstate->port) ||
+ (NULL == cstate->hostname) )
{
- /* on even rounds, try UNIX first */
+ /* on even rounds, try UNIX first, or always
+ if we do not have a DNS name and TCP port. */
cstate->sock = try_unixpath (cstate->service_name,
cstate->cfg);
if (NULL != cstate->sock)
@@ -732,8 +738,15 @@
{
connect_success_continuation (cstate);
return;
- }
+ }
}
+ if ( (NULL == cstate->hostname) ||
+ (0 == cstate->port) )
+ {
+ /* All options failed. Boo! */
+ connect_fail_continuation (cstate);
+ return;
+ }
cstate->dns_active
= GNUNET_RESOLVER_ip_get (cstate->hostname,
AF_UNSPEC,
@@ -807,11 +820,11 @@
* @return the message queue, NULL on error
*/
struct GNUNET_MQ_Handle *
-GNUNET_CLIENT_connecT2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
- const char *service_name,
- const struct GNUNET_MQ_MessageHandler *handlers,
- GNUNET_MQ_ErrorHandler error_handler,
- void *error_handler_cls)
+GNUNET_CLIENT_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const char *service_name,
+ const struct GNUNET_MQ_MessageHandler *handlers,
+ GNUNET_MQ_ErrorHandler error_handler,
+ void *error_handler_cls)
{
struct ClientState *cstate;
Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c 2016-10-21 14:26:15 UTC (rev 38173)
+++ gnunet/src/util/mq.c 2016-10-21 16:04:46 UTC (rev 38174)
@@ -128,6 +128,11 @@
void *error_handler_cls;
/**
+ * Task to asynchronously run #impl_send_continue().
+ */
+ struct GNUNET_SCHEDULER_Task *send_task;
+
+ /**
* Linked list of messages pending to be sent
*/
struct GNUNET_MQ_Envelope *envelope_head;
@@ -145,23 +150,11 @@
struct GNUNET_MQ_Envelope *current_envelope;
/**
- * GNUNET_YES if the sent notification was called
- * for the current envelope.
- */
- int send_notification_called;
-
- /**
* Map of associations, lazily allocated
*/
struct GNUNET_CONTAINER_MultiHashMap32 *assoc_map;
/**
- * Task scheduled during #GNUNET_MQ_impl_send_continue
- * or #GNUNET_MQ_impl_send_in_flight
- */
- struct GNUNET_SCHEDULER_Task *send_task;
-
- /**
* Functions to call on queue destruction; kept in a DLL.
*/
struct GNUNET_MQ_DestroyNotificationHandle *dnh_head;
@@ -196,9 +189,15 @@
unsigned int queue_length;
/**
- * GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * #GNUNET_YES if GNUNET_MQ_impl_evacuate was called.
+ * FIXME: is this dead?
*/
int evacuate_called;
+
+ /**
+ * #GNUNET_YES if GNUNET_MQ_impl_send_in_flight() was called.
+ */
+ int in_flight;
};
@@ -364,7 +363,7 @@
unsigned int
GNUNET_MQ_get_length (struct GNUNET_MQ_Handle *mq)
{
- return mq->queue_length;
+ return mq->queue_length - (GNUNET_YES == mq->in_flight) ? 1 : 0;
}
@@ -385,7 +384,8 @@
mq->queue_length++;
ev->parent_queue = mq;
/* is the implementation busy? queue it! */
- if (NULL != mq->current_envelope)
+ if ( (NULL != mq->current_envelope) ||
+ (NULL != mq->send_task) )
{
GNUNET_CONTAINER_DLL_insert_tail (mq->envelope_head,
mq->envelope_tail,
@@ -428,35 +428,6 @@
/**
- * Task run to call the send notification for the next queued
- * message, if any. Only useful for implementing message queues,
- * results in undefined behavior if not used carefully.
- *
- * @param cls message queue to send the next message with
- */
-static void
-impl_send_in_flight (void *cls)
-{
- struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
- mq->send_task = NULL;
- /* call is only valid if we're actually currently sending
- * a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- /* can't call cancel from now on anymore */
- current_envelope->parent_queue = NULL;
- if ( (GNUNET_NO == mq->send_notification_called) &&
- (NULL != current_envelope->sent_cb) )
- {
- current_envelope->sent_cb (current_envelope->sent_cls);
- }
- mq->send_notification_called = GNUNET_YES;
-}
-
-
-/**
* Task run to call the send implementation for the next queued
* message, if any. Only useful for implementing message queues,
* results in undefined behavior if not used carefully.
@@ -467,32 +438,19 @@
impl_send_continue (void *cls)
{
struct GNUNET_MQ_Handle *mq = cls;
- struct GNUNET_MQ_Envelope *current_envelope;
-
+
mq->send_task = NULL;
/* call is only valid if we're actually currently sending
* a message */
- current_envelope = mq->current_envelope;
- GNUNET_assert (NULL != current_envelope);
- impl_send_in_flight (mq);
- GNUNET_assert (0 < mq->queue_length);
- mq->queue_length--;
if (NULL == mq->envelope_head)
- {
- mq->current_envelope = NULL;
- }
- else
- {
- mq->current_envelope = mq->envelope_head;
- GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
- mq->envelope_tail,
- mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
- mq->send_impl (mq,
- mq->current_envelope->mh,
- mq->impl_state);
- }
- GNUNET_free (current_envelope);
+ return;
+ mq->current_envelope = mq->envelope_head;
+ GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
+ mq->envelope_tail,
+ mq->current_envelope);
+ mq->send_impl (mq,
+ mq->current_envelope->mh,
+ mq->impl_state);
}
@@ -506,22 +464,32 @@
void
GNUNET_MQ_impl_send_continue (struct GNUNET_MQ_Handle *mq)
{
- /* maybe #GNUNET_MQ_impl_send_in_flight was called? */
- if (NULL != mq->send_task)
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ GNUNET_assert (0 < mq->queue_length);
+ mq->queue_length--;
+ current_envelope = mq->current_envelope;
+ current_envelope->parent_queue = NULL;
+ mq->current_envelope = NULL;
+ GNUNET_assert (NULL == mq->send_task);
+ mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
+ mq);
+ if (NULL != (cb = current_envelope->sent_cb))
{
- GNUNET_SCHEDULER_cancel (mq->send_task);
- }
- mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_continue,
- mq);
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
+ GNUNET_free (current_envelope);
}
/**
* Call the send notification for the current message, but do not
- * try to send the next message until #gnunet_mq_impl_send_continue
+ * try to send the next message until #GNUNET_MQ_impl_send_continue
* is called.
*
- * only useful for implementing message queues, results in undefined
+ * Only useful for implementing message queues, results in undefined
* behavior if not used carefully.
*
* @param mq message queue to send the next message with
@@ -529,9 +497,21 @@
void
GNUNET_MQ_impl_send_in_flight (struct GNUNET_MQ_Handle *mq)
{
- GNUNET_assert (NULL == mq->send_task);
- mq->send_task = GNUNET_SCHEDULER_add_now (&impl_send_in_flight,
- mq);
+ struct GNUNET_MQ_Envelope *current_envelope;
+ GNUNET_MQ_NotifyCallback cb;
+
+ mq->in_flight = GNUNET_YES;
+ /* call is only valid if we're actually currently sending
+ * a message */
+ current_envelope = mq->current_envelope;
+ GNUNET_assert (NULL != current_envelope);
+ /* can't call cancel from now on anymore */
+ current_envelope->parent_queue = NULL;
+ if (NULL != (cb = current_envelope->sent_cb))
+ {
+ current_envelope->sent_cb = NULL;
+ cb (current_envelope->sent_cls);
+ }
}
@@ -1187,7 +1167,6 @@
GNUNET_CONTAINER_DLL_remove (mq->envelope_head,
mq->envelope_tail,
mq->current_envelope);
- mq->send_notification_called = GNUNET_NO;
mq->send_impl (mq,
mq->current_envelope->mh,
mq->impl_state);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r38174 - in gnunet/src: transport util,
gnunet <=