diff options
Diffstat (limited to 'node')
| -rw-r--r-- | node/main.cpp | 12 | ||||
| -rw-r--r-- | node/node.cpp | 22 | ||||
| -rw-r--r-- | node/node.hpp | 2 | ||||
| -rw-r--r-- | node/node_node.cpp | 43 |
4 files changed, 55 insertions, 24 deletions
diff --git a/node/main.cpp b/node/main.cpp index acfdafe..02838f1 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -22,6 +22,7 @@ int main(int argc, char* argv[]) { ("next_node,n", po::value<std::string>(), "The address of the next node in the network.") ("first,f", "This is the first node and will be the communication point for the clients.") ("last,l", "this is the last node and will be able to send the original message out of the network.") + ("minimum_nr_messages,m", po::value<unsigned int>(), "minimum number of messages that needs to accumulate before starting a mix.") ("cert,c", po::value<std::string>(), "The cert file to use (in pem format).") ("key,k", po::value<std::string>(), "The key file (in pem format).") ("dhparam,d", po::value<std::string>(), "The dhparam file.") @@ -93,6 +94,15 @@ int main(int argc, char* argv[]) { BOOST_LOG_TRIVIAL(info) << "Started node"; bool is_first = bool(vm.count("first")); + unsigned int minimum_nr_messages = 0; + if(is_first) { + if(vm.count("minimum_nr_messages")) { + minimum_nr_messages = vm["minimum_nr_messages"].as<unsigned int>(); + } else { + std::cerr << "Minimum nr of messages is a required parameter for the first node" << std::endl; + return -1; + } + } bool is_last = bool(vm.count("last")); std::string next_node; if(vm.count("next_node")) { @@ -114,7 +124,7 @@ int main(int argc, char* argv[]) { Uri uri = parse_uri(next_node); - NodeNetworkSettings nsettings{is_first, is_last, uri.host, uri.port, certdir}; + NodeNetworkSettings nsettings{is_first, is_last, uri.host, uri.port, certdir, minimum_nr_messages}; Node node(lsettings, nsettings); node.run(); diff --git a/node/node.cpp b/node/node.cpp index 2310bc1..84ea179 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -156,8 +156,14 @@ void Node::handle_message(Purgatory::iterator handle, cmix_proto::CMixMessage me } void Node::start_precomputation() { - BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << clients.size() << " clients."; + BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << messages.size() << " clients."; index_map.clear(); + + if(messages.size() < network_settings.minimum_nr_messages) { + start_timer_delayed_mix(); + return; + } + if(start_mix(&cmix_ctx, messages.size()) != no_error) { exit(-1); } @@ -278,3 +284,17 @@ bool Node::send_bye(bool got_bye) return false; } } + +void Node::start_timer_delayed_mix() +{ + timer.expires_from_now(boost::posix_time::millisec(500)); + timer.async_wait([this](boost::system::error_code const& ec) { + if(ec == boost::system::errc::operation_canceled) { + return; + } + if(ec) { + throw std::runtime_error(ec.message()); + } + start_precomputation(); + }); +} diff --git a/node/node.hpp b/node/node.hpp index db3c66f..5365c9f 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -40,6 +40,7 @@ struct NodeNetworkSettings { std::string next_host; ///< Next nodes host. std::string next_port; ///< Next nodes port. std::string certdir; ///< Directory containing trusted certificate authorities. + unsigned int minimum_nr_messages; ///< The minimum number of available messages before starting to run a mix; }; /*! @@ -85,6 +86,7 @@ class Node void start_realtime_phase(); void shutdown(); bool send_bye(bool got_bye); + void start_timer_delayed_mix(); void handle_node_initialization(cmix_proto::Initialization const& init); void handle_node_secretkey(cmix_proto::SecretKey const& secret); diff --git a/node/node_node.cpp b/node/node_node.cpp index f68b496..00968a0 100644 --- a/node/node_node.cpp +++ b/node/node_node.cpp @@ -205,10 +205,7 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret) std::string share = secret.secret_key(); if(network_settings.is_first) { - timer.expires_from_now(boost::posix_time::seconds(4)); - timer.async_wait([this](boost::system::error_code const& ec) { - start_precomputation(); - }); + start_timer_delayed_mix(); } else { set_network_key(&cmix_ctx, secret.secret_key().data(), secret.secret_key().size()); next_node.async_send(secret); @@ -245,27 +242,16 @@ void Node::handle_node_prepost(cmix_proto::PrePost const& prepost) { } void Node::handle_node_realpre(cmix_proto::RealPre const& realpre) { - auto final = [this](){ - deinitialize(&cmix_ctx); - shutdown(); - }; - if(network_settings.is_first) { cmix_proto::RealMix n_realmix = fill_realtime_mix_message(cmix_ctx, realpre.m()); - next_node.async_send(n_realmix, final); + next_node.async_send(n_realmix); } else { cmix_proto::RealPre n_realpre = fill_realtime_pre_message(cmix_ctx, realpre.h(), realpre.m(), data); next_node.async_send(n_realpre); } } -void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) { - auto final = [this](){ - deinitialize(&cmix_ctx); - shutdown(); - send_bye(false); - }; - +void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) { if(network_settings.is_last) { BOOST_LOG_TRIVIAL(trace) << "Doing the last step:"; @@ -277,18 +263,31 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) { for(int i = 0; i < n_realmix.m_size(); i++) { remove_r_and_s(&cmix_ctx, &str[0], n_realmix.m(i).data(), i); - { std::stringstream ss; for(auto&& c : str) { ss << "\\" << std::setw(3) << std::setfill('0') << std::oct << (unsigned int) c; } BOOST_LOG_TRIVIAL(trace) << ss.str(); - } + } + + char* dest; + size_t dest_len; + + char* payload; + size_t payload_len; + + split_message(&cmix_ctx, &dest, &dest_len, &payload, &payload_len, str.data()); + + std::string dest_s = std::string(dest, dest_len); + + cmix_proto::Payload pay; + pay.set_payload(payload, payload_len); + clients.at(dest_s).async_send(pay); + + free(dest); + free(payload); } - - final(); - } else { cmix_proto::RealMix n_realmix = fill_realtime_mix_message(cmix_ctx, realmix.m()); next_node.async_send(n_realmix); |
