#pragma once #include "connect.hpp" #include "logging.hpp" #include #include #include #include #include #include #include #include #include #include #include /*! * \file */ /*! * \brief The Client class */ struct SSLClient; template class BaseClient { friend SSLClient; std::unique_ptr socket; /*! * \brief OnDoneFT Function type of the function being called when this instance is done operating. */ typedef std::function OnDoneFT; /*! * \brief MessageHandler Function type of the function handling incoming messages. */ typedef std::function)> MessageHandler; /*! * \brief socket The socket connection this instance handles. */ std::unique_ptr receive_buffer; std::unique_ptr send_buffer; OnDoneFT done; std::vector received_bytes_to_vector(size_t read_bytes) { receive_buffer->commit(read_bytes); std::istream is(receive_buffer.get()); is.unsetf(std::ios::skipws); return std::vector(std::istream_iterator(is), {}); } void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { using namespace boost::asio::placeholders; if(!ec) { std::vector data = received_bytes_to_vector(read_bytes); uint32_t size; std::copy(data.begin(), data.end(), reinterpret_cast(&size)); size = ntohl(size); boost::asio::async_read( *socket, receive_buffer->prepare(size), [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { handle_receive_message(message_handler, ec, read_bytes); } ); } else { BOOST_LOG_TRIVIAL(error) << ec.message(); if(done) { done(); } } } void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { if(!ec) { std::vector data = received_bytes_to_vector(read_bytes); message_handler(data); } else { BOOST_LOG_TRIVIAL(error) << ec.message(); if(done) { done(); } } } std::array prepare_length_prefix(uint32_t length) { length = htonl(length); std::array buf; std::copy(reinterpret_cast(&length), reinterpret_cast(&length) + 4, buf.data()); return buf; } public: /*! * \brief Client * \param socket An rvalue reference to a socket it will now own and receive from. */ BaseClient(std::unique_ptr&& socket) : socket(std::move(socket)) , receive_buffer(new boost::asio::streambuf) , send_buffer(new boost::asio::streambuf) , done() {} /*! * \brief Move constructor for Client. */ BaseClient(BaseClient&&) = default; /*! * \brief Move assignment for Client. */ BaseClient& operator=(BaseClient&&) = default; /*! * \brief async_connect Asynchronously connects to next_host:port and calls on_connect * \param next_host The host to connect to * \param next_port The port to connect to * \param on_connect The callback to call on a succes. */ void async_connect(std::string next_host, std::string next_port, std::function on_connect) { ::async_connect(*socket, next_host, next_port, on_connect); } /*! * \brief send sends the string prefixed with it's length over the socket. * \param message The string to be sent. * \param on_sent The callback to call when the message has been sent. */ template void async_send(std::string message, F on_sent) { auto length_buffer = prepare_length_prefix(message.size()); std::ostream os(send_buffer.get()); os.unsetf(std::ios::skipws); os.write(reinterpret_cast(length_buffer.data()), length_buffer.size()); os.write(message.data(), message.size()); auto handler = [this, on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) { if(ec) { BOOST_LOG_TRIVIAL(fatal) << ec.message(); throw std::runtime_error("unable to send message"); } send_buffer->consume(bytes_transferred); on_sent(); }; boost::asio::async_write( *socket, send_buffer->data(), handler ); } /*! * \brief async_receive * \param message_handler The function to call when a message has been received. */ void async_receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; boost::asio::async_read( *socket, receive_buffer->prepare(4), [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { handle_receive_size(message_handler, ec, read_bytes); } ); } /*! * \brief close Closes the underlying socket. */ void close() { socket->close(); } /*! * \brief on_done sets the done callback. * \param f The new done callback function. */ void on_done(OnDoneFT f) { done = f; } /*! * \brief is_open * \return returns true if the underlying socket is opened. */ bool is_open() const { return socket->is_open(); } }; struct Client : private BaseClient { using BaseClient::BaseClient; using BaseClient::operator=; using BaseClient::async_receive; using BaseClient::async_send; using BaseClient::async_connect; using BaseClient::close; using BaseClient::on_done; using BaseClient::is_open; }; struct SSLClient : private BaseClient> { using BaseClient::BaseClient; using BaseClient::operator=; /*! * \brief async_connect Asynchronously connects to next_host:port and calls on_connect * \param next_host The host to connect to * \param next_port The port to connect to * \param on_connect The callback to call on a succes. */ void async_connect(std::string next_host, std::string next_port, std::function on_connect) { auto new_on_connect = [this, next_host, on_connect](){ socket->set_verify_mode(boost::asio::ssl::verify_peer); socket->set_verify_callback(boost::asio::ssl::rfc2818_verification(next_host)); socket->async_handshake(boost::asio::ssl::stream_base::client, [this, on_connect](boost::system::error_code const& ec) { if(!ec) { on_connect(); } else { BOOST_LOG_TRIVIAL(error) << ec.message(); if(done) { done(); } } }); }; ::async_connect(socket->lowest_layer(), next_host, next_port, new_on_connect); } using BaseClient::async_send; using BaseClient::async_receive; /*! * \brief close Closes the underlying socket. */ void close() { socket->lowest_layer().cancel(); } using BaseClient::on_done; /*! * \brief is_open * \return returns true if the underlying socket is opened. */ bool is_open() const { return socket->lowest_layer().is_open(); } };