diff options
| author | Dennis Brentjes <d.brentjes@gmail.com> | 2016-12-01 21:42:51 +0100 |
|---|---|---|
| committer | Dennis Brentjes <d.brentjes@gmail.com> | 2016-12-01 21:43:48 +0100 |
| commit | 16c28db384adbe61034eb8a2267cd6a886ffd72f (patch) | |
| tree | 426be5a41f5186ba17e909dda90afca5b7921c30 | |
| parent | 463b8ec708db0d2d7405d434e28d0140c94b1d98 (diff) | |
| download | cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.gz cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.bz2 cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.zip | |
Added the client side code for the statsd in the nodes.
| -rw-r--r-- | libcmix-common/CMakeLists.txt | 5 | ||||
| -rw-r--r-- | libcmix-common/cmixprotofunctor.hpp | 3 | ||||
| -rw-r--r-- | libcmix-common/performanceclient.cpp | 29 | ||||
| -rw-r--r-- | libcmix-common/performanceclient.hpp | 16 | ||||
| -rw-r--r-- | libcmix-protobuf/cmix.proto | 9 | ||||
| -rw-r--r-- | node/main.cpp | 20 | ||||
| -rw-r--r-- | node/node.cpp | 29 | ||||
| -rw-r--r-- | node/node.hpp | 16 | ||||
| -rw-r--r-- | node/node_node.cpp | 83 |
9 files changed, 204 insertions, 6 deletions
diff --git a/libcmix-common/CMakeLists.txt b/libcmix-common/CMakeLists.txt index 2324622..958be6a 100644 --- a/libcmix-common/CMakeLists.txt +++ b/libcmix-common/CMakeLists.txt @@ -1,10 +1,12 @@ +find_package(Boost COMPONENTS timer REQUIRED) + add_library(cmix-common cmixprotofunctor.hpp receiver.hpp sender.hpp senderreceiver.hpp - performanceclient.hpp + performanceclient.hpp performanceclient.cpp ) set_target_properties(cmix-common PROPERTIES LINKER_LANGUAGE CXX) @@ -16,4 +18,5 @@ target_include_directories(cmix-common target_link_libraries(cmix-common PRIVATE cmix-protobuf PRIVATE cmix-network + PRIVATE Boost::timer )
\ No newline at end of file diff --git a/libcmix-common/cmixprotofunctor.hpp b/libcmix-common/cmixprotofunctor.hpp index 0c8a341..17793ec 100644 --- a/libcmix-common/cmixprotofunctor.hpp +++ b/libcmix-common/cmixprotofunctor.hpp @@ -68,7 +68,8 @@ struct CMixProtoFunctor { PrePost, prepost, RealPre, realpre, RealMix, realmix, - Payload, payload + Payload, payload, + Performance, performance ) #undef MESSAGE_SETTER_DEFS #undef MESSAGE_SETTER_DEF_ITERATION diff --git a/libcmix-common/performanceclient.cpp b/libcmix-common/performanceclient.cpp new file mode 100644 index 0000000..70953a8 --- /dev/null +++ b/libcmix-common/performanceclient.cpp @@ -0,0 +1,29 @@ +#include "performanceclient.hpp" + +#include <boost/date_time.hpp> + +#include <memory> + +PerformanceClient::PerformanceClient(std::string name, boost::asio::io_service& io_service, std::string host, std::string port) +: node_name(name) +, sender(std::unique_ptr<boost::asio::ip::tcp::socket>(new boost::asio::ip::tcp::socket(io_service))) +, timer() +{ + timer.start(); + sender.async_connect(host, port, []{}); +} + +void PerformanceClient::send(std::string column) +{ + cmix_proto::Performance perf; + perf.set_node(node_name); + perf.set_column(column); + + auto tp = timer.elapsed(); + + perf.set_wall_time(std::to_string(tp.wall)); + perf.set_user_time(std::to_string(tp.user)); + perf.set_system_time(std::to_string(tp.system)); + + sender.async_send(perf); +} diff --git a/libcmix-common/performanceclient.hpp b/libcmix-common/performanceclient.hpp new file mode 100644 index 0000000..cb9afb1 --- /dev/null +++ b/libcmix-common/performanceclient.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include "sender.hpp" + +#include <boost/timer/timer.hpp> + +class PerformanceClient +{ + std::string node_name; + Sender sender; + boost::timer::cpu_timer timer; +public: + PerformanceClient(std::string name, boost::asio::io_service& io_service, std::string host, std::string port); + + void send(std::string column); +};
\ No newline at end of file diff --git a/libcmix-protobuf/cmix.proto b/libcmix-protobuf/cmix.proto index 5cfdd92..2af20bc 100644 --- a/libcmix-protobuf/cmix.proto +++ b/libcmix-protobuf/cmix.proto @@ -65,6 +65,14 @@ message Payload { bytes payload = 1; } +message Performance { + string wall_time = 1; + string user_time = 2; + string system_time = 3; + string column = 4; + string node = 5; +} + message CMixMessage { oneof contents { Initialization initialization = 1; @@ -81,5 +89,6 @@ message CMixMessage { RealPre realpre = 12; RealMix realmix = 13; Payload payload = 14; + Performance performance = 15; } } diff --git a/node/main.cpp b/node/main.cpp index 02838f1..1e76a1f 100644 --- a/node/main.cpp +++ b/node/main.cpp @@ -27,6 +27,8 @@ int main(int argc, char* argv[]) { ("key,k", po::value<std::string>(), "The key file (in pem format).") ("dhparam,d", po::value<std::string>(), "The dhparam file.") ("certdir", po::value<std::string>(), "Directory containing trusted certificates.") + ("statsd,s", po::value<std::string>(), "The address of the statistics daemon.") + ("name,a", po::value<std::string>(), "The name to use in the stats daemon file") ; po::variables_map vm; @@ -126,6 +128,22 @@ int main(int argc, char* argv[]) { NodeNetworkSettings nsettings{is_first, is_last, uri.host, uri.port, certdir, minimum_nr_messages}; - Node node(lsettings, nsettings); + bool run_stats = vm.count("statsd"); + + Uri statsd_uri; + if(run_stats) { + statsd_uri = parse_uri(vm["statsd"].as<std::string>()); + } + + std::string name; + if(vm.count("name")) { + name = vm["name"].as<std::string>(); + } else { + name = uri.port; + } + + PerformanceSettings psettings{run_stats, statsd_uri.host, statsd_uri.port, name}; + + Node node(lsettings, nsettings, psettings); node.run(); } diff --git a/node/node.cpp b/node/node.cpp index aadc123..dc0d1c0 100644 --- a/node/node.cpp +++ b/node/node.cpp @@ -4,15 +4,19 @@ #include "logging.hpp" +#include <boost/utility/in_place_factory.hpp> + #include <numeric> using namespace boost::asio::ip; -Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings) +Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings) : io_service() , timer(io_service) , ssl_ctx(std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23)) , server(io_service, listen_settings, ssl_ctx, [this](std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx){accept_handler(std::move(socket), ctx);}) +, purgatory() +, performance(boost::none) , clients() , data() , messages() @@ -22,6 +26,10 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se , cmix_ctx(initialize_cmix_context(get_implementation())) , shutting_down(false) { + if(performance_settings.run) { + performance = boost::in_place(performance_settings.name, boost::ref(io_service), performance_settings.host, performance_settings.port); + } + initialize_keypair(&cmix_ctx); GOOGLE_PROTOBUF_VERIFY_VERSION; @@ -135,6 +143,10 @@ void Node::handle_message(Purgatory::iterator handle, cmix_proto::CMixMessage me } void Node::start_precomputation() { + if(performance) { + performance->send("pre_pre_start"); + } + BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << messages.size() << " clients."; index_map.clear(); @@ -188,6 +200,10 @@ void Node::start_precomputation() { exit(-1); } + if(performance) { + performance->send("pre_pre_end"); + } + BOOST_LOG_TRIVIAL(trace) << "Sending prepre message: " << prepre.ShortDebugString(); next_node.async_send(prepre); @@ -203,6 +219,10 @@ void Node::start_precomputation() { void Node::start_realtime_phase() { + if(performance) { + performance->send("real_pre_start"); + } + ArenaMessage<cmix_proto::RealPre> arena; cmix_proto::RealPre& realpre = arena.get(); @@ -244,11 +264,16 @@ void Node::start_realtime_phase() { keys.data() ); + if(performance) { + performance->send("real_pre_end"); + } + + next_node.async_send(realpre); + for(auto&& pair : index_map) { messages.at(pair.first).pop(); } - next_node.async_send(realpre); } void Node::shutdown() diff --git a/node/node.hpp b/node/node.hpp index 3233ba2..1aad503 100644 --- a/node/node.hpp +++ b/node/node.hpp @@ -4,6 +4,7 @@ #include "receiver.hpp" #include "senderreceiver.hpp" #include "sender.hpp" +#include "performanceclient.hpp" #include "api.h" #include "cmix.h" @@ -12,6 +13,7 @@ #include <google/protobuf/arena.h> #include <boost/asio/io_service.hpp> +#include <boost/optional.hpp> #include <list> #include <string> @@ -50,6 +52,16 @@ struct NodeNetworkSettings { unsigned int minimum_nr_messages; ///< The minimum number of available messages before starting to run a mix; }; +/*! + * \brief Details if and where to send performance data. + */ +struct PerformanceSettings { + bool run; + std::string host; + std::string port; + std::string name; +}; + template <typename T> struct ArenaMessage { google::protobuf::Arena arena; @@ -76,6 +88,8 @@ class Node typedef std::list<SSLReceiver> Purgatory; Purgatory purgatory; + boost::optional<PerformanceClient> performance; + typedef std::map<std::string, SSLSenderReceiver> ClientConnections; ClientConnections clients; typedef std::map<std::string, CMixClientData> ClientData; @@ -128,7 +142,7 @@ public: * \param listen_settings The listen settings for the accepter. * \param network_settings The network settings containing if we are first and who is the next node. */ - Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings); + Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings); ~Node(); /*! diff --git a/node/node_node.cpp b/node/node_node.cpp index 27e8042..27104b5 100644 --- a/node/node_node.cpp +++ b/node/node_node.cpp @@ -3,6 +3,7 @@ template <typename T> void fill_precomputation_pre_message(CMixContext& ctx, cmix_proto::PrePre& prepre, T const& rs, T const& ms) { + if(start_mix(&ctx, rs.size()) != no_error) { exit(-1); } @@ -190,29 +191,62 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret) void Node::handle_node_prepre(cmix_proto::PrePre const& pre) { if(network_settings.is_first) { + if(performance) { + performance->send("pre_mix_start"); + } + ArenaMessage<cmix_proto::PreMix> arena; auto& premix = arena.get(); fill_precomputation_mix_message(cmix_ctx, premix, pre.r_er(), pre.m_er()); next_node.async_send(premix); + + if(performance) { + performance->send("pre_mix_end"); + } } else { + if(performance) { + performance->send("pre_pre_start"); + } + ArenaMessage<cmix_proto::PrePre> arena; auto& prepre = arena.get(); fill_precomputation_pre_message(cmix_ctx, prepre, pre.r_er(), pre.m_er()); next_node.async_send(prepre); + + if(performance) { + performance->send("pre_pre_end"); + } } } void Node::handle_node_premix(cmix_proto::PreMix const& premix) { if(network_settings.is_first) { + if(performance) { + performance->send("pre_post_start"); + } + ArenaMessage<cmix_proto::PrePost> arena; auto& prepost = arena.get(); fill_precomputation_post_message(cmix_ctx, prepost, premix.r_epirs(), premix.m_epirs()); next_node.async_send(prepost); + + if(performance) { + performance->send("pre_post_end"); + } + } else { + if(performance) { + performance->send("pre_mix_start"); + } + ArenaMessage<cmix_proto::PreMix> arena; auto& n_premix = arena.get(); fill_precomputation_mix_message(cmix_ctx, n_premix, premix.r_epirs(), premix.m_epirs()); next_node.async_send(n_premix); + + if(performance) { + performance->send("pre_mix_end"); + } } } @@ -220,24 +254,48 @@ void Node::handle_node_prepost(cmix_proto::PrePost const& prepost) { if(network_settings.is_first) { start_realtime_phase(); } else { + if(performance) { + performance->send("pre_post_start"); + } + ArenaMessage<cmix_proto::PrePost> arena; auto& n_prepost = arena.get(); fill_precomputation_post_message(cmix_ctx, n_prepost, prepost.r_epirs(), prepost.m_epirs()); next_node.async_send(n_prepost); + + if(performance) { + performance->send("pre_post_end"); + } } } void Node::handle_node_realpre(cmix_proto::RealPre const& realpre) { if(network_settings.is_first) { + if(performance) { + performance->send("real_mix_start"); + } + ArenaMessage<cmix_proto::RealMix> arena; auto& realmix = arena.get(); fill_realtime_mix_message(cmix_ctx, realmix, realpre.m()); next_node.async_send(realmix); + + if(performance) { + performance->send("real_mix_end"); + } } else { + if(performance) { + performance->send("real_pre_start"); + } + ArenaMessage<cmix_proto::RealPre> arena; auto& n_realpre = arena.get(); fill_realtime_pre_message(cmix_ctx, n_realpre, realpre.h(), realpre.m(), data); next_node.async_send(n_realpre); + + if(performance) { + performance->send("real_pre_end"); + } } } @@ -245,10 +303,22 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) { if(network_settings.is_last) { BOOST_LOG_TRIVIAL(trace) << "Doing the last step:"; + if(performance) { + performance->send("real_mix_start"); + } + ArenaMessage<cmix_proto::RealMix> arena; auto& n_realmix = arena.get(); fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m()); + if(performance) { + performance->send("real_mix_end"); + } + + if(performance) { + performance->send("real_post_start"); + } + size_t len = get_group_element_array_size(&cmix_ctx); std::string str; str.resize(len); @@ -280,11 +350,24 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) { free(dest); free(payload); } + + if(performance) { + performance->send("real_post_end"); + } + } else { + if(performance) { + performance->send("real_mix_start"); + } + ArenaMessage<cmix_proto::RealMix> arena; auto& n_realmix = arena.get(); fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m()); next_node.async_send(n_realmix); + + if(performance) { + performance->send("real_mix_end"); + } } } |
