aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/cmixclient.cpp4
-rw-r--r--libcmix-common/receiver.hpp28
-rw-r--r--libcmix-common/sender.hpp31
-rw-r--r--libcmix-common/senderreceiver.cpp9
-rw-r--r--libcmix-common/senderreceiver.hpp46
-rw-r--r--libcmix-network/CMakeLists.txt4
-rw-r--r--libcmix-network/accept.cpp47
-rw-r--r--libcmix-network/accept.hpp4
-rw-r--r--libcmix-network/acceptor.cpp5
-rw-r--r--libcmix-network/acceptor.hpp2
-rw-r--r--libcmix-network/client.cpp91
-rw-r--r--libcmix-network/client.hpp207
-rw-r--r--libcmix-network/connect.cpp14
-rw-r--r--libcmix-network/connect.hpp3
-rw-r--r--libcmix-network/protobufclient.cpp0
-rw-r--r--libcmix-network/protobufclient.hpp58
-rw-r--r--libcmix-network/server.cpp17
-rw-r--r--libcmix-network/server.hpp5
-rw-r--r--node/main.cpp67
-rw-r--r--node/node.cpp41
-rw-r--r--node/node.hpp15
21 files changed, 517 insertions, 181 deletions
diff --git a/client/cmixclient.cpp b/client/cmixclient.cpp
index 5c94750..ccbdeb5 100644
--- a/client/cmixclient.cpp
+++ b/client/cmixclient.cpp
@@ -1,6 +1,8 @@
#include "cmixclient.hpp"
+using namespace boost::asio::ip;
+
void CMixClient::key_exchange(int i) {
BOOST_LOG_TRIVIAL(trace) << "Sending KeyExchange for node: " << i;
@@ -27,7 +29,7 @@ void CMixClient::initialize_connections() {
key_exchange(i);
};
- network_connections.emplace_back(boost::asio::ip::tcp::socket(io_service));
+ network_connections.emplace_back(std::unique_ptr<tcp::socket>(new tcp::socket(io_service)));
network_connections.back().async_connect(network_details[i].host, network_details[i].port, handler);
}
}
diff --git a/libcmix-common/receiver.hpp b/libcmix-common/receiver.hpp
index 8a73e9c..efd3753 100644
--- a/libcmix-common/receiver.hpp
+++ b/libcmix-common/receiver.hpp
@@ -21,6 +21,7 @@ struct Receiver : private ProtobufClient<CMixProtoFunctor>
friend SenderReceiver;
using ProtobufClient::ProtobufClient;
+ using ProtobufClient::operator=;
using ProtobufClient::async_receive;
@@ -32,3 +33,30 @@ struct Receiver : private ProtobufClient<CMixProtoFunctor>
using ProtobufClient::is_open;
};
+
+struct SSLSenderReceiver;
+
+/*!
+ * \brief The Receiver struct is a shim around ProtobufClient and only exposes things needed
+ * for receiving messages.
+ */
+struct SSLReceiver : private SSLProtobufClient<CMixProtoFunctor>
+{
+ /*!
+ * \brief friend decleration allowing Receivers to be upgraded to SenderReceivers.
+ */
+ friend SSLSenderReceiver;
+
+ using SSLProtobufClient::SSLProtobufClient;
+ using SSLProtobufClient::operator=;
+
+ using SSLProtobufClient::async_receive;
+
+ using SSLProtobufClient::async_connect;
+
+ using SSLProtobufClient::close;
+
+ using SSLProtobufClient::on_done;
+
+ using SSLProtobufClient::is_open;
+};
diff --git a/libcmix-common/sender.hpp b/libcmix-common/sender.hpp
index ea16a81..c722e30 100644
--- a/libcmix-common/sender.hpp
+++ b/libcmix-common/sender.hpp
@@ -25,6 +25,7 @@ struct Sender : private ProtobufClient<CMixProtoFunctor>
friend SenderReceiver;
using ProtobufClient::ProtobufClient;
+ using ProtobufClient::operator=;
using ProtobufClient::async_send;
@@ -36,3 +37,33 @@ struct Sender : private ProtobufClient<CMixProtoFunctor>
using ProtobufClient::is_open;
};
+
+/*!
+ * \brief forward declaration for SenderReceiver to allow for the friend declaration.
+ */
+struct SSLSenderReceiver;
+
+/*!
+ * \brief The Sender struct is a shim around ProtobufClient and only exposes things needed
+ * for sending messages.
+ */
+struct SSLSender : private SSLProtobufClient<CMixProtoFunctor>
+{
+ /*!
+ * The friend declaration that allows for the Sender to be upgraded to a SenderReceiver.
+ */
+ friend SSLSenderReceiver;
+
+ using SSLProtobufClient::SSLProtobufClient;
+ using SSLProtobufClient::operator=;
+
+ using SSLProtobufClient::async_send;
+
+ using SSLProtobufClient::async_connect;
+
+ using SSLProtobufClient::close;
+
+ using SSLProtobufClient::on_done;
+
+ using SSLProtobufClient::is_open;
+};
diff --git a/libcmix-common/senderreceiver.cpp b/libcmix-common/senderreceiver.cpp
index eafa8a5..8b13789 100644
--- a/libcmix-common/senderreceiver.cpp
+++ b/libcmix-common/senderreceiver.cpp
@@ -1,10 +1 @@
-#include "senderreceiver.hpp"
-
-SenderReceiver::SenderReceiver(Receiver&& r)
-: ProtobufClient(std::move(r))
-{}
-
-SenderReceiver::SenderReceiver(Sender&& s)
-: ProtobufClient(std::move(s))
-{}
diff --git a/libcmix-common/senderreceiver.hpp b/libcmix-common/senderreceiver.hpp
index c7fed3a..6654809 100644
--- a/libcmix-common/senderreceiver.hpp
+++ b/libcmix-common/senderreceiver.hpp
@@ -20,15 +20,20 @@ struct SenderReceiver : private ProtobufClient<CMixProtoFunctor>
* \brief SenderReceiver Explicit conversion to SenderReceiver
* \param r The Receiver to upgrade
*/
- explicit SenderReceiver(Receiver&& r);
+ explicit SenderReceiver(Receiver&& r)
+ : ProtobufClient(std::move(r))
+ {}
/*!
* \brief SenderReceiver Explicit conversion to SenderReceiver
* \param s The Sender to upgrade
*/
- explicit SenderReceiver(Sender&& s);
+ explicit SenderReceiver(Sender&& s)
+ : ProtobufClient(std::move(s))
+ {}
using ProtobufClient::ProtobufClient;
+ using ProtobufClient::operator=;
using ProtobufClient::async_receive;
@@ -41,4 +46,39 @@ struct SenderReceiver : private ProtobufClient<CMixProtoFunctor>
using ProtobufClient::on_done;
using ProtobufClient::is_open;
-}; \ No newline at end of file
+};
+
+struct SSLSenderReceiver : private SSLProtobufClient<CMixProtoFunctor>
+{
+ /*!
+ * \brief SenderReceiver Explicit conversion to SenderReceiver
+ * \param r The Receiver to upgrade
+ */
+ explicit SSLSenderReceiver(SSLReceiver&& r)
+ : SSLProtobufClient(std::move(r))
+ {}
+
+ /*!
+ * \brief SenderReceiver Explicit conversion to SenderReceiver
+ * \param s The Sender to upgrade
+ */
+ explicit SSLSenderReceiver(SSLSender&& s)
+ : SSLProtobufClient(std::move(s))
+ {}
+
+ using SSLProtobufClient::SSLProtobufClient;
+ using SSLProtobufClient::operator=;
+
+ using SSLProtobufClient::async_receive;
+
+ using SSLProtobufClient::async_send;
+
+ using SSLProtobufClient::async_connect;
+
+ using SSLProtobufClient::close;
+
+ using SSLProtobufClient::on_done;
+
+ using SSLProtobufClient::is_open;
+};
+
diff --git a/libcmix-network/CMakeLists.txt b/libcmix-network/CMakeLists.txt
index b58cef1..057061c 100644
--- a/libcmix-network/CMakeLists.txt
+++ b/libcmix-network/CMakeLists.txt
@@ -8,8 +8,8 @@ add_library(cmix-network
accept.hpp accept.cpp
connect.hpp connect.cpp
server.hpp server.cpp
- client.hpp client.cpp
- protobufclient.hpp protobufclient.cpp
+ client.hpp
+ protobufclient.hpp
uriparser.hpp uriparser.cpp
)
diff --git a/libcmix-network/accept.cpp b/libcmix-network/accept.cpp
index 3fa4314..1ec26fe 100644
--- a/libcmix-network/accept.cpp
+++ b/libcmix-network/accept.cpp
@@ -7,42 +7,61 @@
using namespace boost::asio::ip;
using namespace boost::asio;
-void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<tcp::socket> socket, boost::system::error_code ec, std::function<void (tcp::socket&&)> f)
+void accept_connection(tcp::acceptor& acceptor, tcp::socket* socket, boost::system::error_code ec, AcceptHandler f)
{
if(!bool(ec))
{
- f(std::move(*socket));
+ f(std::unique_ptr<tcp::socket>(socket));
accept_loop(acceptor, f);
} else {
- std::stringstream ss;
- ss << ec;
- throw std::runtime_error(ss.str());
+ if(ec != boost::system::errc::operation_canceled) {
+ std::stringstream ss;
+ ss << ec.message();
+ delete socket;
+ throw std::runtime_error(ss.str());
+ } else {
+ delete socket;
+ }
}
}
-void accept_loop(tcp::acceptor& acceptor, std::function<void (tcp::socket&&)> f)
+void accept_loop(tcp::acceptor& acceptor, AcceptHandler f)
{
- std::shared_ptr<tcp::socket> new_socket = std::make_shared<tcp::socket>(acceptor.get_io_service());
+ tcp::socket* new_socket = new tcp::socket(acceptor.get_io_service());
acceptor.async_accept(*new_socket, boost::bind(accept_connection, boost::ref(acceptor), new_socket, placeholders::error, f));
}
-void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, std::shared_ptr<ssl::stream<tcp::socket>> socket, boost::system::error_code ec, std::function<void (ssl::stream<tcp::socket>&&)> f)
+void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, ssl::stream<tcp::socket>* socket, boost::system::error_code ec, SSLAcceptHandler f)
{
if(!bool(ec))
{
- f(std::move(*socket));
+ socket->async_handshake(boost::asio::ssl::stream_base::server, [&acceptor, ctx, socket, f](boost::system::error_code const& ec) {
+ if(!bool(ec)) {
+ f(std::unique_ptr<ssl::stream<tcp::socket>>(socket), ctx);
+ } else {
+ std::stringstream ss;
+ ss << ec.message();
+ delete socket;
+ throw std::runtime_error(ss.str());
+ }
+ });
accept_loop(acceptor, ctx, f);
} else {
- std::stringstream ss;
- ss << ec;
- throw std::runtime_error(ss.str());
+ if(ec != boost::system::errc::operation_canceled) {
+ std::stringstream ss;
+ ss << ec;
+ delete socket;
+ throw std::runtime_error(ss.str());
+ } else {
+ delete socket;
+ }
}
}
-void accept_loop(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, std::function<void(ssl::stream<tcp::socket>&& socket)> f) {
+void accept_loop(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, SSLAcceptHandler f) {
- std::shared_ptr<ssl::stream<tcp::socket>> new_socket = std::make_shared<ssl::stream<tcp::socket>>(acceptor.get_io_service(), *ctx);
+ ssl::stream<tcp::socket>* new_socket = new ssl::stream<tcp::socket>(acceptor.get_io_service(), *ctx);
acceptor.async_accept(new_socket->lowest_layer(), boost::bind(accept_connection, boost::ref(acceptor), ctx, new_socket, placeholders::error, f));
}
diff --git a/libcmix-network/accept.hpp b/libcmix-network/accept.hpp
index dde98a3..f7d068a 100644
--- a/libcmix-network/accept.hpp
+++ b/libcmix-network/accept.hpp
@@ -12,11 +12,11 @@
/*!
* \brief AcceptHandler Handy typedef for a function taking an tcp::socket.
*/
-typedef std::function<void(boost::asio::ip::tcp::socket&&)> AcceptHandler;
+typedef std::function<void(std::unique_ptr<boost::asio::ip::tcp::socket>&&)> AcceptHandler;
/*!
* \brief SSLAcceptHandler Handy typedef for a function taking an "SSLSocket"
*/
-typedef std::function<void(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>&&)> SSLAcceptHandler;
+typedef std::function<void(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&&, std::shared_ptr<boost::asio::ssl::context>)> SSLAcceptHandler;
/*!
* \brief accept_loop Keeps accepting connections on the specified acceptor
diff --git a/libcmix-network/acceptor.cpp b/libcmix-network/acceptor.cpp
index 83406db..ee8833a 100644
--- a/libcmix-network/acceptor.cpp
+++ b/libcmix-network/acceptor.cpp
@@ -53,3 +53,8 @@ void Acceptor::start_accepting(std::shared_ptr<ssl::context> ctx, SSLAcceptHandl
{
accept_loop(acceptor, ctx, accept_handler);
}
+
+void Acceptor::close()
+{
+ acceptor.close();
+}
diff --git a/libcmix-network/acceptor.hpp b/libcmix-network/acceptor.hpp
index 6e4de63..0ee9294 100644
--- a/libcmix-network/acceptor.hpp
+++ b/libcmix-network/acceptor.hpp
@@ -74,4 +74,6 @@ public:
*/
void start_accepting(std::shared_ptr<boost::asio::ssl::context> ctx, SSLAcceptHandler accept_handler);
+ void close();
+
};
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp
deleted file mode 100644
index 09ea98c..0000000
--- a/libcmix-network/client.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-#include "client.hpp"
-
-#include "connect.hpp"
-
-#include <iostream>
-
-using namespace boost::asio::ip;
-using namespace boost::system;
-
-Client::Client(tcp::socket &&socket)
-: socket(std::move(socket))
-, buffer(new boost::asio::streambuf())
-, done()
-{}
-
-Client::~Client()
-{
- close();
-}
-
-void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect)
-{
- ::async_connect(socket, next_host, next_port, on_connect);
-}
-
-std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length)
-{
- length = htonl(length);
- std::array<uint8_t, 4> buf;
-
- std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
- return buf;
-}
-
-std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes)
-{
- buffer->commit(read_bytes);
- std::istream is(buffer.get());
- is.unsetf(std::ios::skipws);
-
- return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
-}
-
-void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
-{
- if(!ec) {
- std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
- message_handler(data);
- } else {
- BOOST_LOG_TRIVIAL(error) << ec;
- if(done) {
- done();
- }
- }
-}
-
-void Client::handle_receive_size(Client::MessageHandler message_handler, error_code const& ec, size_t read_bytes)
-{
- using namespace boost::asio::placeholders;
-
- if(!ec) {
- std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
- uint32_t size;
- std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
- size = ntohl(size);
-
- socket.async_receive(
- buffer->prepare(size),
- boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred())
- );
- } else {
- BOOST_LOG_TRIVIAL(error) << ec;
- if(done) {
- done();
- }
- }
-}
-
-void Client::close()
-{
- socket.close();
-}
-
-void Client::on_done(Client::OnDoneFT f) {
- done = f;
-}
-
-bool Client::is_open() const
-{
- return socket.is_open();
-}
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp
index bd66b98..0b98c1c 100644
--- a/libcmix-network/client.hpp
+++ b/libcmix-network/client.hpp
@@ -1,8 +1,13 @@
#pragma once
+#include "connect.hpp"
+
#include "logging.hpp"
+#include <boost/asio/read.hpp>
+#include <boost/asio/write.hpp>
#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
@@ -11,6 +16,8 @@
#include <array>
+#include <iostream>
+
/*!
* \file
*/
@@ -18,8 +25,15 @@
/*!
* \brief The Client class
*/
-class Client {
-public:
+
+struct SSLClient;
+
+template <typename T>
+class BaseClient {
+ friend SSLClient;
+
+ std::unique_ptr<T> socket;
+
/*!
* \brief OnDoneFT Function type of the function being called when this instance is done operating.
*/
@@ -30,42 +44,88 @@ public:
*/
typedef std::function<void(std::vector<uint8_t>)> MessageHandler;
-protected:
/*!
* \brief socket The socket connection this instance handles.
*/
- boost::asio::ip::tcp::socket socket;
-
-private:
- std::unique_ptr<boost::asio::streambuf> buffer;
+ std::unique_ptr<boost::asio::streambuf> receive_buffer;
+ std::unique_ptr<boost::asio::streambuf> send_buffer;
OnDoneFT done;
- std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes);
+ std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes) {
+ receive_buffer->commit(read_bytes);
+ std::istream is(receive_buffer.get());
+ is.unsetf(std::ios::skipws);
+
+ return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
+ }
- void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+ void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) {
+ using namespace boost::asio::placeholders;
+
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ uint32_t size;
+ std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
+ size = ntohl(size);
+
+ boost::asio::async_read(
+ *socket,
+ receive_buffer->prepare(size),
+ [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) {
+ handle_receive_message(message_handler, ec, read_bytes);
+ }
+ );
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ }
- void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+ void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) {
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ message_handler(data);
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ }
+
+ std::array<uint8_t, 4> prepare_length_prefix(uint32_t length) {
+ length = htonl(length);
+ std::array<uint8_t, 4> buf;
+
+ std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
+ return buf;
+ }
- std::array<uint8_t, 4> prepare_length_prefix(uint32_t length);
public:
/*!
* \brief Client
* \param socket An rvalue reference to a socket it will now own and receive from.
*/
- Client(boost::asio::ip::tcp::socket&& socket);
- ~Client();
+ BaseClient(std::unique_ptr<T>&& socket)
+ : socket(std::move(socket))
+ , receive_buffer(new boost::asio::streambuf)
+ , send_buffer(new boost::asio::streambuf)
+ , done()
+ {}
/*!
* \brief Move constructor for Client.
*/
- Client(Client&&) = default;
+ BaseClient(BaseClient&&) = default;
/*!
* \brief Move assignment for Client.
*/
- Client& operator=(Client&&) = default;
+ BaseClient& operator=(BaseClient&&) = default;
/*!
* \brief async_connect Asynchronously connects to next_host:port and calls on_connect
@@ -73,8 +133,10 @@ public:
* \param next_port The port to connect to
* \param on_connect The callback to call on a succes.
*/
- void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect);
-
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) {
+ ::async_connect(*socket, next_host, next_port, on_connect);
+ }
+
/*!
* \brief send sends the string prefixed with it's length over the socket.
* \param message The string to be sent.
@@ -84,20 +146,25 @@ public:
void async_send(std::string message, F on_sent) {
auto length_buffer = prepare_length_prefix(message.size());
- boost::array<boost::asio::const_buffer, 2> package = {
- boost::asio::buffer(length_buffer.data(), length_buffer.size()),
- boost::asio::buffer(message)
- };
+ std::ostream os(send_buffer.get());
+ os.unsetf(std::ios::skipws);
+
+ os.write(reinterpret_cast<char*>(length_buffer.data()), length_buffer.size());
+ os.write(message.data(), message.size());
auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) {
if(ec) {
- BOOST_LOG_TRIVIAL(fatal) << ec;
+ BOOST_LOG_TRIVIAL(fatal) << ec.message();
throw std::runtime_error("unable to send message");
}
on_sent();
};
- socket.async_send(package, 0, handler);
+ boost::asio::async_write(
+ *socket,
+ *send_buffer,
+ handler
+ );
}
/*!
@@ -107,28 +174,110 @@ public:
void async_receive(MessageHandler message_handler) {
using namespace boost::asio::placeholders;
- socket.async_receive(
- buffer->prepare(4),
- boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
+ boost::asio::async_read(
+ *socket,
+ receive_buffer->prepare(4),
+ [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) {
+ handle_receive_size(message_handler, ec, read_bytes);
+ }
);
}
/*!
* \brief close Closes the underlying socket.
*/
- void close();
+ void close() {
+ socket->close();
+ }
/*!
* \brief on_done sets the done callback.
* \param f The new done callback function.
*/
- void on_done(OnDoneFT f);
+ void on_done(OnDoneFT f) {
+ done = f;
+ }
/*!
* \brief is_open
* \return returns true if the underlying socket is opened.
*/
- bool is_open() const;
+ bool is_open() const {
+ return socket->is_open();
+ }
+};
+
+struct Client : private BaseClient<boost::asio::ip::tcp::socket> {
+
+ using BaseClient::BaseClient;
+ using BaseClient::operator=;
+
+ using BaseClient::async_receive;
+
+ using BaseClient::async_send;
+ using BaseClient::async_connect;
+ using BaseClient::close;
+
+ using BaseClient::on_done;
+
+ using BaseClient::is_open;
+
+};
+
+struct SSLClient : private BaseClient<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> {
+
+ using BaseClient::BaseClient;
+ using BaseClient::operator=;
+
+ /*!
+ * \brief async_connect Asynchronously connects to next_host:port and calls on_connect
+ * \param next_host The host to connect to
+ * \param next_port The port to connect to
+ * \param on_connect The callback to call on a succes.
+ */
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) {
+
+ auto new_on_connect = [this, next_host, on_connect](){
+
+ socket->set_verify_mode(boost::asio::ssl::verify_peer);
+ socket->set_verify_callback(boost::asio::ssl::rfc2818_verification(next_host));
+
+ socket->async_handshake(boost::asio::ssl::stream_base::client, [this, on_connect](boost::system::error_code const& ec) {
+ if(!ec) {
+ on_connect();
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec.message();
+ if(done) {
+ done();
+ }
+ }
+ });
+ };
+
+ ::async_connect(socket->lowest_layer(), next_host, next_port, new_on_connect);
+ }
+
+ using BaseClient::async_send;
+ using BaseClient::async_receive;
+
+ /*!
+ * \brief close Closes the underlying socket.
+ */
+ void close() {
+ socket->lowest_layer().close();
+ }
+
+ using BaseClient::on_done;
+
+ /*!
+ * \brief is_open
+ * \return returns true if the underlying socket is opened.
+ */
+ bool is_open() const {
+ return socket->lowest_layer().is_open();
+ }
};
+
+
diff --git a/libcmix-network/connect.cpp b/libcmix-network/connect.cpp
index 01e72de..9df9b56 100644
--- a/libcmix-network/connect.cpp
+++ b/libcmix-network/connect.cpp
@@ -4,8 +4,10 @@
#include <boost/asio/ip/basic_resolver.hpp>
#include <boost/asio/ip/address.hpp>
+#include <boost/asio/ssl.hpp>
using namespace boost::asio::ip;
+using namespace boost::asio;
using boost::asio::io_service;
boost::asio::ip::tcp::socket connect(std::string host, std::string port, io_service& io_service) {
@@ -50,7 +52,7 @@ struct IterationInfo {
{}
};
-void async_connect_iteration(tcp::socket& socket, std::shared_ptr<IterationInfo> info, std::function<void()> on_connect) {
+void async_connect_iteration(basic_socket<tcp, stream_socket_service<tcp>>& socket, std::shared_ptr<IterationInfo> info, std::function<void()> on_connect) {
if(info->it == boost::asio::ip::tcp::resolver::iterator()) {
throw std::runtime_error("None of the supplied endpoints responded");
@@ -91,16 +93,16 @@ void async_connect_iteration(tcp::socket& socket, std::shared_ptr<IterationInfo>
socket.async_connect(*(info->it), handler);
}
-void async_connect(tcp::socket& socket, std::string host, std::string next_port, std::function<void()> on_connect)
+void async_connect(basic_socket<tcp, stream_socket_service<tcp>>& socket, std::string host, std::string port, std::function<void()> on_connect)
{
boost::asio::ip::basic_resolver<tcp> resolver(socket.get_io_service());
- boost::asio::ip::basic_resolver_query<tcp> query(host, next_port);
+ boost::asio::ip::basic_resolver_query<tcp> query(host, port);
- BOOST_LOG_TRIVIAL(trace) << "connecting to: \"" << host << ":" << next_port << "\"";
+ BOOST_LOG_TRIVIAL(trace) << "connecting to: \"" << host << ":" << port << "\"";
auto it = resolver.resolve(query);
std::shared_ptr<IterationInfo> info = std::make_shared<IterationInfo>(0, it, socket.get_io_service());
-
+
async_connect_iteration(socket, info, on_connect);
-}
+} \ No newline at end of file
diff --git a/libcmix-network/connect.hpp b/libcmix-network/connect.hpp
index 0a58115..f58c3e1 100644
--- a/libcmix-network/connect.hpp
+++ b/libcmix-network/connect.hpp
@@ -2,6 +2,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/io_service.hpp>
+#include <boost/asio/ssl.hpp>
/*!
* \file
@@ -23,4 +24,4 @@ boost::asio::ip::tcp::socket connect(std::string host, std::string port, boost::
* \param next_port The port
* \param on_connect The function to call when the connect has succeeded.
*/
-void async_connect(boost::asio::ip::tcp::socket& socket, std::string host, std::string next_port, std::function<void()> on_connect);
+void async_connect(boost::asio::basic_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp>>& socket, std::string host, std::string port, std::function<void()> on_connect); \ No newline at end of file
diff --git a/libcmix-network/protobufclient.cpp b/libcmix-network/protobufclient.cpp
deleted file mode 100644
index e69de29..0000000
--- a/libcmix-network/protobufclient.cpp
+++ /dev/null
diff --git a/libcmix-network/protobufclient.hpp b/libcmix-network/protobufclient.hpp
index 247b71f..df84152 100644
--- a/libcmix-network/protobufclient.hpp
+++ b/libcmix-network/protobufclient.hpp
@@ -4,8 +4,13 @@
#include "logging.hpp"
+#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl.hpp>
+
#include <type_traits>
+#include <iostream>
+
/*!
* \file
*/
@@ -15,6 +20,7 @@
* in to a desired wireformat to serialize and send or receive via the Client.
* \tparam T The Protobuf functor to use when transforming messages.
*/
+
template <typename T>
class ProtobufClient : private Client
{
@@ -31,6 +37,7 @@ public:
template<typename V, typename F>
void async_send(V value, F on_sent) {
typename T::proto_type m = T()(value);
+
Client::async_send(m.SerializeAsString(), on_sent);
}
@@ -65,3 +72,54 @@ public:
using Client::on_done;
using Client::is_open;
};
+
+template <typename T>
+class SSLProtobufClient : private SSLClient
+{
+public:
+ using SSLClient::SSLClient;
+ using SSLClient::operator=;
+ using SSLClient::async_connect;
+
+ /*!
+ * \brief an async_send wrapper that transforms a specific message to the container message an sends it
+ * \param value The specific message to send.
+ * \param on_sent The function to call after succesfully sending the message.
+ */
+ template<typename V, typename F>
+ void async_send(V value, F on_sent) {
+ typename T::proto_type m = T()(value);
+ SSLClient::async_send(m.SerializeAsString(), on_sent);
+ }
+
+ /*!
+ * \brief an async_send wrapper, like the above but without the on_sent callback.
+ * \param value The specific message to send.
+ */
+ template<typename V>
+ void async_send(V value) {
+ async_send(value, []{});
+ }
+
+ /*!
+ * \brief An async_receive wrapper that transforms the wireformat to the protobuf
+ * container message type.
+ * \param message_handler The callback to call after receiving a message.
+ */
+ template <typename F>
+ void async_receive(F message_handler) {
+ auto f = [message_handler](std::vector<uint8_t> const& buffer) {
+ typename T::proto_type message;
+ if(!message.ParseFromArray(buffer.data(), buffer.size())) {
+ BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage";
+ throw std::runtime_error("Network communication was disrupted in a unrecoverable way.");
+ }
+ message_handler(message);
+ };
+ SSLClient::async_receive(f);
+ }
+
+ using SSLClient::close;
+ using SSLClient::on_done;
+ using SSLClient::is_open;
+};
diff --git a/libcmix-network/server.cpp b/libcmix-network/server.cpp
index 34e3183..195c5a0 100644
--- a/libcmix-network/server.cpp
+++ b/libcmix-network/server.cpp
@@ -45,10 +45,27 @@ Server::Server(io_service& io_service, const ListenSettings& listen_settings, Ac
Server::Server(io_service& io_service, const ListenSettings& listen_settings, std::shared_ptr<ssl::context> ctx, SSLAcceptHandler accept_handler)
: Server(io_service, listen_settings)
{
+ ctx->set_options(
+ boost::asio::ssl::context::default_workarounds
+ | boost::asio::ssl::context::no_sslv2
+ | boost::asio::ssl::context::single_dh_use
+ );
+
+ ctx->use_certificate_chain_file(listen_settings.cert);
+ ctx->use_private_key_file(listen_settings.key, boost::asio::ssl::context::pem);
+ ctx->use_tmp_dh_file(listen_settings.dhparam);
+ ctx->set_default_verify_paths();
+
if(v4_acceptor.is_open()) {
v4_acceptor.start_accepting(ctx, accept_handler);
}
if(v6_acceptor.is_open()) {
v6_acceptor.start_accepting(ctx, accept_handler);
}
+}
+
+void Server::close()
+{
+ v4_acceptor.close();
+ v6_acceptor.close();
} \ No newline at end of file
diff --git a/libcmix-network/server.hpp b/libcmix-network/server.hpp
index fad7c71..6976f04 100644
--- a/libcmix-network/server.hpp
+++ b/libcmix-network/server.hpp
@@ -19,6 +19,9 @@ struct ListenSettings {
std::string ipv6_inaddr; ///< Listen on this ipv6 address
uint16_t port; ///< Listen on this port.
bool use_ssl; ///< Should we use ssl
+ std::string cert; ///< The cert to use in pem format.
+ std::string key; ///< The corresponding key in pem format.
+ std::string dhparam; ///< The diffie helman parameters.
};
/*!
@@ -50,4 +53,6 @@ public:
*/
Server(boost::asio::io_service& io_service, ListenSettings const& listen_settings, std::shared_ptr<boost::asio::ssl::context> ctx, SSLAcceptHandler accept_handler);
+ void close();
+
};
diff --git a/node/main.cpp b/node/main.cpp
index 527e3c5..0c81312 100644
--- a/node/main.cpp
+++ b/node/main.cpp
@@ -4,6 +4,7 @@
#include "logging.hpp"
#include <boost/program_options.hpp>
+#include <boost/filesystem/operations.hpp>
#include <iostream>
@@ -18,8 +19,12 @@ int main(int argc, char* argv[]) {
("interface4,4", po::value<std::string>()->default_value("0.0.0.0"), "Set the ipv4 address to listen on.")
("enable_v6", po::value<bool>()->default_value(true), "Enable/disable ipv6 accept support.")
("interface6,6", po::value<std::string>()->default_value("::"), "Set the ipv6 address to listen on.")
- ("next_node,n", po::value<std::string>(), "The address of the next node in the network")
+ ("next_node,n", po::value<std::string>(), "The address of the next node in the network.")
("first,f", "This is the first node and will be the communication point for the clients.")
+ ("cert,c", po::value<std::string>(), "The cert file to use (in pem format).")
+ ("key,k", po::value<std::string>(), "The key file (in pem format).")
+ ("dhparam,d", po::value<std::string>(), "The dhparam file.")
+ ("certdir", po::value<std::string>(), "Directory containing trusted certificates.")
;
po::variables_map vm;
@@ -37,12 +42,54 @@ int main(int argc, char* argv[]) {
std::string if6 = vm["interface6"].as<std::string>();
uint16_t port = vm["port"].as<uint16_t>();
- init_logging(boost::log::trivial::severity_level::trace, "node_" + std::to_string(port));
+ std::string cert;
+ if(vm.count("cert")) {
+ std::string filename = vm["cert"].as<std::string>();
+ if(boost::filesystem::exists(filename)) {
+ cert = filename;
+ } else {
+ std::cerr << "cert file: \"" << filename << "\" does not exist";
+ return -1;
+ }
+ } else {
+ std::cerr << "supplying a certificate is required" << std::endl;
+ return -1;
+ }
- BOOST_LOG_TRIVIAL(info) << "Started node";
+ std::string key;
+ if(vm.count("key")) {
+ std::string filename = vm["key"].as<std::string>();
+ if(boost::filesystem::exists(filename)) {
+ key = filename;
+ } else {
+ std::cerr << "key file: \"" << filename << "\" does not exist";
+ return -1;
+ }
+ } else {
+ std::cerr << "supplying a key file is required" << std::endl;
+ return -1;
+ }
+
+ std::string dhparam;
+ if(vm.count("dhparam")) {
+ std::string filename = vm["dhparam"].as<std::string>();
+ if(boost::filesystem::exists(filename)) {
+ dhparam = filename;
+ } else {
+ std::cerr << "dhparam file: \"" << filename << "\" does not exist";
+ return -1;
+ }
+ } else {
+ std::cerr << "supplying a dhparam file is required" << std::endl;
+ return -1;
+ }
+
+ ListenSettings lsettings{en4, if4, en6, if6, port, true, cert, key, dhparam};
- ListenSettings lsettings{en4, if4, en6, if6, port};
+ init_logging(boost::log::trivial::severity_level::trace, "node_" + std::to_string(port));
+
+ BOOST_LOG_TRIVIAL(info) << "Started node";
bool is_first = bool(vm.count("first"));
std::string next_node;
@@ -52,10 +99,20 @@ int main(int argc, char* argv[]) {
std::cerr << "next_node option is required." << std::endl;
return -1;
}
+ std::string certdir;
+ if(vm.count("certdir")) {
+ std::string filename = vm["certdir"].as<std::string>();
+ if(boost::filesystem::is_directory(filename)) {
+ certdir = filename;
+ } else {
+ std::cerr << "cert dir: \"" << filename << "\" is not a directory";
+ return -1;
+ }
+ }
Uri uri = parse_uri(next_node);
- NodeNetworkSettings nsettings{is_first, uri.host, uri.port};
+ NodeNetworkSettings nsettings{is_first, uri.host, uri.port, certdir};
Node node(lsettings, nsettings);
node.run();
diff --git a/node/node.cpp b/node/node.cpp
index 04ed4f1..2dfbba8 100644
--- a/node/node.cpp
+++ b/node/node.cpp
@@ -11,12 +11,13 @@ using namespace boost::asio::ip;
Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings)
: io_service()
-, server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));})
+, ctx(std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23))
+, server(io_service, listen_settings, ctx, [this](std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx){accept_handler(std::move(socket), ctx);})
, clients()
, data()
, network_settings(network_settings)
-, prev_node(Receiver(tcp::socket(io_service)))
-, next_node(Sender(tcp::socket(io_service)))
+, prev_node(SSLReceiver(std::unique_ptr<boost::asio::ssl::stream<tcp::socket>>(new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(io_service, *ctx))))
+, next_node(SSLSender(std::unique_ptr<boost::asio::ssl::stream<tcp::socket>>(new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(io_service, *ctx))))
, api(get_implementation())
, keypair(api.create_key_pair())
, network_key()
@@ -24,11 +25,9 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
- auto on_connect = [this, network_settings](){
- next_node.async_send(cmix_proto::ImANode());
- };
-
- next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect);
+ if(network_settings.is_first) {
+ connect_to_next_node();
+ }
}
Node::~Node() {
@@ -39,9 +38,9 @@ void Node::run() {
io_service.run();
}
-void Node::accept_handler(boost::asio::ip::tcp::socket&& socket)
+void Node::accept_handler(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx)
{
- std::list<Receiver>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket));
+ Purgatory::iterator it = purgatory.emplace(purgatory.end(), std::move(socket));
purgatory.back().on_done(
[this, it]() {
purgatory.erase(it);
@@ -53,6 +52,20 @@ void Node::accept_handler(boost::asio::ip::tcp::socket&& socket)
});
}
+void Node::connect_to_next_node()
+{
+ if(!network_settings.certdir.empty()) {
+ ctx->add_verify_path(network_settings.certdir);
+ }
+
+ auto on_connect = [this](){
+ BOOST_LOG_TRIVIAL(trace) << "Connected to next_node";
+ next_node.async_send(cmix_proto::ImANode());
+ };
+
+ next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect);
+}
+
void Node::start_initialisation() {
cmix_proto::Initialization init;
init.set_public_share(keypair.pub, keypair.pub_len);
@@ -105,6 +118,8 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret)
if(network_settings.is_first) {
start_precomputation();
+ } else {
+ next_node.async_send(secret);
}
}
@@ -173,11 +188,13 @@ void Node::handle_client_message(ClientConnections::key_type handle, cmix_proto:
void Node::handle_imanode(Purgatory::iterator handle) {
handle->on_done([]{});
- prev_node = Receiver(std::move(*handle));
+ prev_node = SSLReceiver(std::move(*handle));
purgatory.erase(handle);
if(network_settings.is_first) {
start_initialisation();
+ } else {
+ connect_to_next_node();
}
prev_node.async_receive([this](cmix_proto::CMixMessage message){
@@ -187,7 +204,7 @@ void Node::handle_imanode(Purgatory::iterator handle) {
void Node::handle_imaclient(Purgatory::iterator handle, cmix_proto::ImAClient c) {
std::string client_id = c.id();
- clients.emplace(c.id(), SenderReceiver(std::move(*handle)));
+ clients.emplace(c.id(), SSLSenderReceiver(std::move(*handle)));
clients.at(c.id()).on_done([this, client_id]{
clients.erase(client_id);
});
diff --git a/node/node.hpp b/node/node.hpp
index ac379ca..9d59687 100644
--- a/node/node.hpp
+++ b/node/node.hpp
@@ -24,6 +24,7 @@ struct NodeNetworkSettings {
bool is_first; ///< Are we the first node in the network.
std::string next_host; ///< Next nodes host.
std::string next_port; ///< Next nodes port.
+ std::string certdir; ///< Directory containing trusted certificate authorities.
};
/*!
@@ -36,29 +37,31 @@ class Node
};
boost::asio::io_service io_service;
+ std::shared_ptr<boost::asio::ssl::context> ctx;
Server server;
- typedef std::list<Receiver> Purgatory;
+ typedef std::list<SSLReceiver> Purgatory;
Purgatory purgatory;
- typedef std::map<std::string, SenderReceiver> ClientConnections;
+ typedef std::map<std::string, SSLSenderReceiver> ClientConnections;
ClientConnections clients;
typedef std::map<std::string, CMixClientData> ClientData;
ClientData data;
NodeNetworkSettings network_settings;
- Receiver prev_node;
- Sender next_node;
+ SSLReceiver prev_node;
+ SSLSender next_node;
Api api;
KeyPair keypair;
std::vector<uint8_t> network_key;
bool shutting_down;
-
- void accept_handler(boost::asio::ip::tcp::socket&& socket);
+ void accept_handler(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx);
+
+ void connect_to_next_node();
void start_precomputation();
void start_initialisation();