aboutsummaryrefslogtreecommitdiff
path: root/libcmix-network
diff options
context:
space:
mode:
authorDennis Brentjes <d.brentjes@gmail.com>2016-09-29 17:23:22 +0200
committerDennis Brentjes <d.brentjes@gmail.com>2016-09-29 17:23:22 +0200
commit37effc3e136c73afd4d6ba37d23a91766795d0f7 (patch)
treee5a1684231df52f48088dd053d57132d390e5a02 /libcmix-network
parentf7f0e8c53bc8231264346ef91e2f533164a25562 (diff)
downloadcmix-37effc3e136c73afd4d6ba37d23a91766795d0f7.tar.gz
cmix-37effc3e136c73afd4d6ba37d23a91766795d0f7.tar.bz2
cmix-37effc3e136c73afd4d6ba37d23a91766795d0f7.zip
This changset triggered a storm of changes.
Added the behaviour for receiving a public_share message from our previous node when not being the first node ourselves. This triggered the whitespace bug below, which sparked the network rewrite. Rewrote network protocol to first send the size of the message it's sending in a 32bit integer in network byte order. Fixed a bug where whitespace in the received buffer would be skipped. leading to broken protobuf messages. NextNode no longers inherits from client but owns one, and some functions needed to be forwarded.
Diffstat (limited to 'libcmix-network')
-rw-r--r--libcmix-network/client.cpp83
-rw-r--r--libcmix-network/client.hpp30
2 files changed, 95 insertions, 18 deletions
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp
index ce9fd4d..d8e79d7 100644
--- a/libcmix-network/client.cpp
+++ b/libcmix-network/client.cpp
@@ -1,9 +1,13 @@
#include "client.hpp"
+#include "connect.hpp"
+
#include "logging.hpp"
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/array.hpp>
#include <iostream>
@@ -16,17 +20,83 @@ Client::Client(tcp::socket &&socket)
, done()
{}
-void Client::handle_receive(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect)
+{
+ ::async_connect(socket, next_host, next_port, on_connect);
+}
+
+std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length)
+{
+ length = htonl(length);
+ std::array<uint8_t, 4> buf;
+
+ std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
+ return buf;
+}
+
+void Client::send(std::string message) {
+
+ boost::array<boost::asio::const_buffer, 2> package = {
+ boost::asio::buffer(prepare_length_prefix(message.size())),
+ boost::asio::buffer(message)
+ };
+
+ auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) {
+ BOOST_LOG_TRIVIAL(trace) << "sent message of " << bytes_transferred << " bytes";
+ if(ec) {
+ BOOST_LOG_TRIVIAL(fatal) << ec;
+ throw std::runtime_error("unable to send message");
+ }
+ };
+
+ socket.async_send(package, 0, handler);
+
+}
+
+std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes)
{
buffer.commit(read_bytes);
std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
+ return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
+}
- BOOST_LOG_TRIVIAL(trace) << "handling receive";
+void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes)
+{
+ using namespace boost::asio::placeholders;
+
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ uint32_t size;
+ std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
+ size = ntohl(size);
+
+ socket.async_receive(
+ buffer.prepare(size),
+ boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred())
+ );
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec;
+ if(done) {
+ done();
+ }
+ }
+}
+void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+{
+ BOOST_LOG_TRIVIAL(trace) << "handling receive of: " << read_bytes << " bytes";
if(!ec) {
+ buffer.commit(read_bytes);
+ std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
std::vector<uint8_t> data(std::istream_iterator<uint8_t>(is), {});
+ BOOST_LOG_TRIVIAL(trace) << "data.size(): " << data.size();
message_handler(data);
} else {
+ BOOST_LOG_TRIVIAL(error) << ec;
if(done) {
done();
}
@@ -37,11 +107,16 @@ void Client::receive(MessageHandler message_handler) {
using namespace boost::asio::placeholders;
socket.async_receive(
- buffer.prepare(512),
- boost::bind(&Client::handle_receive, this, message_handler, error(), bytes_transferred())
+ buffer.prepare(4),
+ boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
);
}
+void Client::close()
+{
+ socket.close();
+}
+
void Client::on_done(Client::OnDoneFT f) {
done = f;
}
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp
index 981283b..cb81418 100644
--- a/libcmix-network/client.hpp
+++ b/libcmix-network/client.hpp
@@ -31,24 +31,18 @@ protected:
boost::asio::ip::tcp::socket socket;
private:
- /*!
- * \brief buffer Internal private buffer used to receive messages.
- */
boost::asio::streambuf buffer;
- /*!
- * \brief done The done function being called when this is done operating.
- */
OnDoneFT done;
+
+ std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes);
+
+ void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+
+ void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
- /*!
- * \brief handle_receive
- * \param message_handler The function to call when a message has been received.
- * \param ec A possible error that occured during receiving.
- * \param read_bytes The number of bytes read.
- */
- void handle_receive(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
-
+ std::array<uint8_t, 4> prepare_length_prefix(uint32_t length);
+
public:
/*!
* \brief Client
@@ -56,15 +50,23 @@ public:
*/
Client(boost::asio::ip::tcp::socket&& socket);
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect);
+
+ void send(std::string message);
+
/*!
* \brief receive
* \param message_handler The function to call when a message has been received.
*/
void receive(MessageHandler message_handler);
+ void close();
+
/*!
* \brief on_done sets the done callback.
* \param f The new done callback function.
*/
void on_done(OnDoneFT f);
+
+
};