diff options
Diffstat (limited to 'libcmix-network/client.hpp')
| -rw-r--r-- | libcmix-network/client.hpp | 207 |
1 files changed, 178 insertions, 29 deletions
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp index bd66b98..0b98c1c 100644 --- a/libcmix-network/client.hpp +++ b/libcmix-network/client.hpp @@ -1,8 +1,13 @@ #pragma once +#include "connect.hpp" + #include "logging.hpp" +#include <boost/asio/read.hpp> +#include <boost/asio/write.hpp> #include <boost/asio/ip/tcp.hpp> +#include <boost/asio/ssl.hpp> #include <boost/asio/streambuf.hpp> #include <boost/asio/placeholders.hpp> #include <boost/bind.hpp> @@ -11,6 +16,8 @@ #include <array> +#include <iostream> + /*! * \file */ @@ -18,8 +25,15 @@ /*! * \brief The Client class */ -class Client { -public: + +struct SSLClient; + +template <typename T> +class BaseClient { + friend SSLClient; + + std::unique_ptr<T> socket; + /*! * \brief OnDoneFT Function type of the function being called when this instance is done operating. */ @@ -30,42 +44,88 @@ public: */ typedef std::function<void(std::vector<uint8_t>)> MessageHandler; -protected: /*! * \brief socket The socket connection this instance handles. */ - boost::asio::ip::tcp::socket socket; - -private: - std::unique_ptr<boost::asio::streambuf> buffer; + std::unique_ptr<boost::asio::streambuf> receive_buffer; + std::unique_ptr<boost::asio::streambuf> send_buffer; OnDoneFT done; - std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes); + std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes) { + receive_buffer->commit(read_bytes); + std::istream is(receive_buffer.get()); + is.unsetf(std::ios::skipws); + + return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {}); + } - void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); + void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { + using namespace boost::asio::placeholders; + + if(!ec) { + std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); + uint32_t size; + std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size)); + size = ntohl(size); + + boost::asio::async_read( + *socket, + receive_buffer->prepare(size), + [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { + handle_receive_message(message_handler, ec, read_bytes); + } + ); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + } - void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); + void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { + if(!ec) { + std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); + message_handler(data); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + } + + std::array<uint8_t, 4> prepare_length_prefix(uint32_t length) { + length = htonl(length); + std::array<uint8_t, 4> buf; + + std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data()); + return buf; + } - std::array<uint8_t, 4> prepare_length_prefix(uint32_t length); public: /*! * \brief Client * \param socket An rvalue reference to a socket it will now own and receive from. */ - Client(boost::asio::ip::tcp::socket&& socket); - ~Client(); + BaseClient(std::unique_ptr<T>&& socket) + : socket(std::move(socket)) + , receive_buffer(new boost::asio::streambuf) + , send_buffer(new boost::asio::streambuf) + , done() + {} /*! * \brief Move constructor for Client. */ - Client(Client&&) = default; + BaseClient(BaseClient&&) = default; /*! * \brief Move assignment for Client. */ - Client& operator=(Client&&) = default; + BaseClient& operator=(BaseClient&&) = default; /*! * \brief async_connect Asynchronously connects to next_host:port and calls on_connect @@ -73,8 +133,10 @@ public: * \param next_port The port to connect to * \param on_connect The callback to call on a succes. */ - void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect); - + void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) { + ::async_connect(*socket, next_host, next_port, on_connect); + } + /*! * \brief send sends the string prefixed with it's length over the socket. * \param message The string to be sent. @@ -84,20 +146,25 @@ public: void async_send(std::string message, F on_sent) { auto length_buffer = prepare_length_prefix(message.size()); - boost::array<boost::asio::const_buffer, 2> package = { - boost::asio::buffer(length_buffer.data(), length_buffer.size()), - boost::asio::buffer(message) - }; + std::ostream os(send_buffer.get()); + os.unsetf(std::ios::skipws); + + os.write(reinterpret_cast<char*>(length_buffer.data()), length_buffer.size()); + os.write(message.data(), message.size()); auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) { if(ec) { - BOOST_LOG_TRIVIAL(fatal) << ec; + BOOST_LOG_TRIVIAL(fatal) << ec.message(); throw std::runtime_error("unable to send message"); } on_sent(); }; - socket.async_send(package, 0, handler); + boost::asio::async_write( + *socket, + *send_buffer, + handler + ); } /*! @@ -107,28 +174,110 @@ public: void async_receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; - socket.async_receive( - buffer->prepare(4), - boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) + boost::asio::async_read( + *socket, + receive_buffer->prepare(4), + [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { + handle_receive_size(message_handler, ec, read_bytes); + } ); } /*! * \brief close Closes the underlying socket. */ - void close(); + void close() { + socket->close(); + } /*! * \brief on_done sets the done callback. * \param f The new done callback function. */ - void on_done(OnDoneFT f); + void on_done(OnDoneFT f) { + done = f; + } /*! * \brief is_open * \return returns true if the underlying socket is opened. */ - bool is_open() const; + bool is_open() const { + return socket->is_open(); + } +}; + +struct Client : private BaseClient<boost::asio::ip::tcp::socket> { + + using BaseClient::BaseClient; + using BaseClient::operator=; + + using BaseClient::async_receive; + + using BaseClient::async_send; + using BaseClient::async_connect; + using BaseClient::close; + + using BaseClient::on_done; + + using BaseClient::is_open; + +}; + +struct SSLClient : private BaseClient<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> { + + using BaseClient::BaseClient; + using BaseClient::operator=; + + /*! + * \brief async_connect Asynchronously connects to next_host:port and calls on_connect + * \param next_host The host to connect to + * \param next_port The port to connect to + * \param on_connect The callback to call on a succes. + */ + void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) { + + auto new_on_connect = [this, next_host, on_connect](){ + + socket->set_verify_mode(boost::asio::ssl::verify_peer); + socket->set_verify_callback(boost::asio::ssl::rfc2818_verification(next_host)); + + socket->async_handshake(boost::asio::ssl::stream_base::client, [this, on_connect](boost::system::error_code const& ec) { + if(!ec) { + on_connect(); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + }); + }; + + ::async_connect(socket->lowest_layer(), next_host, next_port, new_on_connect); + } + + using BaseClient::async_send; + using BaseClient::async_receive; + + /*! + * \brief close Closes the underlying socket. + */ + void close() { + socket->lowest_layer().close(); + } + + using BaseClient::on_done; + + /*! + * \brief is_open + * \return returns true if the underlying socket is opened. + */ + bool is_open() const { + return socket->lowest_layer().is_open(); + } }; + + |
