aboutsummaryrefslogtreecommitdiff
path: root/libcmix-network
diff options
context:
space:
mode:
Diffstat (limited to 'libcmix-network')
-rw-r--r--libcmix-network/client.cpp83
-rw-r--r--libcmix-network/client.hpp30
2 files changed, 95 insertions, 18 deletions
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp
index ce9fd4d..d8e79d7 100644
--- a/libcmix-network/client.cpp
+++ b/libcmix-network/client.cpp
@@ -1,9 +1,13 @@
#include "client.hpp"
+#include "connect.hpp"
+
#include "logging.hpp"
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/array.hpp>
#include <iostream>
@@ -16,17 +20,83 @@ Client::Client(tcp::socket &&socket)
, done()
{}
-void Client::handle_receive(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect)
+{
+ ::async_connect(socket, next_host, next_port, on_connect);
+}
+
+std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length)
+{
+ length = htonl(length);
+ std::array<uint8_t, 4> buf;
+
+ std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
+ return buf;
+}
+
+void Client::send(std::string message) {
+
+ boost::array<boost::asio::const_buffer, 2> package = {
+ boost::asio::buffer(prepare_length_prefix(message.size())),
+ boost::asio::buffer(message)
+ };
+
+ auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) {
+ BOOST_LOG_TRIVIAL(trace) << "sent message of " << bytes_transferred << " bytes";
+ if(ec) {
+ BOOST_LOG_TRIVIAL(fatal) << ec;
+ throw std::runtime_error("unable to send message");
+ }
+ };
+
+ socket.async_send(package, 0, handler);
+
+}
+
+std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes)
{
buffer.commit(read_bytes);
std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
+ return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
+}
- BOOST_LOG_TRIVIAL(trace) << "handling receive";
+void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes)
+{
+ using namespace boost::asio::placeholders;
+
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ uint32_t size;
+ std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
+ size = ntohl(size);
+
+ socket.async_receive(
+ buffer.prepare(size),
+ boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred())
+ );
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec;
+ if(done) {
+ done();
+ }
+ }
+}
+void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+{
+ BOOST_LOG_TRIVIAL(trace) << "handling receive of: " << read_bytes << " bytes";
if(!ec) {
+ buffer.commit(read_bytes);
+ std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
std::vector<uint8_t> data(std::istream_iterator<uint8_t>(is), {});
+ BOOST_LOG_TRIVIAL(trace) << "data.size(): " << data.size();
message_handler(data);
} else {
+ BOOST_LOG_TRIVIAL(error) << ec;
if(done) {
done();
}
@@ -37,11 +107,16 @@ void Client::receive(MessageHandler message_handler) {
using namespace boost::asio::placeholders;
socket.async_receive(
- buffer.prepare(512),
- boost::bind(&Client::handle_receive, this, message_handler, error(), bytes_transferred())
+ buffer.prepare(4),
+ boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
);
}
+void Client::close()
+{
+ socket.close();
+}
+
void Client::on_done(Client::OnDoneFT f) {
done = f;
}
diff --git a/libcmix-network/client.hpp b/libcmix-network/client.hpp
index 981283b..cb81418 100644
--- a/libcmix-network/client.hpp
+++ b/libcmix-network/client.hpp
@@ -31,24 +31,18 @@ protected:
boost::asio::ip::tcp::socket socket;
private:
- /*!
- * \brief buffer Internal private buffer used to receive messages.
- */
boost::asio::streambuf buffer;
- /*!
- * \brief done The done function being called when this is done operating.
- */
OnDoneFT done;
+
+ std::vector<uint8_t> received_bytes_to_vector(size_t read_bytes);
+
+ void handle_receive_size(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
+
+ void handle_receive_message(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
- /*!
- * \brief handle_receive
- * \param message_handler The function to call when a message has been received.
- * \param ec A possible error that occured during receiving.
- * \param read_bytes The number of bytes read.
- */
- void handle_receive(MessageHandler message_handler, boost::system::error_code const& ec, size_t read_bytes);
-
+ std::array<uint8_t, 4> prepare_length_prefix(uint32_t length);
+
public:
/*!
* \brief Client
@@ -56,15 +50,23 @@ public:
*/
Client(boost::asio::ip::tcp::socket&& socket);
+ void async_connect(std::string next_host, std::string next_port, std::function<void()> on_connect);
+
+ void send(std::string message);
+
/*!
* \brief receive
* \param message_handler The function to call when a message has been received.
*/
void receive(MessageHandler message_handler);
+ void close();
+
/*!
* \brief on_done sets the done callback.
* \param f The new done callback function.
*/
void on_done(OnDoneFT f);
+
+
};