aboutsummaryrefslogtreecommitdiff
path: root/node
diff options
context:
space:
mode:
authorDennis Brentjes <d.brentjes@gmail.com>2016-12-01 21:42:51 +0100
committerDennis Brentjes <d.brentjes@gmail.com>2016-12-01 21:43:48 +0100
commit16c28db384adbe61034eb8a2267cd6a886ffd72f (patch)
tree426be5a41f5186ba17e909dda90afca5b7921c30 /node
parent463b8ec708db0d2d7405d434e28d0140c94b1d98 (diff)
downloadcmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.gz
cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.tar.bz2
cmix-16c28db384adbe61034eb8a2267cd6a886ffd72f.zip
Added the client side code for the statsd in the nodes.
Diffstat (limited to 'node')
-rw-r--r--node/main.cpp20
-rw-r--r--node/node.cpp29
-rw-r--r--node/node.hpp16
-rw-r--r--node/node_node.cpp83
4 files changed, 144 insertions, 4 deletions
diff --git a/node/main.cpp b/node/main.cpp
index 02838f1..1e76a1f 100644
--- a/node/main.cpp
+++ b/node/main.cpp
@@ -27,6 +27,8 @@ int main(int argc, char* argv[]) {
("key,k", po::value<std::string>(), "The key file (in pem format).")
("dhparam,d", po::value<std::string>(), "The dhparam file.")
("certdir", po::value<std::string>(), "Directory containing trusted certificates.")
+ ("statsd,s", po::value<std::string>(), "The address of the statistics daemon.")
+ ("name,a", po::value<std::string>(), "The name to use in the stats daemon file")
;
po::variables_map vm;
@@ -126,6 +128,22 @@ int main(int argc, char* argv[]) {
NodeNetworkSettings nsettings{is_first, is_last, uri.host, uri.port, certdir, minimum_nr_messages};
- Node node(lsettings, nsettings);
+ bool run_stats = vm.count("statsd");
+
+ Uri statsd_uri;
+ if(run_stats) {
+ statsd_uri = parse_uri(vm["statsd"].as<std::string>());
+ }
+
+ std::string name;
+ if(vm.count("name")) {
+ name = vm["name"].as<std::string>();
+ } else {
+ name = uri.port;
+ }
+
+ PerformanceSettings psettings{run_stats, statsd_uri.host, statsd_uri.port, name};
+
+ Node node(lsettings, nsettings, psettings);
node.run();
}
diff --git a/node/node.cpp b/node/node.cpp
index aadc123..dc0d1c0 100644
--- a/node/node.cpp
+++ b/node/node.cpp
@@ -4,15 +4,19 @@
#include "logging.hpp"
+#include <boost/utility/in_place_factory.hpp>
+
#include <numeric>
using namespace boost::asio::ip;
-Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings)
+Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings)
: io_service()
, timer(io_service)
, ssl_ctx(std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23))
, server(io_service, listen_settings, ssl_ctx, [this](std::unique_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>&& socket, std::shared_ptr<boost::asio::ssl::context> ctx){accept_handler(std::move(socket), ctx);})
+, purgatory()
+, performance(boost::none)
, clients()
, data()
, messages()
@@ -22,6 +26,10 @@ Node::Node(ListenSettings const& listen_settings, NodeNetworkSettings network_se
, cmix_ctx(initialize_cmix_context(get_implementation()))
, shutting_down(false)
{
+ if(performance_settings.run) {
+ performance = boost::in_place(performance_settings.name, boost::ref(io_service), performance_settings.host, performance_settings.port);
+ }
+
initialize_keypair(&cmix_ctx);
GOOGLE_PROTOBUF_VERIFY_VERSION;
@@ -135,6 +143,10 @@ void Node::handle_message(Purgatory::iterator handle, cmix_proto::CMixMessage me
}
void Node::start_precomputation() {
+ if(performance) {
+ performance->send("pre_pre_start");
+ }
+
BOOST_LOG_TRIVIAL(trace) << "Starting precomputation for " << messages.size() << " clients.";
index_map.clear();
@@ -188,6 +200,10 @@ void Node::start_precomputation() {
exit(-1);
}
+ if(performance) {
+ performance->send("pre_pre_end");
+ }
+
BOOST_LOG_TRIVIAL(trace) << "Sending prepre message: " << prepre.ShortDebugString();
next_node.async_send(prepre);
@@ -203,6 +219,10 @@ void Node::start_precomputation() {
void Node::start_realtime_phase() {
+ if(performance) {
+ performance->send("real_pre_start");
+ }
+
ArenaMessage<cmix_proto::RealPre> arena;
cmix_proto::RealPre& realpre = arena.get();
@@ -244,11 +264,16 @@ void Node::start_realtime_phase() {
keys.data()
);
+ if(performance) {
+ performance->send("real_pre_end");
+ }
+
+ next_node.async_send(realpre);
+
for(auto&& pair : index_map) {
messages.at(pair.first).pop();
}
- next_node.async_send(realpre);
}
void Node::shutdown()
diff --git a/node/node.hpp b/node/node.hpp
index 3233ba2..1aad503 100644
--- a/node/node.hpp
+++ b/node/node.hpp
@@ -4,6 +4,7 @@
#include "receiver.hpp"
#include "senderreceiver.hpp"
#include "sender.hpp"
+#include "performanceclient.hpp"
#include "api.h"
#include "cmix.h"
@@ -12,6 +13,7 @@
#include <google/protobuf/arena.h>
#include <boost/asio/io_service.hpp>
+#include <boost/optional.hpp>
#include <list>
#include <string>
@@ -50,6 +52,16 @@ struct NodeNetworkSettings {
unsigned int minimum_nr_messages; ///< The minimum number of available messages before starting to run a mix;
};
+/*!
+ * \brief Details if and where to send performance data.
+ */
+struct PerformanceSettings {
+ bool run;
+ std::string host;
+ std::string port;
+ std::string name;
+};
+
template <typename T>
struct ArenaMessage {
google::protobuf::Arena arena;
@@ -76,6 +88,8 @@ class Node
typedef std::list<SSLReceiver> Purgatory;
Purgatory purgatory;
+ boost::optional<PerformanceClient> performance;
+
typedef std::map<std::string, SSLSenderReceiver> ClientConnections;
ClientConnections clients;
typedef std::map<std::string, CMixClientData> ClientData;
@@ -128,7 +142,7 @@ public:
* \param listen_settings The listen settings for the accepter.
* \param network_settings The network settings containing if we are first and who is the next node.
*/
- Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings);
+ Node(ListenSettings const& listen_settings, NodeNetworkSettings network_settings, PerformanceSettings performance_settings);
~Node();
/*!
diff --git a/node/node_node.cpp b/node/node_node.cpp
index 27e8042..27104b5 100644
--- a/node/node_node.cpp
+++ b/node/node_node.cpp
@@ -3,6 +3,7 @@
template <typename T>
void fill_precomputation_pre_message(CMixContext& ctx, cmix_proto::PrePre& prepre, T const& rs, T const& ms) {
+
if(start_mix(&ctx, rs.size()) != no_error) {
exit(-1);
}
@@ -190,29 +191,62 @@ void Node::handle_node_secretkey(cmix_proto::SecretKey const& secret)
void Node::handle_node_prepre(cmix_proto::PrePre const& pre) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("pre_mix_start");
+ }
+
ArenaMessage<cmix_proto::PreMix> arena;
auto& premix = arena.get();
fill_precomputation_mix_message(cmix_ctx, premix, pre.r_er(), pre.m_er());
next_node.async_send(premix);
+
+ if(performance) {
+ performance->send("pre_mix_end");
+ }
} else {
+ if(performance) {
+ performance->send("pre_pre_start");
+ }
+
ArenaMessage<cmix_proto::PrePre> arena;
auto& prepre = arena.get();
fill_precomputation_pre_message(cmix_ctx, prepre, pre.r_er(), pre.m_er());
next_node.async_send(prepre);
+
+ if(performance) {
+ performance->send("pre_pre_end");
+ }
}
}
void Node::handle_node_premix(cmix_proto::PreMix const& premix) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("pre_post_start");
+ }
+
ArenaMessage<cmix_proto::PrePost> arena;
auto& prepost = arena.get();
fill_precomputation_post_message(cmix_ctx, prepost, premix.r_epirs(), premix.m_epirs());
next_node.async_send(prepost);
+
+ if(performance) {
+ performance->send("pre_post_end");
+ }
+
} else {
+ if(performance) {
+ performance->send("pre_mix_start");
+ }
+
ArenaMessage<cmix_proto::PreMix> arena;
auto& n_premix = arena.get();
fill_precomputation_mix_message(cmix_ctx, n_premix, premix.r_epirs(), premix.m_epirs());
next_node.async_send(n_premix);
+
+ if(performance) {
+ performance->send("pre_mix_end");
+ }
}
}
@@ -220,24 +254,48 @@ void Node::handle_node_prepost(cmix_proto::PrePost const& prepost) {
if(network_settings.is_first) {
start_realtime_phase();
} else {
+ if(performance) {
+ performance->send("pre_post_start");
+ }
+
ArenaMessage<cmix_proto::PrePost> arena;
auto& n_prepost = arena.get();
fill_precomputation_post_message(cmix_ctx, n_prepost, prepost.r_epirs(), prepost.m_epirs());
next_node.async_send(n_prepost);
+
+ if(performance) {
+ performance->send("pre_post_end");
+ }
}
}
void Node::handle_node_realpre(cmix_proto::RealPre const& realpre) {
if(network_settings.is_first) {
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, realmix, realpre.m());
next_node.async_send(realmix);
+
+ if(performance) {
+ performance->send("real_mix_end");
+ }
} else {
+ if(performance) {
+ performance->send("real_pre_start");
+ }
+
ArenaMessage<cmix_proto::RealPre> arena;
auto& n_realpre = arena.get();
fill_realtime_pre_message(cmix_ctx, n_realpre, realpre.h(), realpre.m(), data);
next_node.async_send(n_realpre);
+
+ if(performance) {
+ performance->send("real_pre_end");
+ }
}
}
@@ -245,10 +303,22 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) {
if(network_settings.is_last) {
BOOST_LOG_TRIVIAL(trace) << "Doing the last step:";
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& n_realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m());
+ if(performance) {
+ performance->send("real_mix_end");
+ }
+
+ if(performance) {
+ performance->send("real_post_start");
+ }
+
size_t len = get_group_element_array_size(&cmix_ctx);
std::string str;
str.resize(len);
@@ -280,11 +350,24 @@ void Node::handle_node_realmix(cmix_proto::RealMix const& realmix) {
free(dest);
free(payload);
}
+
+ if(performance) {
+ performance->send("real_post_end");
+ }
+
} else {
+ if(performance) {
+ performance->send("real_mix_start");
+ }
+
ArenaMessage<cmix_proto::RealMix> arena;
auto& n_realmix = arena.get();
fill_realtime_mix_message(cmix_ctx, n_realmix, realmix.m());
next_node.async_send(n_realmix);
+
+ if(performance) {
+ performance->send("real_mix_end");
+ }
}
}