aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDennis Brentjes <d.brentjes@gmail.com>2016-12-01 21:42:51 +0100
committerDennis Brentjes <d.brentjes@gmail.com>2016-12-01 21:43:48 +0100
commit16c28db384adbe61034eb8a2267cd6a886ffd72f (patch)
tree426be5a41f5186ba17e909dda90afca5b7921c30
parent463b8ec708db0d2d7405d434e28d0140c94b1d98 (diff)
downloadcmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.gz
cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.bz2
cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.zip
Added the client side code for the statsd in the nodes.
-rw-r--r--libcmix-common/CMakeLists.txt5
-rw-r--r--libcmix-common/cmixprotofunctor.hpp3
-rw-r--r--libcmix-common/performanceclient.cpp29
-rw-r--r--libcmix-common/performanceclient.hpp16
-rw-r--r--libcmix-protobuf/cmix.proto9
-rw-r--r--node/main.cpp20
-rw-r--r--node/node.cpp29
-rw-r--r--node/node.hpp16
-rw-r--r--node/node_node.cpp83
9 files changed, 204 insertions, 6 deletions
diff --git a/libcmix-common/CMakeLists.txt b/libcmix-common/CMakeLists.txt
index 2324622..958be6a 100644
--- a/libcmix-common/CMakeLists.txt
+++ b/libcmix-common/CMakeLists.txt
@@ -1,10 +1,12 @@
+find_package(Boost COMPONENTS timer REQUIRED)
+
add_library(cmix-common
cmixprotofunctor.hpp
receiver.hpp
sender.hpp
senderreceiver.hpp
- performanceclient.hpp
+ performanceclient.hpp performanceclient.cpp
)
set_target_properties(cmix-common PROPERTIES LINKER_LANGUAGE CXX)
@@ -16,4 +18,5 @@ target_include_directories(cmix-common
target_link_libraries(cmix-common
PRIVATE cmix-protobuf
PRIVATE cmix-network
+ PRIVATE Boost::timer
) \ No newline at end of file
diff --git a/libcmix-common/cmixprotofunctor.hpp b/libcmix-common/cmixprotofunctor.hpp
index 0c8a341..17793ec 100644
--- a/libcmix-common/cmixprotofunctor.hpp
+++ b/libcmix-common/cmixprotofunctor.hpp
@@ -68,7 +68,8 @@ struct CMixProtoFunctor {
PrePost, prepost,
RealPre, realpre,
RealMix, realmix,
- Payload, payload
+ Payload, payload,
+ Performance, performance
)
#undef MESSAGE_SETTER_DEFS
#undef MESSAGE_SETTER_DEF_ITERATION
diff --git a/libcmix-common/performanceclient.cpp b/libcmix-common/performanceclient.cpp
new file mode 100644
index 0000000..70953a8
--- /dev/null
+++ b/libcmix-common/performanceclient.cpp
@@ -0,0 +1,29 @@
+#include "performanceclient.hpp"
+
+#include <boost/date_time.hpp>
+
+#include <memory>
+
+PerformanceClient::PerformanceClient(std::string name, boost::asio::io_service& io_service, std::string host, std::string port)
+: node_name(name)
+, sender(std::unique_ptr<boost::asio::ip::tcp::socket>(new boost::asio::ip::tcp::socket(io_service)))
+, timer()
+{
+ timer.start();
+ sender.async_connect(host, port, []{});
+}
+
+void PerformanceClient::send(std::string column)
+{
+ cmix_proto::Performance perf;
+ perf.set_node(node_name);
+ perf.set_column(column);
+
+ auto tp = timer.elapsed();
+
+ perf.set_wall_time(std::to_string(tp.wall));
+ perf.set_user_time(std::to_string(tp.user));
+ perf.set_system_time(std::to_string(tp.system));
+
+ sender.async_send(perf);
+}
diff --git a/libcmix-common/performanceclient.hpp b/libcmix-common/performanceclient.hpp
new file mode 100644
index 0000000..cb9afb1
--- /dev/null
+++ b/libcmix-common/performanceclient.hpp
@@ -0,0 +1,16 @@
+#pragma once
+
+#include "sender.hpp"
+
+#include <boost/timer/timer.hpp>
+
+class PerformanceClient
+{
+ std::string node_name;
+ Sender sender;
+ boost::timer::cpu_timer timer;
+public:
+ PerformanceClient(std::string name, boost::asio::io_service& io_service, std::string host, std::string port);
+
+ void send(std::string column);
+}; \ No newline at end of file
diff --git a/libcmix-protobuf/cmix.proto b/libcmix-protobuf/cmix.proto
index 5cfdd92..2af20bc 100644
--- a/libcmix-protobuf/cmix.proto
+++ b/libcmix-protobuf/cmix.proto
@@ -65,6 +65,14 @@ message Payload {
bytes payload = 1;
}
+message Performance {
+ string wall_time = 1;
+ string user_time = 2;
+ string system_time = 3;
+ string column = 4;
+ string node = 5;
+}
+
message CMixMessage {
oneof contents {
Initialization initialization = 1;
@@ -81,5 +89,6 @@ message CMixMessage {
RealPre realpre = 12;
RealMix realmix = 13;
Payload payload = 14;
+ Performance performance = 15;
}
}
diff --git a/node/main.cpp b/node/main.cpp
index 02838f1..1e76a1f 100644
--- a/node/main.cpp
+++ b/node/main.cpp
@@ -27,6 +27,8 @@ int main(int argc, char* argv[]) {
("key,k", po::value<std::string>(), "The key file (in pem format).")
("dhparam,d", po::value<std::string>(), "The dhparam file.")
("certdir", po::value<std::string>(), "Directory containing trusted certificates.")
+ ("statsd,s", po::value<std::string>(), "The address of the statistics daemon.")
+ ("name,a", po::value<std::string>(), "The name to use in the stats daemon file")
;
po::variables_map vm;
@@ -126,6 +128,22 @@ int main(int argc, char* argv[]) {
NodeNetworkSettings nsettings{is_first, is_last, uri.host, uri.port, certdir, minimum_nr_messages};
- Node node(lsettings, nsettings);
+ bool run_stats = vm.count("statsd");
+
+ Uri statsd_uri;
+ if(run_stats) {
+ statsd_uri = parse_uri(vm["statsd"].as<std::string>());
+ }
+
+ std::string name;
+ if(vm.count("name")) {
+ name = vm["name"].as<std::string>();
+ } else {
+ name = uri.port;
+ }
+
+ PerformanceSettings psettings{run_stats, statsd_uri.host, statsd_uri.port, name};
+
+ Node node(lsettings, nsettings, psettings);
node.run();
}
diff --git a/node/node.cpp b/node/node.cpp
index aadc123..dc0d1c0 100644
--- a/node/node.cpp
+++ b/node/node.cpp
@@ -4,15 +4,19 @@
#include "logging.hpp"
+#include <boost/utility/in_place_factory.hpp>
+
#include <numeric>
using namespace boost::asio::ip;
-Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings)
+Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings)
: io_service()
, timer(io_service)
, ssl_ctx(std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23))
, server(io_service, listen_settings, ssl_ctx, [this](std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx){accept_handler(std::move(socket), ctx);})
+, purgatory()
+, performance(boost::none)
, clients()
, data()
, messages()
@@ -22,6 +26,10 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
, cmix_ctx(initialize_cmix_context(get_implementation()))
, shutting_down(false)
{
+ if(performance_settings.run) {
+ performance = boost::in_place(performance_settings.name, boost::ref(io_service), performance_settings.host, performance_settings.port);
+ }
+
initialize_keypair(&cmix_ctx);
GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -135,6 +143,10 @@ void Node::handle_message(Purgatory::iterator handle, cmix_proto::CMixMessage me
}
void Node::start_precomputation() {
+ if(performance) {
+ performance->send("pre_pre_start");
+ }
+
BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << messages.size() << " clients.";
index_map.clear();
@@ -188,6 +200,10 @@ void Node::start_precomputation() {
exit(-1);
}
+ if(performance) {
+ performance->send("pre_pre_end");
+ }
+
BOOST_LOG_TRIVIAL(trace) << "Sending prepre message: " << prepre.ShortDebugString();
next_node.async_send(prepre);
@@ -203,6 +219,10 @@ void Node::start_precomputation() {
void Node::start_realtime_phase() {
+ if(performance) {
+ performance->send("real_pre_start");
+ }
+
ArenaMessage<cmix_proto::RealPre> arena;
cmix_proto::RealPre& realpre = arena.get();
@@ -244,11 +264,16 @@ void Node::start_realtime_phase() {
keys.data()
);
+ if(performance) {
+ performance->send("real_pre_end");
+ }
+
+ next_node.async_send(realpre);
+
for(auto&& pair : index_map) {
messages.at(pair.first).pop();
}
- next_node.async_send(realpre);
}
void Node::shutdown()
diff --git a/node/node.hpp b/node/node.hpp
index 3233ba2..1aad503 100644
--- a/node/node.hpp
+++ b/node/node.hpp
@@ -4,6 +4,7 @@
#include "receiver.hpp"
#include "senderreceiver.hpp"
#include "sender.hpp"
+#include "performanceclient.hpp"
#include "api.h"
#include "cmix.h"
@@ -12,6 +13,7 @@
#include <google/protobuf/arena.h>
#include <boost/asio/io_service.hpp>
+#include <boost/optional.hpp>
#include <list>
#include <string>
@@ -50,6 +52,16 @@ struct NodeNetworkSettings {
unsigned int minimum_nr_messages; ///< The minimum number of available messages before starting to run a mix;
};
+/*!
+ * \brief Details if and where to send performance data.
+ */
+struct PerformanceSettings {
+ bool run;
+ std::string host;
+ std::string port;
+ std::string name;
+};
+
template <typename T>
struct ArenaMessage {
google::protobuf::Arena arena;
@@ -76,6 +88,8 @@ class Node
typedef std::list<SSLReceiver> Purgatory;
Purgatory purgatory;
+ boost::optional<PerformanceClient> performance;
+
typedef std::map<std::string, SSLSenderReceiver> ClientConnections;
ClientConnections clients;
typedef std::map<std::string, CMixClientData> ClientData;
@@ -128,7 +142,7 @@ public:
* \param listen_settings The listen settings for the accepter.
* \param network_settings The network settings containing if we are first and who is the next node.
*/
- Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings);
+ Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings);
~Node();
/*!
diff --git a/node/node_node.cpp b/node/node_node.cpp
index 27e8042..27104b5 100644
--- a/node/node_node.cpp
+++ b/node/node_node.cpp
@@ -3,6 +3,7 @@
template <typename T>
void fill_precomputation_pre_message(CMixContext& ctx, cmix_proto::PrePre& prepre, T const& rs, T const& ms) {
+
if(start_mix(&ctx, rs.size()) != no_error) {
exit(-1);
}
@@ -190,29 +191,62 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret)
void Node::handle_node_prepre(cmix_proto::PrePre const& pre) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("pre_mix_start");
+ }
+
ArenaMessage<cmix_proto::PreMix> arena;
auto& premix = arena.get();
fill_precomputation_mix_message(cmix_ctx, premix, pre.r_er(), pre.m_er());
next_node.async_send(premix);
+
+ if(performance) {
+ performance->send("pre_mix_end");
+ }
} else {
+ if(performance) {
+ performance->send("pre_pre_start");
+ }
+
ArenaMessage<cmix_proto::PrePre> arena;
auto& prepre = arena.get();
fill_precomputation_pre_message(cmix_ctx, prepre, pre.r_er(), pre.m_er());
next_node.async_send(prepre);
+
+ if(performance) {
+ performance->send("pre_pre_end");
+ }
}
}
void Node::handle_node_premix(cmix_proto::PreMix const& premix) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("pre_post_start");
+ }
+
ArenaMessage<cmix_proto::PrePost> arena;
auto& prepost = arena.get();
fill_precomputation_post_message(cmix_ctx, prepost, premix.r_epirs(), premix.m_epirs());
next_node.async_send(prepost);
+
+ if(performance) {
+ performance->send("pre_post_end");
+ }
+
} else {
+ if(performance) {
+ performance->send("pre_mix_start");
+ }
+
ArenaMessage<cmix_proto::PreMix> arena;
auto& n_premix = arena.get();
fill_precomputation_mix_message(cmix_ctx, n_premix, premix.r_epirs(), premix.m_epirs());
next_node.async_send(n_premix);
+
+ if(performance) {
+ performance->send("pre_mix_end");
+ }
}
}
@@ -220,24 +254,48 @@ void Node::handle_node_prepost(cmix_proto::PrePost const& prepost) {
if(network_settings.is_first) {
start_realtime_phase();
} else {
+ if(performance) {
+ performance->send("pre_post_start");
+ }
+
ArenaMessage<cmix_proto::PrePost> arena;
auto& n_prepost = arena.get();
fill_precomputation_post_message(cmix_ctx, n_prepost, prepost.r_epirs(), prepost.m_epirs());
next_node.async_send(n_prepost);
+
+ if(performance) {
+ performance->send("pre_post_end");
+ }
}
}
void Node::handle_node_realpre(cmix_proto::RealPre const& realpre) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, realmix, realpre.m());
next_node.async_send(realmix);
+
+ if(performance) {
+ performance->send("real_mix_end");
+ }
} else {
+ if(performance) {
+ performance->send("real_pre_start");
+ }
+
ArenaMessage<cmix_proto::RealPre> arena;
auto& n_realpre = arena.get();
fill_realtime_pre_message(cmix_ctx, n_realpre, realpre.h(), realpre.m(), data);
next_node.async_send(n_realpre);
+
+ if(performance) {
+ performance->send("real_pre_end");
+ }
}
}
@@ -245,10 +303,22 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) {
if(network_settings.is_last) {
BOOST_LOG_TRIVIAL(trace) << "Doing the last step:";
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& n_realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m());
+ if(performance) {
+ performance->send("real_mix_end");
+ }
+
+ if(performance) {
+ performance->send("real_post_start");
+ }
+
size_t len = get_group_element_array_size(&cmix_ctx);
std::string str;
str.resize(len);
@@ -280,11 +350,24 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) {
free(dest);
free(payload);
}
+
+ if(performance) {
+ performance->send("real_post_end");
+ }
+
} else {
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& n_realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m());
next_node.async_send(n_realmix);
+
+ if(performance) {
+ performance->send("real_mix_end");
+ }
}
}