From 37effc3e136c73afd4d6ba37d23a91766795d0f7 Mon Sep 17 00:00:00 2001 From: Dennis Brentjes Date: Thu, 29 Sep 2016 17:23:22 +0200 Subject: This changset triggered a storm of changes. Added the behaviour for receiving a public_share message from our previous node when not being the first node ourselves. This triggered the whitespace bug below, which sparked the network rewrite. Rewrote network protocol to first send the size of the message it's sending in a 32bit integer in network byte order. Fixed a bug where whitespace in the received buffer would be skipped. leading to broken protobuf messages. NextNode no longers inherits from client but owns one, and some functions needed to be forwarded. --- libcmix-network/client.cpp | 83 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 79 insertions(+), 4 deletions(-) (limited to 'libcmix-network/client.cpp') diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp index ce9fd4d..d8e79d7 100644 --- a/libcmix-network/client.cpp +++ b/libcmix-network/client.cpp @@ -1,9 +1,13 @@ #include "client.hpp" +#include "connect.hpp" + #include "logging.hpp" #include #include +#include +#include #include @@ -16,17 +20,83 @@ Client::Client(tcp::socket &&socket) , done() {} -void Client::handle_receive(MessageHandler message_handler, const error_code &ec, size_t read_bytes) +void Client::async_connect(std::string next_host, std::string next_port, std::function on_connect) +{ + ::async_connect(socket, next_host, next_port, on_connect); +} + +std::array Client::prepare_length_prefix(uint32_t length) +{ + length = htonl(length); + std::array buf; + + std::copy(reinterpret_cast(&length), reinterpret_cast(&length) + 4, buf.data()); + return buf; +} + +void Client::send(std::string message) { + + boost::array package = { + boost::asio::buffer(prepare_length_prefix(message.size())), + boost::asio::buffer(message) + }; + + auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) { + BOOST_LOG_TRIVIAL(trace) << "sent message of " << bytes_transferred << " bytes"; + if(ec) { + BOOST_LOG_TRIVIAL(fatal) << ec; + throw std::runtime_error("unable to send message"); + } + }; + + socket.async_send(package, 0, handler); + +} + +std::vector Client::received_bytes_to_vector(size_t read_bytes) { buffer.commit(read_bytes); std::istream is(&buffer); + is.unsetf(std::ios::skipws); + + return std::vector(std::istream_iterator(is), {}); +} - BOOST_LOG_TRIVIAL(trace) << "handling receive"; +void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes) +{ + using namespace boost::asio::placeholders; + + if(!ec) { + std::vector data = received_bytes_to_vector(read_bytes); + uint32_t size; + std::copy(data.begin(), data.end(), reinterpret_cast(&size)); + size = ntohl(size); + + socket.async_receive( + buffer.prepare(size), + boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred()) + ); + } else { + BOOST_LOG_TRIVIAL(error) << ec; + if(done) { + done(); + } + } +} +void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes) +{ + BOOST_LOG_TRIVIAL(trace) << "handling receive of: " << read_bytes << " bytes"; if(!ec) { + buffer.commit(read_bytes); + std::istream is(&buffer); + is.unsetf(std::ios::skipws); + std::vector data(std::istream_iterator(is), {}); + BOOST_LOG_TRIVIAL(trace) << "data.size(): " << data.size(); message_handler(data); } else { + BOOST_LOG_TRIVIAL(error) << ec; if(done) { done(); } @@ -37,11 +107,16 @@ void Client::receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; socket.async_receive( - buffer.prepare(512), - boost::bind(&Client::handle_receive, this, message_handler, error(), bytes_transferred()) + buffer.prepare(4), + boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) ); } +void Client::close() +{ + socket.close(); +} + void Client::on_done(Client::OnDoneFT f) { done = f; } -- cgit v1.2.3-70-g09d2