#include "node.hpp" #include "cmix.h" #include "logging.hpp" #include #include using namespace boost::asio::ip; Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings) : io_service() , timer(io_service) , ssl_ctx(std::make_shared(boost::asio::ssl::context::sslv23)) , server(io_service, listen_settings, ssl_ctx, [this](std::unique_ptr>&& socket, std::shared_ptr ctx){accept_handler(std::move(socket), ctx);}) , purgatory() , performance(boost::none) , clients() , data() , messages() , network_settings(network_settings) , prev_node(SSLReceiver(std::unique_ptr>(new boost::asio::ssl::stream(io_service, *ssl_ctx)))) , next_node(SSLSender(std::unique_ptr>(new boost::asio::ssl::stream(io_service, *ssl_ctx)))) , cmix_ctx(initialize_cmix_context(get_implementation())) , shutting_down(false) { if(performance_settings.run) { performance = boost::in_place(performance_settings.name, boost::ref(io_service), performance_settings.host, performance_settings.port); } initialize_keypair(&cmix_ctx); GOOGLE_PROTOBUF_VERIFY_VERSION; if(network_settings.is_first) { connect_to_next_node(); } } Node::~Node() { for(auto pair : data) { cmix_ctx.api.free_group_element(pair.second.shared_value); pair.second.shared_value = nullptr; } deinitialize(&cmix_ctx); } void Node::run() { io_service.run(); } void Node::accept_handler(std::unique_ptr>&& socket, std::shared_ptr ctx) { Purgatory::iterator it = purgatory.emplace(purgatory.end(), std::move(socket)); purgatory.back().on_done( [this, it]() { purgatory.erase(it); } ); it->async_receive([this, it](cmix_proto::CMixMessage message) { handle_message(it, message); }); } void Node::connect_to_next_node() { if(!network_settings.certdir.empty()) { ssl_ctx->add_verify_path(network_settings.certdir); } auto on_connect = [this](){ BOOST_LOG_TRIVIAL(trace) << "Connected to next_node"; next_node.async_send(cmix_proto::ImANode()); }; next_node.async_connect(network_settings.next_host, network_settings.next_port, on_connect); } void Node::start_initialisation() { cmix_proto::Initialization init; size_t len = get_group_element_array_size(&cmix_ctx); init.mutable_public_share()->resize(len); get_public_key(&cmix_ctx, &(*init.mutable_public_share())[0]); BOOST_LOG_TRIVIAL(trace) << "Sending intialization as first node"; next_node.async_send(init); } void Node::handle_imanode(Purgatory::iterator handle, cmix_proto::ImANode const&) { handle->on_done([]{}); prev_node = std::move(*handle); purgatory.erase(handle); if(network_settings.is_first) { start_initialisation(); } else { connect_to_next_node(); } prev_node.async_receive([this](cmix_proto::CMixMessage message){ handle_node_message(message); }); } void Node::handle_imaclient(Purgatory::iterator handle, cmix_proto::ImAClient const& c) { std::string client_id = c.id(); clients.emplace(c.id(), decltype(clients)::mapped_type(std::move(*handle))); clients.at(c.id()).on_done([this, client_id]{ clients.erase(client_id); }); purgatory.erase(handle); clients.at(c.id()).async_send(cmix_proto::NodeReady()); clients.at(c.id()).async_receive([this, client_id](cmix_proto::CMixMessage message) { handle_client_message(client_id, message); }); } void Node::handle_message(Purgatory::iterator handle, cmix_proto::CMixMessage message) { switch(message.contents_case()) { case cmix_proto::CMixMessage::ContentsCase::kImanode: { BOOST_LOG_TRIVIAL(trace) << "Handling imanode"; handle_imanode(handle, message.imanode()); return; } case cmix_proto::CMixMessage::ContentsCase::kImaclient: { BOOST_LOG_TRIVIAL(trace) << "Handling imaclient"; handle_imaclient(handle, message.imaclient()); return; } default: { BOOST_LOG_TRIVIAL(error) << "handle_message: CMixMessage contains unknown contents."; } } handle->close(); purgatory.erase(handle); } void Node::start_precomputation() { if(performance) { performance->send("pre_pre_start"); } BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << messages.size() << " clients."; index_map.clear(); if(messages.size() < network_settings.minimum_nr_messages) { start_timer_delayed_mix(); return; } if(start_mix(&cmix_ctx, messages.size()) != no_error) { exit(-1); } unsigned int i = 0; for(auto&& pair : messages) { index_map[pair.first] = i++; } if(initialize_mix_randomness(&cmix_ctx) != no_error) { exit(-1); } std::stringstream ss; ss << "permutation:"; for(auto i = 0; i < cmix_ctx.nr_participants; ++i) { ss << " " << cmix_ctx.permutation[i]; } BOOST_LOG_TRIVIAL(trace) << ss.str(); ArenaMessage arena; cmix_proto::PrePre& prepre = arena.get(); size_t len = get_group_element_array_size(&cmix_ctx); std::vector r_er(cmix_ctx.nr_participants, nullptr); std::vector m_er(cmix_ctx.nr_participants, nullptr); for(size_t i = 0; i < cmix_ctx.nr_participants; ++i) { std::string* r = prepre.add_r_er(); r->resize(len); r_er[i] = &(*r)[0]; std::string* m = prepre.add_m_er(); m->resize(len); m_er[i] = &(*m)[0]; } if(encrypt_r( &cmix_ctx, r_er.data(), m_er.data() ) != no_error) { exit(-1); } if(performance) { performance->send("pre_pre_end"); } BOOST_LOG_TRIVIAL(trace) << "Sending prepre message: " << prepre.ShortDebugString(); next_node.async_send(prepre); auto it = messages.cbegin(); while(it != messages.cend()) { if(it->second.empty()) { it = messages.erase(it); } else { ++it; } } } void Node::start_realtime_phase() { if(performance) { performance->send("real_pre_start"); } ArenaMessage arena; cmix_proto::RealPre& realpre = arena.get(); size_t len = get_group_element_array_size(&cmix_ctx); std::vector ms(index_map.size(), nullptr); std::vector msv(index_map.size(), nullptr); std::vector keys(index_map.size(), nullptr); auto it = index_map.begin(); for(size_t i = 0; i < index_map.size(); ++i) { auto handle = it->first; auto index = it->second; std::string* m = realpre.add_m(); m->resize(len); ms[i] = &(*m)[0]; auto& queue = messages.at(handle); if(queue.empty()) { std::string v; v.resize(len); generate_random_message(&cmix_ctx, &v[0]); queue.push(v); } msv[i] = queue.front().data(); keys[i] = data.at(handle).shared_value; realpre.add_h(handle); it++; } swap_k_for_r( &cmix_ctx, ms.data(), msv.data(), keys.data() ); if(performance) { performance->send("real_pre_end"); } next_node.async_send(realpre); for(auto&& pair : index_map) { messages.at(pair.first).pop(); } } void Node::shutdown() { server.close(); for(auto&& pair : clients) { pair.second.async_send(cmix_proto::Bye()); } } bool Node::send_bye(bool got_bye) { if(got_bye) { if(!shutting_down) { next_node.async_send(cmix_proto::Bye()); } return true; } else { next_node.async_send(cmix_proto::Bye()); shutting_down = true; return false; } } void Node::start_timer_delayed_mix() { timer.expires_from_now(boost::posix_time::millisec(500)); timer.async_wait([this](boost::system::error_code const& ec) { if(ec == boost::system::errc::operation_canceled) { return; } if(ec) { throw std::runtime_error(ec.message()); } start_precomputation(); }); }