From 37effc3e136c73afd4d6ba37d23a91766795d0f7 Mon Sep 17 00:00:00 2001 From: Dennis Brentjes Date: Thu, 29 Sep 2016 17:23:22 +0200 Subject: This changset triggered a storm of changes. Added the behaviour for receiving a public_share message from our previous node when not being the first node ourselves. This triggered the whitespace bug below, which sparked the network rewrite. Rewrote network protocol to first send the size of the message it's sending in a 32bit integer in network byte order. Fixed a bug where whitespace in the received buffer would be skipped. leading to broken protobuf messages. NextNode no longers inherits from client but owns one, and some functions needed to be forwarded. --- CMakeModules/gmpConfig.cmake | 13 +++++++ libcmix-network/client.cpp | 83 +++++++++++++++++++++++++++++++++++++++++--- libcmix-network/client.hpp | 30 ++++++++-------- node/CMakeLists.txt | 4 +++ node/nextnode.cpp | 20 +++-------- node/nextnode.hpp | 7 ++-- node/node.cpp | 56 +++++++++++++++++++++++++++--- 7 files changed, 172 insertions(+), 41 deletions(-) create mode 100644 CMakeModules/gmpConfig.cmake diff --git a/CMakeModules/gmpConfig.cmake b/CMakeModules/gmpConfig.cmake new file mode 100644 index 0000000..a6e6682 --- /dev/null +++ b/CMakeModules/gmpConfig.cmake @@ -0,0 +1,13 @@ +add_library(gmp UNKNOWN IMPORTED) +find_library(gmp_LIBRARY NAMES "gmp") +set_property(TARGET gmp PROPERTY IMPORTED_LOCATION "${gmp_LIBRARY}") + +add_library(gmpxx UNKNOWN IMPORTED) +find_library(gmpxx_LIBRARY NAMES "gmpxx") +set_property(TARGET gmpxx PROPERTY IMPORTED_LOCATION "${gmpxx_LIBRARY}") + +find_path(gmp_INCLUDE_DIR NAMES "gmp.h") +set_property(TARGET gmp PROPERTY INCLUDE_DIRECTORIES ${gmp_INCLUDE_DIR}) + +find_path(gmpxx_INCLUDE_DIR NAMES "gmpxx.h") +set_property(TARGET gmpxx PROPERTY INCLUDE_DIRECTORIES ${gmpxx_INCLUDE_DIR}) diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp index ce9fd4d..d8e79d7 100644 --- a/libcmix-network/client.cpp +++ b/libcmix-network/client.cpp @@ -1,9 +1,13 @@ #include "client.hpp" +#include "connect.hpp" + #include "logging.hpp" #include #include +#include +#include #include @@ -16,17 +20,83 @@ Client::Client(tcp::socket &&socket) , done() {} -void Client::handle_receive(MessageHandler message_handler, const error_code &ec, size_t read_bytes) +void Client::async_connect(std::string next_host, std::string next_port, std::function on_connect) +{ + ::async_connect(socket, next_host, next_port, on_connect); +} + +std::array Client::prepare_length_prefix(uint32_t length) +{ + length = htonl(length); + std::array buf; + + std::copy(reinterpret_cast(&length), reinterpret_cast(&length) + 4, buf.data()); + return buf; +} + +void Client::send(std::string message) { + + boost::array package = { + boost::asio::buffer(prepare_length_prefix(message.size())), + boost::asio::buffer(message) + }; + + auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) { + BOOST_LOG_TRIVIAL(trace) << "sent message of " << bytes_transferred << " bytes"; + if(ec) { + BOOST_LOG_TRIVIAL(fatal) << ec; + throw std::runtime_error("unable to send message"); + } + }; + + socket.async_send(package, 0, handler); + +} + +std::vector Client::received_bytes_to_vector(size_t read_bytes) { buffer.commit(read_bytes); std::istream is(&buffer); + is.unsetf(std::ios::skipws); + + return std::vector(std::istream_iterator(is), {}); +} - BOOST_LOG_TRIVIAL(trace) << "handling receive"; +void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes) +{ + using namespace boost::asio::placeholders; + + if(!ec) { + std::vector data = received_bytes_to_vector(read_bytes); + uint32_t size; + std::copy(data.begin(), data.end(), reinterpret_cast(&size)); + size = ntohl(size); + + socket.async_receive( + buffer.prepare(size), + boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred()) + ); + } else { + BOOST_LOG_TRIVIAL(error) << ec; + if(done) { + done(); + } + } +} +void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes) +{ + BOOST_LOG_TRIVIAL(trace) << "handling receive of: " << read_bytes << " bytes"; if(!ec) { + buffer.commit(read_bytes); + std::istream is(&buffer); + is.unsetf(std::ios::skipws); + std::vector data(std::istream_iterator(is), {}); + BOOST_LOG_TRIVIAL(trace) << "data.size(): " << data.size(); message_handler(data); } else { + BOOST_LOG_TRIVIAL(error) << ec; if(done) { done(); } @@ -37,11 +107,16 @@ void Client::receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; socket.async_receive( - buffer.prepare(512), - boost::bind(&Client::handle_receive, this, message_handler, error(), bytes_transferred()) + buffer.prepare(4), + boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) ); } +void Client::close() +{ + socket.close(); +} + void Client::on_done(Client::OnDoneFT f) { done = f; } diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp index 981283b..cb81418 100644 --- a/libcmix-network/client.hpp +++ b/libcmix-network/client.hpp @@ -31,24 +31,18 @@ protected: boost::asio::ip::tcp::socket socket; private: - /*! - * \brief buffer Internal private buffer used to receive messages. - */ boost::asio::streambuf buffer; - /*! - * \brief done The done function being called when this is done operating. - */ OnDoneFT done; + + std::vector received_bytes_to_vector(size_t read_bytes); + + void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); + + void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); - /*! - * \brief handle_receive - * \param message_handler The function to call when a message has been received. - * \param ec A possible error that occured during receiving. - * \param read_bytes The number of bytes read. - */ - void handle_receive(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); - + std::array prepare_length_prefix(uint32_t length); + public: /*! * \brief Client @@ -56,15 +50,23 @@ public: */ Client(boost::asio::ip::tcp::socket&& socket); + void async_connect(std::string next_host, std::string next_port, std::function on_connect); + + void send(std::string message); + /*! * \brief receive * \param message_handler The function to call when a message has been received. */ void receive(MessageHandler message_handler); + void close(); + /*! * \brief on_done sets the done callback. * \param f The new done callback function. */ void on_done(OnDoneFT f); + + }; diff --git a/node/CMakeLists.txt b/node/CMakeLists.txt index c3c8e46..adbedb1 100644 --- a/node/CMakeLists.txt +++ b/node/CMakeLists.txt @@ -1,5 +1,7 @@ find_package(Boost COMPONENTS system program_options REQUIRED) +find_package(gmp REQUIRED) + add_executable(node main.cpp node.hpp node.cpp @@ -18,4 +20,6 @@ target_link_libraries(node PRIVATE cmix PRIVATE cmix-network PRIVATE cmix-protobuf + PRIVATE gmpxx + PRIVATE gmp ) diff --git a/node/nextnode.cpp b/node/nextnode.cpp index d38200d..f6c4454 100644 --- a/node/nextnode.cpp +++ b/node/nextnode.cpp @@ -1,34 +1,24 @@ #include "nextnode.hpp" -#include "connect.hpp" - #include "logging.hpp" using namespace boost::asio::ip; NextNode::NextNode(tcp::socket&& socket) -: Client(std::move(socket)) +: client(std::move(socket)) {} void NextNode::send(std::string message) { - auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) { - BOOST_LOG_TRIVIAL(trace) << "sent message"; - if(ec) { - BOOST_LOG_TRIVIAL(fatal) << ec; - throw std::runtime_error("unable to send message"); - } - }; - - socket.async_send(boost::asio::buffer(message), 0, handler); + client.send(message); } -void NextNode::connect(std::string next_host, std::string next_port, std::function on_connect) +void NextNode::async_connect(std::string next_host, std::string next_port, std::function on_connect) { - async_connect(socket, next_host, next_port, on_connect); + client.async_connect(next_host, next_port, on_connect); } void NextNode::close() { - socket.close(); + client.close(); } diff --git a/node/nextnode.hpp b/node/nextnode.hpp index 24b93c4..6550445 100644 --- a/node/nextnode.hpp +++ b/node/nextnode.hpp @@ -11,8 +11,9 @@ /*! * \brief The NextNode class represents the next node in the network, will only be sent to. */ -class NextNode : public Client +class NextNode { + Client client; public: /*! * \brief NextNode @@ -27,12 +28,12 @@ public: void send(std::string message); /*! - * \brief connect + * \brief async_connect * \param next_host The host of the next node. * \param next_port The port of the next node. * \param on_connect The callback to call when the connect was succesfull. */ - void connect(std::string next_host, std::string next_port, std::function on_connect); + void async_connect(std::string next_host, std::string next_port, std::function on_connect); /*! * \brief close This function closes the underlying socket connection. diff --git a/node/node.cpp b/node/node.cpp index 045eb13..92c51ec 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -2,6 +2,10 @@ #include "logging.hpp" +#include "gmpxx.h" + +#include + using namespace boost::asio::ip; Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings) @@ -21,7 +25,7 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se } }; - next_node.connect(network_settings.next_host, network_settings.next_port, on_connect); + next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect); } Node::~Node() { @@ -52,18 +56,60 @@ void Node::start_initialisation() { std::string message; init.SerializeToString(&message); - BOOST_LOG_TRIVIAL(trace) << init.DebugString(); + BOOST_LOG_TRIVIAL(trace) << "init: " << init.DebugString(); + BOOST_LOG_TRIVIAL(trace) << "size: " << message.size(); + BOOST_LOG_TRIVIAL(trace) << "raw: " << message; next_node.send(message); auto f = [this](std::vector bytes) { - network_pub_key = bytes; - if(network_settings.is_first) { + BOOST_LOG_TRIVIAL(trace) << "bytes.size(): " << bytes.size(); + BOOST_LOG_TRIVIAL(trace) << "raw2: " << std::string(bytes.begin(), bytes.end()); + initialization init; + init.ParseFromArray(bytes.data(), bytes.size()); + std::string share = init.public_share(); + + network_pub_key = std::vector(share.begin(), share.end()); + + BOOST_LOG_TRIVIAL(trace) << "The network pub_key: " << init.DebugString(); + start_precomputation(); + } else { + mpz_t shared; + mpz_init(shared); + mpz_import(shared, bytes.size(), -1, 1, 0, 0, bytes.data()); + + mpz_t my_share; + mpz_init(my_share); + mpz_import(my_share, keypair.pub_len, -1, 1, 0, 0, keypair.pub); + + mpz_mul(shared, shared, my_share); + + mpz_t mod; + mpz_init(mod); + mpz_set_ui(mod, 2); + mpz_pow_ui(mod, mod, 255); + mpz_sub_ui(mod, mod, 19); + + mpz_mod(shared, shared, mod); + + std::vector new_shared(keypair.pub_len, '\0'); + size_t size; + mpz_export(new_shared.data(), &size, -1, 1, 0, 0, shared); + + initialization init; + init.set_public_share(new_shared.data(), new_shared.size()); + + std::string message; + init.SerializeToString(&message); + next_node.send(message); + + mpz_clear(shared); + mpz_clear(my_share); + mpz_clear(mod); } }; - BOOST_LOG_TRIVIAL(trace) << "number of clients: " << clients.size(); for(auto&& client : clients) { client.receive(f); } -- cgit v1.2.3-70-g09d2