diff options
| -rw-r--r-- | libcmix-network/client.cpp | 39 | ||||
| -rw-r--r-- | libcmix-network/client.hpp | 1 | ||||
| -rw-r--r-- | node/node.cpp | 15 |
3 files changed, 32 insertions, 23 deletions
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp index 0c3c432..7c95234 100644 --- a/libcmix-network/client.cpp +++ b/libcmix-network/client.cpp @@ -20,6 +20,11 @@ Client::Client(tcp::socket &&socket) , done() {} +Client::~Client() +{ + close(); +} + void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect) { ::async_connect(socket, next_host, next_port, on_connect); @@ -36,8 +41,10 @@ std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length) void Client::send(std::string message) { + auto length_buffer = prepare_length_prefix(message.size()); + boost::array<boost::asio::const_buffer, 2> package = { - boost::asio::buffer(prepare_length_prefix(message.size())), + boost::asio::buffer(length_buffer.data(), length_buffer.size()), boost::asio::buffer(message) }; @@ -61,6 +68,19 @@ std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes) return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {}); } +void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes) +{ + if(!ec) { + std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); + message_handler(data); + } else { + BOOST_LOG_TRIVIAL(error) << ec; + if(done) { + done(); + } + } +} + void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes) { using namespace boost::asio::placeholders; @@ -83,23 +103,6 @@ void Client::handle_receive_size(Client::MessageHandler message_handler, const e } } -void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes) -{ - if(!ec) { - buffer.commit(read_bytes); - std::istream is(&buffer); - is.unsetf(std::ios::skipws); - - std::vector<uint8_t> data(std::istream_iterator<uint8_t>(is), {}); - message_handler(data); - } else { - BOOST_LOG_TRIVIAL(error) << ec; - if(done) { - done(); - } - } -} - void Client::receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp index e1c2e14..51dfa6f 100644 --- a/libcmix-network/client.hpp +++ b/libcmix-network/client.hpp @@ -49,6 +49,7 @@ public: * \param socket An rvalue reference to a socket it will now own and receive from. */ Client(boost::asio::ip::tcp::socket&& socket); + ~Client(); /*! * \brief async_connect Asynchronously connects to next_host:port and calls on_connect diff --git a/node/node.cpp b/node/node.cpp index c9f6b14..fb5b875 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -47,6 +47,8 @@ void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) clients.erase(it); } ); + + it->receive(std::bind(&Node::handle_message, this, std::placeholders::_1)); } void Node::start_initialisation() { @@ -54,10 +56,6 @@ void Node::start_initialisation() { init.set_public_share(keypair.pub, keypair.pub_len); next_node.send(init); - - for(auto&& client : clients) { - client.receive(std::bind(&Node::handle_message, this, std::placeholders::_1)); - } } void Node::handle_message(const std::vector<uint8_t>& message_buffer) @@ -65,6 +63,9 @@ void Node::handle_message(const std::vector<uint8_t>& message_buffer) cmix_proto::CMixMessage message; if(!message.ParseFromArray(message_buffer.data(), message_buffer.size())) { BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; + clients.clear(); + io_service.stop(); + return; } switch(message.contents_case()) { @@ -84,7 +85,6 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) std::string share = init.public_share(); network_pub_key = std::vector<uint8_t>(share.begin(), share.end()); - start_precomputation(); } else { Bignum shared = allocate_bignum(init.public_share().size()); @@ -107,9 +107,14 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) free_bignum(&mod); free_bignum(&new_shared); + io_service.post([this]{ + clients.clear(); + io_service.stop(); + }); } } void Node::start_precomputation() { + clients.clear(); io_service.stop(); } |
