From 7bca48bc5b5e37a3a8b0b23e57b88d069fa50589 Mon Sep 17 00:00:00 2001 From: Dennis Brentjes Date: Wed, 12 Oct 2016 14:26:12 +0200 Subject: Major network rewrite. One generic class has been introduced to handle all connection types. Typedefs provide Sender Receiver and SenderReceiver types, which limit the functionality of the types. As to not accidentally communicate with the wrong node about things. --- node/node.cpp | 82 +++++++++++++++++------------------------------------------ 1 file changed, 24 insertions(+), 58 deletions(-) (limited to 'node/node.cpp') diff --git a/node/node.cpp b/node/node.cpp index db18253..1fd83c2 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -13,8 +13,8 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se , server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));}) , clients() , network_settings(network_settings) -, prev_node(Client(tcp::socket(io_service))) -, next_node(tcp::socket(io_service)) +, prev_node(ProtobufClient(tcp::socket(io_service))) +, next_node(ProtobufClient(tcp::socket(io_service))) , api(get_implementation()) , keypair(api.create_key_pair()) , network_pub_key() @@ -22,7 +22,7 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se GOOGLE_PROTOBUF_VERIFY_VERSION; auto on_connect = [this, network_settings](){ - next_node.send(cmix_proto::ImANode()); + next_node.async_send(cmix_proto::ImANode()); if(network_settings.is_first) { start_initialisation(); } @@ -41,17 +41,15 @@ void Node::run() { void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) { - purgatory.emplace_back(std::move(socket)); - - std::list::iterator it = --purgatory.end(); + std::list>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket)); purgatory.back().on_done( [this, it]() { purgatory.erase(it); } ); - it->receive([this, it](std::vector const& message_buffer) { - handle_message(it, message_buffer); + it->receive([this, it](cmix_proto::CMixMessage message) { + handle_message(it, message); }); } @@ -59,18 +57,7 @@ void Node::start_initialisation() { cmix_proto::Initialization init; init.set_public_share(keypair.pub, keypair.pub_len); - next_node.send(init); -} - -cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector& buffer) -{ - cmix_proto::CMixMessage message; - if(!message.ParseFromArray(buffer.data(), buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - io_service.stop(); - throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); - } - return message; + next_node.async_send(init); } void Node::handle_initialization(const cmix_proto::Initialization& init) @@ -95,7 +82,7 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) cmix_proto::Initialization init; init.set_public_share(new_shared.data, new_shared.len); - next_node.send(init); + next_node.async_send(init); free_bignum(&shared); free_bignum(&mod); @@ -103,9 +90,8 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) } } -void Node::handle_node_message(const std::vector& message_buffer) +void Node::handle_node_message(cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kInitialization: { handle_initialization(message.initialization()); @@ -119,22 +105,20 @@ void Node::handle_node_message(const std::vector& message_buffer) BOOST_LOG_TRIVIAL(error) << "handle_node_message: CMixMessage contains unknown contents."; } } - prev_node.receive([this](std::vector const& buffer) { - handle_node_message(buffer); + prev_node.receive([this](cmix_proto::CMixMessage message) { + handle_node_message(message); }); } -void Node::handle_client_message(std::list::iterator handle, const std::vector& message_buffer) +void Node::handle_client_message(std::list::iterator handle, cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); - BOOST_LOG_TRIVIAL(trace) << "case: " << message.contents_case(); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kKeyexchange: { BOOST_LOG_TRIVIAL(trace) << "Deriving shared key"; api.derive_shared_key(keypair, reinterpret_cast(message.keyexchange().public_key().c_str()), true); - handle->receive([this, handle](std::vector const& buffer){ - handle_client_message(handle, buffer); + handle->receive([this, handle](cmix_proto::CMixMessage message){ + handle_client_message(handle, message); }); return; } @@ -144,17 +128,6 @@ void Node::handle_client_message(std::list::iterator handle, const std:: handle->close(); clients.erase(handle); - if(clients.size() == 0) { - cmix_proto::Bye bye; - next_node.send(bye); - - io_service.post([this]{ - BOOST_LOG_TRIVIAL(trace) << "Shutting down"; - io_service.stop(); - }); - - } - return; } default: { @@ -165,36 +138,29 @@ void Node::handle_client_message(std::list::iterator handle, const std:: clients.erase(handle); } -void Node::handle_imanode(std::list::iterator handle) { +void Node::handle_imanode(std::list::iterator handle) { handle->on_done([]{}); - prev_node = PrevNode(std::move(*handle)); + prev_node = make_receiver(std::move(*handle)); purgatory.erase(handle); - prev_node.receive([this](std::vector const& message_buffer){ - handle_node_message(message_buffer); + prev_node.receive([this](cmix_proto::CMixMessage message){ + handle_node_message(message); }); } -void Node::handle_imaclient(std::list::iterator handle) { - std::list::iterator it = clients.emplace(clients.end(), std::move(*handle)); +void Node::handle_imaclient(std::list::iterator handle) { + BOOST_LOG_TRIVIAL(trace) << "Handling imaclient"; + std::list::iterator it = clients.emplace(clients.end(), make_sender_receiver(std::move(*handle))); it->on_done([this, it]{ clients.erase(it); }); purgatory.erase(handle); - it->receive([this, it](std::vector buffer) { - handle_client_message(it, buffer); + it->receive([this, it](cmix_proto::CMixMessage message) { + handle_client_message(it, message); }); } -void Node::handle_message(std::list::iterator handle, const std::vector& message_buffer) +void Node::handle_message(std::list>::iterator handle, cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message; - try { - message = parse_cmix_message(message_buffer); - } catch(std::runtime_error const& e) { - purgatory.erase(handle); - return; - } - switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kImanode: { handle_imanode(handle); -- cgit v1.2.3-70-g09d2