diff options
Diffstat (limited to 'node')
| -rw-r--r-- | node/main.cpp | 2 | ||||
| -rw-r--r-- | node/nextnode.hpp | 1 | ||||
| -rw-r--r-- | node/node.cpp | 73 | ||||
| -rw-r--r-- | node/node.hpp | 6 |
4 files changed, 58 insertions, 24 deletions
diff --git a/node/main.cpp b/node/main.cpp index 85b0ca5..7425f00 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -10,7 +10,7 @@ int main(int argc, char* argv[]) { namespace po = boost::program_options; - init_logging(boost::log::trivial::severity_level::trace); + init_logging(boost::log::trivial::severity_level::trace, "node"); BOOST_LOG_TRIVIAL(info) << "Started node"; diff --git a/node/nextnode.hpp b/node/nextnode.hpp index 862ca5c..c2e064c 100644 --- a/node/nextnode.hpp +++ b/node/nextnode.hpp @@ -22,6 +22,7 @@ inline void message_setter(cmix_proto::CMixMessage& m, cmix_proto::TYPE const& v MESSAGE_SETTER(Initialization, initialization) MESSAGE_SETTER(ImANode, imanode) +MESSAGE_SETTER(Bye, bye) #undef MESSAGE_SETTER diff --git a/node/node.cpp b/node/node.cpp index c02e5e6..1fe0b44 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -51,7 +51,7 @@ void Node::accept_handler(boost::asio::ip::tcp::socket&& socket) ); it->receive([this, it](std::vector<uint8_t> const& message_buffer) { - handle_message(decltype(it)(it), message_buffer); + handle_message(it, message_buffer); }); } @@ -62,6 +62,17 @@ void Node::start_initialisation() { next_node.send(init); } +cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector<uint8_t>& buffer) +{ + cmix_proto::CMixMessage message; + if(!message.ParseFromArray(buffer.data(), buffer.size())) { + BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; + io_service.stop(); + throw std::runtime_error("Network communication was disrupted in a unrecoverable way."); + } + return message; +} + void Node::handle_initialization(const cmix_proto::Initialization& init) { if(network_settings.is_first) { @@ -97,16 +108,9 @@ void Node::handle_initialization(const cmix_proto::Initialization& init) } } -void Node::handle_message(const std::vector<uint8_t>& message_buffer) +void Node::handle_node_message(const std::vector<uint8_t>& message_buffer) { - cmix_proto::CMixMessage message; - if(!message.ParseFromArray(message_buffer.data(), message_buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - clients.clear(); - io_service.stop(); - return; - } - + cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kInitialization: { handle_initialization(message.initialization()); @@ -118,50 +122,75 @@ void Node::handle_message(const std::vector<uint8_t>& message_buffer) } } +void Node::handle_client_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) +{ + cmix_proto::CMixMessage message = parse_cmix_message(message_buffer); + switch(message.contents_case()) { + case cmix_proto::CMixMessage::ContentsCase::kBye: { + handle->close(); + clients.erase(handle); + + if(clients.size() == 0) { + io_service.stop(); + } + + return; + } + default: { + BOOST_LOG_TRIVIAL(error) << "CMixMessage contains unknown contents."; + } + } + handle->close(); + clients.erase(handle); +} + void Node::handle_imanode(std::list<Client>::iterator handle) { handle->on_done([]{}); prev_node = PrevNode(std::move(*handle)); purgatory.erase(handle); prev_node.receive([this](std::vector<uint8_t> const& message_buffer){ - handle_message(message_buffer); + handle_node_message(message_buffer); }); } void Node::handle_imaclient(std::list<Client>::iterator handle) { - clients.emplace_back(std::move(*handle)); - std::list<Client>::iterator it = clients.end()--; + std::list<Client>::iterator it = clients.emplace(clients.end(), std::move(*handle)); it->on_done([this, it]{ clients.erase(it); }); purgatory.erase(handle); + it->receive([this, it](std::vector<uint8_t> buffer) { + handle_client_message(it, buffer); + }); } void Node::handle_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer) { cmix_proto::CMixMessage message; - if(!message.ParseFromArray(message_buffer.data(), message_buffer.size())) { - BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage"; - clients.clear(); - io_service.stop(); + try { + message = parse_cmix_message(message_buffer); + } catch(std::runtime_error const& e) { + purgatory.erase(handle); return; } - + switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kImanode: { handle_imanode(handle); - break; + return; } case cmix_proto::CMixMessage::ContentsCase::kImaclient: { handle_imaclient(handle); - break; + return; } default: { BOOST_LOG_TRIVIAL(error) << "CMixMessage contains unknown contents."; } } + + purgatory.erase(handle); } void Node::start_precomputation() { - clients.clear(); - io_service.stop(); + } diff --git a/node/node.hpp b/node/node.hpp index a4e21b3..fa0cb37 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -51,8 +51,12 @@ class Node void start_precomputation(); void start_initialisation(); + cmix_proto::CMixMessage parse_cmix_message(std::vector<uint8_t> const& buffer); + void handle_initialization(cmix_proto::Initialization const& init); - void handle_message(std::vector<uint8_t> const& message_buffer); + void handle_node_message(std::vector<uint8_t> const& message_buffer); + + void handle_client_message(std::list<Client>::iterator handle, std::vector<uint8_t> const& message_buffer); void handle_imanode(std::list<Client>::iterator handle); void handle_imaclient(std::list<Client>::iterator handle); |
