# # # patch "netsync.cc" # from [c5c9d7589daa10da3a011215097402ed0c445990] # to [228c905f8c8b1d41bb30455ebfc4e805dddbd231] # ============================================================ --- netsync.cc c5c9d7589daa10da3a011215097402ed0c445990 +++ netsync.cc 228c905f8c8b1d41bb30455ebfc4e805dddbd231 @@ -2624,6 +2624,126 @@ call_server(options & opts, } } +static shared_ptr +session_from_server_sync_item(options & opts, + lua_hooks & lua, + project_t & project, + key_store & keys, + Netxx::Timeout const & timeout, + Netxx::port_type const & default_port, + server_initiated_sync_request const & request) +{ + netsync_connection_info info; + info.client.unparsed = utf8(request.address); + info.client.include_pattern = globish(request.include); + info.client.exclude_pattern = globish(request.exclude); + info.client.use_argv = false; + parse_uri(info.client.unparsed(), info.client.u); + + try + { + P(F("connecting to %s") % info.client.unparsed); + shared_ptr server + = build_stream_to_server(opts, lua, + info, default_port, + timeout); + + // 'false' here means not to revert changes when + // the SockOpt goes out of scope. + Netxx::SockOpt socket_options(server->get_socketfd(), false); + socket_options.set_non_blocking(); + + protocol_role role = source_and_sink_role; + if (request.what == "sync") + role = source_and_sink_role; + else if (request.what == "push") + role = source_role; + else if (request.what == "pull") + role = sink_role; + + shared_ptr sess(new session(opts, lua, + project, keys, + role, client_voice, + info.client.include_pattern, + info.client.exclude_pattern, + info.client.unparsed(), + server, true)); + + return sess; + } + catch (Netxx::NetworkException & e) + { + P(F("Network error: %s") % e.what()); + return shared_ptr(); + } +} + +static shared_ptr +make_server(std::list const & addresses, + Netxx::port_type default_port, + Netxx::Timeout timeout, + bool use_ipv6, + Netxx::Address & addr) +{ + try + { + addr = Netxx::Address(use_ipv6); + + if (addresses.empty()) + addr.add_all_addresses(default_port); + else + { + for (std::list::const_iterator it = addresses.begin(); + it != addresses.end(); ++it) + { + const utf8 & address = *it; + if (!address().empty()) + { + size_t l_colon = address().find(':'); + size_t r_colon = address().rfind(':'); + + if (l_colon == r_colon && l_colon == 0) + { + // can't be an IPv6 address as there is only one colon + // must be a : followed by a port + string port_str = address().substr(1); + addr.add_all_addresses(std::atoi(port_str.c_str())); + } + else + addr.add_address(address().c_str(), default_port); + } + } + } + shared_ptr ret(new Netxx::StreamServer(addr, timeout)); + + char const * name; + name = addr.get_name(); + P(F("beginning service on %s : %s") + % (name != NULL ? name : _("")) + % lexical_cast(addr.get_port())); + + return ret; + } + // If we use IPv6 and the initialisation of server fails, we want + // to try again with IPv4. The reason is that someone may have + // downloaded a IPv6-enabled monotone on a system that doesn't + // have IPv6, and which might fail therefore. + catch(Netxx::NetworkException & e) + { + if (use_ipv6) + return make_server(addresses, default_port, timeout, false, addr); + else + throw; + } + catch(Netxx::Exception & e) + { + if (use_ipv6) + return make_server(addresses, default_port, timeout, false, addr); + else + throw; + } +} + static void drop_session_associated_with_fd(map > & sessions, Netxx::socket_type fd) @@ -2860,238 +2980,127 @@ serve_connections(options & opts, #else bool use_ipv6=false; #endif - // This will be true when we try to bind while using IPv6. See comments - // further down. - bool try_again=false; - do - { - try - { - try_again = false; + Netxx::Address addr(use_ipv6); + shared_ptr server = make_server(addresses, + default_port, + timeout, + use_ipv6, + addr); - Netxx::Address addr(use_ipv6); + map > sessions; + set armed_sessions; - if (addresses.empty()) - addr.add_all_addresses(default_port); - else - { - for (std::list::const_iterator it = addresses.begin(); it != addresses.end(); ++it) - { - const utf8 & address = *it; - if (!address().empty()) - { - size_t l_colon = address().find(':'); - size_t r_colon = address().rfind(':'); + shared_ptr guard; - if (l_colon == r_colon && l_colon == 0) - { - // can't be an IPv6 address as there is only one colon - // must be a : followed by a port - string port_str = address().substr(1); - addr.add_all_addresses(std::atoi(port_str.c_str())); - } - else - addr.add_address(address().c_str(), default_port); - } - } - } + while (true) + { + probe.clear(); + armed_sessions.clear(); - // If se use IPv6 and the initialisation of server fails, we want - // to try again with IPv4. The reason is that someone may have - // downloaded a IPv6-enabled monotone on a system that doesn't - // have IPv6, and which might fail therefore. - // On failure, Netxx::NetworkException is thrown, and we catch - // it further down. - try_again=use_ipv6; + if (sessions.size() >= session_limit) + W(F("session limit %d reached, some connections " + "will be refused") % session_limit); + else + probe.add(*server); - Netxx::StreamServer server(addr, timeout); + if (!guard) + guard = shared_ptr + (new transaction_guard(project.db)); - // If we came this far, whatever we used (IPv6 or IPv4) was - // accepted, so we don't need to try again any more. - try_again=false; + I(guard); - const char *name = addr.get_name(); - P(F("beginning service on %s : %s") - % (name != NULL ? name : _("")) - % lexical_cast(addr.get_port())); + int probing = arm_sessions_and_calculate_probe(probe, sessions, + armed_sessions, + *guard); - map > sessions; - set armed_sessions; + while (!server_initiated_sync_requests.empty()) + { + server_initiated_sync_request request + = server_initiated_sync_requests.front(); + server_initiated_sync_requests.pop_front(); + shared_ptr sess + = session_from_server_sync_item(opts, lua, project, keys, + timeout, default_port, + request); - shared_ptr guard; - - while (true) + if (sess) { - probe.clear(); - armed_sessions.clear(); + sessions.insert(make_pair(sess->str->get_socketfd(), sess)); + probe.add(*sess->str, sess->which_events()); + ++probing; + L(FL("Opened connection to %s") % sess->peer_id); + } + } - if (sessions.size() >= session_limit) - W(F("session limit %d reached, some connections " - "will be refused") % session_limit); - else - probe.add(server); + L(FL("i/o probe with %d armed, %d probing") + % armed_sessions.size() % probing); + Netxx::socket_type fd; + Netxx::Timeout how_long; + if (sessions.empty()) + how_long = forever; + else if (armed_sessions.empty()) + how_long = timeout; + else + how_long = instant; + do + { + Netxx::Probe::result_type res = probe.ready(how_long); + how_long = instant; + Netxx::Probe::ready_type event = res.second; + fd = res.first; - if (!guard) - guard = shared_ptr - (new transaction_guard(project.db)); + if (fd == -1) + { + if (armed_sessions.empty()) + L(FL("timed out waiting for I/O (listening on %s : %s)") + % addr.get_name() % lexical_cast(addr.get_port())); + } - I(guard); + // we either got a new connection + else if (fd == *server) + handle_new_connection(opts, lua, project, keys, + addr, *server, timeout, role, + sessions); - int probing = arm_sessions_and_calculate_probe(probe, sessions, - armed_sessions, - *guard); - - while (!server_initiated_sync_requests.empty()) + // or an existing session woke up + else + { + map >::iterator i; + i = sessions.find(fd); + if (i == sessions.end()) { - server_initiated_sync_request request - = server_initiated_sync_requests.front(); - server_initiated_sync_requests.pop_front(); - - netsync_connection_info info; - info.client.unparsed = utf8(request.address); - info.client.include_pattern = globish(request.include); - info.client.exclude_pattern = globish(request.exclude); - info.client.use_argv = false; - parse_uri(info.client.unparsed(), info.client.u); - - try - { - P(F("connecting to %s") % info.client.unparsed); - shared_ptr server - = build_stream_to_server(opts, lua, - info, default_port, - timeout); - - // 'false' here means not to revert changes when - // the SockOpt goes out of scope. - Netxx::SockOpt socket_options(server->get_socketfd(), false); - socket_options.set_non_blocking(); - - protocol_role role = source_and_sink_role; - if (request.what == "sync") - role = source_and_sink_role; - else if (request.what == "push") - role = source_role; - else if (request.what == "pull") - role = sink_role; - - shared_ptr sess(new session(opts, lua, - project, keys, - role, client_voice, - info.client.include_pattern, - info.client.exclude_pattern, - info.client.unparsed(), - server, true)); - - sessions.insert(make_pair(server->get_socketfd(), sess)); - probe.add(*server, sess->which_events()); - ++probing; - L(FL("Opened connection to %s") % sess->peer_id); - } - catch (Netxx::NetworkException & e) - { - P(F("Network error: %s") % e.what()); - } + L(FL("got woken up for action on unknown fd %d") % fd); } - - L(FL("i/o probe with %d armed, %d probing") - % armed_sessions.size() % probing); - Netxx::socket_type fd; - Netxx::Timeout how_long; - if (sessions.empty()) - how_long = forever; - else if (armed_sessions.empty()) - how_long = timeout; else - how_long = instant; - do { - Netxx::Probe::result_type res = probe.ready(how_long); - how_long = instant; - Netxx::Probe::ready_type event = res.second; - fd = res.first; + probe.remove(*(i->second->str)); + shared_ptr sess = i->second; - if (fd == -1) + try { - if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s : %s)") - % addr.get_name() % lexical_cast(addr.get_port())); + handle_io(fd, sess, sessions, event); } - - // we either got a new connection - else if (fd == server) - handle_new_connection(opts, lua, project, keys, - addr, server, timeout, role, - sessions); - - // or an existing session woke up - else + catch (Netxx::Exception &) { - map >::iterator i; - i = sessions.find(fd); - if (i == sessions.end()) - { - L(FL("got woken up for action on unknown fd %d") % fd); - } - else - { - probe.remove(*(i->second->str)); - shared_ptr sess = i->second; - - try - { - handle_io(fd, sess, sessions, event); - } - catch (Netxx::Exception &) - { - P(F("Network error on peer %s, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } - } + P(F("Network error on peer %s, disconnecting") + % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); } } - while (fd != -1); - reap_dead_sessions(sessions, timeout_seconds); - - if (sessions.empty()) - { - // Let the guard die completely if everything's gone quiet. - guard->commit(); - guard.reset(); - } } } - // This exception is thrown when bind() fails somewhere in Netxx. - catch (Netxx::NetworkException &) + while (fd != -1); + reap_dead_sessions(sessions, timeout_seconds); + + if (sessions.empty()) { - // If we tried with IPv6 and failed, we want to try again using IPv4. - if (try_again) - { - use_ipv6 = false; - } - // In all other cases, just rethrow the exception. - else - throw; + // Let the guard die completely if everything's gone quiet. + guard->commit(); + guard.reset(); } - // This exception is thrown when there is no support for the type of - // connection we want to do in the kernel, for example when a socket() - // call fails somewhere in Netxx. - catch (Netxx::Exception &) - { - // If we tried with IPv6 and failed, we want to try again using IPv4. - if (try_again) - { - use_ipv6 = false; - } - // In all other cases, just rethrow the exception. - else - throw; - } } - while(try_again); - } +} static void serve_single_connection(shared_ptr sess,