aboutsummaryrefslogtreecommitdiff
path: root/node/node.cpp
diff options
context:
space:
mode:
authorDennis Brentjes <d.brentjes@gmail.com>2016-10-12 14:26:12 +0200
committerDennis Brentjes <d.brentjes@gmail.com>2016-10-12 14:26:12 +0200
commit7bca48bc5b5e37a3a8b0b23e57b88d069fa50589 (patch)
tree47cd62512e631a064852015c65bb1965bc72414a /node/node.cpp
parent0fb433690c0ca5f9561fe9e2e973e2cd61b873ba (diff)
downloadcmix-7bca48bc5b5e37a3a8b0b23e57b88d069fa50589.tar.gz
cmix-7bca48bc5b5e37a3a8b0b23e57b88d069fa50589.tar.bz2
cmix-7bca48bc5b5e37a3a8b0b23e57b88d069fa50589.zip
Major network rewrite.
One generic class has been introduced to handle all connection types. Typedefs provide Sender Receiver and SenderReceiver types, which limit the functionality of the types. As to not accidentally communicate with the wrong node about things.
Diffstat (limited to 'node/node.cpp')
-rw-r--r--node/node.cpp82
1 files changed, 24 insertions, 58 deletions
diff --git a/node/node.cpp b/node/node.cpp
index db18253..1fd83c2 100644
--- a/node/node.cpp
+++ b/node/node.cpp
@@ -13,8 +13,8 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
, server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));})
, clients()
, network_settings(network_settings)
-, prev_node(Client(tcp::socket(io_service)))
-, next_node(tcp::socket(io_service))
+, prev_node(ProtobufClient<Receive>(tcp::socket(io_service)))
+, next_node(ProtobufClient<Send>(tcp::socket(io_service)))
, api(get_implementation())
, keypair(api.create_key_pair())
, network_pub_key()
@@ -22,7 +22,7 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
GOOGLE_PROTOBUF_VERIFY_VERSION;
auto on_connect = [this, network_settings](){
- next_node.send(cmix_proto::ImANode());
+ next_node.async_send(cmix_proto::ImANode());
if(network_settings.is_first) {
start_initialisation();
}
@@ -41,17 +41,15 @@ void Node::run() {
void Node::accept_handler(boost::asio::ip::tcp::socket&& socket)
{
- purgatory.emplace_back(std::move(socket));
-
- std::list<Client>::iterator it = --purgatory.end();
+ std::list<ProtobufClient<Receive>>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket));
purgatory.back().on_done(
[this, it]() {
purgatory.erase(it);
}
);
- it->receive([this, it](std::vector<uint8_t> const& message_buffer) {
- handle_message(it, message_buffer);
+ it->receive([this, it](cmix_proto::CMixMessage message) {
+ handle_message(it, message);
});
}
@@ -59,18 +57,7 @@ void Node::start_initialisation() {
cmix_proto::Initialization init;
init.set_public_share(keypair.pub, keypair.pub_len);
- next_node.send(init);
-}
-
-cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector<uint8_t>& buffer)
-{
- cmix_proto::CMixMessage message;
- if(!message.ParseFromArray(buffer.data(), buffer.size())) {
- BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage";
- io_service.stop();
- throw std::runtime_error("Network communication was disrupted in a unrecoverable way.");
- }
- return message;
+ next_node.async_send(init);
}
void Node::handle_initialization(const cmix_proto::Initialization& init)
@@ -95,7 +82,7 @@ void Node::handle_initialization(const cmix_proto::Initialization& init)
cmix_proto::Initialization init;
init.set_public_share(new_shared.data, new_shared.len);
- next_node.send(init);
+ next_node.async_send(init);
free_bignum(&shared);
free_bignum(&mod);
@@ -103,9 +90,8 @@ void Node::handle_initialization(const cmix_proto::Initialization& init)
}
}
-void Node::handle_node_message(const std::vector<uint8_t>& message_buffer)
+void Node::handle_node_message(cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message = parse_cmix_message(message_buffer);
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kInitialization: {
handle_initialization(message.initialization());
@@ -119,22 +105,20 @@ void Node::handle_node_message(const std::vector<uint8_t>& message_buffer)
BOOST_LOG_TRIVIAL(error) << "handle_node_message: CMixMessage contains unknown contents.";
}
}
- prev_node.receive([this](std::vector<uint8_t> const& buffer) {
- handle_node_message(buffer);
+ prev_node.receive([this](cmix_proto::CMixMessage message) {
+ handle_node_message(message);
});
}
-void Node::handle_client_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer)
+void Node::handle_client_message(std::list<SenderReceiver>::iterator handle, cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message = parse_cmix_message(message_buffer);
- BOOST_LOG_TRIVIAL(trace) << "case: " << message.contents_case();
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kKeyexchange: {
BOOST_LOG_TRIVIAL(trace) << "Deriving shared key";
api.derive_shared_key(keypair, reinterpret_cast<uint8_t const*>(message.keyexchange().public_key().c_str()), true);
- handle->receive([this, handle](std::vector<uint8_t> const& buffer){
- handle_client_message(handle, buffer);
+ handle->receive([this, handle](cmix_proto::CMixMessage message){
+ handle_client_message(handle, message);
});
return;
}
@@ -144,17 +128,6 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std::
handle->close();
clients.erase(handle);
- if(clients.size() == 0) {
- cmix_proto::Bye bye;
- next_node.send(bye);
-
- io_service.post([this]{
- BOOST_LOG_TRIVIAL(trace) << "Shutting down";
- io_service.stop();
- });
-
- }
-
return;
}
default: {
@@ -165,36 +138,29 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std::
clients.erase(handle);
}
-void Node::handle_imanode(std::list<Client>::iterator handle) {
+void Node::handle_imanode(std::list<Receiver>::iterator handle) {
handle->on_done([]{});
- prev_node = PrevNode(std::move(*handle));
+ prev_node = make_receiver(std::move(*handle));
purgatory.erase(handle);
- prev_node.receive([this](std::vector<uint8_t> const& message_buffer){
- handle_node_message(message_buffer);
+ prev_node.receive([this](cmix_proto::CMixMessage message){
+ handle_node_message(message);
});
}
-void Node::handle_imaclient(std::list<Client>::iterator handle) {
- std::list<Client>::iterator it = clients.emplace(clients.end(), std::move(*handle));
+void Node::handle_imaclient(std::list<Receiver>::iterator handle) {
+ BOOST_LOG_TRIVIAL(trace) << "Handling imaclient";
+ std::list<SenderReceiver>::iterator it = clients.emplace(clients.end(), make_sender_receiver(std::move(*handle)));
it->on_done([this, it]{
clients.erase(it);
});
purgatory.erase(handle);
- it->receive([this, it](std::vector<uint8_t> buffer) {
- handle_client_message(it, buffer);
+ it->receive([this, it](cmix_proto::CMixMessage message) {
+ handle_client_message(it, message);
});
}
-void Node::handle_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer)
+void Node::handle_message(std::list<ProtobufClient<Receive>>::iterator handle, cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message;
- try {
- message = parse_cmix_message(message_buffer);
- } catch(std::runtime_error const& e) {
- purgatory.erase(handle);
- return;
- }
-
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kImanode: {
handle_imanode(handle);