aboutsummaryrefslogtreecommitdiff
path: root/libcmix-network/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'libcmix-network/client.cpp')
-rw-r--r--libcmix-network/client.cpp83
1 files changed, 79 insertions, 4 deletions
diff --git a/libcmix-network/client.cpp b/libcmix-network/client.cpp
index ce9fd4d..d8e79d7 100644
--- a/libcmix-network/client.cpp
+++ b/libcmix-network/client.cpp
@@ -1,9 +1,13 @@
#include "client.hpp"
+#include "connect.hpp"
+
#include "logging.hpp"
#include <boost/asio/placeholders.hpp>
#include <boost/bind.hpp>
+#include <boost/asio/buffer.hpp>
+#include <boost/array.hpp>
#include <iostream>
@@ -16,17 +20,83 @@ Client::Client(tcp::socket &&socket)
, done()
{}
-void Client::handle_receive(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+void Client::async_connect(std::string next_host, std::string next_port, std::function<void ()> on_connect)
+{
+ ::async_connect(socket, next_host, next_port, on_connect);
+}
+
+std::array<uint8_t, 4> Client::prepare_length_prefix(uint32_t length)
+{
+ length = htonl(length);
+ std::array<uint8_t, 4> buf;
+
+ std::copy(reinterpret_cast<uint8_t*>(&length), reinterpret_cast<uint8_t*>(&length) + 4, buf.data());
+ return buf;
+}
+
+void Client::send(std::string message) {
+
+ boost::array<boost::asio::const_buffer, 2> package = {
+ boost::asio::buffer(prepare_length_prefix(message.size())),
+ boost::asio::buffer(message)
+ };
+
+ auto handler = [](boost::system::error_code const& ec, std::size_t bytes_transferred) {
+ BOOST_LOG_TRIVIAL(trace) << "sent message of " << bytes_transferred << " bytes";
+ if(ec) {
+ BOOST_LOG_TRIVIAL(fatal) << ec;
+ throw std::runtime_error("unable to send message");
+ }
+ };
+
+ socket.async_send(package, 0, handler);
+
+}
+
+std::vector<uint8_t> Client::received_bytes_to_vector(size_t read_bytes)
{
buffer.commit(read_bytes);
std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
+ return std::vector<uint8_t>(std::istream_iterator<uint8_t>(is), {});
+}
- BOOST_LOG_TRIVIAL(trace) << "handling receive";
+void Client::handle_receive_size(Client::MessageHandler message_handler, const error_code& ec, size_t read_bytes)
+{
+ using namespace boost::asio::placeholders;
+
+ if(!ec) {
+ std::vector<uint8_t> data = received_bytes_to_vector(read_bytes);
+ uint32_t size;
+ std::copy(data.begin(), data.end(), reinterpret_cast<uint8_t*>(&size));
+ size = ntohl(size);
+
+ socket.async_receive(
+ buffer.prepare(size),
+ boost::bind(&Client::handle_receive_message, this, message_handler, error(), bytes_transferred())
+ );
+ } else {
+ BOOST_LOG_TRIVIAL(error) << ec;
+ if(done) {
+ done();
+ }
+ }
+}
+void Client::handle_receive_message(MessageHandler message_handler, const error_code &ec, size_t read_bytes)
+{
+ BOOST_LOG_TRIVIAL(trace) << "handling receive of: " << read_bytes << " bytes";
if(!ec) {
+ buffer.commit(read_bytes);
+ std::istream is(&buffer);
+ is.unsetf(std::ios::skipws);
+
std::vector<uint8_t> data(std::istream_iterator<uint8_t>(is), {});
+ BOOST_LOG_TRIVIAL(trace) << "data.size(): " << data.size();
message_handler(data);
} else {
+ BOOST_LOG_TRIVIAL(error) << ec;
if(done) {
done();
}
@@ -37,11 +107,16 @@ void Client::receive(MessageHandler message_handler) {
using namespace boost::asio::placeholders;
socket.async_receive(
- buffer.prepare(512),
- boost::bind(&Client::handle_receive, this, message_handler, error(), bytes_transferred())
+ buffer.prepare(4),
+ boost::bind(&Client::handle_receive_size, this, message_handler, error(), bytes_transferred())
);
}
+void Client::close()
+{
+ socket.close();
+}
+
void Client::on_done(Client::OnDoneFT f) {
done = f;
}