25 socket_.set_option(boost::asio::socket_base::send_buffer_size(max_message_size));
27 socket_.set_option(boost::asio::socket_base::receive_buffer_size(max_message_size));
28 socket_.set_option(boost::asio::socket_base::linger(
true, timeout));
29 socket_.set_option(boost::asio::ip::tcp::no_delay(
true));
41 : socket_(
std::move(socket)) {
47 socket_.connect(endpoint);
51 template<
class MsgT>
inline void
53 boost::system::error_code io_error;
54 const write_lock_t lk(
mutex_);
55 if constexpr (MsgT::has_static_size) {
56 using raw_buff_t=
char const [
sizeof(MsgT)];
57 assert(message.length()==
sizeof(MsgT));
58 [[maybe_unused]]
const std::size_t bytes_written=
boost::
asio::write(socket_,
boost::
asio::buffer(
reinterpret_cast<raw_buff_t &>(message)), io_error);
62 throw boost::system::system_error(io_error);
64 assert(bytes_written==
sizeof(MsgT));
66 [[maybe_unused]]
const std::size_t bytes_written=
boost::
asio::write(socket_,
boost::
asio::buffer(
reinterpret_cast<
std::uint8_t
const *>(&message), message.length()), io_error);
70 throw boost::system::system_error(io_error);
72 assert(bytes_written<=
sizeof(MsgT));
73 assert(bytes_written==message.length());
78 template<
class V,
std::size_t N>
inline void
80 boost::system::error_code io_error;
81 const write_lock_t lk(
mutex_);
82 [[maybe_unused]]
const std::size_t bytes_written=
boost::
asio::write(socket_,
boost::
asio::buffer(message), io_error);
86 throw boost::system::system_error(io_error);
88 assert(bytes_written==
sizeof(V)*N);
92 template<
class MsgT>
inline void
94 boost::system::error_code io_error;
95 if constexpr (MsgT::has_static_size) {
96 using raw_buff_t=
char [
sizeof(MsgT)];
97 [[maybe_unused]]
const std::size_t bytes_read=
boost::
asio::read(socket_,
boost::
asio::buffer(
reinterpret_cast<raw_buff_t &>(dest)), io_error);
101 throw boost::system::system_error(io_error);
103 assert(bytes_read>0);
104 assert(bytes_read==
sizeof(MsgT));
105 assert(dest.length()<=
sizeof(MsgT));
107 constexpr std::size_t header_t_sz=MsgT::header_t_size;
108 [[maybe_unused]]
const std::size_t bytes_read=
boost::
asio::read(socket_,
boost::
asio::buffer(
reinterpret_cast<
std::uint8_t *>(&dest), header_t_sz), io_error);
112 throw boost::system::system_error(io_error);
114 assert(bytes_read>0);
115 assert(bytes_read==header_t_sz);
116 typename MsgT::Header_t
const *hdr=
reinterpret_cast<
typename MsgT::Header_t
const *>(&dest);
117 const std::size_t length=hdr->length();
118 if (length>=header_t_sz && length<=
sizeof(MsgT)) {
119 const std::size_t body_size=length-header_t_sz;
120 [[maybe_unused]]
const std::size_t bytes_read_body=
boost::
asio::read(socket_,
boost::
asio::buffer(
reinterpret_cast<
std::uint8_t *>(&dest)+header_t_sz, body_size), io_error);
124 throw boost::system::system_error(io_error);
126 assert(bytes_read_body==body_size);
131 assert(dest.is_valid());
135 template<
class V,
std::size_t SrcSz>
inline void
137 boost::system::error_code io_error;
138 [[maybe_unused]]
const std::size_t bytes_read=
boost::
asio::read(socket_,
boost::
asio::buffer(dest), io_error);
142 throw boost::system::system_error(io_error);
144 assert(bytes_read==
sizeof(V)*SrcSz);
148 template<
class MsgDetails,
class V,
std::size_t N>
inline bool
150 using msg_details_t=MsgDetails;
152 boost::system::error_code io_error;
153 assert(socket_.is_open());
154 BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, >=, msg_details_t::header_t_size);
155 BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, <=, N*
sizeof(V));
156 [[maybe_unused]]
std::size_t bytes_read=
boost::
asio::read(socket_,
boost::
asio::buffer(buff, msg_details_t::header_t_size), io_error);
160 throw boost::system::system_error(io_error);
162 assert(bytes_read<=N);
163 assert(bytes_read<=msg_details_t::max_msg_size);
164 assert(bytes_read==msg_details_t::header_t_size);
165 typename msg_details_t::Header_t
const *hdr=
reinterpret_cast<
typename msg_details_t::Header_t
const *>(buff.data());
166 const std::size_t length=hdr->length();
167 if (length>=msg_details_t::header_t_size && length<=msg_details_t::max_msg_size) {
168 const std::size_t body_size=length-msg_details_t::header_t_size;
170 bytes_read=
boost::
asio::read(socket_,
boost::
asio::buffer(&*
std::next(buff.begin(), msg_details_t::header_t_size), body_size), io_error);
173 }
else if (io_error) {
174 throw boost::system::system_error(io_error);
176 assert(bytes_read<=msg_details_t::max_msg_size);
177 assert(bytes_read==body_size);
179 assert(hdr->is_valid());
189 return socket_.is_open();
195 socket_.set_option(boost::asio::socket_base::linger(
true, 1));
196 boost::system::error_code ec;
197 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_send, ec);
203 std::ostringstream ss;
205 <<
"socket_="<<
const_cast<
boost::
asio::
ip::tcp::socket &>(socket_).native_handle();