diff options
Diffstat (limited to 'libcmix-network')
| -rw-r--r-- | libcmix-network/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | libcmix-network/client.cpp | 36 | ||||
| -rw-r--r-- | libcmix-network/client.hpp | 37 | ||||
| -rw-r--r-- | libcmix-network/nodeclient.cpp | 22 | ||||
| -rw-r--r-- | libcmix-network/nodeclient.hpp | 49 | ||||
| -rw-r--r-- | libcmix-network/protobufclient.cpp | 0 | ||||
| -rw-r--r-- | libcmix-network/protobufclient.hpp | 148 |
7 files changed, 185 insertions, 110 deletions
diff --git a/libcmix-network/CMakeLists.txt b/libcmix-network/CMakeLists.txt index 5891254..c64c8b7 100644 --- a/libcmix-network/CMakeLists.txt +++ b/libcmix-network/CMakeLists.txt @@ -9,7 +9,7 @@ add_library(cmix-network connect.hpp connect.cpp server.hpp server.cpp client.hpp client.cpp - nodeclient.hpp nodeclient.cpp + protobufclient.hpp protobufclient.cpp uriparser.hpp uriparser.cpp ) @@ -27,6 +27,7 @@ target_link_libraries(cmix-network PUBLIC ${CMAKE_THREAD_LIBS_INIT} PRIVATE cmix PRIVATE log + PRIVATE cmix-protobuf ) if(WIN32) diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp index f693afa..755989b 100644 --- a/libcmix-network/client.cpp +++ b/libcmix-network/client.cpp @@ -2,13 +2,6 @@ #include "connect.hpp" -#include "logging.hpp" - -#include <boost/asio/placeholders.hpp> -#include <boost/bind.hpp> -#include <boost/asio/buffer.hpp> -#include <boost/array.hpp> - #include <iostream> using namespace boost::asio::ip; @@ -39,26 +32,6 @@ std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length) return buf; } -void Client::send(std::string message) { - - auto length_buffer = prepare_length_prefix(message.size()); - - boost::array<boost::asio::const_buffer, 2> package = { - boost::asio::buffer(length_buffer.data(), length_buffer.size()), - boost::asio::buffer(message) - }; - - auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) { - if(ec) { - BOOST_LOG_TRIVIAL(fatal) << ec; - throw std::runtime_error("unable to send message"); - } - }; - - socket.async_send(package, 0, handler); - -} - std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes) { buffer->commit(read_bytes); @@ -103,15 +76,6 @@ void Client::handle_receive_size(Client::MessageHandler message_handler, error_c } } -void Client::receive(MessageHandler message_handler) { - using namespace boost::asio::placeholders; - - socket.async_receive( - buffer->prepare(4), - boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) - ); -} - void Client::close() { socket.close(); diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp index dc3787d..7bfea7d 100644 --- a/libcmix-network/client.hpp +++ b/libcmix-network/client.hpp @@ -1,7 +1,13 @@ #pragma once +#include "logging.hpp" + #include <boost/asio/ip/tcp.hpp> #include <boost/asio/streambuf.hpp> +#include <boost/asio/placeholders.hpp> +#include <boost/bind.hpp> +#include <boost/asio/buffer.hpp> +#include <boost/array.hpp> #include <array> @@ -69,17 +75,44 @@ public: */ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect); + inline static void foo() {} + /*! * \brief send sends the string prefixed with it's length over the socket. * \param message The string to be sent. */ - void send(std::string message); + template <typename F> + void async_send(std::string message, F on_sent) { + auto length_buffer = prepare_length_prefix(message.size()); + + boost::array<boost::asio::const_buffer, 2> package = { + boost::asio::buffer(length_buffer.data(), length_buffer.size()), + boost::asio::buffer(message) + }; + + auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) { + if(ec) { + BOOST_LOG_TRIVIAL(fatal) << ec; + throw std::runtime_error("unable to send message"); + } + on_sent(); + }; + + socket.async_send(package, 0, handler); + } /*! * \brief receive * \param message_handler The function to call when a message has been received. */ - void receive(MessageHandler message_handler); + void receive(MessageHandler message_handler) { + using namespace boost::asio::placeholders; + + socket.async_receive( + buffer->prepare(4), + boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) + ); + } /*! * \brief close Closes the underlying socket. diff --git a/libcmix-network/nodeclient.cpp b/libcmix-network/nodeclient.cpp deleted file mode 100644 index 9b026ba..0000000 --- a/libcmix-network/nodeclient.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include "nodeclient.hpp" - -#include <iostream> - -NodeClient::NodeClient(boost::asio::ip::tcp::socket &&socket) -: client(std::move(socket)) -{} - -void NodeClient::handle_message(std::vector<uint8_t> message) -{ - std::cout << std::string(message.begin(), message.end()) << std::endl; -} - -void NodeClient::receive() -{ - client.receive(std::bind(&NodeClient::handle_message, this, std::placeholders::_1)); -} - -void NodeClient::on_done(Client::OnDoneFT done) { - client.on_done(done); -} - diff --git a/libcmix-network/nodeclient.hpp b/libcmix-network/nodeclient.hpp deleted file mode 100644 index ca1ee67..0000000 --- a/libcmix-network/nodeclient.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#pragma once - -#include "client.hpp" - -#include <boost/asio/ip/tcp.hpp> - -#include <vector> - -/*! - * \file - */ - -/*! - * \brief The NodeClient class - */ -class NodeClient -{ - /*! - * \brief client - */ - Client client; - - /*! - * \brief handle_message - * \param message - */ - void handle_message(std::vector<uint8_t> message); - -public: - - /*! - * \brief NodeClient - * \param socket - */ - NodeClient(boost::asio::ip::tcp::socket&& socket); - - virtual ~NodeClient() = default; - - /*! - * \brief receive - */ - void receive(); - - /*! - * \brief on_done - * \param done - */ - void on_done(Client::OnDoneFT done); -}; diff --git a/libcmix-network/protobufclient.cpp b/libcmix-network/protobufclient.cpp new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/libcmix-network/protobufclient.cpp diff --git a/libcmix-network/protobufclient.hpp b/libcmix-network/protobufclient.hpp new file mode 100644 index 0000000..1c97cfa --- /dev/null +++ b/libcmix-network/protobufclient.hpp @@ -0,0 +1,148 @@ +#pragma once + +#include "client.hpp" + +#include "logging.hpp" +#include "cmix.pb.h" + +#include <type_traits> + +struct Send; +struct Receive; +struct SendReceive; + +template<typename T> +class ProtobufClient; + +typedef ProtobufClient<Receive> Receiver; +typedef ProtobufClient<Send> Sender; +typedef ProtobufClient<SendReceive> SenderReceiver; + +template<typename T = SendReceive> +class ProtobufClient +{ + Client client; + + + #define MESSAGE_SETTER(TYPE, NAME) \ + void message_setter(cmix_proto::CMixMessage& m, cmix_proto::TYPE const& v) { \ + *m.mutable_##NAME() = v; \ + } \ + + MESSAGE_SETTER(Initialization, initialization) + MESSAGE_SETTER(ImANode, imanode) + MESSAGE_SETTER(ImAClient, imaclient) + MESSAGE_SETTER(Bye, bye) + MESSAGE_SETTER(KeyExchange, keyexchange) + + #undef MESSAGE_SETTER + +public: + /*! + * \brief ProtobufClient + * \param socket An rvalue reference to a socket it will now own and receive from. + */ + ProtobufClient(boost::asio::ip::tcp::socket&& socket) + : client(std::move(socket)) + {} + + /*! + * \brief Move constructor for ProtobufClient. + */ + ProtobufClient(ProtobufClient&& c) = default; + + ProtobufClient(Client&& c) + : client(std::move(c)) + {} + + template<typename Ty> + ProtobufClient(ProtobufClient<Ty>&& r) + : client(std::move(r.client)) + {} + + /*! + * \brief Move assignment for ProtobufClient. + */ + ProtobufClient& operator=(ProtobufClient&&) = default; + + /*! + * \brief async_connect Asynchronously connects to next_host:port and calls on_connect + * \param next_host The host to connect to + * \param next_port The port to connect to + * \param on_connect The callback to call on a succes. + */ + template<typename F> + void async_connect(std::string next_host, std::string next_port, F on_connect) { + client.async_connect(next_host, next_port, on_connect); + } + + + template<typename V, typename F, typename std::enable_if<(sizeof(V), std::is_same<T, Send>::value || std::is_same<T, SendReceive>::value), void>::type* = nullptr> + void async_send(V value, F on_sent) { + cmix_proto::CMixMessage m; + message_setter(m, value); + client.async_send(m.SerializeAsString(), on_sent); + } + + /*! + * \brief send sends the string prefixed with it's length over the socket asynchronously. + * \param message The string to be sent. + */ + template<typename V, typename std::enable_if<(sizeof(V), std::is_same<T, Send>::value || std::is_same<T, SendReceive>::value)>::type* = nullptr> + void async_send(V value) { + cmix_proto::CMixMessage m; + message_setter(m, value); + client.async_send(m.SerializeAsString(), []{}); + } + + /*! + * \brief receive + * \param message_handler The function to call when a message has been received. + */ + template <typename F, typename std::enable_if<(sizeof(F), std::is_same<T, Receive>::value || std::is_same<T, SendReceive>::value)>::type* = nullptr> + void receive(F message_handler) { + auto f = [message_handler](std::vector<uint8_t> const& buffer) { + cmix_proto::CMixMessage message; + if(!message.ParseFromArray(buffer.data(), buffer.size())) { + BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; + throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); + } + message_handler(message); + }; + client.receive(f); + } + + /*! + * \brief close Closes the underlying socket. + */ + void close(){ + client.close(); + } + + /*! + * \brief on_done sets the done callback. + * \param f The new done callback function. + */ + template <typename F> + void on_done(F f) { + client.on_done(f); + } + + friend Receiver make_receiver(Receiver&& r); + + friend SenderReceiver make_sender_receiver(SenderReceiver&& r); + friend SenderReceiver make_sender_receiver(Receiver&& r); +}; + +inline Receiver make_receiver(Receiver&& r) { + return Receiver(std::move(r.client)); +} + +inline SenderReceiver make_sender_receiver(SenderReceiver&& r) { + return SenderReceiver(std::move(r.client)); +} + +inline SenderReceiver make_sender_receiver(Receiver&& r) +{ + return SenderReceiver(std::move(r.client)); +} |
