diff options
| -rw-r--r-- | client/cmixclient.cpp | 4 | ||||
| -rw-r--r-- | libcmix-common/receiver.hpp | 28 | ||||
| -rw-r--r-- | libcmix-common/sender.hpp | 31 | ||||
| -rw-r--r-- | libcmix-common/senderreceiver.cpp | 9 | ||||
| -rw-r--r-- | libcmix-common/senderreceiver.hpp | 46 | ||||
| -rw-r--r-- | libcmix-network/CMakeLists.txt | 4 | ||||
| -rw-r--r-- | libcmix-network/accept.cpp | 47 | ||||
| -rw-r--r-- | libcmix-network/accept.hpp | 4 | ||||
| -rw-r--r-- | libcmix-network/acceptor.cpp | 5 | ||||
| -rw-r--r-- | libcmix-network/acceptor.hpp | 2 | ||||
| -rw-r--r-- | libcmix-network/client.cpp | 91 | ||||
| -rw-r--r-- | libcmix-network/client.hpp | 207 | ||||
| -rw-r--r-- | libcmix-network/connect.cpp | 14 | ||||
| -rw-r--r-- | libcmix-network/connect.hpp | 3 | ||||
| -rw-r--r-- | libcmix-network/protobufclient.cpp | 0 | ||||
| -rw-r--r-- | libcmix-network/protobufclient.hpp | 58 | ||||
| -rw-r--r-- | libcmix-network/server.cpp | 17 | ||||
| -rw-r--r-- | libcmix-network/server.hpp | 5 | ||||
| -rw-r--r-- | node/main.cpp | 67 | ||||
| -rw-r--r-- | node/node.cpp | 41 | ||||
| -rw-r--r-- | node/node.hpp | 15 |
21 files changed, 517 insertions, 181 deletions
diff --git a/client/cmixclient.cpp b/client/cmixclient.cpp index 5c94750..ccbdeb5 100644 --- a/client/cmixclient.cpp +++ b/client/cmixclient.cpp @@ -1,6 +1,8 @@ #include "cmixclient.hpp" +using namespace boost::asio::ip; + void CMixClient::key_exchange(int i) { BOOST_LOG_TRIVIAL(trace) << "Sending KeyExchange for node: " << i; @@ -27,7 +29,7 @@ void CMixClient::initialize_connections() { key_exchange(i); }; - network_connections.emplace_back(boost::asio::ip::tcp::socket(io_service)); + network_connections.emplace_back(std::unique_ptr<tcp::socket>(new tcp::socket(io_service))); network_connections.back().async_connect(network_details[i].host, network_details[i].port, handler); } } diff --git a/libcmix-common/receiver.hpp b/libcmix-common/receiver.hpp index 8a73e9c..efd3753 100644 --- a/libcmix-common/receiver.hpp +++ b/libcmix-common/receiver.hpp @@ -21,6 +21,7 @@ struct Receiver : private ProtobufClient<CMixProtoFunctor> friend SenderReceiver; using ProtobufClient::ProtobufClient; + using ProtobufClient::operator=; using ProtobufClient::async_receive; @@ -32,3 +33,30 @@ struct Receiver : private ProtobufClient<CMixProtoFunctor> using ProtobufClient::is_open; }; + +struct SSLSenderReceiver; + +/*! + * \brief The Receiver struct is a shim around ProtobufClient and only exposes things needed + * for receiving messages. + */ +struct SSLReceiver : private SSLProtobufClient<CMixProtoFunctor> +{ + /*! + * \brief friend decleration allowing Receivers to be upgraded to SenderReceivers. + */ + friend SSLSenderReceiver; + + using SSLProtobufClient::SSLProtobufClient; + using SSLProtobufClient::operator=; + + using SSLProtobufClient::async_receive; + + using SSLProtobufClient::async_connect; + + using SSLProtobufClient::close; + + using SSLProtobufClient::on_done; + + using SSLProtobufClient::is_open; +}; diff --git a/libcmix-common/sender.hpp b/libcmix-common/sender.hpp index ea16a81..c722e30 100644 --- a/libcmix-common/sender.hpp +++ b/libcmix-common/sender.hpp @@ -25,6 +25,7 @@ struct Sender : private ProtobufClient<CMixProtoFunctor> friend SenderReceiver; using ProtobufClient::ProtobufClient; + using ProtobufClient::operator=; using ProtobufClient::async_send; @@ -36,3 +37,33 @@ struct Sender : private ProtobufClient<CMixProtoFunctor> using ProtobufClient::is_open; }; + +/*! + * \brief forward declaration for SenderReceiver to allow for the friend declaration. + */ +struct SSLSenderReceiver; + +/*! + * \brief The Sender struct is a shim around ProtobufClient and only exposes things needed + * for sending messages. + */ +struct SSLSender : private SSLProtobufClient<CMixProtoFunctor> +{ + /*! + * The friend declaration that allows for the Sender to be upgraded to a SenderReceiver. + */ + friend SSLSenderReceiver; + + using SSLProtobufClient::SSLProtobufClient; + using SSLProtobufClient::operator=; + + using SSLProtobufClient::async_send; + + using SSLProtobufClient::async_connect; + + using SSLProtobufClient::close; + + using SSLProtobufClient::on_done; + + using SSLProtobufClient::is_open; +}; diff --git a/libcmix-common/senderreceiver.cpp b/libcmix-common/senderreceiver.cpp index eafa8a5..8b13789 100644 --- a/libcmix-common/senderreceiver.cpp +++ b/libcmix-common/senderreceiver.cpp @@ -1,10 +1 @@ -#include "senderreceiver.hpp" - -SenderReceiver::SenderReceiver(Receiver&& r) -: ProtobufClient(std::move(r)) -{} - -SenderReceiver::SenderReceiver(Sender&& s) -: ProtobufClient(std::move(s)) -{} diff --git a/libcmix-common/senderreceiver.hpp b/libcmix-common/senderreceiver.hpp index c7fed3a..6654809 100644 --- a/libcmix-common/senderreceiver.hpp +++ b/libcmix-common/senderreceiver.hpp @@ -20,15 +20,20 @@ struct SenderReceiver : private ProtobufClient<CMixProtoFunctor> * \brief SenderReceiver Explicit conversion to SenderReceiver * \param r The Receiver to upgrade */ - explicit SenderReceiver(Receiver&& r); + explicit SenderReceiver(Receiver&& r) + : ProtobufClient(std::move(r)) + {} /*! * \brief SenderReceiver Explicit conversion to SenderReceiver * \param s The Sender to upgrade */ - explicit SenderReceiver(Sender&& s); + explicit SenderReceiver(Sender&& s) + : ProtobufClient(std::move(s)) + {} using ProtobufClient::ProtobufClient; + using ProtobufClient::operator=; using ProtobufClient::async_receive; @@ -41,4 +46,39 @@ struct SenderReceiver : private ProtobufClient<CMixProtoFunctor> using ProtobufClient::on_done; using ProtobufClient::is_open; -};
\ No newline at end of file +}; + +struct SSLSenderReceiver : private SSLProtobufClient<CMixProtoFunctor> +{ + /*! + * \brief SenderReceiver Explicit conversion to SenderReceiver + * \param r The Receiver to upgrade + */ + explicit SSLSenderReceiver(SSLReceiver&& r) + : SSLProtobufClient(std::move(r)) + {} + + /*! + * \brief SenderReceiver Explicit conversion to SenderReceiver + * \param s The Sender to upgrade + */ + explicit SSLSenderReceiver(SSLSender&& s) + : SSLProtobufClient(std::move(s)) + {} + + using SSLProtobufClient::SSLProtobufClient; + using SSLProtobufClient::operator=; + + using SSLProtobufClient::async_receive; + + using SSLProtobufClient::async_send; + + using SSLProtobufClient::async_connect; + + using SSLProtobufClient::close; + + using SSLProtobufClient::on_done; + + using SSLProtobufClient::is_open; +}; + diff --git a/libcmix-network/CMakeLists.txt b/libcmix-network/CMakeLists.txt index b58cef1..057061c 100644 --- a/libcmix-network/CMakeLists.txt +++ b/libcmix-network/CMakeLists.txt @@ -8,8 +8,8 @@ add_library(cmix-network accept.hpp accept.cpp connect.hpp connect.cpp server.hpp server.cpp - client.hpp client.cpp - protobufclient.hpp protobufclient.cpp + client.hpp + protobufclient.hpp uriparser.hpp uriparser.cpp ) diff --git a/libcmix-network/accept.cpp b/libcmix-network/accept.cpp index 3fa4314..1ec26fe 100644 --- a/libcmix-network/accept.cpp +++ b/libcmix-network/accept.cpp @@ -7,42 +7,61 @@ using namespace boost::asio::ip; using namespace boost::asio; -void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<tcp::socket> socket, boost::system::error_code ec, std::function<void (tcp::socket&&)> f) +void accept_connection(tcp::acceptor& acceptor, tcp::socket* socket, boost::system::error_code ec, AcceptHandler f) { if(!bool(ec)) { - f(std::move(*socket)); + f(std::unique_ptr<tcp::socket>(socket)); accept_loop(acceptor, f); } else { - std::stringstream ss; - ss << ec; - throw std::runtime_error(ss.str()); + if(ec != boost::system::errc::operation_canceled) { + std::stringstream ss; + ss << ec.message(); + delete socket; + throw std::runtime_error(ss.str()); + } else { + delete socket; + } } } -void accept_loop(tcp::acceptor& acceptor, std::function<void (tcp::socket&&)> f) +void accept_loop(tcp::acceptor& acceptor, AcceptHandler f) { - std::shared_ptr<tcp::socket> new_socket = std::make_shared<tcp::socket>(acceptor.get_io_service()); + tcp::socket* new_socket = new tcp::socket(acceptor.get_io_service()); acceptor.async_accept(*new_socket, boost::bind(accept_connection, boost::ref(acceptor), new_socket, placeholders::error, f)); } -void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, std::shared_ptr<ssl::stream<tcp::socket>> socket, boost::system::error_code ec, std::function<void (ssl::stream<tcp::socket>&&)> f) +void accept_connection(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, ssl::stream<tcp::socket>* socket, boost::system::error_code ec, SSLAcceptHandler f) { if(!bool(ec)) { - f(std::move(*socket)); + socket->async_handshake(boost::asio::ssl::stream_base::server, [&acceptor, ctx, socket, f](boost::system::error_code const& ec) { + if(!bool(ec)) { + f(std::unique_ptr<ssl::stream<tcp::socket>>(socket), ctx); + } else { + std::stringstream ss; + ss << ec.message(); + delete socket; + throw std::runtime_error(ss.str()); + } + }); accept_loop(acceptor, ctx, f); } else { - std::stringstream ss; - ss << ec; - throw std::runtime_error(ss.str()); + if(ec != boost::system::errc::operation_canceled) { + std::stringstream ss; + ss << ec; + delete socket; + throw std::runtime_error(ss.str()); + } else { + delete socket; + } } } -void accept_loop(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, std::function<void(ssl::stream<tcp::socket>&& socket)> f) { +void accept_loop(tcp::acceptor& acceptor, std::shared_ptr<ssl::context> ctx, SSLAcceptHandler f) { - std::shared_ptr<ssl::stream<tcp::socket>> new_socket = std::make_shared<ssl::stream<tcp::socket>>(acceptor.get_io_service(), *ctx); + ssl::stream<tcp::socket>* new_socket = new ssl::stream<tcp::socket>(acceptor.get_io_service(), *ctx); acceptor.async_accept(new_socket->lowest_layer(), boost::bind(accept_connection, boost::ref(acceptor), ctx, new_socket, placeholders::error, f)); } diff --git a/libcmix-network/accept.hpp b/libcmix-network/accept.hpp index dde98a3..f7d068a 100644 --- a/libcmix-network/accept.hpp +++ b/libcmix-network/accept.hpp @@ -12,11 +12,11 @@ /*! * \brief AcceptHandler Handy typedef for a function taking an tcp::socket. */ -typedef std::function<void(boost::asio::ip::tcp::socket&&)> AcceptHandler; +typedef std::function<void(std::unique_ptr<boost::asio::ip::tcp::socket>&&)> AcceptHandler; /*! * \brief SSLAcceptHandler Handy typedef for a function taking an "SSLSocket" */ -typedef std::function<void(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>&&)> SSLAcceptHandler; +typedef std::function<void(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&&, std::shared_ptr<boost::asio::ssl::context>)> SSLAcceptHandler; /*! * \brief accept_loop Keeps accepting connections on the specified acceptor diff --git a/libcmix-network/acceptor.cpp b/libcmix-network/acceptor.cpp index 83406db..ee8833a 100644 --- a/libcmix-network/acceptor.cpp +++ b/libcmix-network/acceptor.cpp @@ -53,3 +53,8 @@ void Acceptor::start_accepting(std::shared_ptr<ssl::context> ctx, SSLAcceptHandl { accept_loop(acceptor, ctx, accept_handler); } + +void Acceptor::close() +{ + acceptor.close(); +} diff --git a/libcmix-network/acceptor.hpp b/libcmix-network/acceptor.hpp index 6e4de63..0ee9294 100644 --- a/libcmix-network/acceptor.hpp +++ b/libcmix-network/acceptor.hpp @@ -74,4 +74,6 @@ public: */ void start_accepting(std::shared_ptr<boost::asio::ssl::context> ctx, SSLAcceptHandler accept_handler); + void close(); + }; diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp deleted file mode 100644 index 09ea98c..0000000 --- a/libcmix-network/client.cpp +++ /dev/null @@ -1,91 +0,0 @@ -#include "client.hpp" - -#include "connect.hpp" - -#include <iostream> - -using namespace boost::asio::ip; -using namespace boost::system; - -Client::Client(tcp::socket &&socket) -: socket(std::move(socket)) -, buffer(new boost::asio::streambuf()) -, done() -{} - -Client::~Client() -{ - close(); -} - -void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect) -{ - ::async_connect(socket, next_host, next_port, on_connect); -} - -std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length) -{ - length = htonl(length); - std::array<uint8_t, 4> buf; - - std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data()); - return buf; -} - -std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes) -{ - buffer->commit(read_bytes); - std::istream is(buffer.get()); - is.unsetf(std::ios::skipws); - - return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {}); -} - -void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes) -{ - if(!ec) { - std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); - message_handler(data); - } else { - BOOST_LOG_TRIVIAL(error) << ec; - if(done) { - done(); - } - } -} - -void Client::handle_receive_size(Client::MessageHandler message_handler, error_code const& ec, size_t read_bytes) -{ - using namespace boost::asio::placeholders; - - if(!ec) { - std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); - uint32_t size; - std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size)); - size = ntohl(size); - - socket.async_receive( - buffer->prepare(size), - boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred()) - ); - } else { - BOOST_LOG_TRIVIAL(error) << ec; - if(done) { - done(); - } - } -} - -void Client::close() -{ - socket.close(); -} - -void Client::on_done(Client::OnDoneFT f) { - done = f; -} - -bool Client::is_open() const -{ - return socket.is_open(); -} diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp index bd66b98..0b98c1c 100644 --- a/libcmix-network/client.hpp +++ b/libcmix-network/client.hpp @@ -1,8 +1,13 @@ #pragma once +#include "connect.hpp" + #include "logging.hpp" +#include <boost/asio/read.hpp> +#include <boost/asio/write.hpp> #include <boost/asio/ip/tcp.hpp> +#include <boost/asio/ssl.hpp> #include <boost/asio/streambuf.hpp> #include <boost/asio/placeholders.hpp> #include <boost/bind.hpp> @@ -11,6 +16,8 @@ #include <array> +#include <iostream> + /*! * \file */ @@ -18,8 +25,15 @@ /*! * \brief The Client class */ -class Client { -public: + +struct SSLClient; + +template <typename T> +class BaseClient { + friend SSLClient; + + std::unique_ptr<T> socket; + /*! * \brief OnDoneFT Function type of the function being called when this instance is done operating. */ @@ -30,42 +44,88 @@ public: */ typedef std::function<void(std::vector<uint8_t>)> MessageHandler; -protected: /*! * \brief socket The socket connection this instance handles. */ - boost::asio::ip::tcp::socket socket; - -private: - std::unique_ptr<boost::asio::streambuf> buffer; + std::unique_ptr<boost::asio::streambuf> receive_buffer; + std::unique_ptr<boost::asio::streambuf> send_buffer; OnDoneFT done; - std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes); + std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes) { + receive_buffer->commit(read_bytes); + std::istream is(receive_buffer.get()); + is.unsetf(std::ios::skipws); + + return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {}); + } - void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); + void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { + using namespace boost::asio::placeholders; + + if(!ec) { + std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); + uint32_t size; + std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size)); + size = ntohl(size); + + boost::asio::async_read( + *socket, + receive_buffer->prepare(size), + [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { + handle_receive_message(message_handler, ec, read_bytes); + } + ); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + } - void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes); + void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes) { + if(!ec) { + std::vector<uint8_t> data = received_bytes_to_vector(read_bytes); + message_handler(data); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + } + + std::array<uint8_t, 4> prepare_length_prefix(uint32_t length) { + length = htonl(length); + std::array<uint8_t, 4> buf; + + std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data()); + return buf; + } - std::array<uint8_t, 4> prepare_length_prefix(uint32_t length); public: /*! * \brief Client * \param socket An rvalue reference to a socket it will now own and receive from. */ - Client(boost::asio::ip::tcp::socket&& socket); - ~Client(); + BaseClient(std::unique_ptr<T>&& socket) + : socket(std::move(socket)) + , receive_buffer(new boost::asio::streambuf) + , send_buffer(new boost::asio::streambuf) + , done() + {} /*! * \brief Move constructor for Client. */ - Client(Client&&) = default; + BaseClient(BaseClient&&) = default; /*! * \brief Move assignment for Client. */ - Client& operator=(Client&&) = default; + BaseClient& operator=(BaseClient&&) = default; /*! * \brief async_connect Asynchronously connects to next_host:port and calls on_connect @@ -73,8 +133,10 @@ public: * \param next_port The port to connect to * \param on_connect The callback to call on a succes. */ - void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect); - + void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) { + ::async_connect(*socket, next_host, next_port, on_connect); + } + /*! * \brief send sends the string prefixed with it's length over the socket. * \param message The string to be sent. @@ -84,20 +146,25 @@ public: void async_send(std::string message, F on_sent) { auto length_buffer = prepare_length_prefix(message.size()); - boost::array<boost::asio::const_buffer, 2> package = { - boost::asio::buffer(length_buffer.data(), length_buffer.size()), - boost::asio::buffer(message) - }; + std::ostream os(send_buffer.get()); + os.unsetf(std::ios::skipws); + + os.write(reinterpret_cast<char*>(length_buffer.data()), length_buffer.size()); + os.write(message.data(), message.size()); auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) { if(ec) { - BOOST_LOG_TRIVIAL(fatal) << ec; + BOOST_LOG_TRIVIAL(fatal) << ec.message(); throw std::runtime_error("unable to send message"); } on_sent(); }; - socket.async_send(package, 0, handler); + boost::asio::async_write( + *socket, + *send_buffer, + handler + ); } /*! @@ -107,28 +174,110 @@ public: void async_receive(MessageHandler message_handler) { using namespace boost::asio::placeholders; - socket.async_receive( - buffer->prepare(4), - boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred()) + boost::asio::async_read( + *socket, + receive_buffer->prepare(4), + [this, message_handler](boost::system::error_code const& ec, size_t read_bytes) { + handle_receive_size(message_handler, ec, read_bytes); + } ); } /*! * \brief close Closes the underlying socket. */ - void close(); + void close() { + socket->close(); + } /*! * \brief on_done sets the done callback. * \param f The new done callback function. */ - void on_done(OnDoneFT f); + void on_done(OnDoneFT f) { + done = f; + } /*! * \brief is_open * \return returns true if the underlying socket is opened. */ - bool is_open() const; + bool is_open() const { + return socket->is_open(); + } +}; + +struct Client : private BaseClient<boost::asio::ip::tcp::socket> { + + using BaseClient::BaseClient; + using BaseClient::operator=; + + using BaseClient::async_receive; + + using BaseClient::async_send; + using BaseClient::async_connect; + using BaseClient::close; + + using BaseClient::on_done; + + using BaseClient::is_open; + +}; + +struct SSLClient : private BaseClient<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>> { + + using BaseClient::BaseClient; + using BaseClient::operator=; + + /*! + * \brief async_connect Asynchronously connects to next_host:port and calls on_connect + * \param next_host The host to connect to + * \param next_port The port to connect to + * \param on_connect The callback to call on a succes. + */ + void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect) { + + auto new_on_connect = [this, next_host, on_connect](){ + + socket->set_verify_mode(boost::asio::ssl::verify_peer); + socket->set_verify_callback(boost::asio::ssl::rfc2818_verification(next_host)); + + socket->async_handshake(boost::asio::ssl::stream_base::client, [this, on_connect](boost::system::error_code const& ec) { + if(!ec) { + on_connect(); + } else { + BOOST_LOG_TRIVIAL(error) << ec.message(); + if(done) { + done(); + } + } + }); + }; + + ::async_connect(socket->lowest_layer(), next_host, next_port, new_on_connect); + } + + using BaseClient::async_send; + using BaseClient::async_receive; + + /*! + * \brief close Closes the underlying socket. + */ + void close() { + socket->lowest_layer().close(); + } + + using BaseClient::on_done; + + /*! + * \brief is_open + * \return returns true if the underlying socket is opened. + */ + bool is_open() const { + return socket->lowest_layer().is_open(); + } }; + + diff --git a/libcmix-network/connect.cpp b/libcmix-network/connect.cpp index 01e72de..9df9b56 100644 --- a/libcmix-network/connect.cpp +++ b/libcmix-network/connect.cpp @@ -4,8 +4,10 @@ #include <boost/asio/ip/basic_resolver.hpp> #include <boost/asio/ip/address.hpp> +#include <boost/asio/ssl.hpp> using namespace boost::asio::ip; +using namespace boost::asio; using boost::asio::io_service; boost::asio::ip::tcp::socket connect(std::string host, std::string port, io_service& io_service) { @@ -50,7 +52,7 @@ struct IterationInfo { {} }; -void async_connect_iteration(tcp::socket& socket, std::shared_ptr<IterationInfo> info, std::function<void()> on_connect) { +void async_connect_iteration(basic_socket<tcp, stream_socket_service<tcp>>& socket, std::shared_ptr<IterationInfo> info, std::function<void()> on_connect) { if(info->it == boost::asio::ip::tcp::resolver::iterator()) { throw std::runtime_error("None of the supplied endpoints responded"); @@ -91,16 +93,16 @@ void async_connect_iteration(tcp::socket& socket, std::shared_ptr<IterationInfo> socket.async_connect(*(info->it), handler); } -void async_connect(tcp::socket& socket, std::string host, std::string next_port, std::function<void()> on_connect) +void async_connect(basic_socket<tcp, stream_socket_service<tcp>>& socket, std::string host, std::string port, std::function<void()> on_connect) { boost::asio::ip::basic_resolver<tcp> resolver(socket.get_io_service()); - boost::asio::ip::basic_resolver_query<tcp> query(host, next_port); + boost::asio::ip::basic_resolver_query<tcp> query(host, port); - BOOST_LOG_TRIVIAL(trace) << "connecting to: \"" << host << ":" << next_port << "\""; + BOOST_LOG_TRIVIAL(trace) << "connecting to: \"" << host << ":" << port << "\""; auto it = resolver.resolve(query); std::shared_ptr<IterationInfo> info = std::make_shared<IterationInfo>(0, it, socket.get_io_service()); - + async_connect_iteration(socket, info, on_connect); -} +}
\ No newline at end of file diff --git a/libcmix-network/connect.hpp b/libcmix-network/connect.hpp index 0a58115..f58c3e1 100644 --- a/libcmix-network/connect.hpp +++ b/libcmix-network/connect.hpp @@ -2,6 +2,7 @@ #include <boost/asio/ip/tcp.hpp> #include <boost/asio/io_service.hpp> +#include <boost/asio/ssl.hpp> /*! * \file @@ -23,4 +24,4 @@ boost::asio::ip::tcp::socket connect(std::string host, std::string port, boost:: * \param next_port The port * \param on_connect The function to call when the connect has succeeded. */ -void async_connect(boost::asio::ip::tcp::socket& socket, std::string host, std::string next_port, std::function<void()> on_connect); +void async_connect(boost::asio::basic_socket<boost::asio::ip::tcp, boost::asio::stream_socket_service<boost::asio::ip::tcp>>& socket, std::string host, std::string port, std::function<void()> on_connect);
\ No newline at end of file diff --git a/libcmix-network/protobufclient.cpp b/libcmix-network/protobufclient.cpp deleted file mode 100644 index e69de29..0000000 --- a/libcmix-network/protobufclient.cpp +++ /dev/null diff --git a/libcmix-network/protobufclient.hpp b/libcmix-network/protobufclient.hpp index 247b71f..df84152 100644 --- a/libcmix-network/protobufclient.hpp +++ b/libcmix-network/protobufclient.hpp @@ -4,8 +4,13 @@ #include "logging.hpp" +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/ssl.hpp> + #include <type_traits> +#include <iostream> + /*! * \file */ @@ -15,6 +20,7 @@ * in to a desired wireformat to serialize and send or receive via the Client. * \tparam T The Protobuf functor to use when transforming messages. */ + template <typename T> class ProtobufClient : private Client { @@ -31,6 +37,7 @@ public: template<typename V, typename F> void async_send(V value, F on_sent) { typename T::proto_type m = T()(value); + Client::async_send(m.SerializeAsString(), on_sent); } @@ -65,3 +72,54 @@ public: using Client::on_done; using Client::is_open; }; + +template <typename T> +class SSLProtobufClient : private SSLClient +{ +public: + using SSLClient::SSLClient; + using SSLClient::operator=; + using SSLClient::async_connect; + + /*! + * \brief an async_send wrapper that transforms a specific message to the container message an sends it + * \param value The specific message to send. + * \param on_sent The function to call after succesfully sending the message. + */ + template<typename V, typename F> + void async_send(V value, F on_sent) { + typename T::proto_type m = T()(value); + SSLClient::async_send(m.SerializeAsString(), on_sent); + } + + /*! + * \brief an async_send wrapper, like the above but without the on_sent callback. + * \param value The specific message to send. + */ + template<typename V> + void async_send(V value) { + async_send(value, []{}); + } + + /*! + * \brief An async_receive wrapper that transforms the wireformat to the protobuf + * container message type. + * \param message_handler The callback to call after receiving a message. + */ + template <typename F> + void async_receive(F message_handler) { + auto f = [message_handler](std::vector<uint8_t> const& buffer) { + typename T::proto_type message; + if(!message.ParseFromArray(buffer.data(), buffer.size())) { + BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; + throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); + } + message_handler(message); + }; + SSLClient::async_receive(f); + } + + using SSLClient::close; + using SSLClient::on_done; + using SSLClient::is_open; +}; diff --git a/libcmix-network/server.cpp b/libcmix-network/server.cpp index 34e3183..195c5a0 100644 --- a/libcmix-network/server.cpp +++ b/libcmix-network/server.cpp @@ -45,10 +45,27 @@ Server::Server(io_service& io_service, const ListenSettings& listen_settings, Ac Server::Server(io_service& io_service, const ListenSettings& listen_settings, std::shared_ptr<ssl::context> ctx, SSLAcceptHandler accept_handler) : Server(io_service, listen_settings) { + ctx->set_options( + boost::asio::ssl::context::default_workarounds + | boost::asio::ssl::context::no_sslv2 + | boost::asio::ssl::context::single_dh_use + ); + + ctx->use_certificate_chain_file(listen_settings.cert); + ctx->use_private_key_file(listen_settings.key, boost::asio::ssl::context::pem); + ctx->use_tmp_dh_file(listen_settings.dhparam); + ctx->set_default_verify_paths(); + if(v4_acceptor.is_open()) { v4_acceptor.start_accepting(ctx, accept_handler); } if(v6_acceptor.is_open()) { v6_acceptor.start_accepting(ctx, accept_handler); } +} + +void Server::close() +{ + v4_acceptor.close(); + v6_acceptor.close(); }
\ No newline at end of file diff --git a/libcmix-network/server.hpp b/libcmix-network/server.hpp index fad7c71..6976f04 100644 --- a/libcmix-network/server.hpp +++ b/libcmix-network/server.hpp @@ -19,6 +19,9 @@ struct ListenSettings { std::string ipv6_inaddr; ///< Listen on this ipv6 address uint16_t port; ///< Listen on this port. bool use_ssl; ///< Should we use ssl + std::string cert; ///< The cert to use in pem format. + std::string key; ///< The corresponding key in pem format. + std::string dhparam; ///< The diffie helman parameters. }; /*! @@ -50,4 +53,6 @@ public: */ Server(boost::asio::io_service& io_service, ListenSettings const& listen_settings, std::shared_ptr<boost::asio::ssl::context> ctx, SSLAcceptHandler accept_handler); + void close(); + }; diff --git a/node/main.cpp b/node/main.cpp index 527e3c5..0c81312 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -4,6 +4,7 @@ #include "logging.hpp" #include <boost/program_options.hpp> +#include <boost/filesystem/operations.hpp> #include <iostream> @@ -18,8 +19,12 @@ int main(int argc, char* argv[]) { ("interface4,4", po::value<std::string>()->default_value("0.0.0.0"), "Set the ipv4 address to listen on.") ("enable_v6", po::value<bool>()->default_value(true), "Enable/disable ipv6 accept support.") ("interface6,6", po::value<std::string>()->default_value("::"), "Set the ipv6 address to listen on.") - ("next_node,n", po::value<std::string>(), "The address of the next node in the network") + ("next_node,n", po::value<std::string>(), "The address of the next node in the network.") ("first,f", "This is the first node and will be the communication point for the clients.") + ("cert,c", po::value<std::string>(), "The cert file to use (in pem format).") + ("key,k", po::value<std::string>(), "The key file (in pem format).") + ("dhparam,d", po::value<std::string>(), "The dhparam file.") + ("certdir", po::value<std::string>(), "Directory containing trusted certificates.") ; po::variables_map vm; @@ -37,12 +42,54 @@ int main(int argc, char* argv[]) { std::string if6 = vm["interface6"].as<std::string>(); uint16_t port = vm["port"].as<uint16_t>(); - init_logging(boost::log::trivial::severity_level::trace, "node_" + std::to_string(port)); + std::string cert; + if(vm.count("cert")) { + std::string filename = vm["cert"].as<std::string>(); + if(boost::filesystem::exists(filename)) { + cert = filename; + } else { + std::cerr << "cert file: \"" << filename << "\" does not exist"; + return -1; + } + } else { + std::cerr << "supplying a certificate is required" << std::endl; + return -1; + } - BOOST_LOG_TRIVIAL(info) << "Started node"; + std::string key; + if(vm.count("key")) { + std::string filename = vm["key"].as<std::string>(); + if(boost::filesystem::exists(filename)) { + key = filename; + } else { + std::cerr << "key file: \"" << filename << "\" does not exist"; + return -1; + } + } else { + std::cerr << "supplying a key file is required" << std::endl; + return -1; + } + + std::string dhparam; + if(vm.count("dhparam")) { + std::string filename = vm["dhparam"].as<std::string>(); + if(boost::filesystem::exists(filename)) { + dhparam = filename; + } else { + std::cerr << "dhparam file: \"" << filename << "\" does not exist"; + return -1; + } + } else { + std::cerr << "supplying a dhparam file is required" << std::endl; + return -1; + } + + ListenSettings lsettings{en4, if4, en6, if6, port, true, cert, key, dhparam}; - ListenSettings lsettings{en4, if4, en6, if6, port}; + init_logging(boost::log::trivial::severity_level::trace, "node_" + std::to_string(port)); + + BOOST_LOG_TRIVIAL(info) << "Started node"; bool is_first = bool(vm.count("first")); std::string next_node; @@ -52,10 +99,20 @@ int main(int argc, char* argv[]) { std::cerr << "next_node option is required." << std::endl; return -1; } + std::string certdir; + if(vm.count("certdir")) { + std::string filename = vm["certdir"].as<std::string>(); + if(boost::filesystem::is_directory(filename)) { + certdir = filename; + } else { + std::cerr << "cert dir: \"" << filename << "\" is not a directory"; + return -1; + } + } Uri uri = parse_uri(next_node); - NodeNetworkSettings nsettings{is_first, uri.host, uri.port}; + NodeNetworkSettings nsettings{is_first, uri.host, uri.port, certdir}; Node node(lsettings, nsettings); node.run(); diff --git a/node/node.cpp b/node/node.cpp index 04ed4f1..2dfbba8 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -11,12 +11,13 @@ using namespace boost::asio::ip; Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings) : io_service() -, server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));}) +, ctx(std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23)) +, server(io_service, listen_settings, ctx, [this](std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx){accept_handler(std::move(socket), ctx);}) , clients() , data() , network_settings(network_settings) -, prev_node(Receiver(tcp::socket(io_service))) -, next_node(Sender(tcp::socket(io_service))) +, prev_node(SSLReceiver(std::unique_ptr<boost::asio::ssl::stream<tcp::socket>>(new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(io_service, *ctx)))) +, next_node(SSLSender(std::unique_ptr<boost::asio::ssl::stream<tcp::socket>>(new boost::asio::ssl::stream<boost::asio::ip::tcp::socket>(io_service, *ctx)))) , api(get_implementation()) , keypair(api.create_key_pair()) , network_key() @@ -24,11 +25,9 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se { GOOGLE_PROTOBUF_VERIFY_VERSION; - auto on_connect = [this, network_settings](){ - next_node.async_send(cmix_proto::ImANode()); - }; - - next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect); + if(network_settings.is_first) { + connect_to_next_node(); + } } Node::~Node() { @@ -39,9 +38,9 @@ void Node::run() { io_service.run(); } -void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) +void Node::accept_handler(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx) { - std::list<Receiver>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket)); + Purgatory::iterator it = purgatory.emplace(purgatory.end(), std::move(socket)); purgatory.back().on_done( [this, it]() { purgatory.erase(it); @@ -53,6 +52,20 @@ void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) }); } +void Node::connect_to_next_node() +{ + if(!network_settings.certdir.empty()) { + ctx->add_verify_path(network_settings.certdir); + } + + auto on_connect = [this](){ + BOOST_LOG_TRIVIAL(trace) << "Connected to next_node"; + next_node.async_send(cmix_proto::ImANode()); + }; + + next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect); +} + void Node::start_initialisation() { cmix_proto::Initialization init; init.set_public_share(keypair.pub, keypair.pub_len); @@ -105,6 +118,8 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret) if(network_settings.is_first) { start_precomputation(); + } else { + next_node.async_send(secret); } } @@ -173,11 +188,13 @@ void Node::handle_client_message(ClientConnections::key_type handle, cmix_proto: void Node::handle_imanode(Purgatory::iterator handle) { handle->on_done([]{}); - prev_node = Receiver(std::move(*handle)); + prev_node = SSLReceiver(std::move(*handle)); purgatory.erase(handle); if(network_settings.is_first) { start_initialisation(); + } else { + connect_to_next_node(); } prev_node.async_receive([this](cmix_proto::CMixMessage message){ @@ -187,7 +204,7 @@ void Node::handle_imanode(Purgatory::iterator handle) { void Node::handle_imaclient(Purgatory::iterator handle, cmix_proto::ImAClient c) { std::string client_id = c.id(); - clients.emplace(c.id(), SenderReceiver(std::move(*handle))); + clients.emplace(c.id(), SSLSenderReceiver(std::move(*handle))); clients.at(c.id()).on_done([this, client_id]{ clients.erase(client_id); }); diff --git a/node/node.hpp b/node/node.hpp index ac379ca..9d59687 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -24,6 +24,7 @@ struct NodeNetworkSettings { bool is_first; ///< Are we the first node in the network. std::string next_host; ///< Next nodes host. std::string next_port; ///< Next nodes port. + std::string certdir; ///< Directory containing trusted certificate authorities. }; /*! @@ -36,29 +37,31 @@ class Node }; boost::asio::io_service io_service; + std::shared_ptr<boost::asio::ssl::context> ctx; Server server; - typedef std::list<Receiver> Purgatory; + typedef std::list<SSLReceiver> Purgatory; Purgatory purgatory; - typedef std::map<std::string, SenderReceiver> ClientConnections; + typedef std::map<std::string, SSLSenderReceiver> ClientConnections; ClientConnections clients; typedef std::map<std::string, CMixClientData> ClientData; ClientData data; NodeNetworkSettings network_settings; - Receiver prev_node; - Sender next_node; + SSLReceiver prev_node; + SSLSender next_node; Api api; KeyPair keypair; std::vector<uint8_t> network_key; bool shutting_down; - - void accept_handler(boost::asio::ip::tcp::socket&& socket); + void accept_handler(std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx); + + void connect_to_next_node(); void start_precomputation(); void start_initialisation(); |
