# # # patch "netsync.cc" # from [9b3abe03331c5a761b92cf0537c739a3057caed7] # to [a6fa0d8cf1ad02d3e42aa1a84a3951d686c6b88f] # ============================================================ --- netsync.cc 9b3abe03331c5a761b92cf0537c739a3057caed7 +++ netsync.cc a6fa0d8cf1ad02d3e42aa1a84a3951d686c6b88f @@ -314,6 +314,29 @@ class session_base class session_base { + bool read_some(); + bool write_some(); + void mark_recent_io() + { + last_io_time = ::time(NULL); + } +protected: + virtual void note_bytes_in(int count) { return; } + virtual void note_bytes_out(int count) { return; } + string_queue inbuf; +private: + deque< pair > outbuf; + size_t outbuf_size; // so we can avoid queueing up too much stuff +protected: + void queue_output(string const & s) + { + outbuf.push_back(make_pair(s, 0)); + outbuf_size += s.size(); + } + bool output_overfull() const + { + return outbuf.size() > constants::bufsz * 10; + } public: string peer_id; shared_ptr str; @@ -331,6 +354,7 @@ public: session_base(string const & peer_id, shared_ptr str) : + outbuf_size(0), peer_id(peer_id), str(str), last_io_time(::time(NULL)), protocol_state(working_state), @@ -339,13 +363,146 @@ public: virtual ~session_base() { } virtual bool arm() = 0; - virtual Netxx::Probe::ready_type which_events() = 0; - virtual bool do_io(Netxx::Probe::ready_type) = 0; virtual bool do_work(transaction_guard & guard) = 0; - //virtual void begin_service(); + virtual Netxx::Probe::ready_type which_events(); + virtual bool do_io(Netxx::Probe::ready_type); }; +Netxx::Probe::ready_type +session_base::which_events() +{ + Netxx::Probe::ready_type ret = Netxx::Probe::ready_oobd; + if (!outbuf.empty()) + { + L(FL("probing write on %s") % peer_id); + ret = ret | Netxx::Probe::ready_write; + } + // Only ask to read if we're not armed, don't go storing + // 128 MB at a time unless we think we need to. + if (inbuf.size() < constants::netcmd_maxsz && !arm()) + { + L(FL("probing read on %s") % peer_id); + ret = ret | Netxx::Probe::ready_read; + } + return ret; +} + +bool +session_base::read_some() +{ + I(inbuf.size() < constants::netcmd_maxsz); + char tmp[constants::bufsz]; + Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); + if (count > 0) + { + L(FL("read %d bytes from fd %d (peer %s)") + % count % str->get_socketfd() % peer_id); + if (encountered_error) + { + L(FL("in error unwind mode, so throwing them into the bit bucket")); + return true; + } + inbuf.append(tmp,count); + mark_recent_io(); + note_bytes_in(count); + return true; + } + else + return false; +} + +bool +session_base::write_some() +{ + I(!outbuf.empty()); + size_t writelen = outbuf.front().first.size() - outbuf.front().second; + Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, + min(writelen, + constants::bufsz)); + if (count > 0) + { + if ((size_t)count == writelen) + { + outbuf_size -= outbuf.front().first.size(); + outbuf.pop_front(); + } + else + { + outbuf.front().second += count; + } + L(FL("wrote %d bytes to fd %d (peer %s)") + % count % str->get_socketfd() % peer_id); + mark_recent_io(); + note_bytes_out(count); + if (encountered_error && outbuf.empty()) + { + // we've flushed our error message, so it's time to get out. + L(FL("finished flushing output queue in error unwind mode, disconnecting")); + return false; + } + return true; + } + else + return false; +} + +bool +session_base::do_io(Netxx::Probe::ready_type what) +{ + bool ok = true; + try + { + if (what & Netxx::Probe::ready_read) + { + if (!read_some()) + ok = false; + } + if (what & Netxx::Probe::ready_write) + { + if (!write_some()) + ok = false; + } + + if (what & Netxx::Probe::ready_oobd) + { + P(F("got OOB from peer %s, disconnecting") + % peer_id); + ok = false; + } + else if (!ok) + { + switch (protocol_state) + { + case working_state: + P(F("peer %s IO failed in working state (error)") + % peer_id); + break; + + case shutdown_state: + P(F("peer %s IO failed in shutdown state " + "(possibly client misreported error)") + % peer_id); + break; + + case confirmed_state: + P(F("peer %s IO failed in confirmed state (success)") + % peer_id); + break; + } + } + } + catch (Netxx::Exception & e) + { + P(F("Network error on peer %s, disconnecting") + % peer_id); + ok = false; + } + return ok; +} + +//////////////////////////////////////////////////////////////////////// + class session: public refiner_callbacks, @@ -365,14 +522,6 @@ session: rsa_keypair_id const & signing_key; vector const & keys_to_push; - string_queue inbuf; - // deque of pair - deque< pair > outbuf; - // the total data stored in outbuf - this is - // used as a valve to stop too much data - // backing up - size_t outbuf_size; - netcmd cmd; bool armed; public: @@ -468,7 +617,6 @@ private: private: id mk_nonce(); - void mark_recent_io(); void set_session_key(string const & key); void set_session_key(rsa_oaep_sha_data const & key_encrypted); @@ -486,12 +634,10 @@ public: void note_item_sent(netcmd_item_type ty, id const & i); public: - Netxx::Probe::ready_type which_events(); - bool do_io(Netxx::Probe::ready_type what); bool do_work(transaction_guard & guard); private: - bool read_some(); - bool write_some(); + void note_bytes_in(int count); + void note_bytes_out(int count); void error(int errcode, string const & errmsg); @@ -599,8 +745,6 @@ session::session(options & opts, use_transport_auth(opts.use_transport_auth), signing_key(opts.signing_key), keys_to_push(opts.keys_to_push), - inbuf(), - outbuf_size(0), armed(false), received_remote_key(false), remote_peer_key_name(""), @@ -833,12 +977,6 @@ void } void -session::mark_recent_io() -{ - last_io_time = ::time(NULL); -} - -void session::set_session_key(string const & key) { session_key = netsync_session_key(key); @@ -1071,15 +1209,10 @@ session::write_netcmd_and_try_flush(netc { string buf; cmd.write(buf, write_hmac); - outbuf.push_back(make_pair(buf, 0)); - outbuf_size += buf.size(); + queue_output(buf); } else L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); - // FIXME: this helps keep the protocol pipeline full but it seems to - // interfere with initial and final sequences. careful with it. - // write_some(); - // read_some(); } // This method triggers a special "error unwind" mode to netsync. In this @@ -1094,80 +1227,7 @@ session::error(int errcode, string const throw netsync_error(errmsg); } -Netxx::Probe::ready_type -session::which_events() -{ - Netxx::Probe::ready_type ret = Netxx::Probe::ready_oobd; - if (!outbuf.empty()) - { - L(FL("probing write on %s") % peer_id); - ret = ret | Netxx::Probe::ready_write; - } - // Only ask to read if we're not armed, don't go storing - // 128 MB at a time unless we think we need to. - if (inbuf.size() < constants::netcmd_maxsz && !arm()) - { - L(FL("probing read on %s") % peer_id); - ret = ret | Netxx::Probe::ready_read; - } - return ret; -} - bool -session::do_io(Netxx::Probe::ready_type what) -{ - bool ok = true; - try - { - if (what & Netxx::Probe::ready_read) - { - if (!read_some()) - ok = false; - } - if (what & Netxx::Probe::ready_write) - { - if (!write_some()) - ok = false; - } - - if (what & Netxx::Probe::ready_oobd) - { - P(F("got OOB from peer %s, disconnecting") - % peer_id); - ok = false; - } - else if (!ok) - { - switch (protocol_state) - { - case working_state: - P(F("peer %s IO failed in working state (error)") - % peer_id); - break; - - case shutdown_state: - P(F("peer %s IO failed in shutdown state " - "(possibly client misreported error)") - % peer_id); - break; - - case confirmed_state: - P(F("peer %s IO failed in confirmed state (success)") - % peer_id); - break; - } - } - } - catch (Netxx::Exception & e) - { - P(F("Network error on peer %s, disconnecting") - % peer_id); - ok = false; - } - return ok; -} - -bool session::do_work(transaction_guard & guard) { if (process(guard)) @@ -1180,67 +1240,20 @@ session::do_work(transaction_guard & gua return false; } -bool -session::read_some() +void +session::note_bytes_in(int count) { - I(inbuf.size() < constants::netcmd_maxsz); - char tmp[constants::bufsz]; - Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); - if (count > 0) - { - L(FL("read %d bytes from fd %d (peer %s)") - % count % str->get_socketfd() % peer_id); - if (encountered_error) - { - L(FL("in error unwind mode, so throwing them into the bit bucket")); - return true; - } - inbuf.append(tmp,count); - mark_recent_io(); - if (byte_in_ticker.get() != NULL) - (*byte_in_ticker) += count; - bytes_in += count; - return true; - } - else - return false; + if (byte_in_ticker.get() != NULL) + (*byte_in_ticker) += count; + bytes_in += count; } -bool -session::write_some() +void +session::note_bytes_out(int count) { - I(!outbuf.empty()); - size_t writelen = outbuf.front().first.size() - outbuf.front().second; - Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, - min(writelen, - constants::bufsz)); - if (count > 0) - { - if ((size_t)count == writelen) - { - outbuf_size -= outbuf.front().first.size(); - outbuf.pop_front(); - } - else - { - outbuf.front().second += count; - } - L(FL("wrote %d bytes to fd %d (peer %s)") - % count % str->get_socketfd() % peer_id); - mark_recent_io(); - if (byte_out_ticker.get() != NULL) - (*byte_out_ticker) += count; - bytes_out += count; - if (encountered_error && outbuf.empty()) - { - // we've flushed our error message, so it's time to get out. - L(FL("finished flushing output queue in error unwind mode, disconnecting")); - return false; - } - return true; - } - else - return false; + if (byte_out_ticker.get() != NULL) + (*byte_out_ticker) += count; + bytes_out += count; } // senders @@ -2448,7 +2461,7 @@ session::maybe_step() { while (done_all_refinements() && !rev_enumerator.done() - && outbuf_size < constants::bufsz * 10) + && !output_overfull()) { rev_enumerator.step(); } @@ -2473,7 +2486,7 @@ session::arm() if (!armed) { // Don't pack the buffer unnecessarily. - if (outbuf_size > constants::bufsz * 10) + if (output_overfull()) return false; if (cmd.read(inbuf, read_hmac))