# # # patch "netsync.cc" # from [bae6df9d0412407356a431eb242087afa547bfaa] # to [c5c9d7589daa10da3a011215097402ed0c445990] # ============================================================ --- netsync.cc bae6df9d0412407356a431eb242087afa547bfaa +++ netsync.cc c5c9d7589daa10da3a011215097402ed0c445990 @@ -465,20 +465,20 @@ private: bool queued_all_items(); bool received_all_items(); bool finished_working(); -public: void maybe_step(); void maybe_say_goodbye(transaction_guard & guard); -private: void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); public: - Netxx::Probe::ready_type which_events() const; + Netxx::Probe::ready_type which_events(); + bool do_io(Netxx::Probe::ready_type what); + bool do_work(transaction_guard & guard); +private: bool read_some(); bool write_some(); -private: void error(int errcode, string const & errmsg); @@ -556,8 +556,8 @@ public: void send_all_data(netcmd_item_type ty, set const & items); public: void begin_service(); +private: bool process(transaction_guard & guard); -private: bool initiated_by_server; }; @@ -889,6 +889,7 @@ session::done_all_refinements() if (all && !set_totals) { + L(FL("All refinements done for peer %s") % peer_id); if (cert_out_ticker.get()) cert_out_ticker->set_total(cert_refiner.items_to_send.size()); @@ -1085,23 +1086,59 @@ Netxx::Probe::ready_type } Netxx::Probe::ready_type -session::which_events() const +session::which_events() { Netxx::Probe::ready_type ret = Netxx::Probe::ready_oobd; if (!outbuf.empty()) { + L(FL("probing write on %s") % peer_id); ret = ret | Netxx::Probe::ready_write; } // Only ask to read if we're not armed, don't go storing // 128 MB at a time unless we think we need to. - if (inbuf.size() < constants::netcmd_maxsz && !armed) + if (inbuf.size() < constants::netcmd_maxsz && !arm()) { + L(FL("probing read on %s") % peer_id); ret = ret | Netxx::Probe::ready_read; } return ret; } bool +session::do_io(Netxx::Probe::ready_type what) +{ + bool ok = true; + if (what & Netxx::Probe::ready_read) + { + if (!read_some()) + ok = false; + } + if (what & Netxx::Probe::ready_write) + { + if (!write_some()) + ok = false; + } + if (what & Netxx::Probe::ready_oobd) + { + ok = false; + } + return ok; +} + +bool +session::do_work(transaction_guard & guard) +{ + if (process(guard)) + { + maybe_step(); + maybe_say_goodbye(guard); + return true; + } + else + return false; +} + +bool session::read_some() { I(inbuf.size() < constants::netcmd_maxsz); @@ -2428,8 +2465,8 @@ bool session::process(transaction_guard guard.maybe_checkpoint(sz); if (!ret) - L(FL("finishing processing with '%d' packet") - % cmd.get_cmd_code()); + L(FL("peer %s finishing processing with '%d' packet") + % peer_id % cmd.get_cmd_code()); return ret; } catch (bad_decode & bd) @@ -2530,9 +2567,21 @@ call_server(options & opts, % sess.peer_id % bd.what); } - sess.maybe_step(); - sess.maybe_say_goodbye(guard); + if (!sess.do_work(guard)) + { + // Commit whatever work we managed to accomplish anyways. + guard.commit(); + // We failed during processing. This should only happen in + // client voice when we have a decode exception, or received an + // error from our server (which is translated to a decode + // exception). We call these cases E() errors. + E(false, F("processing failure while talking to " + "peer %s, disconnecting") + % sess.peer_id); + return; + } + probe.clear(); probe.add(*(sess.str), sess.which_events()); Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout); @@ -2546,31 +2595,7 @@ call_server(options & opts, % sess.peer_id)); } - bool all_io_clean = (event != Netxx::Probe::ready_oobd); - - if (event & Netxx::Probe::ready_read) - all_io_clean = all_io_clean && sess.read_some(); - - if (event & Netxx::Probe::ready_write) - all_io_clean = all_io_clean && sess.write_some(); - - if (armed) - if (!sess.process(guard)) - { - // Commit whatever work we managed to accomplish anyways. - guard.commit(); - - // We failed during processing. This should only happen in - // client voice when we have a decode exception, or received an - // error from our server (which is translated to a decode - // exception). We call these cases E() errors. - E(false, F("processing failure while talking to " - "peer %s, disconnecting") - % sess.peer_id); - return; - } - - if (!all_io_clean) + if (!sess.do_io(event)) { // Commit whatever work we managed to accomplish anyways. guard.commit(); @@ -2636,33 +2661,65 @@ drop_session_associated_with_fd(map const & fds, + shared_ptr sess) +{ + Netxx::socket_type fd = sess->str->get_socketfd(); + + if (fd == -1) + { + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(pipe); + if (fds.find(pipe->get_readfd()) != fds.end()) + return true; + if (fds.find(pipe->get_writefd()) != fds.end()) + return true; + } + else if (fds.find(fd) != fds.end()) + return true; + + return false; +} + +static int arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, map > & sessions, set & armed_sessions, transaction_guard & guard) { + int probing = 0; set arm_failed; for (map >::const_iterator i = sessions.begin(); i != sessions.end(); ++i) { - i->second->maybe_step(); - i->second->maybe_say_goodbye(guard); - try + // stdio sessions are entered twice + if (session_is_in_fd_set(arm_failed, i->second)) + continue; + if (!i->second->do_work(guard)) { - if (i->second->arm()) - { - L(FL("fd %d is armed") % i->first); - armed_sessions.insert(i->first); - } - probe.add(*i->second->str, i->second->which_events()); + arm_failed.insert(i->first); } - catch (bad_decode & bd) + else { - W(F("protocol error while processing peer %s: '%s', marking as bad") - % i->second->peer_id % bd.what); - arm_failed.insert(i->first); + try + { + if (i->second->arm()) + { + L(FL("fd %d is armed") % i->first); + armed_sessions.insert(i->first); + } + probe.add(*i->second->str, i->second->which_events()); + ++probing; + } + catch (bad_decode & bd) + { + W(F("protocol error while processing peer %s: '%s', marking as bad") + % i->second->peer_id % bd.what); + arm_failed.insert(i->first); + } } } for (set::const_iterator i = arm_failed.begin(); @@ -2670,6 +2727,7 @@ arm_sessions_and_calculate_probe(Netxx:: { drop_session_associated_with_fd(sessions, *i); } + return probing; } static void @@ -2715,107 +2773,41 @@ static void } static void -handle_read_available(Netxx::socket_type fd, - shared_ptr sess, - map > & sessions, - set & armed_sessions, - bool & live_p) +handle_io(Netxx::socket_type fd, + shared_ptr sess, + map > & sessions, + Netxx::Probe::ready_type what) { - if (sess->read_some()) + if (!sess->do_io(what)) { - try + if (what & Netxx::Probe::ready_oobd) { - if (sess->arm()) - armed_sessions.insert(fd); - } - catch (bad_decode & bd) - { - W(F("protocol error while processing peer %s: '%s', disconnecting") - % sess->peer_id % bd.what); - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } - } - else - { - switch (sess->protocol_state) - { - case session::working_state: - P(F("peer %s read failed in working state (error)") + P(F("got OOB from peer %s, disconnecting") % sess->peer_id); - break; - - case session::shutdown_state: - P(F("peer %s read failed in shutdown state " - "(possibly client misreported error)") - % sess->peer_id); - break; - - case session::confirmed_state: - P(F("peer %s read failed in confirmed state (success)") - % sess->peer_id); - break; } - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } -} - - -static void -handle_write_available(Netxx::socket_type fd, - shared_ptr sess, - map > & sessions, - bool & live_p) -{ - if (!sess->write_some()) - { - switch (sess->protocol_state) + else { - case session::working_state: - P(F("peer %s write failed in working state (error)") - % sess->peer_id); - break; + switch (sess->protocol_state) + { + case session::working_state: + P(F("peer %s IO failed in working state (error)") + % sess->peer_id); + break; - case session::shutdown_state: - P(F("peer %s write failed in shutdown state " - "(possibly client misreported error)") - % sess->peer_id); - break; + case session::shutdown_state: + P(F("peer %s IO failed in shutdown state " + "(possibly client misreported error)") + % sess->peer_id); + break; - case session::confirmed_state: - P(F("peer %s write failed in confirmed state (success)") - % sess->peer_id); - break; - } - - drop_session_associated_with_fd(sessions, fd); - live_p = false; - } -} - -static void -process_armed_sessions(map > & sessions, - set & armed_sessions, - transaction_guard & guard) -{ - for (set::const_iterator i = armed_sessions.begin(); - i != armed_sessions.end(); ++i) - { - map >::iterator j; - j = sessions.find(*i); - if (j == sessions.end()) - continue; - else - { - shared_ptr sess = j->second; - if (!sess->process(guard)) - { - P(F("peer %s processing finished, disconnecting") + case session::confirmed_state: + P(F("peer %s IO failed in confirmed state (success)") % sess->peer_id); - drop_session_associated_with_fd(sessions, *i); + break; } } + + drop_session_associated_with_fd(sessions, fd); } } @@ -2946,6 +2938,10 @@ serve_connections(options & opts, I(guard); + int probing = arm_sessions_and_calculate_probe(probe, sessions, + armed_sessions, + *guard); + while (!server_initiated_sync_requests.empty()) { server_initiated_sync_request request @@ -2989,6 +2985,9 @@ serve_connections(options & opts, server, true)); sessions.insert(make_pair(server->get_socketfd(), sess)); + probe.add(*server, sess->which_events()); + ++probing; + L(FL("Opened connection to %s") % sess->peer_id); } catch (Netxx::NetworkException & e) { @@ -2996,9 +2995,8 @@ serve_connections(options & opts, } } - arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard); - - L(FL("i/o probe with %d armed") % armed_sessions.size()); + L(FL("i/o probe with %d armed, %d probing") + % armed_sessions.size() % probing); Netxx::socket_type fd; Netxx::Timeout how_long; if (sessions.empty()) @@ -3040,16 +3038,10 @@ serve_connections(options & opts, { probe.remove(*(i->second->str)); shared_ptr sess = i->second; - bool live_p = true; try { - if (event & Netxx::Probe::ready_read) - handle_read_available(fd, sess, sessions, - armed_sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_write)) - handle_write_available(fd, sess, sessions, live_p); + handle_io(fd, sess, sessions, event); } catch (Netxx::Exception &) { @@ -3057,17 +3049,10 @@ serve_connections(options & opts, % sess->peer_id); drop_session_associated_with_fd(sessions, fd); } - if (live_p && (event & Netxx::Probe::ready_oobd)) - { - P(F("got OOB from peer %s, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } } } } while (fd != -1); - process_armed_sessions(sessions, armed_sessions, *guard); reap_dead_sessions(sessions, timeout_seconds); if (sessions.empty()) @@ -3145,9 +3130,12 @@ serve_single_connection(shared_ptr sess = i->second; - bool live_p = true; - if (event & Netxx::Probe::ready_read) - handle_read_available(fd, sess, sessions, armed_sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_write)) - handle_write_available(fd, sess, sessions, live_p); - - if (live_p && (event & Netxx::Probe::ready_oobd)) - { - P(F("got some OOB data on fd %d (peer %s), disconnecting") - % fd % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } + handle_io(fd, sess, sessions, event); } } - process_armed_sessions(sessions, armed_sessions, guard); reap_dead_sessions(sessions, timeout_seconds); } }