aboutsummaryrefslogtreecommitdiff
path: root/node/node.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'node/node.cpp')
-rw-r--r--node/node.cpp82
1 files changed, 24 insertions, 58 deletions
diff --git a/node/node.cpp b/node/node.cpp
index db18253..1fd83c2 100644
--- a/node/node.cpp
+++ b/node/node.cpp
@@ -13,8 +13,8 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
, server(io_service, listen_settings, [this](boost::asio::ip::tcp::socket&& socket){accept_handler(std::move(socket));})
, clients()
, network_settings(network_settings)
-, prev_node(Client(tcp::socket(io_service)))
-, next_node(tcp::socket(io_service))
+, prev_node(ProtobufClient<Receive>(tcp::socket(io_service)))
+, next_node(ProtobufClient<Send>(tcp::socket(io_service)))
, api(get_implementation())
, keypair(api.create_key_pair())
, network_pub_key()
@@ -22,7 +22,7 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
GOOGLE_PROTOBUF_VERIFY_VERSION;
auto on_connect = [this, network_settings](){
- next_node.send(cmix_proto::ImANode());
+ next_node.async_send(cmix_proto::ImANode());
if(network_settings.is_first) {
start_initialisation();
}
@@ -41,17 +41,15 @@ void Node::run() {
void Node::accept_handler(boost::asio::ip::tcp::socket&& socket)
{
- purgatory.emplace_back(std::move(socket));
-
- std::list<Client>::iterator it = --purgatory.end();
+ std::list<ProtobufClient<Receive>>::iterator it = purgatory.emplace(purgatory.end(), std::move(socket));
purgatory.back().on_done(
[this, it]() {
purgatory.erase(it);
}
);
- it->receive([this, it](std::vector<uint8_t> const& message_buffer) {
- handle_message(it, message_buffer);
+ it->receive([this, it](cmix_proto::CMixMessage message) {
+ handle_message(it, message);
});
}
@@ -59,18 +57,7 @@ void Node::start_initialisation() {
cmix_proto::Initialization init;
init.set_public_share(keypair.pub, keypair.pub_len);
- next_node.send(init);
-}
-
-cmix_proto::CMixMessage Node::parse_cmix_message(const std::vector<uint8_t>& buffer)
-{
- cmix_proto::CMixMessage message;
- if(!message.ParseFromArray(buffer.data(), buffer.size())) {
- BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage";
- io_service.stop();
- throw std::runtime_error("Network communication was disrupted in a unrecoverable way.");
- }
- return message;
+ next_node.async_send(init);
}
void Node::handle_initialization(const cmix_proto::Initialization& init)
@@ -95,7 +82,7 @@ void Node::handle_initialization(const cmix_proto::Initialization& init)
cmix_proto::Initialization init;
init.set_public_share(new_shared.data, new_shared.len);
- next_node.send(init);
+ next_node.async_send(init);
free_bignum(&shared);
free_bignum(&mod);
@@ -103,9 +90,8 @@ void Node::handle_initialization(const cmix_proto::Initialization& init)
}
}
-void Node::handle_node_message(const std::vector<uint8_t>& message_buffer)
+void Node::handle_node_message(cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message = parse_cmix_message(message_buffer);
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kInitialization: {
handle_initialization(message.initialization());
@@ -119,22 +105,20 @@ void Node::handle_node_message(const std::vector<uint8_t>& message_buffer)
BOOST_LOG_TRIVIAL(error) << "handle_node_message: CMixMessage contains unknown contents.";
}
}
- prev_node.receive([this](std::vector<uint8_t> const& buffer) {
- handle_node_message(buffer);
+ prev_node.receive([this](cmix_proto::CMixMessage message) {
+ handle_node_message(message);
});
}
-void Node::handle_client_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer)
+void Node::handle_client_message(std::list<SenderReceiver>::iterator handle, cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message = parse_cmix_message(message_buffer);
- BOOST_LOG_TRIVIAL(trace) << "case: " << message.contents_case();
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kKeyexchange: {
BOOST_LOG_TRIVIAL(trace) << "Deriving shared key";
api.derive_shared_key(keypair, reinterpret_cast<uint8_t const*>(message.keyexchange().public_key().c_str()), true);
- handle->receive([this, handle](std::vector<uint8_t> const& buffer){
- handle_client_message(handle, buffer);
+ handle->receive([this, handle](cmix_proto::CMixMessage message){
+ handle_client_message(handle, message);
});
return;
}
@@ -144,17 +128,6 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std::
handle->close();
clients.erase(handle);
- if(clients.size() == 0) {
- cmix_proto::Bye bye;
- next_node.send(bye);
-
- io_service.post([this]{
- BOOST_LOG_TRIVIAL(trace) << "Shutting down";
- io_service.stop();
- });
-
- }
-
return;
}
default: {
@@ -165,36 +138,29 @@ void Node::handle_client_message(std::list<Client>::iterator handle, const std::
clients.erase(handle);
}
-void Node::handle_imanode(std::list<Client>::iterator handle) {
+void Node::handle_imanode(std::list<Receiver>::iterator handle) {
handle->on_done([]{});
- prev_node = PrevNode(std::move(*handle));
+ prev_node = make_receiver(std::move(*handle));
purgatory.erase(handle);
- prev_node.receive([this](std::vector<uint8_t> const& message_buffer){
- handle_node_message(message_buffer);
+ prev_node.receive([this](cmix_proto::CMixMessage message){
+ handle_node_message(message);
});
}
-void Node::handle_imaclient(std::list<Client>::iterator handle) {
- std::list<Client>::iterator it = clients.emplace(clients.end(), std::move(*handle));
+void Node::handle_imaclient(std::list<Receiver>::iterator handle) {
+ BOOST_LOG_TRIVIAL(trace) << "Handling imaclient";
+ std::list<SenderReceiver>::iterator it = clients.emplace(clients.end(), make_sender_receiver(std::move(*handle)));
it->on_done([this, it]{
clients.erase(it);
});
purgatory.erase(handle);
- it->receive([this, it](std::vector<uint8_t> buffer) {
- handle_client_message(it, buffer);
+ it->receive([this, it](cmix_proto::CMixMessage message) {
+ handle_client_message(it, message);
});
}
-void Node::handle_message(std::list<Client>::iterator handle, const std::vector<uint8_t>& message_buffer)
+void Node::handle_message(std::list<ProtobufClient<Receive>>::iterator handle, cmix_proto::CMixMessage message)
{
- cmix_proto::CMixMessage message;
- try {
- message = parse_cmix_message(message_buffer);
- } catch(std::runtime_error const& e) {
- purgatory.erase(handle);
- return;
- }
-
switch(message.contents_case()) {
case cmix_proto::CMixMessage::ContentsCase::kImanode: {
handle_imanode(handle);