From b5688d16b0920aeed3d945fd136a51fe31dfbe24 Mon Sep 17 00:00:00 2001 From: Dennis Brentjes Date: Mon, 5 Dec 2016 16:34:37 +0100 Subject: added (untested) statsd for the cmake network. --- statsd/CMakeLists.txt | 16 ++++++++++++++++ statsd/main.cpp | 45 +++++++++++++++++++++++++++++++++++++++++++++ statsd/stats.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ statsd/stats.hpp | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+) create mode 100644 statsd/CMakeLists.txt create mode 100644 statsd/main.cpp create mode 100644 statsd/stats.cpp create mode 100644 statsd/stats.hpp (limited to 'statsd') diff --git a/statsd/CMakeLists.txt b/statsd/CMakeLists.txt new file mode 100644 index 0000000..73dbf68 --- /dev/null +++ b/statsd/CMakeLists.txt @@ -0,0 +1,16 @@ +find_package(Boost COMPONENTS system program_options REQUIRED) + +add_executable(statsd + main.cpp + stats.hpp stats.cpp +) + +target_link_libraries(statsd + PRIVATE Boost::boost + PRIVATE Boost::program_options + PRIVATE Boost::system + PRIVATE cmix-protobuf + PRIVATE cmix-network + PRIVATE cmix-common + PRIVATE log +) \ No newline at end of file diff --git a/statsd/main.cpp b/statsd/main.cpp new file mode 100644 index 0000000..436e8ad --- /dev/null +++ b/statsd/main.cpp @@ -0,0 +1,45 @@ + +#include "stats.hpp" + +#include "logging.hpp" + +#include + +#include + +int main(int argc, char* argv[]) { + namespace po = boost::program_options; + + init_logging(boost::log::trivial::severity_level::trace, "statsd"); + + po::options_description desc("Allowed options"); + desc.add_options() + ("help,h", "produce help message.") + ("port,p", po::value()->default_value(9200), "Set listening port.") + ("enable_v4", po::value()->default_value(true), "Enable/disable ipv4 accept support.") + ("interface4,4", po::value()->default_value("0.0.0.0"), "Set the ipv4 address to listen on.") + ("enable_v6", po::value()->default_value(true), "Enable/disable ipv6 accept support.") + ("interface6,6", po::value()->default_value("::"), "Set the ipv6 address to listen on.") + ; + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + if (vm.count("help")) { + std::cout << desc << "\n"; + return 0; + } + + bool en4 = vm["enable_v4"].as(); + std::string if4 = vm["interface4"].as(); + bool en6 = vm["enable_v6"].as(); + std::string if6 = vm["interface6"].as(); + uint16_t port = vm["port"].as(); + + ListenSettings lsettings{en4, if4, en6, if6, port, false, "", "", ""}; + Stats stats(lsettings); + + stats.run(); + +} diff --git a/statsd/stats.cpp b/statsd/stats.cpp new file mode 100644 index 0000000..9eeb140 --- /dev/null +++ b/statsd/stats.cpp @@ -0,0 +1,49 @@ +#include "stats.hpp" + +void Stats::accept_connection(std::unique_ptr&& socket) { + auto it = connections.emplace(connections.end(), std::move(socket)); + it->on_done([this, it](){ + connections.erase(it); + }); + it->async_receive([it, this](cmix_proto::CMixMessage const& message) { + handle_message(it, message); + }); +} + +void Stats::handle_performance(std::list::iterator it, const cmix_proto::Performance& perf) { + data[perf.node()][perf.column() + "wall_time"].push_back(std::stol(perf.wall_time())); + data[perf.node()][perf.column() + "system_time"].push_back(std::stol(perf.system_time())); + data[perf.node()][perf.column() + "user_time"].push_back(std::stol(perf.user_time())); + + it->async_receive([it, this](cmix_proto::CMixMessage const& message) { + handle_message(it, message); + }); +} + +void Stats::handle_message(std::list::iterator it, cmix_proto::CMixMessage message) { + switch(message.contents_case()) { + case cmix_proto::CMixMessage::ContentsCase::kPerformance: { + BOOST_LOG_TRIVIAL(trace) << "Handling performance"; + handle_performance(it, message.performance()); + break; + } + case cmix_proto::CMixMessage::ContentsCase::kBye: { + BOOST_LOG_TRIVIAL(trace) << "Handling Bye"; + + break; + } + default: { + BOOST_LOG_TRIVIAL(error) << "handle_message: CMixMessage contains unknown contents."; + connections.erase(it); + } + } +} + +Stats::Stats(ListenSettings lsettings) +: io_service() +, server(io_service, lsettings, [this](std::unique_ptr&& socket){accept_connection(std::move(socket));}) +{} + +void Stats::run() { + io_service.run(); +} diff --git a/statsd/stats.hpp b/statsd/stats.hpp new file mode 100644 index 0000000..673022e --- /dev/null +++ b/statsd/stats.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "server.hpp" +#include "receiver.hpp" + +#include "cmix.pb.h" + +#include + +#include + +class Stats { + boost::asio::io_service io_service; + Server server; + std::list connections; + + typedef std::vector Column; + typedef std::map Table; + typedef std::map Tables; + + Tables data; + + void accept_connection(std::unique_ptr&& socket); + + void handle_performance(std::list::iterator it, cmix_proto::Performance const& perf); + + void handle_message(std::list::iterator it, cmix_proto::CMixMessage message); + +public: + Stats(ListenSettings lsettings); + + void run(); +}; \ No newline at end of file -- cgit v1.2.3-70-g09d2