diff options
| -rw-r--r-- | CMakeLists.txt | 1 | ||||
| -rw-r--r-- | client/CMakeLists.txt | 29 | ||||
| -rw-r--r-- | client/cmixclient.cpp | 42 | ||||
| -rw-r--r-- | client/cmixclient.hpp | 33 | ||||
| -rw-r--r-- | client/main.cpp | 51 | ||||
| -rw-r--r-- | client/node.cpp | 19 | ||||
| -rw-r--r-- | client/node.hpp | 65 | ||||
| -rw-r--r-- | libcmix-protobuf/cmix.proto | 5 | ||||
| -rw-r--r-- | liblog/logging.cpp | 4 | ||||
| -rw-r--r-- | liblog/logging.hpp | 2 | ||||
| -rw-r--r-- | network-handler/networkhandler.cpp | 4 | ||||
| -rw-r--r-- | node/main.cpp | 2 | ||||
| -rw-r--r-- | node/nextnode.hpp | 1 | ||||
| -rw-r--r-- | node/node.cpp | 73 | ||||
| -rw-r--r-- | node/node.hpp | 6 |
15 files changed, 307 insertions, 30 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 7e285f6..92b61d9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,5 +33,6 @@ add_subdirectory(libcmix-protobuf) add_subdirectory(network-handler) add_subdirectory(node) +add_subdirectory(client) add_subdirectory(scratchpad) diff --git a/client/CMakeLists.txt b/client/CMakeLists.txt new file mode 100644 index 0000000..e04bf6a --- /dev/null +++ b/client/CMakeLists.txt @@ -0,0 +1,29 @@ +find_package(Boost COMPONENTS system program_options REQUIRED) + +add_executable(client + main.cpp + cmixclient.hpp cmixclient.cpp + node.hpp node.cpp +) + +if(WIN32) + target_compile_options(client + PRIVATE "-std=gnu++11" + ) +else(WIN32) + target_compile_options(client + PRIVATE "-std=c++11" + ) +endif(WIN32) + + +target_link_libraries(client + PRIVATE Boost::boost + PRIVATE Boost::program_options + PRIVATE Boost::system + PRIVATE log + PRIVATE cmix + PRIVATE cmix-bignum + PRIVATE cmix-network + PRIVATE cmix-protobuf +) diff --git a/client/cmixclient.cpp b/client/cmixclient.cpp new file mode 100644 index 0000000..e3f5ac4 --- /dev/null +++ b/client/cmixclient.cpp @@ -0,0 +1,42 @@ + +#include "cmixclient.hpp" + +void CMixClient::initialized() { + BOOST_LOG_TRIVIAL(trace) << "Client connections were made"; + for(auto&& connection : network_connections) { + cmix_proto::Bye bye; + connection.send(bye); + } + io_service.stop(); +} + +void CMixClient::initialize_connections() { + network_connections.reserve(network_details.size()); + + int i = network_details.size(); + auto handler = [this, i]() mutable { + cmix_proto::ImAClient imaclient; + network_connections.at(network_details.size() - i).send(imaclient); + + if(--i == 0) { + initialized(); + } + }; + + for(auto&& details : network_details) { + network_connections.emplace_back(boost::asio::ip::tcp::socket(io_service)); + network_connections.back().async_connect(details.host, details.port, handler); + } +} + +CMixClient::CMixClient(std::vector<NodeDetails> details) +: io_service() +, network_details(details) +, network_connections() +{ + initialize_connections(); +} + +void CMixClient::run() { + io_service.run(); +} diff --git a/client/cmixclient.hpp b/client/cmixclient.hpp new file mode 100644 index 0000000..10438d1 --- /dev/null +++ b/client/cmixclient.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "node.hpp" + +#include "logging.hpp" +#include "client.hpp" +#include "connect.hpp" +#include "cmix.pb.h" + +#include <string> +#include <vector> + +struct NodeDetails { + std::string host; + std::string port; +}; + +class CMixClient { + + boost::asio::io_service io_service; + + std::vector<NodeDetails> network_details; + std::vector<Node> network_connections; + + void initialized(); + + void initialize_connections(); + +public: + CMixClient(std::vector<NodeDetails> details); + + void run(); +};
\ No newline at end of file diff --git a/client/main.cpp b/client/main.cpp new file mode 100644 index 0000000..fb05171 --- /dev/null +++ b/client/main.cpp @@ -0,0 +1,51 @@ + +#include "cmixclient.hpp" + +#include "uriparser.hpp" +#include "logging.hpp" + +#include <boost/program_options.hpp> + +#include <vector> +#include <iostream> + +int main(int argc, char* argv[]) { + namespace po = boost::program_options; + + init_logging(boost::log::trivial::severity_level::trace, "client"); + + BOOST_LOG_TRIVIAL(info) << "Started node"; + + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message.") + ("network,n", po::value<std::vector<std::string>>()->multitoken(), "The addresses of the network nodes in order") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + std::cout << desc << "\n"; + return 0; + } + + std::vector<std::string> network; + if(vm.count("network")) { + network = vm["network"].as<std::vector<std::string>>(); + } else { + std::cerr << "network option is required." << std::endl; + return -1; + } + + std::vector<NodeDetails> node_details; + for(auto&& node : network) { + Uri uri = parse_uri(node); + + node_details.push_back({uri.host, uri.port}); + } + + CMixClient cmix_client(node_details); + cmix_client.run(); +} diff --git a/client/node.cpp b/client/node.cpp new file mode 100644 index 0000000..8fd1dd8 --- /dev/null +++ b/client/node.cpp @@ -0,0 +1,19 @@ +#include "node.hpp" + +#include "logging.hpp" + +using namespace boost::asio::ip; + +Node::Node(tcp::socket&& socket) +: client(std::move(socket)) +{} + +void Node::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect) +{ + client.async_connect(next_host, next_port, on_connect); +} + +void Node::close() +{ + client.close(); +} diff --git a/client/node.hpp b/client/node.hpp new file mode 100644 index 0000000..3719223 --- /dev/null +++ b/client/node.hpp @@ -0,0 +1,65 @@ +#pragma once + +#include "client.hpp" + +#include "cmix.pb.h" + +#include <boost/asio/ip/tcp.hpp> + +/*! + * \file + */ + +/*! + * MESSAGE_SETTER is a boilerplate macro that generates a setter function for our CMix + * protobuf messages, This because there are seperate functions for each to type to use. + * And there seems no way to solve this using templates. + */ +#define MESSAGE_SETTER(TYPE, NAME) \ +inline void message_setter(cmix_proto::CMixMessage& m, cmix_proto::TYPE const& v) { \ + *m.mutable_##NAME() = v; \ +} \ + +MESSAGE_SETTER(ImAClient, imaclient) +MESSAGE_SETTER(Bye, bye) + +#undef MESSAGE_SETTER + +/*! + * \brief The Node class represents a node in the network. + */ +class Node +{ + Client client; +public: + /*! + * \brief Node + * \param socket an rvalue reference to the socket it takes ownership and uses to communicate with the next node in the network. + */ + Node(boost::asio::ip::tcp::socket&& socket); + + /*! + * \brief send + * \param v The CMixMessage type we try to send and first have to wrap in a CMixMessage. + */ + template <typename T> + void send(T v) { + cmix_proto::CMixMessage m; + message_setter(m, v); + client.send(m.SerializeAsString()); + } + + /*! + * \brief async_connect + * \param next_host The host of the next node. + * \param next_port The port of the next node. + * \param on_connect The callback to call when the connect was succesfull. + */ + void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect); + + /*! + * \brief close This function closes the underlying socket connection. + */ + void close(); +}; + diff --git a/libcmix-protobuf/cmix.proto b/libcmix-protobuf/cmix.proto index 6e653f1..8be9b1f 100644 --- a/libcmix-protobuf/cmix.proto +++ b/libcmix-protobuf/cmix.proto @@ -12,10 +12,15 @@ message ImAClient { } +message Bye { + +} + message CMixMessage { oneof contents { Initialization initialization = 1; ImANode imanode = 2; ImAClient imaclient = 3; + Bye bye = 4; } } diff --git a/liblog/logging.cpp b/liblog/logging.cpp index 6bbf070..ecc9ae5 100644 --- a/liblog/logging.cpp +++ b/liblog/logging.cpp @@ -9,14 +9,14 @@ #include <boost/log/sources/severity_logger.hpp> #include <boost/log/sources/record_ostream.hpp> -void init_logging(boost::log::trivial::severity_level log_level) +void init_logging(boost::log::trivial::severity_level log_level, std::string file_name) { boost::log::add_common_attributes(); boost::log::register_simple_formatter_factory< boost::log::trivial::severity_level, char >("Severity"); boost::log::add_file_log ( - boost::log::keywords::file_name = "node%N.log", + boost::log::keywords::file_name = file_name + "%N.log", boost::log::keywords::rotation_size = 10 * 1024 * 1024, boost::log::keywords::format = "[%Severity%] (%TimeStamp%): %Message%", boost::log::keywords::auto_flush = true diff --git a/liblog/logging.hpp b/liblog/logging.hpp index b06d176..b95ca64 100644 --- a/liblog/logging.hpp +++ b/liblog/logging.hpp @@ -10,5 +10,5 @@ * \brief init_logging Initializes the logging system to log to file. * \param log_level minimum log level we are interested in. */ -void init_logging(boost::log::trivial::severity_level log_level); +void init_logging(boost::log::trivial::severity_level log_level, std::string file_name = "log"); diff --git a/network-handler/networkhandler.cpp b/network-handler/networkhandler.cpp index 916f0f2..614d270 100644 --- a/network-handler/networkhandler.cpp +++ b/network-handler/networkhandler.cpp @@ -14,9 +14,7 @@ NetworkHandler::NetworkHandler(const ListenSettings& listen_settings) void NetworkHandler::accept_handler(boost::asio::ip::tcp::socket&& socket) { - clients.emplace_back(std::move(socket)); - - auto it = --clients.end(); + std::list<UserClient>::iterator it = clients.emplace(clients.end(), std::move(socket)); clients.back().on_done( [this, it]() { clients.erase(it); diff --git a/node/main.cpp b/node/main.cpp index 85b0ca5..7425f00 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -10,7 +10,7 @@ int main(int argc, char* argv[]) { namespace po = boost::program_options; - init_logging(boost::log::trivial::severity_level::trace); + init_logging(boost::log::trivial::severity_level::trace, "node"); BOOST_LOG_TRIVIAL(info) << "Started node"; diff --git a/node/nextnode.hpp b/node/nextnode.hpp index 862ca5c..c2e064c 100644 --- a/node/nextnode.hpp +++ b/node/nextnode.hpp @@ -22,6 +22,7 @@ inline void message_setter(cmix_proto::CMixMessage& m, cmix_proto::TYPE const& v MESSAGE_SETTER(Initialization, initialization) MESSAGE_SETTER(ImANode, imanode) +MESSAGE_SETTER(Bye, bye) #undef MESSAGE_SETTER diff --git a/node/node.cpp b/node/node.cpp index c02e5e6..1fe0b44 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -51,7 +51,7 @@ void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) ); it->receive([this, it](std::vector<uint8_t> const& message_buffer) { - handle_message(decltype(it)(it), message_buffer); + handle_message(it, message_buffer); }); } @@ -62,6 +62,17 @@ void Node::start_initialisation() { next_node.send(init); } +cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector<uint8_t>& buffer) +{ + cmix_proto::CMixMessage message; + if(!message.ParseFromArray(buffer.data(), buffer.size())) { + BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; + io_service.stop(); + throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); + } + return message; +} + void Node::handle_initialization(const cmix_proto::Initialization& init) { if(network_settings.is_first) { @@ -97,16 +108,9 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) } } -void Node::handle_message(const std::vector<uint8_t>& message_buffer) +void Node::handle_node_message(const std::vector<uint8_t>& message_buffer) { - cmix_proto::CMixMessage message; - if(!message.ParseFromArray(message_buffer.data(), message_buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - clients.clear(); - io_service.stop(); - return; - } - + cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kInitialization: { handle_initialization(message.initialization()); @@ -118,50 +122,75 @@ void Node::handle_message(const std::vector<uint8_t>& message_buffer) } } +void Node::handle_client_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) +{ + cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); + switch(message.contents_case()) { + case cmix_proto::CMixMessage::ContentsCase::kBye: { + handle->close(); + clients.erase(handle); + + if(clients.size() == 0) { + io_service.stop(); + } + + return; + } + default: { + BOOST_LOG_TRIVIAL(error) << "CMixMessage contains unknown contents."; + } + } + handle->close(); + clients.erase(handle); +} + void Node::handle_imanode(std::list<Client>::iterator handle) { handle->on_done([]{}); prev_node = PrevNode(std::move(*handle)); purgatory.erase(handle); prev_node.receive([this](std::vector<uint8_t> const& message_buffer){ - handle_message(message_buffer); + handle_node_message(message_buffer); }); } void Node::handle_imaclient(std::list<Client>::iterator handle) { - clients.emplace_back(std::move(*handle)); - std::list<Client>::iterator it = clients.end()--; + std::list<Client>::iterator it = clients.emplace(clients.end(), std::move(*handle)); it->on_done([this, it]{ clients.erase(it); }); purgatory.erase(handle); + it->receive([this, it](std::vector<uint8_t> buffer) { + handle_client_message(it, buffer); + }); } void Node::handle_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) { cmix_proto::CMixMessage message; - if(!message.ParseFromArray(message_buffer.data(), message_buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - clients.clear(); - io_service.stop(); + try { + message = parse_cmix_message(message_buffer); + } catch(std::runtime_error const& e) { + purgatory.erase(handle); return; } - + switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kImanode: { handle_imanode(handle); - break; + return; } case cmix_proto::CMixMessage::ContentsCase::kImaclient: { handle_imaclient(handle); - break; + return; } default: { BOOST_LOG_TRIVIAL(error) << "CMixMessage contains unknown contents."; } } + + purgatory.erase(handle); } void Node::start_precomputation() { - clients.clear(); - io_service.stop(); + } diff --git a/node/node.hpp b/node/node.hpp index a4e21b3..fa0cb37 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -51,8 +51,12 @@ class Node void start_precomputation(); void start_initialisation(); + cmix_proto::CMixMessage parse_cmix_message(std::vector<uint8_t> const& buffer); + void handle_initialization(cmix_proto::Initialization const& init); - void handle_message(std::vector<uint8_t> const& message_buffer); + void handle_node_message(std::vector<uint8_t> const& message_buffer); + + void handle_client_message(std::list<Client>::iterator handle, std::vector<uint8_t> const& message_buffer); void handle_imanode(std::list<Client>::iterator handle); void handle_imaclient(std::list<Client>::iterator handle); |
