# # # add_file "network.cc" # content [2d2c39ab3475fb0bb10d8d2b82e5f1ab196ee3f3] # # add_file "network.hh" # content [08043161052d6397db7ecb78ed2af4781585690a] # # patch "Makefile.am" # from [c7cdc7a6ff99623495792d2e28bebccc79483c67] # to [993653607dcbc4371cd64de6b8f53f0fd4d827c0] # # patch "cmd_netsync.cc" # from [3480dd77ea18cca0fe7842b0d4807f7e7183a76f] # to [49b230c1864bddb3dbd355fa5b611afadffb82e6] # # patch "lua_hooks.cc" # from [7455b70dc877298e9c99d421a6f6e3a219ddfb3e] # to [ddba95946b674d4d21cbbb03b5c1e96c2035a652] # # patch "lua_hooks.hh" # from [3136f049c267df992198a278d5e71ee0ee687562] # to [ddadacf111d6a1423ab026b76b5fb833e56c50fa] # # patch "netsync.cc" # from [a1803f7586c78a891069427ea63ccc6ff578947d] # to [10ad3d3e72257627bfea1b28fe952afc79d772a9] # # patch "netsync.hh" # from [2a4d8e88f8774f885470a658ba101a06bc403eeb] # to [64567ee872332b85777648ed7b1aa8682cdbf664] # # patch "refiner.cc" # from [fd4795cb306c92c03cac8e9463551562c1f14f37] # to [c4bacfb1eadaa252c39136468356585bdeebcc77] # # patch "refiner.hh" # from [66938922057cfdd7aa84b4dcb0d9640c8ac7da1f] # to [1fffe4f5a38822e83891c4f9cf8676b876eccda9] # ============================================================ --- network.cc 2d2c39ab3475fb0bb10d8d2b82e5f1ab196ee3f3 +++ network.cc 2d2c39ab3475fb0bb10d8d2b82e5f1ab196ee3f3 @@ -0,0 +1,858 @@ +#include "netio.hh" +#include "network.hh" +#include "sanity.hh" +#include "uri.hh" + +#include +using std::make_pair; +using std::map; +using std::pair; + +#include +using std::deque; + +#include +using std::set; + +#include +using std::string; + +#include +using std::vector; + +#include +using boost::lexical_cast; + + +#include "netxx/address.h" +#include "netxx/peer.h" +#include "netxx/probe.h" +#include "netxx/socket.h" +#include "netxx/sockopt.h" +#include "netxx/stream.h" +#include "netxx/streamserver.h" +#include "netxx/timeout.h" +#include "netxx_pipe.hh" + +typedef map service_map; +static service_map & get_service_map() +{ + static service_map m; + return m; +} + + +class input_manager +{ + string_queue buffer; + netcmd cmd; + bool have_cmd; + + chained_hmac read_hmac; + +public: + input_manager(bool use_transport_auth) + : have_cmd(false), + read_hmac(constants::netsync_key_initializer, use_transport_auth) + {} + inline bool full() const + { + return buffer.size() >= constants::netcmd_maxsz; + } + inline bool have_netcmd() + { + if (!have_cmd) + { + have_cmd = cmd.read(buffer, read_hmac); + } + return have_cmd; + } + inline void get_netcmd(netcmd & c) + { + I(have_cmd); + c = cmd; + have_cmd = false; + } + inline size_t size() const + { + return buffer.size() + (have_cmd ? cmd.encoded_size() : 0); + } + inline void set_hmac_key(netsync_session_key const & key) + { + read_hmac.set_key(key); + } + Netxx::signed_size_type read_some_from(shared_ptr str); +}; + +class output_manager +{ + // deque of pair + deque< pair > buffer; + // the total data stored in outbuf - this is + // used as a valve to stop too much data + // backing up + size_t buffer_size; + + chained_hmac write_hmac; + +public: + output_manager(bool use_transport_auth) + : buffer_size(0), + write_hmac(constants::netsync_key_initializer, use_transport_auth) + {} + inline bool full() const + { + return buffer_size > constants::bufsz * 10; + } + inline bool empty() const + { + return buffer_size == 0; + } + inline void set_hmac_key(netsync_session_key const & key) + { + write_hmac.set_key(key); + } + void queue_netcmd(netcmd const & cmd); + Netxx::signed_size_type write_some_to(shared_ptr str); +}; + +class session +{ + session(session const & other) + : my_voice(other.my_voice), input(false), output(false), app(other.app) + { I(false); } + session const & operator=(session const &) + { I(false); } +public: + protocol_voice const my_voice; + input_manager input; + output_manager output; + + app_state & app; + string const peer_id; + shared_ptr str; + service * srv; + + time_t last_io_time; + + + session(protocol_voice voice, + shared_ptr str, + utf8 const & addr, + app_state & app); + + + void queue(netcmd const & cmd); + + // at this level, process() includes to take from the input queue + bool can_process(); + state process(transaction_guard & guard); + + state read_some(); + state write_some(); + + Netxx::Probe::ready_type which_events(); +}; + + +state +run_network_loop(bool client, + shared_ptr server, + map > & sessions, + app_state & app, + string const & listenaddr = "???"); + +state +run_network_loop(shared_ptr sess) +{ + map > sessions; + + // Very similar to serve_single_on_stdio(). + if (sess->str->get_socketfd() == -1) + { + // Unix pipes are non-duplex, have two filedescriptors + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(pipe); + sessions[pipe->get_writefd()]=sess; + sessions[pipe->get_readfd()]=sess; + } + else + sessions[sess->str->get_socketfd()]=sess; + + return run_network_loop(true, + shared_ptr(), + sessions, + sess->app); +} + +service::service(int num) + : sess(0) +{ + if (num) + { + service_map & m(get_service_map()); + pair r = m.insert(make_pair(num, this)); + I(r.second); + } +} + +service::~service() +{ +} + +shared_ptr +service::get(int num) +{ + service_map & m(get_service_map()); + service_map::const_iterator i = m.find(num); + I(i != m.end()); + return shared_ptr(i->second->copy()); +} + +void +service::attach(session & s) +{ + sess = &s; +} + +void +service::detach(bool received_error) +{ + sess = 0; + detached(received_error); +} + +void +service::detached(bool received_error) +{ +} + +bool +service::can_receive() +{ + return true; +} + +bool +service::can_process() +{ + return false; +} + +void +service::send(netcmd const & cmd) +{ + I(sess); + sess->queue(cmd); +} + + +client_session::client_session(utf8 const & address, app_state & app) +{ + ignore_sigpipe(); + + shared_ptr server; + uri u; + vector argv; + if (parse_uri(address(), u) + && app.lua.hook_get_netsync_connect_command(u, + global_sanity.debug, + argv)) + { + I(argv.size() > 0); + string cmd = argv[0]; + argv.erase(argv.begin()); + app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); + server.reset(new Netxx::PipeStream(cmd, argv)); + } + else + { +#ifdef USE_IPV6 + bool use_ipv6=true; +#else + bool use_ipv6=false; +#endif + Netxx::Address addr(address().c_str(), + static_cast(constants::netsync_default_port), + use_ipv6); + Netxx::Timeout timeout(static_cast(constants::netsync_timeout_seconds)); + server.reset(new Netxx::Stream(addr, + timeout)); + } + + impl.reset(new session(client_voice, server, address, app)); +} + +client_session::client_session(client_session const & other) +{ + I(false); +} +client_session const & +client_session::operator = (client_session const & other) +{ + I(false); +} + + +bool +client_session::authenticate_as(netsync_session_key const & key) +{ + // This will eventually involve running the network loop a couple times. + I(impl); + I(impl->my_voice == client_voice); + impl->input.set_hmac_key(key); + impl->output.set_hmac_key(key); + return true; +} + +state +client_session::request_service(service * newsrv) +{ + I(impl->my_voice == client_voice); + if (impl->srv) + { + impl->srv->detach(false); + } + newsrv->attach(*impl); + impl->srv = newsrv; + impl->srv->request_service(); + + return run_network_loop(impl); +} + +bool +session::can_process() +{ + if (input.have_netcmd()) + return true; + if (srv) + return srv->can_process(); + else + return false; +} + +state +session::process(transaction_guard & guard) +{ + return srv->process(guard); +} + +Netxx::Probe::ready_type +session::which_events() +{ + Netxx::Probe::ready_type which = Netxx::Probe::ready_oobd; + // Don't ask to read if we still have unprocessed input. + if (!input.full() && !input.have_netcmd()) + { + which = which | Netxx::Probe::ready_read; + } + if (!output.empty()) + { + which = which | Netxx::Probe::ready_write; + } + + return which; +} +//////////////////////////////////////////////////////////////////////// + +static void +drop_session_associated_with_fd(map > & sessions, + Netxx::socket_type fd) +{ + // This is a bit of a hack. Initially all "file descriptors" in + // netsync were full duplex, so we could get away with indexing + // sessions by their file descriptor. + // + // When using pipes in unix, it's no longer true: a session gets + // entered in the session map under its read pipe fd *and* its write + // pipe fd. When we're in such a situation the socket fd is "-1" and + // we downcast to a PipeStream and use its read+write fds. + // + // When using pipes in windows, we use a full duplex pipe (named + // pipe) so the socket-like abstraction holds. + + I(fd != -1); + map >::const_iterator i = sessions.find(fd); + I(i != sessions.end()); + shared_ptr sess = i->second; + fd = sess->str->get_socketfd(); + if (fd != -1) + { + sessions.erase(fd); + } + else + { + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(static_cast(pipe)); + I(pipe->get_writefd() != -1); + I(pipe->get_readfd() != -1); + sessions.erase(pipe->get_readfd()); + sessions.erase(pipe->get_writefd()); + } +} + +static void +arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, + map > & sessions, + set & armed_sessions) +{ + set arm_failed; + for (map >::const_iterator i = sessions.begin(); + i != sessions.end(); ++i) + { + try + { + if (i->second->can_process()) + { + L(FL("fd %d is armed") % i->first); + armed_sessions.insert(i->first); + } + probe.add(*i->second->str, i->second->which_events()); + } + catch (bad_decode & bd) + { + W(F("protocol error while processing peer %s: '%s', marking as bad") + % i->second->peer_id % bd.what); + arm_failed.insert(i->first); + } + } + for (set::const_iterator i = arm_failed.begin(); + i != arm_failed.end(); ++i) + { + drop_session_associated_with_fd(sessions, *i); + } +} + +static void +handle_new_connection(string const & addr, + Netxx::StreamServer & server, + Netxx::Timeout & timeout, + map > & sessions, + app_state & app, + string const & listenaddr) +{ + L(FL("accepting new connection on %s") + % listenaddr); + Netxx::Peer client = server.accept_connection(); + + if (!client) + { + L(FL("accept() returned a dead client")); + } + else + { + P(F("accepted new client connection from %s : %s") + % client.get_address() % lexical_cast(client.get_port())); + + // 'false' here means not to revert changes when the SockOpt + // goes out of scope. + Netxx::SockOpt socket_options(client.get_socketfd(), false); + socket_options.set_non_blocking(); + + shared_ptr str + (new Netxx::Stream(client.get_socketfd(), timeout)); + + /* + shared_ptr sess(new session(source_and_sink_role, server_voice, + "*", "", + app, + ulexical_cast(client), str)); + sess->begin_service(); + */ + shared_ptr sess(new session(server_voice, str, + utf8(lexical_cast(client)), + app)); + sessions.insert(make_pair(client.get_socketfd(), sess)); + } +} + +static void +handle_read_available(Netxx::socket_type fd, + shared_ptr sess, + map > & sessions, + set & armed_sessions, + bool & live_p) +{ + if (sess->read_some()) + { + try + { + if (sess->can_process()) + armed_sessions.insert(fd); + } + catch (bad_decode & bd) + { + W(F("protocol error while processing peer %s: '%s', disconnecting") + % sess->peer_id % bd.what); + drop_session_associated_with_fd(sessions, fd); + live_p = false; + } + } + else + { + /* + switch (sess->protocol_state) + { + case session::working_state: + P(F("peer %s read failed in working state (error)") + % sess->peer_id); + break; + + case session::shutdown_state: + P(F("peer %s read failed in shutdown state " + "(possibly client misreported error)") + % sess->peer_id); + break; + + case session::confirmed_state: + P(F("peer %s read failed in confirmed state (success)") + % sess->peer_id); + break; + } + */ + P(F("peer %s read failed") % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); + live_p = false; + } +} + + +static void +handle_write_available(Netxx::socket_type fd, + shared_ptr sess, + map > & sessions, + bool & live_p) +{ + if (!sess->write_some()) + { + /* + switch (sess->protocol_state) + { + case session::working_state: + P(F("peer %s write failed in working state (error)") + % sess->peer_id); + break; + + case session::shutdown_state: + P(F("peer %s write failed in shutdown state " + "(possibly client misreported error)") + % sess->peer_id); + break; + + case session::confirmed_state: + P(F("peer %s write failed in confirmed state (success)") + % sess->peer_id); + break; + } + */ + P(F("peer %s write failed") % sess->peer_id); + + drop_session_associated_with_fd(sessions, fd); + live_p = false; + } +} + +static void +process_armed_sessions(map > & sessions, + set & armed_sessions, + transaction_guard & guard) +{ + for (set::const_iterator i = armed_sessions.begin(); + i != armed_sessions.end(); ++i) + { + map >::iterator j; + j = sessions.find(*i); + if (j == sessions.end()) + continue; + else + { + shared_ptr sess = j->second; + if (!sess->process(guard)) + { + P(F("peer %s processing finished, disconnecting") + % sess->peer_id); + drop_session_associated_with_fd(sessions, *i); + } + } + } +} + +static void +reap_dead_sessions(map > & sessions, + time_t timeout_seconds) +{ + // Kill any clients which haven't done any i/o inside the timeout period + // or who have exchanged all items and flushed their output buffers. + set dead_clients; + time_t now = ::time(NULL); + for (map >::const_iterator + i = sessions.begin(); i != sessions.end(); ++i) + { + if (i->second->last_io_time + timeout_seconds < now) + { + P(F("fd %d (peer %s) has been idle too long, disconnecting") + % i->first % i->second->peer_id); + dead_clients.insert(i->first); + } + } + for (set::const_iterator i = dead_clients.begin(); + i != dead_clients.end(); ++i) + { + drop_session_associated_with_fd(sessions, *i); + } +} + + + + + +state +run_network_loop(bool client, + shared_ptr server, + map > & sessions, + app_state & app, + string const & listenaddr) +{ + I(!client || sessions.size() == 1); + + unsigned long timeout_seconds + = static_cast(constants::netsync_timeout_seconds); + + Netxx::PipeCompatibleProbe probe; + + Netxx::Timeout + forever, + timeout(static_cast(timeout_seconds)), + instant(0,1); + + unsigned long session_limit = + static_cast(constants::netsync_connection_limit); + + shared_ptr guard; + + set armed_sessions; + while(true) + { + probe.clear(); + armed_sessions.clear(); + + if (server) + { + if (sessions.size() >= session_limit) + W(F("session limit %d reached, some connections " + "will be refused") % session_limit); + else + probe.add(*server); + } + + arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); + + L(FL("i/o probe with %d armed") % armed_sessions.size()); + Netxx::socket_type fd; + Netxx::Timeout how_long; + if (sessions.empty()) + how_long = forever; + else if (armed_sessions.empty()) + how_long = timeout; + else + how_long = instant; + + do + { + Netxx::Probe::result_type res = probe.ready(how_long); + how_long = instant; + Netxx::Probe::ready_type event = res.second; + fd = res.first; + + if (!guard) + guard = shared_ptr(new transaction_guard(app.db)); + + I(guard); + + if (fd == -1) + { + if (armed_sessions.empty()) + L(FL("timed out waiting for I/O (listening on %s)") + % listenaddr); + } + // we either got a new connection + else if (server && fd == *server) + { + handle_new_connection(listenaddr, *server, timeout, + sessions, app, listenaddr); + } + // or an existing session woke up + else + { + map >::iterator i; + i = sessions.find(fd); + if (i == sessions.end()) + { + L(FL("got woken up for action on unknown fd %d") % fd); + } + else + { + probe.remove(*(i->second->str)); + shared_ptr sess = i->second; + bool live_p = true; + + try + { + if (event & Netxx::Probe::ready_read) + handle_read_available(fd, sess, sessions, + armed_sessions, live_p); + + if (live_p && (event & Netxx::Probe::ready_write)) + handle_write_available(fd, sess, sessions, live_p); + } + catch (Netxx::Exception &) + { + P(F("Network error on peer %s, disconnecting") + % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); + } + if (live_p && (event & Netxx::Probe::ready_oobd)) + { + P(F("got OOB from peer %s, disconnecting") + % sess->peer_id); + drop_session_associated_with_fd(sessions, fd); + } + } + } + } + while(fd != -1); + + process_armed_sessions(sessions, armed_sessions, *guard); + reap_dead_sessions(sessions, timeout_seconds); + + if (sessions.empty()) + { + // Let the guard die completely if everything's gone quiet. + guard->commit(); + guard.reset(); + } + + if (client) + { + if (sessions.empty()) + break; + I(sessions.size() == 1); + map >::iterator i = sessions.begin(); + if (!i->second->srv) + break; + } + } + return state::NONE; +} + +shared_ptr +make_server(bool use_ipv6, app_state & app, + string & listenaddr) +{ + Netxx::port_type port; + if (app.opts.bind_port().empty()) + port = std::atoi(app.opts.bind_port().c_str()); + else + port = static_cast(constants::netsync_default_port); + + Netxx::Timeout timeout(static_cast(constants::netsync_timeout_seconds)); + + bool retry = true; + while(retry) + { + retry = false; + try + { + Netxx::Address addr(use_ipv6); + + if (!app.opts.bind_address().empty()) + addr.add_address(app.opts.bind_address().c_str(), port); + else + addr.add_all_addresses(port); + + shared_ptr srv + (new Netxx::StreamServer(addr, timeout)); + + const char * name = addr.get_name(); + listenaddr = (FL("%s : %s") + % (name != NULL ? name : _("")) + % lexical_cast(addr.get_port())).str(); + P(F("beginning service on %s") % listenaddr); + + return srv; + } + catch(Netxx::NetworkException &) + { + if (use_ipv6) + { + use_ipv6 = false; + retry = true; + } + else + throw; + } + catch(Netxx::Exception &) + { + if (use_ipv6) + { + use_ipv6 = false; + retry = true; + } + else + throw; + } + } + + I(false); + return shared_ptr(); +} + +void +serve_connections_forever(utf8 const & addr, app_state & app) +{ +#ifdef USE_IPV6 + bool use_ipv6 = true; +#else + bool use_ipv6 = false; +#endif + ignore_sigpipe(); + string listenaddr; + shared_ptr srv = make_server(use_ipv6, app, listenaddr); + map > sessions; + run_network_loop(false, srv, sessions, app, listenaddr); +} + +void +serve_single_on_stdio(app_state & app) +{ + ignore_sigpipe(); + shared_ptr str(new Netxx::PipeStream(0,1)); + shared_ptr sess(new session(server_voice, str, utf8("stdio"), app)); + + map > sessions; + + if (sess->str->get_socketfd() == -1) + { + // Unix pipes are non-duplex, have two filedescriptors + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(pipe); + sessions[pipe->get_writefd()]=sess; + sessions[pipe->get_readfd()]=sess; + } + else + sessions[sess->str->get_socketfd()]=sess; + + run_network_loop(false, shared_ptr(), sessions, app); +} + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- network.hh 08043161052d6397db7ecb78ed2af4781585690a +++ network.hh 08043161052d6397db7ecb78ed2af4781585690a @@ -0,0 +1,100 @@ +#ifndef __NETWORK_HH__ +#define __NETWORK_HH__ + +#include "app_state.hh" +#include "netcmd.hh" +#include "vocab.hh" + +#include +using boost::shared_ptr; + +namespace service_numbers +{ + const int none = 0; + const int netsync = 1; +} + +class transaction_guard; + +typedef enum + { + server_voice, + client_voice + } +protocol_voice; + +class session; + +struct state +{ + enum _state {SERVICE_DONE, SESSION_DONE, ERROR, RUNNING, NONE}; + _state val; + state(_state from) : val(from) {} + operator _state() { return val; } +}; + +class service +{ +protected: + session * sess; +public: + + service(int num); + virtual ~service(); + virtual service * copy() = 0; + static shared_ptr get(int num); + + void attach(session & s); + void detach(bool received_error); +private: + virtual void detached(bool received_error); // default do nothing +public: + + virtual void begin_service() = 0; // called on the server + virtual void request_service() = 0; // called on the client + + // do we have work to do, even without receiving anything? + virtual bool can_process() = 0; // default false + // do work + virtual state process(transaction_guard & guard) = 0; + + // are we willing to accept input? + virtual bool can_receive(); // default true + + virtual state received(netcmd const & cmd, + transaction_guard & guard) = 0; +protected: + void send(netcmd const & cmd); + bool can_send() const; +}; + + +class client_session +{ + shared_ptr impl; + client_session(client_session const & other);//I(false) + client_session const & operator=(client_session const & other);//I(false) +public: + // client + client_session(utf8 const & addr, app_state & app); + + // client + bool authenticate_as(netsync_session_key const & key); + state request_service(service * newsrv); +}; + +void serve_connections_forever(utf8 const & addr, app_state & app); +void serve_single_on_stdio(app_state & app); + + + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: + +#endif + ============================================================ --- Makefile.am c7cdc7a6ff99623495792d2e28bebccc79483c67 +++ Makefile.am 993653607dcbc4371cd64de6b8f53f0fd4d827c0 @@ -45,7 +45,7 @@ MOST_SOURCES = \ schema_migration.cc schema_migration.hh \ refiner.cc refiner.hh \ enumerator.cc enumerator.hh \ - netsync.cc netsync.hh \ + netsync.cc netsync.hh network.cc network.hh \ netxx_pipe.cc netxx_pipe.hh \ netcmd.cc netcmd.hh \ merkle_tree.cc merkle_tree.hh \ ============================================================ --- cmd_netsync.cc 3480dd77ea18cca0fe7842b0d4807f7e7183a76f +++ cmd_netsync.cc 49b230c1864bddb3dbd355fa5b611afadffb82e6 @@ -1,7 +1,8 @@ #include "cmd.hh" #include "diff_patch.hh" #include "netsync.hh" +#include "network.hh" #include "globish.hh" #include "keys.hh" #include "cert.hh" @@ -132,8 +133,12 @@ CMD(push, N_("network"), N_("[ADDRESS[:P find_key_if_needed(addr, app); extract_patterns(args, include_pattern, exclude_pattern, app); - run_netsync_protocol(client_voice, source_role, addr, - include_pattern, exclude_pattern, app); + netsync_service pusher(netsync_service::push, + include_pattern, + exclude_pattern, + app); + client_session sess(addr, app); + sess.request_service(&pusher); } CMD(pull, N_("network"), N_("[ADDRESS[:PORTNUMBER] [PATTERN ...]]"), @@ -148,8 +153,12 @@ CMD(pull, N_("network"), N_("[ADDRESS[:P if (app.opts.signing_key() == "") P(F("doing anonymous pull; use -kKEYNAME if you need authentication")); - run_netsync_protocol(client_voice, sink_role, addr, - include_pattern, exclude_pattern, app); + netsync_service puller(netsync_service::pull, + include_pattern, + exclude_pattern, + app); + client_session sess(addr, app); + sess.request_service(&puller); } CMD(sync, N_("network"), N_("[ADDRESS[:PORTNUMBER] [PATTERN ...]]"), @@ -163,8 +172,12 @@ CMD(sync, N_("network"), N_("[ADDRESS[:P find_key_if_needed(addr, app); extract_patterns(args, include_pattern, exclude_pattern, app); - run_netsync_protocol(client_voice, source_and_sink_role, addr, - include_pattern, exclude_pattern, app); + netsync_service syncer(netsync_service::sync, + include_pattern, + exclude_pattern, + app); + client_session sess(addr, app); + sess.request_service(&syncer); } class dir_cleanup_helper @@ -280,10 +293,15 @@ CMD(clone, N_("network"), N_("ADDRESS[:P // make sure we're back in the original dir so that file: URIs work change_current_working_dir(start_dir); - - run_netsync_protocol(client_voice, sink_role, addr, - include_pattern, exclude_pattern, app); + netsync_service puller(netsync_service::pull, + include_pattern, + exclude_pattern, + app); + client_session sess(addr, app); + sess.request_service(&puller); + + change_current_working_dir(workspace_dir); transaction_guard guard(app.db, false); @@ -409,8 +427,14 @@ CMD_NO_WORKSPACE(serve, N_("network"), " app.db.ensure_open(); - run_netsync_protocol(server_voice, source_and_sink_role, app.opts.bind_address, - globish("*"), globish(""), app); + if (app.opts.bind_stdio) + { + serve_single_on_stdio(app); + } + else + { + serve_connections_forever(app.opts.bind_address, app); + } } // Local Variables: ============================================================ --- lua_hooks.cc 7455b70dc877298e9c99d421a6f6e3a219ddfb3e +++ lua_hooks.cc ddba95946b674d4d21cbbb03b5c1e96c2035a652 @@ -568,8 +568,6 @@ lua_hooks::hook_get_netsync_connect_comm bool lua_hooks::hook_get_netsync_connect_command(uri const & u, - globish const & include_pattern, - globish const & exclude_pattern, bool debug, std::vector & argv) { @@ -581,20 +579,6 @@ lua_hooks::hook_get_netsync_connect_comm ll.push_table(); - if (!include_pattern().empty()) - { - ll.push_str("include"); - ll.push_str(include_pattern()); - ll.set_table(); - } - - if (!exclude_pattern().empty()) - { - ll.push_str("exclude"); - ll.push_str(exclude_pattern()); - ll.set_table(); - } - if (debug) { ll.push_str("debug"); ============================================================ --- lua_hooks.hh 3136f049c267df992198a278d5e71ee0ee687562 +++ lua_hooks.hh ddadacf111d6a1423ab026b76b5fb833e56c50fa @@ -64,8 +64,6 @@ public: // network hooks bool hook_get_netsync_connect_command(uri const & u, - globish const & include_pattern, - globish const & exclude_pattern, bool debug, std::vector & argv); bool hook_use_transport_auth(uri const & u); ============================================================ --- netsync.cc a1803f7586c78a891069427ea63ccc6ff578947d +++ netsync.cc 10ad3d3e72257627bfea1b28fe952afc79d772a9 @@ -32,6 +32,7 @@ #include "netcmd.hh" #include "netio.hh" #include "netsync.hh" +#include "network.hh" #include "numeric_vocab.hh" #include "packet.hh" #include "refiner.hh" @@ -264,99 +265,39 @@ struct netsync_error netsync_error(string const & s): msg(s) {} }; -class -session: + +class netsync: public refiner_callbacks, public enumerator_callbacks { protocol_role role; - protocol_voice const voice; + // protocol_voice const voice; globish const & our_include_pattern; globish const & our_exclude_pattern; globish_matcher our_matcher; + netsync_service & service; // for sending public: app_state & app; - string const peer_id; - shared_ptr str; -private: +public: + bool armed(); - class input_manager - { - string_queue buffer; - netcmd cmd; - bool have_cmd; - - chained_hmac read_hmac; - - public: - input_manager(bool use_transport_auth) - : have_cmd(false), - read_hmac(constants::netsync_key_initializer, use_transport_auth) - {} - inline bool full() const { return buffer.size() >= constants::netcmd_maxsz; } - inline bool have_netcmd() - { - if (!have_cmd) - { - have_cmd = cmd.read(buffer, read_hmac); - } - return have_cmd; - } - inline void get_netcmd(netcmd & c) - { - I(have_cmd); - c = cmd; - have_cmd = false; - } - inline size_t size() const - { - return buffer.size() + (have_cmd ? cmd.encoded_size() : 0); - } - inline void set_hmac_key(netsync_session_key const & key) - { read_hmac.set_key(key); } - Netxx::signed_size_type read_some_from(shared_ptr str); - }; - input_manager input; - - class output_manager - { - // deque of pair - deque< pair > buffer; - // the total data stored in outbuf - this is - // used as a valve to stop too much data - // backing up - size_t buffer_size; - - chained_hmac write_hmac; - - public: - output_manager(bool use_transport_auth) - : buffer_size(0), - write_hmac(constants::netsync_key_initializer, use_transport_auth) - {} - inline bool full() const { return buffer_size > constants::bufsz * 10; } - inline bool empty() const { return buffer_size == 0; } - inline void set_hmac_key(netsync_session_key const & key) - { write_hmac.set_key(key); } - void queue_netcmd(netcmd const & cmd); - Netxx::signed_size_type write_some_to(shared_ptr str); - }; - output_manager output; - + // Interface. public: - bool armed(); + void begin_service(); + void request_service(); + bool can_process() const; + bool process(transaction_guard & guard); + //private: + // The incoming dispatcher. + bool received(netcmd const & cmd, + transaction_guard & guard); private: id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; bool authenticated; - time_t last_io_time; -public: - bool io_idle_timeout(time_t now, unsigned long timeout); - -private: auto_ptr byte_in_ticker; auto_ptr byte_out_ticker; auto_ptr cert_in_ticker; @@ -430,22 +371,19 @@ public: void note_rev(revision_id const & rev); void note_cert(hexenc const & c); - session(protocol_role role, - protocol_voice voice, + netsync(protocol_role role, globish const & our_include_pattern, globish const & our_exclude_pattern, - app_state & app, - string const & peer, - shared_ptr sock); + netsync_service & wrapper, + app_state & app); - virtual ~session(); + virtual ~netsync(); private: void rev_written_callback(revision_id rid); void key_written_callback(rsa_keypair_id kid); void cert_written_callback(cert const & c); id mk_nonce(); - void mark_recent_io(); void set_session_key(string const & key); void set_session_key(rsa_oaep_sha_data const & key_encrypted); @@ -456,8 +394,8 @@ private: bool received_all_items(); bool finished_working(); - bool can_step(); - void maybe_step(); + bool can_step(); // refiner + void maybe_step(); //refiner public: void maybe_say_goodbye(transaction_guard & guard); @@ -467,15 +405,10 @@ private: void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); -public: - Netxx::Probe::ready_type which_events(); - bool read_some(); - bool write_some(); private: void error(int errcode, string const & errmsg); - void write_netcmd_and_try_flush(netcmd const & cmd); // Outgoing queue-writers. public: // refiner_callbacks @@ -537,10 +470,6 @@ private: delta const & del); bool process_usher_cmd(utf8 const & msg); - // The incoming dispatcher. - bool dispatch_payload(netcmd const & cmd, - transaction_guard & guard); - // Various helpers. void assume_corresponding_role(protocol_role their_role); void respond_to_confirm_cmd(); @@ -554,33 +483,56 @@ private: set const & branches); void send_all_data(netcmd_item_type ty, set const & items); -public: - void begin_service(); - bool process(transaction_guard & guard); }; -size_t session::session_count = 0; -session::session(protocol_role role, - protocol_voice voice, +netsync_service netsync_service::mapped; + +netsync_service::netsync_service() + : service(service_numbers::netsync) +{ +} + +netsync_service::netsync_service(netsync_op what, + globish const & include, + globish const & exclude, + app_state & app) + : service(0) +{ +} + +netsync_service::~netsync_service() +{ +} + +netsync_service::netsync_service(netsync_service const & other) + : service(0) +{ + I(false); +} + +netsync_service const & +netsync_service::operator=(netsync_service const & other) +{ + I(false); +} + + +size_t netsync::session_count = 0; + +netsync::netsync(protocol_role role, globish const & our_include_pattern, globish const & our_exclude_pattern, - app_state & app, - string const & peer, - shared_ptr sock) : + netsync_service & service, + app_state & app) : role(role), - voice(voice), our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), + service(service), app(app), - peer_id(peer), - str(sock), - input(app.opts.use_transport_auth), - output(app.opts.use_transport_auth), remote_peer_key_hash(""), remote_peer_key_name(""), authenticated(false), - last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), @@ -604,15 +556,15 @@ session::session(protocol_role role, rev_refiner(revision_item, voice, *this), rev_enumerator(*this, app) { - dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, + dbw.set_on_revision_written(boost::bind(&netsync::rev_written_callback, this, _1)); - dbw.set_on_cert_written(boost::bind(&session::cert_written_callback, + dbw.set_on_cert_written(boost::bind(&netsync::cert_written_callback, this, _1)); - dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback, + dbw.set_on_pubkey_written(boost::bind(&netsync::key_written_callback, this, _1)); } -session::~session() +netsync::~netsync() { if (protocol_state == confirmed_state) error_code = no_error; @@ -688,7 +640,7 @@ bool } bool -session::process_this_rev(revision_id const & rev) +netsync::process_this_rev(revision_id const & rev) { id item; decode_hexenc(rev.inner(), item); @@ -697,7 +649,7 @@ bool } bool -session::queue_this_cert(hexenc const & c) +netsync::queue_this_cert(hexenc const & c) { id item; decode_hexenc(c, item); @@ -706,13 +658,13 @@ bool } bool -session::queue_this_file(hexenc const & f) +netsync::queue_this_file(hexenc const & f) { return file_items_sent.find(file_id(f)) == file_items_sent.end(); } void -session::note_file_data(file_id const & f) +netsync::note_file_data(file_id const & f) { if (role == sink_role) return; @@ -725,7 +677,7 @@ void } void -session::note_file_delta(file_id const & src, file_id const & dst) +netsync::note_file_delta(file_id const & src, file_id const & dst) { if (role == sink_role) return; @@ -739,7 +691,7 @@ void } void -session::note_rev(revision_id const & rev) +netsync::note_rev(revision_id const & rev) { if (role == sink_role) return; @@ -753,7 +705,7 @@ void } void -session::note_cert(hexenc const & c) +netsync::note_cert(hexenc const & c) { if (role == sink_role) return; @@ -767,23 +719,23 @@ session::note_cert(hexenc const & c) } -void session::rev_written_callback(revision_id rid) +void netsync::rev_written_callback(revision_id rid) { written_revisions.push_back(rid); } -void session::key_written_callback(rsa_keypair_id kid) +void netsync::key_written_callback(rsa_keypair_id kid) { written_keys.push_back(kid); } -void session::cert_written_callback(cert const & c) +void netsync::cert_written_callback(cert const & c) { written_certs.push_back(c); } id -session::mk_nonce() +netsync::mk_nonce() { I(this->saved_nonce().size() == 0); char buf[constants::merkle_hash_length_in_bytes]; @@ -793,30 +745,17 @@ session::mk_nonce() I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); return this->saved_nonce; } - +/* void -session::mark_recent_io() +netsync::set_session_key(string const & key) { - last_io_time = ::time(NULL); -} - -bool -session::io_idle_timeout(time_t now, unsigned long timeout) -{ - return static_cast(last_io_time + timeout) - < static_cast(now); -} - -void -session::set_session_key(string const & key) -{ netsync_session_key session_key = netsync_session_key(key); input.set_hmac_key(session_key); output.set_hmac_key(session_key); } void -session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) +netsync::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) { if (app.opts.use_transport_auth) { @@ -828,9 +767,9 @@ session::set_session_key(rsa_oaep_sha_da set_session_key(hmac_key); } } - +*/ void -session::setup_client_tickers() +netsync::setup_client_tickers() { // xgettext: please use short message and try to avoid multibytes chars byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true)); @@ -861,7 +800,7 @@ bool } bool -session::done_all_refinements() +netsync::done_all_refinements() { bool all = rev_refiner.done && cert_refiner.done @@ -890,7 +829,7 @@ bool bool -session::received_all_items() +netsync::received_all_items() { if (role == source_role) return true; @@ -902,7 +841,7 @@ bool } bool -session::finished_working() +netsync::finished_working() { bool all = done_all_refinements() && received_all_items() @@ -912,7 +851,7 @@ bool } bool -session::queued_all_items() +netsync::queued_all_items() { if (role == sink_role) return true; @@ -925,7 +864,7 @@ void void -session::maybe_note_epochs_finished() +netsync::maybe_note_epochs_finished() { // Maybe there are outstanding epoch requests. // These only matter if we're in sink or source-and-sink mode. @@ -974,7 +913,7 @@ void } void -session::note_item_arrived(netcmd_item_type ty, id const & ident) +netsync::note_item_arrived(netcmd_item_type ty, id const & ident) { switch (ty) { @@ -1006,7 +945,7 @@ void void -session::note_item_sent(netcmd_item_type ty, id const & ident) +netsync::note_item_sent(netcmd_item_type ty, id const & ident) { switch (ty) { @@ -1036,20 +975,11 @@ void } void -session::output_manager::queue_netcmd(netcmd const & cmd) +netsync::write_netcmd_and_try_flush(netcmd const & cmd) { - string buf; - cmd.write(buf, write_hmac); - buffer.push_back(make_pair(buf, 0)); - buffer_size += buf.size(); -} - -void -session::write_netcmd_and_try_flush(netcmd const & cmd) -{ if (!encountered_error) { - output.queue_netcmd(cmd); + service.send(cmd); } else L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); @@ -1065,133 +995,16 @@ void // ensure that our peer receives the error message. // Affects read_some, write_some, and process . void -session::error(int errcode, string const & errmsg) +netsync::error(int errcode, string const & errmsg) { error_code = errcode; throw netsync_error(errmsg); } -Netxx::Probe::ready_type -session::which_events() -{ - Netxx::Probe::ready_type which = Netxx::Probe::ready_oobd; - - // Don't ask to read if we still have unprocessed input. - if (!input.full() && !input.have_netcmd()) - { - which = which | Netxx::Probe::ready_read; - } - - if (!output.empty()) - { - which = which | Netxx::Probe::ready_write; - } - - return which; -} - -Netxx::signed_size_type -session::input_manager::read_some_from(shared_ptr str) -{ - I(!full()); - char tmp[constants::bufsz]; - Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); - - if (count > 0) - { - buffer.append(tmp,count); - } - - return count; -} - -bool -session::read_some() -{ - I(!input.full()); - Netxx::signed_size_type count; - if (!encountered_error) - { - count = input.read_some_from(str); - } - else - { - char bit_bucket[constants::bufsz]; - count = str->read(bit_bucket, sizeof(bit_bucket)); - } - - if (count > 0) - { - L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id); - if (encountered_error) - { - L(FL("in error unwind mode, so throwing them into the bit bucket")); - return true; - } - mark_recent_io(); - if (byte_in_ticker.get() != NULL) - (*byte_in_ticker) += count; - bytes_in += count; - return true; - } - else - return false; -} - -Netxx::signed_size_type -session::output_manager::write_some_to(shared_ptr str) -{ - I(!buffer.empty()); - string & to_write(buffer.front().first); - size_t & writepos(buffer.front().second); - size_t writelen = to_write.size() - writepos; - Netxx::signed_size_type count = str->write(to_write.data() + writepos, - min(writelen, - constants::bufsz)); - if (count > 0) - { - if ((size_t)count == writelen) - { - buffer_size -= to_write.size(); - buffer.pop_front(); - } - else - { - writepos += count; - } - } - return count; -} - -bool -session::write_some() -{ - I(!output.empty()); - Netxx::signed_size_type count = output.write_some_to(str); - if (count > 0) - { - L(FL("wrote %d bytes to fd %d (peer %s)") - % count % str->get_socketfd() % peer_id); - mark_recent_io(); - if (byte_out_ticker.get() != NULL) - (*byte_out_ticker) += count; - bytes_out += count; - if (encountered_error && output.empty()) - { - // we've flushed our error message, so it's time to get out. - L(FL("finished flushing output queue in error unwind mode, disconnecting")); - return false; - } - return true; - } - else - return false; -} - // senders void -session::queue_error_cmd(string const & errmsg) +netsync::queue_error_cmd(string const & errmsg) { L(FL("queueing 'error' command")); netcmd cmd; @@ -1200,7 +1013,7 @@ void } void -session::queue_bye_cmd(u8 phase) +netsync::queue_bye_cmd(u8 phase) { L(FL("queueing 'bye' command, phase %d") % static_cast(phase)); @@ -1210,7 +1023,7 @@ void } void -session::queue_done_cmd(netcmd_item_type type, +netsync::queue_done_cmd(netcmd_item_type type, size_t n_items) { string typestr; @@ -1223,7 +1036,7 @@ void } void -session::queue_hello_cmd(rsa_keypair_id const & key_name, +netsync::queue_hello_cmd(rsa_keypair_id const & key_name, base64 const & pub_encoded, id const & nonce) { @@ -1236,7 +1049,7 @@ void } void -session::queue_anonymous_cmd(protocol_role role, +netsync::queue_anonymous_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & nonce2, @@ -1254,7 +1067,7 @@ void } void -session::queue_auth_cmd(protocol_role role, +netsync::queue_auth_cmd(protocol_role role, globish const & include_pattern, globish const & exclude_pattern, id const & client, @@ -1275,7 +1088,7 @@ void } void -session::queue_confirm_cmd() +netsync::queue_confirm_cmd() { netcmd cmd; cmd.write_confirm_cmd(); @@ -1283,7 +1096,7 @@ void } void -session::queue_refine_cmd(refinement_type ty, merkle_node const & node) +netsync::queue_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; hexenc hpref; @@ -1298,7 +1111,7 @@ void } void -session::queue_data_cmd(netcmd_item_type type, +netsync::queue_data_cmd(netcmd_item_type type, id const & item, string const & dat) { @@ -1332,7 +1145,7 @@ void } void -session::queue_delta_cmd(netcmd_item_type type, +netsync::queue_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del) @@ -1364,7 +1177,7 @@ bool // processors bool -session::process_error_cmd(string const & errmsg) +netsync::process_error_cmd(string const & errmsg) { // "xxx string" with xxx being digits means there's an error code if (errmsg.size() > 4 && errmsg.substr(3,1) == " ") @@ -1389,7 +1202,7 @@ bool static const var_domain known_servers_domain = var_domain("known-servers"); bool -session::process_hello_cmd(rsa_keypair_id const & their_keyname, +netsync::process_hello_cmd(rsa_keypair_id const & their_keyname, rsa_pub_key const & their_key, id const & nonce) { @@ -1503,7 +1316,7 @@ bool } bool -session::process_anonymous_cmd(protocol_role their_role, +netsync::process_anonymous_cmd(protocol_role their_role, globish const & their_include_pattern, globish const & their_exclude_pattern) { @@ -1584,7 +1397,7 @@ void } void -session::assume_corresponding_role(protocol_role their_role) +netsync::assume_corresponding_role(protocol_role their_role) { // Assume the (possibly degraded) opposite role. switch (their_role) @@ -1606,7 +1419,7 @@ bool } bool -session::process_auth_cmd(protocol_role their_role, +netsync::process_auth_cmd(protocol_role their_role, globish const & their_include_pattern, globish const & their_exclude_pattern, id const & client, @@ -1752,7 +1565,7 @@ bool } bool -session::process_refine_cmd(refinement_type ty, merkle_node const & node) +netsync::process_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; netcmd_item_type_to_string(node.type, typestr); @@ -1785,7 +1598,7 @@ bool } bool -session::process_bye_cmd(u8 phase, +netsync::process_bye_cmd(u8 phase, transaction_guard & guard) { @@ -1866,7 +1679,7 @@ bool } bool -session::process_done_cmd(netcmd_item_type type, size_t n_items) +netsync::process_done_cmd(netcmd_item_type type, size_t n_items) { string typestr; netcmd_item_type_to_string(type, typestr); @@ -1904,13 +1717,13 @@ void } void -session::respond_to_confirm_cmd() +netsync::respond_to_confirm_cmd() { epoch_refiner.begin_refinement(); } bool -session::data_exists(netcmd_item_type type, +netsync::data_exists(netcmd_item_type type, id const & item) { hexenc hitem; @@ -1936,7 +1749,7 @@ void } void -session::load_data(netcmd_item_type type, +netsync::load_data(netcmd_item_type type, id const & item, string & out) { @@ -1999,7 +1812,7 @@ bool } bool -session::process_data_cmd(netcmd_item_type type, +netsync::process_data_cmd(netcmd_item_type type, id const & item, string const & dat) { @@ -2105,7 +1918,7 @@ bool } bool -session::process_delta_cmd(netcmd_item_type type, +netsync::process_delta_cmd(netcmd_item_type type, id const & base, id const & ident, delta const & del) @@ -2139,7 +1952,7 @@ bool } bool -session::process_usher_cmd(utf8 const & msg) +netsync::process_usher_cmd(utf8 const & msg) { if (msg().size()) { @@ -2157,7 +1970,7 @@ void void -session::send_all_data(netcmd_item_type ty, set const & items) +netsync::send_all_data(netcmd_item_type ty, set const & items) { string typestr; netcmd_item_type_to_string(ty, typestr); @@ -2181,7 +1994,7 @@ bool } bool -session::dispatch_payload(netcmd const & cmd, +netsync::dispatch_payload(netcmd const & cmd, transaction_guard & guard) { @@ -2353,7 +2166,7 @@ void // This kicks off the whole cascade starting from "hello". void -session::begin_service() +netsync::begin_service() { keypair kp; if (app.opts.use_transport_auth) @@ -2362,15 +2175,15 @@ bool } bool -session::can_step() +netsync::can_step() { return done_all_refinements() && !rev_enumerator.done() - && !output.full(); + && service.can_send(); } void -session::maybe_step() +netsync::maybe_step() { while (can_step()) { @@ -2379,7 +2192,7 @@ void } void -session::maybe_say_goodbye(transaction_guard & guard) +netsync::maybe_say_goodbye(transaction_guard & guard) { if (voice == client_voice && protocol_state == working_state @@ -2392,22 +2205,12 @@ bool } bool -session::armed() +netsync::can_process() const { - // Don't pack the buffer unnecessarily. - if (output.full()) - return false; - - if (input.have_netcmd()) - return true; - - if (can_step()) - return true; - - return false; + return can_step(); } -bool session::process(transaction_guard & guard) +bool netsync::process(transaction_guard & guard) { if (encountered_error) return true; @@ -2456,684 +2259,6 @@ bool session::process(transaction_guard } } - -static shared_ptr -build_stream_to_server(app_state & app, - globish const & include_pattern, - globish const & exclude_pattern, - utf8 const & address, - Netxx::port_type default_port, - Netxx::Timeout timeout) -{ - shared_ptr server; - uri u; - vector argv; - if (parse_uri(address(), u) - && app.lua.hook_get_netsync_connect_command(u, - include_pattern, - exclude_pattern, - global_sanity.debug, - argv)) - { - I(argv.size() > 0); - string cmd = argv[0]; - argv.erase(argv.begin()); - app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); - return shared_ptr - (new Netxx::PipeStream(cmd, argv)); - - } - else - { -#ifdef USE_IPV6 - bool use_ipv6=true; -#else - bool use_ipv6=false; -#endif - Netxx::Address addr(address().c_str(), - default_port, use_ipv6); - return shared_ptr - (new Netxx::Stream(addr, timeout)); - } -} - -static void -call_server(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - app_state & app, - utf8 const & address, - Netxx::port_type default_port, - unsigned long timeout_seconds) -{ - Netxx::PipeCompatibleProbe probe; - transaction_guard guard(app.db); - - Netxx::Timeout timeout(static_cast(timeout_seconds)), instant(0,1); - - // FIXME: split into labels and convert to ace here. - - P(F("connecting to %s") % address()); - - shared_ptr server - = build_stream_to_server(app, - include_pattern, - exclude_pattern, - address, default_port, - timeout); - - - // 'false' here means not to revert changes when the SockOpt - // goes out of scope. - Netxx::SockOpt socket_options(server->get_socketfd(), false); - socket_options.set_non_blocking(); - - session sess(role, client_voice, - include_pattern, - exclude_pattern, - app, address(), server); - - while (true) - { - bool armed = false; - try - { - armed = sess.armed(); - } - catch (bad_decode & bd) - { - E(false, F("protocol error while processing peer %s: '%s'") - % sess.peer_id % bd.what); - } - - sess.maybe_say_goodbye(guard); - - probe.clear(); - probe.add(*(sess.str), sess.which_events()); - Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout); - Netxx::Probe::ready_type event = res.second; - Netxx::socket_type fd = res.first; - - if (fd == -1 && !armed) - { - E(false, (F("timed out waiting for I/O with " - "peer %s, disconnecting") - % sess.peer_id)); - } - - bool all_io_clean = (event != Netxx::Probe::ready_oobd); - - if (event & Netxx::Probe::ready_read) - all_io_clean = all_io_clean && sess.read_some(); - - if (event & Netxx::Probe::ready_write) - all_io_clean = all_io_clean && sess.write_some(); - - if (armed) - if (!sess.process(guard)) - { - // Commit whatever work we managed to accomplish anyways. - guard.commit(); - - // We failed during processing. This should only happen in - // client voice when we have a decode exception, or received an - // error from our server (which is translated to a decode - // exception). We call these cases E() errors. - E(false, F("processing failure while talking to " - "peer %s, disconnecting") - % sess.peer_id); - return; - } - - if (!all_io_clean) - { - // Commit whatever work we managed to accomplish anyways. - guard.commit(); - - // We had an I/O error. We must decide if this represents a - // user-reported error or a clean disconnect. See protocol - // state diagram in session::process_bye_cmd. - - if (sess.protocol_state == session::confirmed_state) - { - P(F("successful exchange with %s") - % sess.peer_id); - return; - } - else if (sess.encountered_error) - { - P(F("peer %s disconnected after we informed them of error") - % sess.peer_id); - return; - } - else - E(false, (F("I/O failure while talking to " - "peer %s, disconnecting") - % sess.peer_id)); - } - } -} - -static void -drop_session_associated_with_fd(map > & sessions, - Netxx::socket_type fd) -{ - // This is a bit of a hack. Initially all "file descriptors" in - // netsync were full duplex, so we could get away with indexing - // sessions by their file descriptor. - // - // When using pipes in unix, it's no longer true: a session gets - // entered in the session map under its read pipe fd *and* its write - // pipe fd. When we're in such a situation the socket fd is "-1" and - // we downcast to a PipeStream and use its read+write fds. - // - // When using pipes in windows, we use a full duplex pipe (named - // pipe) so the socket-like abstraction holds. - - I(fd != -1); - map >::const_iterator i = sessions.find(fd); - I(i != sessions.end()); - shared_ptr sess = i->second; - fd = sess->str->get_socketfd(); - if (fd != -1) - { - sessions.erase(fd); - } - else - { - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(static_cast(pipe)); - I(pipe->get_writefd() != -1); - I(pipe->get_readfd() != -1); - sessions.erase(pipe->get_readfd()); - sessions.erase(pipe->get_writefd()); - } -} - -static void -arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, - map > & sessions, - set & armed_sessions) -{ - set arm_failed; - for (map >::const_iterator i = sessions.begin(); - i != sessions.end(); ++i) - { - try - { - if (i->second->armed()) - { - L(FL("fd %d is armed") % i->first); - armed_sessions.insert(i->first); - } - probe.add(*i->second->str, i->second->which_events()); - } - catch (bad_decode & bd) - { - W(F("protocol error while processing peer %s: '%s', marking as bad") - % i->second->peer_id % bd.what); - arm_failed.insert(i->first); - } - } - for (set::const_iterator i = arm_failed.begin(); - i != arm_failed.end(); ++i) - { - drop_session_associated_with_fd(sessions, *i); - } -} - -static void -handle_new_connection(Netxx::Address & addr, - Netxx::StreamServer & server, - Netxx::Timeout & timeout, - protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - map > & sessions, - app_state & app) -{ - L(FL("accepting new connection on %s : %s") - % (addr.get_name()?addr.get_name():"") % lexical_cast(addr.get_port())); - Netxx::Peer client = server.accept_connection(); - - if (!client) - { - L(FL("accept() returned a dead client")); - } - else - { - P(F("accepted new client connection from %s : %s") - % client.get_address() % lexical_cast(client.get_port())); - - // 'false' here means not to revert changes when the SockOpt - // goes out of scope. - Netxx::SockOpt socket_options(client.get_socketfd(), false); - socket_options.set_non_blocking(); - - shared_ptr str = - shared_ptr - (new Netxx::Stream(client.get_socketfd(), timeout)); - - shared_ptr sess(new session(role, server_voice, - include_pattern, exclude_pattern, - app, - lexical_cast(client), str)); - sess->begin_service(); - sessions.insert(make_pair(client.get_socketfd(), sess)); - } -} - -static void -handle_read_available(Netxx::socket_type fd, - shared_ptr sess, - map > & sessions, - set & armed_sessions, - bool & live_p) -{ - if (sess->read_some()) - { - try - { - if (sess->armed()) - armed_sessions.insert(fd); - } - catch (bad_decode & bd) - { - W(F("protocol error while processing peer %s: '%s', disconnecting") - % sess->peer_id % bd.what); - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } - } - else - { - switch (sess->protocol_state) - { - case session::working_state: - P(F("peer %s read failed in working state (error)") - % sess->peer_id); - break; - - case session::shutdown_state: - P(F("peer %s read failed in shutdown state " - "(possibly client misreported error)") - % sess->peer_id); - break; - - case session::confirmed_state: - P(F("peer %s read failed in confirmed state (success)") - % sess->peer_id); - break; - } - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } -} - - -static void -handle_write_available(Netxx::socket_type fd, - shared_ptr sess, - map > & sessions, - bool & live_p) -{ - if (!sess->write_some()) - { - switch (sess->protocol_state) - { - case session::working_state: - P(F("peer %s write failed in working state (error)") - % sess->peer_id); - break; - - case session::shutdown_state: - P(F("peer %s write failed in shutdown state " - "(possibly client misreported error)") - % sess->peer_id); - break; - - case session::confirmed_state: - P(F("peer %s write failed in confirmed state (success)") - % sess->peer_id); - break; - } - - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } -} - -static void -process_armed_sessions(map > & sessions, - set & armed_sessions, - transaction_guard & guard) -{ - for (set::const_iterator i = armed_sessions.begin(); - i != armed_sessions.end(); ++i) - { - map >::iterator j; - j = sessions.find(*i); - if (j == sessions.end()) - continue; - else - { - shared_ptr sess = j->second; - if (!sess->process(guard)) - { - P(F("peer %s processing finished, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, *i); - } - } - } -} - -static void -reap_dead_sessions(map > & sessions, - unsigned long timeout_seconds) -{ - // Kill any clients which haven't done any i/o inside the timeout period - // or who have exchanged all items and flushed their output buffers. - set dead_clients; - time_t now = ::time(NULL); - for (map >::const_iterator - i = sessions.begin(); i != sessions.end(); ++i) - { - if (i->second->io_idle_timeout(now, timeout_seconds)) - { - P(F("fd %d (peer %s) has been idle too long, disconnecting") - % i->first % i->second->peer_id); - dead_clients.insert(i->first); - } - } - for (set::const_iterator i = dead_clients.begin(); - i != dead_clients.end(); ++i) - { - drop_session_associated_with_fd(sessions, *i); - } -} - -static void -serve_connections(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - app_state & app, - utf8 const & address, - Netxx::port_type default_port, - unsigned long timeout_seconds, - unsigned long session_limit) -{ - Netxx::PipeCompatibleProbe probe; - - Netxx::Timeout - forever, - timeout(static_cast(timeout_seconds)), - instant(0,1); - - if (!app.opts.bind_port().empty()) - default_port = std::atoi(app.opts.bind_port().c_str()); -#ifdef USE_IPV6 - bool use_ipv6=true; -#else - bool use_ipv6=false; -#endif - // This will be true when we try to bind while using IPv6. See comments - // further down. - bool try_again=false; - - do - { - try - { - try_again = false; - - Netxx::Address addr(use_ipv6); - - if (!app.opts.bind_address().empty()) - addr.add_address(app.opts.bind_address().c_str(), default_port); - else - addr.add_all_addresses (default_port); - - // If se use IPv6 and the initialisation of server fails, we want - // to try again with IPv4. The reason is that someone may have - // downloaded a IPv6-enabled monotone on a system that doesn't - // have IPv6, and which might fail therefore. - // On failure, Netxx::NetworkException is thrown, and we catch - // it further down. - try_again=use_ipv6; - - Netxx::StreamServer server(addr, timeout); - - // If we came this far, whatever we used (IPv6 or IPv4) was - // accepted, so we don't need to try again any more. - try_again=false; - - const char *name = addr.get_name(); - P(F("beginning service on %s : %s") - % (name != NULL ? name : _("")) - % lexical_cast(addr.get_port())); - - map > sessions; - set armed_sessions; - - shared_ptr guard; - - while (true) - { - probe.clear(); - armed_sessions.clear(); - - if (sessions.size() >= session_limit) - W(F("session limit %d reached, some connections " - "will be refused") % session_limit); - else - probe.add(server); - - arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); - - L(FL("i/o probe with %d armed") % armed_sessions.size()); - Netxx::socket_type fd; - Netxx::Timeout how_long; - if (sessions.empty()) - how_long = forever; - else if (armed_sessions.empty()) - how_long = timeout; - else - how_long = instant; - do - { - Netxx::Probe::result_type res = probe.ready(how_long); - how_long = instant; - Netxx::Probe::ready_type event = res.second; - fd = res.first; - - if (!guard) - guard = shared_ptr(new transaction_guard(app.db)); - - I(guard); - - if (fd == -1) - { - if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s : %s)") - % addr.get_name() % lexical_cast(addr.get_port())); - } - - // we either got a new connection - else if (fd == server) - handle_new_connection(addr, server, timeout, role, - include_pattern, exclude_pattern, - sessions, app); - - // or an existing session woke up - else - { - map >::iterator i; - i = sessions.find(fd); - if (i == sessions.end()) - { - L(FL("got woken up for action on unknown fd %d") % fd); - } - else - { - probe.remove(*(i->second->str)); - shared_ptr sess = i->second; - bool live_p = true; - - try - { - if (event & Netxx::Probe::ready_read) - handle_read_available(fd, sess, sessions, - armed_sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_write)) - handle_write_available(fd, sess, sessions, live_p); - } - catch (Netxx::Exception &) - { - P(F("Network error on peer %s, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } - if (live_p && (event & Netxx::Probe::ready_oobd)) - { - P(F("got OOB from peer %s, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } - } - } - } - while (fd != -1); - process_armed_sessions(sessions, armed_sessions, *guard); - reap_dead_sessions(sessions, timeout_seconds); - - if (sessions.empty()) - { - // Let the guard die completely if everything's gone quiet. - guard->commit(); - guard.reset(); - } - } - } - // This exception is thrown when bind() fails somewhere in Netxx. - catch (Netxx::NetworkException &) - { - // If we tried with IPv6 and failed, we want to try again using IPv4. - if (try_again) - { - use_ipv6 = false; - } - // In all other cases, just rethrow the exception. - else - throw; - } - // This exception is thrown when there is no support for the type of - // connection we want to do in the kernel, for example when a socket() - // call fails somewhere in Netxx. - catch (Netxx::Exception &) - { - // If we tried with IPv6 and failed, we want to try again using IPv4. - if (try_again) - { - use_ipv6 = false; - } - // In all other cases, just rethrow the exception. - else - throw; - } - } - while(try_again); - } - -static void -serve_single_connection(shared_ptr sess, - unsigned long timeout_seconds) -{ - Netxx::PipeCompatibleProbe probe; - - Netxx::Timeout - forever, - timeout(static_cast(timeout_seconds)), - instant(0,1); - - P(F("beginning service on %s") % sess->peer_id); - - sess->begin_service(); - - transaction_guard guard(sess->app.db); - - map > sessions; - set armed_sessions; - - if (sess->str->get_socketfd() == -1) - { - // Unix pipes are non-duplex, have two filedescriptors - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(pipe); - sessions[pipe->get_writefd()]=sess; - sessions[pipe->get_readfd()]=sess; - } - else - sessions[sess->str->get_socketfd()]=sess; - - while (!sessions.empty()) - { - probe.clear(); - armed_sessions.clear(); - - arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); - - L(FL("i/o probe with %d armed") % armed_sessions.size()); - Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout - : instant)); - Netxx::Probe::ready_type event = res.second; - Netxx::socket_type fd = res.first; - - if (fd == -1) - { - if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s)") - % sess->peer_id); - } - - // an existing session woke up - else - { - map >::iterator i; - i = sessions.find(fd); - if (i == sessions.end()) - { - L(FL("got woken up for action on unknown fd %d") % fd); - } - else - { - shared_ptr sess = i->second; - bool live_p = true; - - if (event & Netxx::Probe::ready_read) - handle_read_available(fd, sess, sessions, armed_sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_write)) - handle_write_available(fd, sess, sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_oobd)) - { - P(F("got some OOB data on fd %d (peer %s), disconnecting") - % fd % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } - } - } - process_armed_sessions(sessions, armed_sessions, guard); - reap_dead_sessions(sessions, timeout_seconds); - } -} - - void insert_with_parents(revision_id rev, refiner & ref, @@ -3168,7 +2293,7 @@ void } void -session::rebuild_merkle_trees(app_state & app, +netsync::rebuild_merkle_trees(app_state & app, set const & branchnames) { P(F("finding items to synchronize:")); @@ -3318,65 +2443,7 @@ session::rebuild_merkle_trees(app_state epoch_refiner.reindex_local_items(); } -void -run_netsync_protocol(protocol_voice voice, - protocol_role role, - utf8 const & addr, - globish const & include_pattern, - globish const & exclude_pattern, - app_state & app) -{ - if (include_pattern().find_first_of("'\"") != string::npos) - { - W(F("include branch pattern contains a quote character:\n" - "%s") % include_pattern()); - } - if (exclude_pattern().find_first_of("'\"") != string::npos) - { - W(F("exclude branch pattern contains a quote character:\n" - "%s") % exclude_pattern()); - } - - // We do not want to be killed by SIGPIPE from a network disconnect. - ignore_sigpipe(); - - try - { - if (voice == server_voice) - { - if (app.opts.bind_stdio) - { - shared_ptr str(new Netxx::PipeStream(0,1)); - shared_ptr sess(new session(role, server_voice, - include_pattern, exclude_pattern, - app, "stdio", str)); - serve_single_connection(sess,constants::netsync_timeout_seconds); - } - else - serve_connections(role, include_pattern, exclude_pattern, app, - addr, static_cast(constants::netsync_default_port), - static_cast(constants::netsync_timeout_seconds), - static_cast(constants::netsync_connection_limit)); - } - else - { - I(voice == client_voice); - call_server(role, include_pattern, exclude_pattern, app, - addr, static_cast(constants::netsync_default_port), - static_cast(constants::netsync_timeout_seconds)); - } - } - catch (Netxx::NetworkException & e) - { - throw informative_failure((F("network error: %s") % e.what()).str()); - } - catch (Netxx::Exception & e) - { - throw oops((F("network error: %s") % e.what()).str());; - } -} - // Local Variables: // mode: C++ // fill-column: 76 ============================================================ --- netsync.hh 2a4d8e88f8774f885470a658ba101a06bc403eeb +++ netsync.hh 64567ee872332b85777648ed7b1aa8682cdbf664 @@ -13,23 +13,40 @@ #include #include "app_state.hh" -#include "netcmd.hh" +#include "network.hh" #include "vocab.hh" -typedef enum - { - server_voice, - client_voice - } -protocol_voice; +#include +using boost::shared_ptr; -void run_netsync_protocol(protocol_voice voice, - protocol_role role, - utf8 const & addr, - globish const & include_pattern, - globish const & exclude_pattern, - app_state & app); +class netsync; +class netsync_service : public service +{ + shared_ptr impl; + static netsync_service mapped; + friend class netsync; + void send(netcmd const & cmd); + bool can_send() const; + netsync_service(); + netsync_service(netsync_service const & other);//I(false) + netsync_service const & operator=(netsync_service const & other);//I(false) +public: + enum netsync_op {push, pull, sync}; + netsync_service(netsync_op what, + globish const & include, + globish const & exclude, + app_state & app); + ~netsync_service(); + + service * copy(); + void begin_service(); + void request_service(); + bool can_process(); + state process(transaction_guard & guard); + state received(netcmd const & cmd, transaction_guard & guard); +}; + // Local Variables: // mode: C++ // fill-column: 76 ============================================================ --- refiner.cc fd4795cb306c92c03cac8e9463551562c1f14f37 +++ refiner.cc c4bacfb1eadaa252c39136468356585bdeebcc77 @@ -18,7 +18,6 @@ #include "vocab.hh" #include "merkle_tree.hh" #include "netcmd.hh" -#include "netsync.hh" using std::inserter; using std::make_pair; ============================================================ --- refiner.hh 66938922057cfdd7aa84b4dcb0d9640c8ac7da1f +++ refiner.hh 1fffe4f5a38822e83891c4f9cf8676b876eccda9 @@ -15,7 +15,7 @@ #include "vocab.hh" #include "merkle_tree.hh" #include "netcmd.hh" -#include "netsync.hh" +#include "network.hh" // This file defines the "refiner" class, which is a helper encapsulating // the main tricky part of the netsync algorithm. You must construct a