diff options
Diffstat (limited to 'node/node.cpp')
| -rw-r--r-- | node/node.cpp | 82 |
1 files changed, 24 insertions, 58 deletions
diff --git a/node/node.cpp b/node/node.cpp index db18253..1fd83c2 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -13,8 +13,8 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se , server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));}) , clients() , network_settings(network_settings) -, prev_node(Client(tcp::socket(io_service))) -, next_node(tcp::socket(io_service)) +, prev_node(ProtobufClient<Receive>(tcp::socket(io_service))) +, next_node(ProtobufClient<Send>(tcp::socket(io_service))) , api(get_implementation()) , keypair(api.create_key_pair()) , network_pub_key() @@ -22,7 +22,7 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se GOOGLE_PROTOBUF_VERIFY_VERSION; auto on_connect = [this, network_settings](){ - next_node.send(cmix_proto::ImANode()); + next_node.async_send(cmix_proto::ImANode()); if(network_settings.is_first) { start_initialisation(); } @@ -41,17 +41,15 @@ void Node::run() { void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) { - purgatory.emplace_back(std::move(socket)); - - std::list<Client>::iterator it = --purgatory.end(); + std::list<ProtobufClient<Receive>>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket)); purgatory.back().on_done( [this, it]() { purgatory.erase(it); } ); - it->receive([this, it](std::vector<uint8_t> const& message_buffer) { - handle_message(it, message_buffer); + it->receive([this, it](cmix_proto::CMixMessage message) { + handle_message(it, message); }); } @@ -59,18 +57,7 @@ void Node::start_initialisation() { cmix_proto::Initialization init; init.set_public_share(keypair.pub, keypair.pub_len); - next_node.send(init); -} - -cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector<uint8_t>& buffer) -{ - cmix_proto::CMixMessage message; - if(!message.ParseFromArray(buffer.data(), buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - io_service.stop(); - throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); - } - return message; + next_node.async_send(init); } void Node::handle_initialization(const cmix_proto::Initialization& init) @@ -95,7 +82,7 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) cmix_proto::Initialization init; init.set_public_share(new_shared.data, new_shared.len); - next_node.send(init); + next_node.async_send(init); free_bignum(&shared); free_bignum(&mod); @@ -103,9 +90,8 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) } } -void Node::handle_node_message(const std::vector<uint8_t>& message_buffer) +void Node::handle_node_message(cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kInitialization: { handle_initialization(message.initialization()); @@ -119,22 +105,20 @@ void Node::handle_node_message(const std::vector<uint8_t>& message_buffer) BOOST_LOG_TRIVIAL(error) << "handle_node_message: CMixMessage contains unknown contents."; } } - prev_node.receive([this](std::vector<uint8_t> const& buffer) { - handle_node_message(buffer); + prev_node.receive([this](cmix_proto::CMixMessage message) { + handle_node_message(message); }); } -void Node::handle_client_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) +void Node::handle_client_message(std::list<SenderReceiver>::iterator handle, cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); - BOOST_LOG_TRIVIAL(trace) << "case: " << message.contents_case(); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kKeyexchange: { BOOST_LOG_TRIVIAL(trace) << "Deriving shared key"; api.derive_shared_key(keypair, reinterpret_cast<uint8_t const*>(message.keyexchange().public_key().c_str()), true); - handle->receive([this, handle](std::vector<uint8_t> const& buffer){ - handle_client_message(handle, buffer); + handle->receive([this, handle](cmix_proto::CMixMessage message){ + handle_client_message(handle, message); }); return; } @@ -144,17 +128,6 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std:: handle->close(); clients.erase(handle); - if(clients.size() == 0) { - cmix_proto::Bye bye; - next_node.send(bye); - - io_service.post([this]{ - BOOST_LOG_TRIVIAL(trace) << "Shutting down"; - io_service.stop(); - }); - - } - return; } default: { @@ -165,36 +138,29 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std:: clients.erase(handle); } -void Node::handle_imanode(std::list<Client>::iterator handle) { +void Node::handle_imanode(std::list<Receiver>::iterator handle) { handle->on_done([]{}); - prev_node = PrevNode(std::move(*handle)); + prev_node = make_receiver(std::move(*handle)); purgatory.erase(handle); - prev_node.receive([this](std::vector<uint8_t> const& message_buffer){ - handle_node_message(message_buffer); + prev_node.receive([this](cmix_proto::CMixMessage message){ + handle_node_message(message); }); } -void Node::handle_imaclient(std::list<Client>::iterator handle) { - std::list<Client>::iterator it = clients.emplace(clients.end(), std::move(*handle)); +void Node::handle_imaclient(std::list<Receiver>::iterator handle) { + BOOST_LOG_TRIVIAL(trace) << "Handling imaclient"; + std::list<SenderReceiver>::iterator it = clients.emplace(clients.end(), make_sender_receiver(std::move(*handle))); it->on_done([this, it]{ clients.erase(it); }); purgatory.erase(handle); - it->receive([this, it](std::vector<uint8_t> buffer) { - handle_client_message(it, buffer); + it->receive([this, it](cmix_proto::CMixMessage message) { + handle_client_message(it, message); }); } -void Node::handle_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) +void Node::handle_message(std::list<ProtobufClient<Receive>>::iterator handle, cmix_proto::CMixMessage message) { - cmix_proto::CMixMessage message; - try { - message = parse_cmix_message(message_buffer); - } catch(std::runtime_error const& e) { - purgatory.erase(handle); - return; - } - switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kImanode: { handle_imanode(handle); |
