[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28626 - gnunet/src/mesh
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28626 - gnunet/src/mesh |
Date: |
Wed, 14 Aug 2013 19:21:18 +0200 |
Author: bartpolot
Date: 2013-08-14 19:21:18 +0200 (Wed, 14 Aug 2013)
New Revision: 28626
Modified:
gnunet/src/mesh/gnunet-service-mesh-enc.c
Log:
- rewrite flow control towards clients
Modified: gnunet/src/mesh/gnunet-service-mesh-enc.c
===================================================================
--- gnunet/src/mesh/gnunet-service-mesh-enc.c 2013-08-14 15:37:25 UTC (rev
28625)
+++ gnunet/src/mesh/gnunet-service-mesh-enc.c 2013-08-14 17:21:18 UTC (rev
28626)
@@ -240,6 +240,11 @@
unsigned int queue_max;
/**
+ * Next ID to use.
+ */
+ uint32_t next_pid;
+
+ /**
* ID of the last packet sent towards the peer.
*/
uint32_t last_pid_sent;
@@ -755,6 +760,11 @@
* ID of the client, mainly for debug messages
*/
unsigned int id;
+
+ /**
+ * Is this client prevented from sending more data? (We "owe" him an ACK).
+ */
+ int blocked;
};
@@ -1451,22 +1461,30 @@
*
* @param ch Channel on which to send the ACK.
* @param c Client to whom send the ACK.
- * @param is_fwd Set to GNUNET_YES for FWD ACK (dest->owner)
+ * @param fwd Set to GNUNET_YES for FWD ACK (dest->owner)
*/
static void
send_local_ack (struct MeshChannel *ch,
struct MeshClient *c,
- int is_fwd)
+ int fwd)
{
struct GNUNET_MESH_LocalAck msg;
+ if (NULL == c
+ || ( fwd && (0 == ch->lid_root || c != ch->root))
+ || (!fwd && (0 == ch->lid_dest || c != ch->dest)) )
+ {
+ GNUNET_break (0);
+ return;
+ }
msg.header.size = htons (sizeof (msg));
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_LOCAL_ACK);
- msg.channel_id = htonl (is_fwd ? ch->lid_root : ch->lid_dest);
+ msg.channel_id = htonl (fwd ? ch->lid_root : ch->lid_dest);
GNUNET_SERVER_notification_context_unicast (nc,
c->handle,
&msg.header,
GNUNET_NO);
+ c->blocked = GNUNET_NO;
}
@@ -1633,6 +1651,7 @@
return;
}
msg->ttl = htonl (ttl - 1);
+ msg->pid = htonl (fwd ? c->fwd_fc.next_pid++ : c->bck_fc.next_pid++);
}
queue_add (data,
@@ -1659,6 +1678,7 @@
int fwd)
{
struct MeshConnection *c;
+ struct MeshFlowControl *fc;
uint16_t type;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send on Tunnel %s\n",
@@ -1669,14 +1689,18 @@
GNUNET_break (GNUNET_YES == t->destroy);
return;
}
+ fc = fwd ? &c->fwd_fc : &c->bck_fc;
type = ntohs (msg->header.type);
switch (type)
{
case GNUNET_MESSAGE_TYPE_MESH_FWD:
case GNUNET_MESSAGE_TYPE_MESH_BCK:
+ case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_CREATE:
+ case GNUNET_MESSAGE_TYPE_MESH_CHANNEL_DESTROY:
msg->cid = htonl (c->id);
msg->tid = t->id;
msg->ttl = default_ttl;
+ msg->pid = fc->next_pid++;
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "unkown type %s\n",
@@ -2295,6 +2319,25 @@
/**
+ * Is this peer the first one on the connection?
+ *
+ * @param c Connection.
+ * @param fwd Is this about fwd traffic?
+ *
+ * @return GNUNET_YES if origin, GNUNET_NO if relay/terminal.
+ */
+static int
+connection_is_origin (struct MeshConnection *c, int fwd)
+{
+ if (!fwd && c->own_pos == c->path->length - 1)
+ return GNUNET_YES;
+ if (fwd && c->own_pos == 0)
+ return GNUNET_YES;
+ return GNUNET_NO;
+}
+
+
+/**
* Is this peer the last one on the connection?
*
* @param c Connection.
@@ -3445,12 +3488,34 @@
}
+/**
+ * Send an ACK to a client is needed.
+ *
+ * @param ch Channel this is regarding.
+ * @param fwd Is this about fwd traffic? (ACk goes the opposite direction).
+ */
+static void
+channel_send_client_ack (struct MeshChannel *ch, int fwd)
+{
+ struct MeshClient *c;
+ /* Client to receive the ACK (fwd indicates traffic to be ACK'd) */
+ c = fwd ? ch->root : ch->dest;
+
+ if (GNUNET_YES == c->blocked)
+ send_local_ack (ch, c, fwd);
+}
+
+
/**
* Send ACK on one or more connections due to buffer space to the client.
+ *
+ * @param ch Channel which has some free buffer space.
+ * @param buffer Buffer space.
+ * @param fwd Is this in the FWD direction?
*/
static void
-channel_send_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
+channel_send_connection_ack (struct MeshChannel *ch, uint32_t buffer, int fwd)
{
struct MeshTunnel2 *t = ch->t;
struct MeshConnection *c;
@@ -3748,6 +3813,7 @@
static void
fc_init (struct MeshFlowControl *fc)
{
+ fc->next_pid = 0;
fc->last_pid_sent = (uint32_t) -1; /* Next (expected) = 0 */
fc->last_pid_recv = (uint32_t) -1;
fc->last_ack_sent = (uint32_t) 0;
@@ -4111,7 +4177,7 @@
peer2s (c->t->peer),
c->id);
- if (connection_is_terminal (c, GNUNET_NO)) /* If local, leave. */
+ if (connection_is_origin (c, GNUNET_YES)) /* If local, leave. */
return;
connection_destroy (c);
@@ -4140,7 +4206,7 @@
peer2s (c->t->peer),
c->id);
- if (connection_is_terminal (c, GNUNET_YES)) /* If local, leave. */
+ if (connection_is_origin (c, GNUNET_NO)) /* If local, leave. */
return;
connection_destroy (c);
@@ -4172,7 +4238,7 @@
if (GNUNET_SCHEDULER_NO_TASK != *ti)
GNUNET_SCHEDULER_cancel (*ti);
- if (connection_is_terminal (c, !fwd)) /* Endpoint */
+ if (connection_is_origin (c, fwd)) /* Endpoint */
{
f = fwd ? &connection_fwd_keepalive : &connection_bck_keepalive;
*ti = GNUNET_SCHEDULER_add_delayed (refresh_connection_time, f, c);
@@ -4371,14 +4437,14 @@
break;
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path create\n");
- if (connection_is_terminal (c, GNUNET_NO))
+ if (connection_is_origin (c, GNUNET_YES))
data_size = send_core_connection_create (queue->c, size, buf);
else
data_size = send_core_data_raw (queue->cls, size, buf);
break;
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path ack\n");
- if (connection_is_terminal (c, GNUNET_YES))
+ if (connection_is_origin (c, GNUNET_NO))
data_size = send_core_connection_ack (queue->c, size, buf);
else
data_size = send_core_data_raw (queue->cls, size, buf);
@@ -4398,8 +4464,6 @@
GNUNET_MESH_DEBUG_M2S (queue->type));
data_size = 0;
}
- /* Free queue, but cls was freed by send_core_* */
- queue_destroy (queue, GNUNET_NO);
/* Send ACK if needed, after accounting for sent ID in fc->queue_n */
switch (type)
@@ -4408,12 +4472,18 @@
case GNUNET_MESSAGE_TYPE_MESH_BCK:
pid = ntohl ( ((struct GNUNET_MESH_Encrypted *) buf)->pid );
fc->last_pid_sent = pid;
- connection_send_ack (c, fwd);
+ if (NULL != queue->ch)
+ channel_send_client_ack (queue->ch, fwd);
+ else
+ connection_send_ack (c, fwd);
break;
default:
break;
}
+ /* Free queue, but cls was freed by send_core_* */
+ queue_destroy (queue, GNUNET_NO);
+
/* If more data in queue, send next */
queue = peer_get_first_message (peer);
if (NULL != queue)
@@ -4511,6 +4581,7 @@
priority = 50;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "priority %d\n", priority);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "fc %p\n", fc);
if (fc->queue_n >= fc->queue_max && 0 == priority)
{
GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
@@ -4522,6 +4593,8 @@
return; /* Drop this message */
}
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "pid %u\n", fc->last_pid_sent);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ack %u\n", fc->last_ack_recv);
if (GMC_is_pid_bigger (fc->last_pid_sent + 1, fc->last_ack_recv) &&
GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
{
@@ -5304,6 +5377,7 @@
return GNUNET_OK;
}
+ /* Message not for us: forward to next hop */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " not for us, retransmitting...\n");
ttl = ntohl (msg->ttl);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " ttl: %u\n", ttl);
@@ -5317,7 +5391,6 @@
GNUNET_STATISTICS_update (stats, "# messages forwarded", 1, GNUNET_NO);
send_prebuilt_message_connection (&msg->header, c, NULL, fwd);
- connection_send_ack (c, fwd);
return GNUNET_OK;
}
@@ -5992,6 +6065,7 @@
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " by client %u\n", c->id);
+ c->blocked = GNUNET_YES;
msg = (struct GNUNET_MESH_LocalData *) message;
@@ -6140,7 +6214,7 @@
rel->client_ready = GNUNET_YES;
channel_send_client_buffered_data (ch, c, rel);
- channel_send_ack (ch, 64 - rel->n_recv, fwd);
+ channel_send_connection_ack (ch, 64 - rel->n_recv, fwd);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28626 - gnunet/src/mesh,
gnunet <=