aboutsummaryrefslogtreecommitdiff
path: root/libcmix-network
diff options
context:
space:
mode:
Diffstat (limited to 'libcmix-network')
-rw-r--r--libcmix-network/CMakeLists.txt3
-rw-r--r--libcmix-network/client.cpp36
-rw-r--r--libcmix-network/client.hpp37
-rw-r--r--libcmix-network/nodeclient.cpp22
-rw-r--r--libcmix-network/nodeclient.hpp49
-rw-r--r--libcmix-network/protobufclient.cpp0
-rw-r--r--libcmix-network/protobufclient.hpp148
7 files changed, 185 insertions, 110 deletions
diff --git a/libcmix-network/CMakeLists.txt b/libcmix-network/CMakeLists.txt
index 5891254..c64c8b7 100644
--- a/libcmix-network/CMakeLists.txt
+++ b/libcmix-network/CMakeLists.txt
@@ -9,7 +9,7 @@ add_library(cmix-network
connect.hpp connect.cpp
server.hpp server.cpp
client.hpp client.cpp
- nodeclient.hpp nodeclient.cpp
+ protobufclient.hpp protobufclient.cpp
uriparser.hpp uriparser.cpp
)
@@ -27,6 +27,7 @@ target_link_libraries(cmix-network
PUBLIC ${CMAKE_THREAD_LIBS_INIT}
PRIVATE cmix
PRIVATE log
+ PRIVATE cmix-protobuf
)
if(WIN32)
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp
index f693afa..755989b 100644
--- a/libcmix-network/client.cpp
+++ b/libcmix-network/client.cpp
@@ -2,13 +2,6 @@
#include "connect.hpp"
-#include "logging.hpp"
-
-#include <boost/asio/placeholders.hpp>
-#include <boost/bind.hpp>
-#include <boost/asio/buffer.hpp>
-#include <boost/array.hpp>
-
#include <iostream>
using namespace boost::asio::ip;
@@ -39,26 +32,6 @@ std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length)
return buf;
}
-void Client::send(std::string message) {
-
- auto length_buffer = prepare_length_prefix(message.size());
-
- boost::array<boost::asio::const_buffer, 2> package = {
- boost::asio::buffer(length_buffer.data(), length_buffer.size()),
- boost::asio::buffer(message)
- };
-
- auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) {
- if(ec) {
- BOOST_LOG_TRIVIAL(fatal) << ec;
- throw std::runtime_error("unable to send message");
- }
- };
-
- socket.async_send(package, 0, handler);
-
-}
-
std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes)
{
buffer->commit(read_bytes);
@@ -103,15 +76,6 @@ void Client::handle_receive_size(Client::MessageHandler message_handler, error_c
}
}
-void Client::receive(MessageHandler message_handler) {
- using namespace boost::asio::placeholders;
-
- socket.async_receive(
- buffer->prepare(4),
- boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
- );
-}
-
void Client::close()
{
socket.close();
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp
index dc3787d..7bfea7d 100644
--- a/libcmix-network/client.hpp
+++ b/libcmix-network/client.hpp
@@ -1,7 +1,13 @@
#pragma once
+#include "logging.hpp"
+
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/streambuf.hpp>
+#include <boost/asio/placeholders.hpp>
+#include <boost/bind.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/array.hpp>
#include <array>
@@ -69,17 +75,44 @@ public:
*/
void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect);
+ inline static void foo() {}
+
/*!
* \brief send sends the string prefixed with it's length over the socket.
* \param message The string to be sent.
*/
- void send(std::string message);
+ template <typename F>
+ void async_send(std::string message, F on_sent) {
+ auto length_buffer = prepare_length_prefix(message.size());
+
+ boost::array<boost::asio::const_buffer, 2> package = {
+ boost::asio::buffer(length_buffer.data(), length_buffer.size()),
+ boost::asio::buffer(message)
+ };
+
+ auto handler = [on_sent](boost::system::error_code const& ec, std::size_t bytes_transferred) {
+ if(ec) {
+ BOOST_LOG_TRIVIAL(fatal) << ec;
+ throw std::runtime_error("unable to send message");
+ }
+ on_sent();
+ };
+
+ socket.async_send(package, 0, handler);
+ }
/*!
* \brief receive
* \param message_handler The function to call when a message has been received.
*/
- void receive(MessageHandler message_handler);
+ void receive(MessageHandler message_handler) {
+ using namespace boost::asio::placeholders;
+
+ socket.async_receive(
+ buffer->prepare(4),
+ boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
+ );
+ }
/*!
* \brief close Closes the underlying socket.
diff --git a/libcmix-network/nodeclient.cpp b/libcmix-network/nodeclient.cpp
deleted file mode 100644
index 9b026ba..0000000
--- a/libcmix-network/nodeclient.cpp
+++ /dev/null
@@ -1,22 +0,0 @@
-#include "nodeclient.hpp"
-
-#include <iostream>
-
-NodeClient::NodeClient(boost::asio::ip::tcp::socket &&socket)
-: client(std::move(socket))
-{}
-
-void NodeClient::handle_message(std::vector<uint8_t> message)
-{
- std::cout << std::string(message.begin(), message.end()) << std::endl;
-}
-
-void NodeClient::receive()
-{
- client.receive(std::bind(&NodeClient::handle_message, this, std::placeholders::_1));
-}
-
-void NodeClient::on_done(Client::OnDoneFT done) {
- client.on_done(done);
-}
-
diff --git a/libcmix-network/nodeclient.hpp b/libcmix-network/nodeclient.hpp
deleted file mode 100644
index ca1ee67..0000000
--- a/libcmix-network/nodeclient.hpp
+++ /dev/null
@@ -1,49 +0,0 @@
-#pragma once
-
-#include "client.hpp"
-
-#include <boost/asio/ip/tcp.hpp>
-
-#include <vector>
-
-/*!
- * \file
- */
-
-/*!
- * \brief The NodeClient class
- */
-class NodeClient
-{
- /*!
- * \brief client
- */
- Client client;
-
- /*!
- * \brief handle_message
- * \param message
- */
- void handle_message(std::vector<uint8_t> message);
-
-public:
-
- /*!
- * \brief NodeClient
- * \param socket
- */
- NodeClient(boost::asio::ip::tcp::socket&& socket);
-
- virtual ~NodeClient() = default;
-
- /*!
- * \brief receive
- */
- void receive();
-
- /*!
- * \brief on_done
- * \param done
- */
- void on_done(Client::OnDoneFT done);
-};
diff --git a/libcmix-network/protobufclient.cpp b/libcmix-network/protobufclient.cpp
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/libcmix-network/protobufclient.cpp
diff --git a/libcmix-network/protobufclient.hpp b/libcmix-network/protobufclient.hpp
new file mode 100644
index 0000000..1c97cfa
--- /dev/null
+++ b/libcmix-network/protobufclient.hpp
@@ -0,0 +1,148 @@
+#pragma once
+
+#include "client.hpp"
+
+#include "logging.hpp"
+#include "cmix.pb.h"
+
+#include <type_traits>
+
+struct Send;
+struct Receive;
+struct SendReceive;
+
+template<typename T>
+class ProtobufClient;
+
+typedef ProtobufClient<Receive> Receiver;
+typedef ProtobufClient<Send> Sender;
+typedef ProtobufClient<SendReceive> SenderReceiver;
+
+template<typename T = SendReceive>
+class ProtobufClient
+{
+ Client client;
+
+
+ #define MESSAGE_SETTER(TYPE, NAME) \
+ void message_setter(cmix_proto::CMixMessage& m, cmix_proto::TYPE const& v) { \
+ *m.mutable_##NAME() = v; \
+ } \
+
+ MESSAGE_SETTER(Initialization, initialization)
+ MESSAGE_SETTER(ImANode, imanode)
+ MESSAGE_SETTER(ImAClient, imaclient)
+ MESSAGE_SETTER(Bye, bye)
+ MESSAGE_SETTER(KeyExchange, keyexchange)
+
+ #undef MESSAGE_SETTER
+
+public:
+ /*!
+ * \brief ProtobufClient
+ * \param socket An rvalue reference to a socket it will now own and receive from.
+ */
+ ProtobufClient(boost::asio::ip::tcp::socket&& socket)
+ : client(std::move(socket))
+ {}
+
+ /*!
+ * \brief Move constructor for ProtobufClient.
+ */
+ ProtobufClient(ProtobufClient&& c) = default;
+
+ ProtobufClient(Client&& c)
+ : client(std::move(c))
+ {}
+
+ template<typename Ty>
+ ProtobufClient(ProtobufClient<Ty>&& r)
+ : client(std::move(r.client))
+ {}
+
+ /*!
+ * \brief Move assignment for ProtobufClient.
+ */
+ ProtobufClient& operator=(ProtobufClient&&) = default;
+
+ /*!
+ * \brief async_connect Asynchronously connects to next_host:port and calls on_connect
+ * \param next_host The host to connect to
+ * \param next_port The port to connect to
+ * \param on_connect The callback to call on a succes.
+ */
+ template<typename F>
+ void async_connect(std::string next_host, std::string next_port, F on_connect) {
+ client.async_connect(next_host, next_port, on_connect);
+ }
+
+
+ template<typename V, typename F, typename std::enable_if<(sizeof(V), std::is_same<T, Send>::value || std::is_same<T, SendReceive>::value), void>::type* = nullptr>
+ void async_send(V value, F on_sent) {
+ cmix_proto::CMixMessage m;
+ message_setter(m, value);
+ client.async_send(m.SerializeAsString(), on_sent);
+ }
+
+ /*!
+ * \brief send sends the string prefixed with it's length over the socket asynchronously.
+ * \param message The string to be sent.
+ */
+ template<typename V, typename std::enable_if<(sizeof(V), std::is_same<T, Send>::value || std::is_same<T, SendReceive>::value)>::type* = nullptr>
+ void async_send(V value) {
+ cmix_proto::CMixMessage m;
+ message_setter(m, value);
+ client.async_send(m.SerializeAsString(), []{});
+ }
+
+ /*!
+ * \brief receive
+ * \param message_handler The function to call when a message has been received.
+ */
+ template <typename F, typename std::enable_if<(sizeof(F), std::is_same<T, Receive>::value || std::is_same<T, SendReceive>::value)>::type* = nullptr>
+ void receive(F message_handler) {
+ auto f = [message_handler](std::vector<uint8_t> const& buffer) {
+ cmix_proto::CMixMessage message;
+ if(!message.ParseFromArray(buffer.data(), buffer.size())) {
+ BOOST_LOG_TRIVIAL(error) << "Received something which was not a CMixMessage";
+ throw std::runtime_error("Network communication was disrupted in a unrecoverable way.");
+ }
+ message_handler(message);
+ };
+ client.receive(f);
+ }
+
+ /*!
+ * \brief close Closes the underlying socket.
+ */
+ void close(){
+ client.close();
+ }
+
+ /*!
+ * \brief on_done sets the done callback.
+ * \param f The new done callback function.
+ */
+ template <typename F>
+ void on_done(F f) {
+ client.on_done(f);
+ }
+
+ friend Receiver make_receiver(Receiver&& r);
+
+ friend SenderReceiver make_sender_receiver(SenderReceiver&& r);
+ friend SenderReceiver make_sender_receiver(Receiver&& r);
+};
+
+inline Receiver make_receiver(Receiver&& r) {
+ return Receiver(std::move(r.client));
+}
+
+inline SenderReceiver make_sender_receiver(SenderReceiver&& r) {
+ return SenderReceiver(std::move(r.client));
+}
+
+inline SenderReceiver make_sender_receiver(Receiver&& r)
+{
+ return SenderReceiver(std::move(r.client));
+}