aboutsummaryrefslogtreecommitdiff
path: root/libcmix-network/client.hpp
diff options
context:
space:
mode:
authorDennis Brentjes <d.brentjes@gmail.com>2016-10-21 14:01:26 +0200
committerDennis Brentjes <d.brentjes@gmail.com>2016-10-21 18:15:46 +0200
commit510ce3bec7915a790fbf75ace5521e437d9d416a (patch)
tree7b9286875652b677a110287d11d024f85879cc7a /libcmix-network/client.hpp
parent640e0ad7a762d0473581c2114c2c945961bea80f (diff)
downloadcmix-510ce3bec7915a790fbf75ace5521e437d9d416a.tar.gz
cmix-510ce3bec7915a790fbf75ace5521e437d9d416a.tar.bz2
cmix-510ce3bec7915a790fbf75ace5521e437d9d416a.zip
Adds SSL connections between nodes.
Diffstat (limited to 'libcmix-network/client.hpp')
-rw-r--r--libcmix-network/client.hpp207
1 files changed, 178 insertions, 29 deletions
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp
index bd66b98..0b98c1c 100644
--- a/libcmix-network/client.hpp
+++ b/libcmix-network/client.hpp
@@ -1,8 +1,13 @@
#pragma once
+#include "connect.hpp"
+
#include "logging.hpp"
+#include <boost/asio/read.hpp>
+#include <boost/asio/write.hpp>
#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
@@ -11,6 +16,8 @@
#include <array>
+#include <iostream>
+
/*!
* \file
*/
@@ -18,8 +25,15 @@
/*!
* \brief The Client class
*/
-class Client {
-public:
+
+struct SSLClient;
+
+template <typename T>
+class BaseClient {
+ friend SSLClient;
+
+ std::unique_ptr<T> socket;
+
/*!
* \brief OnDoneFT Function type of the function being called when this instance is done operating.
*/
@@ -30,42 +44,88 @@ public:
*/
typedef std::function<void(std::vector<uint8_t>)> MessageHandler;
-protected:
/*!
* \brief socket The socket connection this instance handles.
*/
- boost::asio::ip::tcp::socket socket;
-
-private:
- std::unique_ptr<boost::asio::streambuf> buffer;
+ std::unique_ptr<boost::asio::streambuf> receive_buffer;
+ std::unique_ptr<boost::asio::streambuf> send_buffer;
OnDoneFT done;
- std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes);
+ std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes) {
+ receive_buffer->commit(read_bytes);
+ std::istream is(receive_buffer.get());
+ is.unsetf(std::ios::skipws);
+
+ return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
+ }
- void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+ void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) {
+ using namespace boost::asio::placeholders;
+
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ uint32_t size;
+ std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
+ size = ntohl(size);
+
+ boost::asio::async_read(
+ *socket,
+ receive_buffer->prepare(size),
+ [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) {
+ handle_receive_message(message_handler, ec, read_bytes);
+ }
+ );
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ }
- void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+ void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) {
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ message_handler(data);
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ }
+
+ std::array<uint8_t, 4> prepare_length_prefix(uint32_t length) {
+ length = htonl(length);
+ std::array<uint8_t, 4> buf;
+
+ std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
+ return buf;
+ }
- std::array<uint8_t, 4> prepare_length_prefix(uint32_t length);
public:
/*!
* \brief Client
* \param socket An rvalue reference to a socket it will now own and receive from.
*/
- Client(boost::asio::ip::tcp::socket&& socket);
- ~Client();
+ BaseClient(std::unique_ptr<T>&& socket)
+ : socket(std::move(socket))
+ , receive_buffer(new boost::asio::streambuf)
+ , send_buffer(new boost::asio::streambuf)
+ , done()
+ {}
/*!
* \brief Move constructor for Client.
*/
- Client(Client&&) = default;
+ BaseClient(BaseClient&&) = default;
/*!
* \brief Move assignment for Client.
*/
- Client& operator=(Client&&) = default;
+ BaseClient& operator=(BaseClient&&) = default;
/*!
* \brief async_connect Asynchronously connects to next_host:port and calls on_connect
@@ -73,8 +133,10 @@ public:
* \param next_port The port to connect to
* \param on_connect The callback to call on a succes.
*/
- void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect);
-
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) {
+ ::async_connect(*socket, next_host, next_port, on_connect);
+ }
+
/*!
* \brief send sends the string prefixed with it's length over the socket.
* \param message The string to be sent.
@@ -84,20 +146,25 @@ public:
void async_send(std::string message, F on_sent) {
auto length_buffer = prepare_length_prefix(message.size());
- boost::array<boost::asio::const_buffer, 2> package = {
- boost::asio::buffer(length_buffer.data(), length_buffer.size()),
- boost::asio::buffer(message)
- };
+ std::ostream os(send_buffer.get());
+ os.unsetf(std::ios::skipws);
+
+ os.write(reinterpret_cast<char*>(length_buffer.data()), length_buffer.size());
+ os.write(message.data(), message.size());
auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) {
if(ec) {
- BOOST_LOG_TRIVIAL(fatal) << ec;
+ BOOST_LOG_TRIVIAL(fatal) << ec.message();
throw std::runtime_error("unable to send message");
}
on_sent();
};
- socket.async_send(package, 0, handler);
+ boost::asio::async_write(
+ *socket,
+ *send_buffer,
+ handler
+ );
}
/*!
@@ -107,28 +174,110 @@ public:
void async_receive(MessageHandler message_handler) {
using namespace boost::asio::placeholders;
- socket.async_receive(
- buffer->prepare(4),
- boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
+ boost::asio::async_read(
+ *socket,
+ receive_buffer->prepare(4),
+ [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) {
+ handle_receive_size(message_handler, ec, read_bytes);
+ }
);
}
/*!
* \brief close Closes the underlying socket.
*/
- void close();
+ void close() {
+ socket->close();
+ }
/*!
* \brief on_done sets the done callback.
* \param f The new done callback function.
*/
- void on_done(OnDoneFT f);
+ void on_done(OnDoneFT f) {
+ done = f;
+ }
/*!
* \brief is_open
* \return returns true if the underlying socket is opened.
*/
- bool is_open() const;
+ bool is_open() const {
+ return socket->is_open();
+ }
+};
+
+struct Client : private BaseClient<boost::asio::ip::tcp::socket> {
+
+ using BaseClient::BaseClient;
+ using BaseClient::operator=;
+
+ using BaseClient::async_receive;
+
+ using BaseClient::async_send;
+ using BaseClient::async_connect;
+ using BaseClient::close;
+
+ using BaseClient::on_done;
+
+ using BaseClient::is_open;
+
+};
+
+struct SSLClient : private BaseClient<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> {
+
+ using BaseClient::BaseClient;
+ using BaseClient::operator=;
+
+ /*!
+ * \brief async_connect Asynchronously connects to next_host:port and calls on_connect
+ * \param next_host The host to connect to
+ * \param next_port The port to connect to
+ * \param on_connect The callback to call on a succes.
+ */
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) {
+
+ auto new_on_connect = [this, next_host, on_connect](){
+
+ socket->set_verify_mode(boost::asio::ssl::verify_peer);
+ socket->set_verify_callback(boost::asio::ssl::rfc2818_verification(next_host));
+
+ socket->async_handshake(boost::asio::ssl::stream_base::client, [this, on_connect](boost::system::error_code const& ec) {
+ if(!ec) {
+ on_connect();
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ });
+ };
+
+ ::async_connect(socket->lowest_layer(), next_host, next_port, new_on_connect);
+ }
+
+ using BaseClient::async_send;
+ using BaseClient::async_receive;
+
+ /*!
+ * \brief close Closes the underlying socket.
+ */
+ void close() {
+ socket->lowest_layer().close();
+ }
+
+ using BaseClient::on_done;
+
+ /*!
+ * \brief is_open
+ * \return returns true if the underlying socket is opened.
+ */
+ bool is_open() const {
+ return socket->lowest_layer().is_open();
+ }
};
+
+