# # # patch "netsync.cc" # from [10ad3d3e72257627bfea1b28fe952afc79d772a9] # to [a935a5974284fa7bbf52d47e38dd2f169a1473f9] # # patch "netsync.hh" # from [64567ee872332b85777648ed7b1aa8682cdbf664] # to [8e88ebfef5a5373dc7b3266df3b9d4f96a48d375] # # patch "network.cc" # from [2d2c39ab3475fb0bb10d8d2b82e5f1ab196ee3f3] # to [1281bc4bea2223b142681d4fb25582e5eecf1654] # # patch "refiner.cc" # from [c4bacfb1eadaa252c39136468356585bdeebcc77] # to [77b1704542514b454a44284f5b2983f2ac2c191f] # # patch "refiner.hh" # from [1fffe4f5a38822e83891c4f9cf8676b876eccda9] # to [aa5292c044229db47773869c9b48780d6351f053] # ============================================================ --- netsync.cc 10ad3d3e72257627bfea1b28fe952afc79d772a9 +++ netsync.cc a935a5974284fa7bbf52d47e38dd2f169a1473f9 @@ -276,6 +276,7 @@ class netsync: globish const & our_exclude_pattern; globish_matcher our_matcher; netsync_service & service; // for sending + protocol_voice voice; public: app_state & app; @@ -284,16 +285,24 @@ public: // Interface. public: + netsync(protocol_role role, + globish const & our_include_pattern, + globish const & our_exclude_pattern, + netsync_service & wrapper, + app_state & app); + void begin_service(); void request_service(); - bool can_process() const; + bool can_process(); bool process(transaction_guard & guard); //private: // The incoming dispatcher. bool received(netcmd const & cmd, transaction_guard & guard); + + private: - + void write_netcmd_and_try_flush(netcmd const & cmd); id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; bool authenticated; @@ -371,12 +380,6 @@ public: void note_rev(revision_id const & rev); void note_cert(hexenc const & c); - netsync(protocol_role role, - globish const & our_include_pattern, - globish const & our_exclude_pattern, - netsync_service & wrapper, - app_state & app); - virtual ~netsync(); private: void rev_written_callback(revision_id rid); @@ -385,8 +388,8 @@ private: id mk_nonce(); - void set_session_key(string const & key); - void set_session_key(rsa_oaep_sha_data const & key_encrypted); + //void set_session_key(string const & key); + //void set_session_key(rsa_oaep_sha_data const & key_encrypted); void setup_client_tickers(); bool done_all_refinements(); @@ -507,7 +510,6 @@ netsync_service::netsync_service(netsync netsync_service::netsync_service(netsync_service const & other) : service(0) { - I(false); } netsync_service const & @@ -516,7 +518,55 @@ netsync_service::operator=(netsync_servi I(false); } +bool +netsync_service::can_send() const +{ + return service::can_send(); +} +void +netsync_service::send(netcmd const & cmd) +{ + service::send(cmd); +} + +service * +netsync_service::copy() +{ + return new netsync_service(*this); +} + +void +netsync_service::begin_service() +{ + impl->begin_service(); +} + +void +netsync_service::request_service() +{ + impl->request_service(); +} + +bool +netsync_service::can_process() +{ + return impl->can_process(); +} + +state +netsync_service::process(transaction_guard & guard) +{ + return impl->process(guard)?state::RUNNING:state::ERROR; +} + +state +netsync_service::received(netcmd const & cmd, transaction_guard & guard) +{ + return impl->received(cmd, guard)?state::RUNNING:state::ERROR; +} + + size_t netsync::session_count = 0; netsync::netsync(protocol_role role, @@ -550,10 +600,10 @@ netsync::netsync(protocol_role role, encountered_error(false), error_code(no_transfer), set_totals(false), - epoch_refiner(epoch_item, voice, *this), - key_refiner(key_item, voice, *this), - cert_refiner(cert_item, voice, *this), - rev_refiner(revision_item, voice, *this), + epoch_refiner(epoch_item, *this), + key_refiner(key_item, *this), + cert_refiner(cert_item, *this), + rev_refiner(revision_item, *this), rev_enumerator(*this, app) { dbw.set_on_revision_written(boost::bind(&netsync::rev_written_callback, @@ -639,6 +689,23 @@ netsync::~netsync() keys_in, keys_out); } +void +netsync::request_service() +{ + //FIXME +} + +// This kicks off the whole cascade starting from "hello". +void +netsync::begin_service() +{ + keypair kp; + if (app.opts.use_transport_auth) + app.keys.get_key_pair(app.opts.signing_key, kp); + queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce()); +} + + bool netsync::process_this_rev(revision_id const & rev) { @@ -1063,7 +1130,7 @@ netsync::queue_anonymous_cmd(protocol_ro cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern, hmac_key_encrypted); write_netcmd_and_try_flush(cmd); - set_session_key(nonce2()); + //set_session_key(nonce2()); } void @@ -1084,7 +1151,7 @@ netsync::queue_auth_cmd(protocol_role ro cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client, nonce1, hmac_key_encrypted, signature); write_netcmd_and_try_flush(cmd); - set_session_key(nonce2()); + //set_session_key(nonce2()); } void @@ -1217,7 +1284,7 @@ netsync::process_hello_cmd(rsa_keypair_i encode_base64(their_key, their_key_encoded); key_hash_code(their_keyname, their_key_encoded, their_key_hash); L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash); - var_key their_key_key(known_servers_domain, var_name(peer_id)); + var_key their_key_key(known_servers_domain, var_name(/*peer_id*/"peer_id")); if (app.db.var_exists(their_key_key)) { var_value expected_key_hash; @@ -1239,9 +1306,10 @@ netsync::process_hello_cmd(rsa_keypair_i } else { + // FIXME P(F("first time connecting to server %s\n" "I'll assume it's really them, but you might want to double-check\n" - "their key's fingerprint: %s") % peer_id % their_key_hash); + "their key's fingerprint: %s") % /*peer_id*/"peer_id" % their_key_hash); app.db.set_var(their_key_key, var_value(their_key_hash())); } if (!app.db.public_key_exists(their_key_hash)) @@ -1308,10 +1376,12 @@ netsync::process_hello_cmd(rsa_keypair_i our_exclude_pattern, mk_nonce(), their_key_encoded); } + // FIXME + /* app.lua.hook_note_netsync_start(session_id, "client", this->role, peer_id, their_keyname, our_include_pattern, our_exclude_pattern); - + */ return true; } @@ -1331,11 +1401,12 @@ netsync::process_anonymous_cmd(protocol_ // so we need to check that the opposite role is allowed for us, // in our this->role field. // - + // FIXME + /* app.lua.hook_note_netsync_start(session_id, "server", their_role, peer_id, rsa_keypair_id(), their_include_pattern, their_exclude_pattern); - + */ // Client must be a sink and server must be a source (anonymous // read-only), unless transport auth is disabled. // @@ -1441,11 +1512,13 @@ netsync::process_auth_cmd(protocol_role if (!app.keys.try_ensure_in_db(their_key_hash)) { this->saved_nonce = id(""); - + // FIXME + /* app.lua.hook_note_netsync_start(session_id, "server", their_role, peer_id, rsa_keypair_id("-unknown-"), their_include_pattern, their_exclude_pattern); + */ error(unknown_key, (F("remote public key hash '%s' is unknown") % their_key_hash).str()); } @@ -1455,11 +1528,12 @@ netsync::process_auth_cmd(protocol_role rsa_keypair_id their_id; base64 their_key; app.db.get_pubkey(their_key_hash, their_id, their_key); - + // FIXME + /* app.lua.hook_note_netsync_start(session_id, "server", their_role, peer_id, their_id, their_include_pattern, their_exclude_pattern); - + */ // Check that they replied with the nonce we asked for. if (!(nonce1 == this->saved_nonce)) { @@ -1962,7 +2036,8 @@ netsync::process_usher_cmd(utf8 const & L(FL("Received greeting from usher: %s") % msg().substr(1)); } netcmd cmdout; - cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern); + // FIXME + cmdout.write_usher_reply_cmd(utf8(/*peer_id*/"peer_id"), our_include_pattern); write_netcmd_and_try_flush(cmdout); L(FL("Sent reply.")); return true; @@ -1994,8 +2069,8 @@ bool } bool -netsync::dispatch_payload(netcmd const & cmd, - transaction_guard & guard) +netsync::received(netcmd const & cmd, + transaction_guard & guard) { switch (cmd.get_cmd_code()) @@ -2047,7 +2122,7 @@ netsync::dispatch_payload(netcmd const & % (role == source_and_sink_role ? _("source and sink") : (role == source_role ? _("source") : _("sink")))); - set_session_key(hmac_key_encrypted); + //set_session_key(hmac_key_encrypted); if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern)) return false; queue_confirm_cmd(); @@ -2079,7 +2154,7 @@ netsync::dispatch_payload(netcmd const & (role == source_role ? _("source") : _("sink"))) % hnonce1); - set_session_key(hmac_key_encrypted); + //set_session_key(hmac_key_encrypted); if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern, client, nonce1, signature)) @@ -2164,16 +2239,6 @@ netsync::dispatch_payload(netcmd const & return false; } -// This kicks off the whole cascade starting from "hello". -void -netsync::begin_service() -{ - keypair kp; - if (app.opts.use_transport_auth) - app.keys.get_key_pair(app.opts.signing_key, kp); - queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce()); -} - bool netsync::can_step() { @@ -2205,7 +2270,7 @@ bool } bool -netsync::can_process() const +netsync::can_process() { return can_step(); } @@ -2216,38 +2281,15 @@ bool netsync::process(transaction_guard return true; try { - if (!armed()) - return true; - maybe_step(); - - if (!input.have_netcmd()) - return true; - - L(FL("processing %d byte input buffer from peer %s") - % input.size() % peer_id); - - netcmd cmd; - input.get_netcmd(cmd); - - size_t sz = cmd.encoded_size(); - bool ret = dispatch_payload(cmd, guard); - - if (input.full()) - W(F("input buffer for peer %s is overfull " - "after netcmd dispatch") % peer_id); - - guard.maybe_checkpoint(sz); - - if (!ret) - L(FL("finishing processing with '%d' packet") - % cmd.get_cmd_code()); - return ret; + return true; } catch (bad_decode & bd) { + /* W(F("protocol error while processing peer %s: '%s'") % peer_id % bd.what); + */ return false; } catch (netsync_error & err) ============================================================ --- netsync.hh 64567ee872332b85777648ed7b1aa8682cdbf664 +++ netsync.hh 8e88ebfef5a5373dc7b3266df3b9d4f96a48d375 @@ -29,7 +29,7 @@ class netsync_service : public service void send(netcmd const & cmd); bool can_send() const; netsync_service(); - netsync_service(netsync_service const & other);//I(false) + netsync_service(netsync_service const & other); netsync_service const & operator=(netsync_service const & other);//I(false) public: enum netsync_op {push, pull, sync}; ============================================================ --- network.cc 2d2c39ab3475fb0bb10d8d2b82e5f1ab196ee3f3 +++ network.cc 1281bc4bea2223b142681d4fb25582e5eecf1654 @@ -1,8 +1,11 @@ #include "netio.hh" #include "network.hh" #include "sanity.hh" #include "uri.hh" +#include +using std::min; + #include using std::make_pair; using std::map; @@ -81,7 +84,19 @@ public: { read_hmac.set_key(key); } - Netxx::signed_size_type read_some_from(shared_ptr str); + Netxx::signed_size_type read_some_from(shared_ptr str) + { + I(!full()); + char tmp[constants::bufsz]; + Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); + + if (count > 0) + { + buffer.append(tmp,count); + } + + return count; + } }; class output_manager @@ -112,8 +127,37 @@ public: { write_hmac.set_key(key); } - void queue_netcmd(netcmd const & cmd); - Netxx::signed_size_type write_some_to(shared_ptr str); + void queue_netcmd(netcmd const & cmd) + { + string buf; + cmd.write(buf, write_hmac); + buffer.push_back(make_pair(buf, 0)); + buffer_size += buf.size(); + } + Netxx::signed_size_type write_some_to(shared_ptr str) + { + I(!buffer.empty()); + string & to_write(buffer.front().first); + size_t & writepos(buffer.front().second); + size_t writelen = to_write.size() - writepos; + Netxx::signed_size_type count = str->write(to_write.data() + writepos, + min(writelen, + constants::bufsz)); + if (count > 0) + { + if ((size_t)count == writelen) + { + buffer_size -= to_write.size(); + buffer.pop_front(); + } + else + { + writepos += count; + } + } + return count; + } + }; class session @@ -139,18 +183,40 @@ public: session(protocol_voice voice, shared_ptr str, utf8 const & addr, - app_state & app); + app_state & app) + : my_voice(voice), input(true), output(true), + app(app), peer_id(addr()), str(str) + { + } - void queue(netcmd const & cmd); + void queue(netcmd const & cmd) + { + output.queue_netcmd(cmd); + } + bool can_send() const + { + return !output.full(); + } + // at this level, process() includes to take from the input queue bool can_process(); state process(transaction_guard & guard); - state read_some(); - state write_some(); + state read_some() + { + input.read_some_from(str); + return state::NONE; + } + state write_some() + { + output.write_some_to(str); + return state::NONE; + } + + Netxx::Probe::ready_type which_events(); }; @@ -247,7 +313,14 @@ service::send(netcmd const & cmd) sess->queue(cmd); } +bool +service::can_send() const +{ + I(sess); + return sess->can_send(); +} + client_session::client_session(utf8 const & address, app_state & app) { ignore_sigpipe(); ============================================================ --- refiner.cc c4bacfb1eadaa252c39136468356585bdeebcc77 +++ refiner.cc 77b1704542514b454a44284f5b2983f2ac2c191f @@ -151,8 +151,8 @@ refiner::note_subtree_shared_with_peer(m collect_items_in_subtree(table, pref, our_node.level+1, peer_items); } -refiner::refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb) - : type(type), voice (voice), cb(cb), +refiner::refiner(netcmd_item_type type, refiner_callbacks & cb) + : type(type), voice_is_set(false), cb(cb), sent_initial_query(false), queries_in_flight(0), calculated_items_to_send(false), @@ -165,6 +165,14 @@ void } void +refiner::set_voice(protocol_voice new_voice) +{ + I(!voice_is_set); + voice = new_voice; + voice_is_set = true; +} + +void refiner::note_item_in_peer(merkle_node const & their_node, size_t slot) { I(slot < constants::merkle_num_slots); ============================================================ --- refiner.hh 1fffe4f5a38822e83891c4f9cf8676b876eccda9 +++ refiner.hh aa5292c044229db47773869c9b48780d6351f053 @@ -51,6 +51,7 @@ refiner { netcmd_item_type type; protocol_voice voice; + bool voice_is_set; refiner_callbacks & cb; bool sent_initial_query; @@ -76,7 +77,8 @@ public: public: - refiner(netcmd_item_type type, protocol_voice voice, refiner_callbacks & cb); + refiner(netcmd_item_type type, refiner_callbacks & cb); + void set_voice(protocol_voice voice); void note_local_item(id const & item); void reindex_local_items(); void begin_refinement();