25 wrapper<LkT>::
set_options(
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short timeout, socket_priority priority,
std::size_t incoming_cpu)
noexcept(
false) {
30 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_LINGER, &details,
sizeof(::linger));
31 unsigned int sock_timeout=timeout;
32 JMMCG_SYSCALL_WRAPPER(
"Unable to set TCP_USER_TIMEOUT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, TCP_USER_TIMEOUT, &sock_timeout,
sizeof(
unsigned int));
34 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_REUSEADDR.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_REUSEADDR, &val,
sizeof(val));
35 JMMCG_SYSCALL_WRAPPER(
"Unable to set TCP_NODELAY.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, IPPROTO_TCP, TCP_NODELAY, &val,
sizeof(val));
36 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_SNDBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_SNDBUF, &max_message_size,
sizeof(max_message_size));
37 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_RCVBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_RCVBUF, &max_message_size,
sizeof(max_message_size));
38 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_RCVLOWAT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_RCVLOWAT, &min_message_size,
sizeof(min_message_size));
39 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_PRIORITY.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_PRIORITY, &priority,
sizeof(priority));
40 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_INCOMING_CPU.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_INCOMING_CPU, &incoming_cpu,
sizeof(incoming_cpu));
50 JMMCG_SYSCALL_WRAPPER(
"Unable to set SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, SO_LINGER, &details,
sizeof(::linger));
51 unsigned int sock_timeout=1;
52 JMMCG_SYSCALL_WRAPPER(
"Unable to set TCP_USER_TIMEOUT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt,
socket_, SOL_SOCKET, TCP_USER_TIMEOUT, &sock_timeout,
sizeof(
unsigned int));
59 wrapper<LkT>::ignore_sigpipe_for_a_socket_that_closes()
noexcept(
false) {
60 JMMCG_SYSCALL_WRAPPER(
"Could not ignore SIGPIPE.", _T(LIBJMMCG_VERSION_NUMBER), ::signal, SIGPIPE, SIG_IGN);
68 ignore_sigpipe_for_a_socket_that_closes();
75 ignore_sigpipe_for_a_socket_that_closes();
85 template<
class MsgT>
inline void
87 const typename thread_traits::cancellability set;
88 assert(message.is_valid());
89 ssize_t bytes_written=0;
90 const write_lock_t lk(
mutex_);
91 if constexpr (MsgT::has_static_size) {
92 assert(message.length()==
sizeof(MsgT));
94 while (
static_cast<
std::size_t>(bytes_written)<
sizeof(MsgT)) {
95 assert(
sizeof(MsgT)-bytes_written);
96 const int ret_code=::write(socket_,
reinterpret_cast<
char const *>(&message)+bytes_written,
sizeof(MsgT)-bytes_written);
98 bytes_written+=ret_code;
99 }
else if (ret_code<0) {
101 BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
120 assert(!
"Error writing data to the socket.");
128 assert(
static_cast<std::size_t>(bytes_written)==
sizeof(MsgT));
130 assert(message.length());
132 while (
static_cast<
std::size_t>(bytes_written)<message.length()) {
133 assert(message.length()-bytes_written);
134 const int ret_code=::write(socket_,
reinterpret_cast<
char const *>(&message)+bytes_written, message.length()-bytes_written);
136 bytes_written+=ret_code;
137 }
else if (ret_code<0) {
139 BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
158 assert(!
"Error writing data to the socket.");
166 assert(
static_cast<std::size_t>(bytes_written)<=
sizeof(MsgT));
167 assert(
static_cast<std::size_t>(bytes_written)==message.length());
172 template<
class V,
std::size_t N>
inline void
174 const typename thread_traits::cancellability set;
175 ssize_t bytes_written=0;
176 const write_lock_t lk(
mutex_);
178 while (
static_cast<
std::size_t>(bytes_written)<(
sizeof(V)*N)) {
179 assert(
sizeof(V)*N-bytes_written);
180 const int ret_code=::write(socket_,
reinterpret_cast<
char const *>(message.data())+bytes_written,
sizeof(V)*N-bytes_written);
182 bytes_written+=ret_code;
183 }
else if (ret_code<0) {
185 BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
204 assert(!
"Error writing data to the socket.");
212 assert(bytes_written>0);
213 assert(
static_cast<std::size_t>(bytes_written)==
sizeof(V)*N);
220 const typename thread_traits::cancellability set;
225 std::size_t bytes_read=0;
226 while (
UNLIKELY(bytes_read<msg_size)) {
227 const ssize_t ret_code=::read(socket_, dest+bytes_read, msg_size-bytes_read);
229 assert((bytes_read+ret_code)<=msg_size);
230 bytes_read+=ret_code;
231 }
else if (ret_code<0) {
233 BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
252 assert(!
"Failed to read from socket.");
260 assert(bytes_read==msg_size);
266 template<
class MsgT>
inline bool
268 if constexpr (MsgT::has_static_size) {
269 assert(dest.length()<=
sizeof(MsgT));
270 BOOST_MPL_ASSERT_RELATION(
sizeof(MsgT), <=, SSIZE_MAX);
271 const bool read_fail=
read_msg(reinterpret_cast<
std::uint8_t *>(&dest)
, sizeof(MsgT)
);
272 assert(!(read_fail && dest.is_valid()));
275 constexpr std::size_t header_t_sz=MsgT::header_t_size;
276 const bool read_err=
read_msg(reinterpret_cast<
std::uint8_t *>(&dest)
, header_t_sz
);
278 typename MsgT::Header_t
const *hdr=
reinterpret_cast<
typename MsgT::Header_t
const *>(&dest);
279 const std::size_t length=hdr->length();
280 if (length>=header_t_sz && length<=SSIZE_MAX && length<=
sizeof(MsgT)) {
281 const std::size_t body_size=length-header_t_sz;
283 const bool read_fail=
read_msg(reinterpret_cast<
std::uint8_t *>(&dest)+header_t_sz
, length-header_t_sz
);
284 assert(read_fail || dest.is_valid());
287 assert(hdr->is_valid());
300 template<
class V,
std::size_t SrcSz>
inline bool
302 BOOST_MPL_ASSERT_RELATION(
sizeof(V)*SrcSz, <=, SSIZE_MAX);
307 template<
class MsgDetails,
class V,
std::size_t N>
inline bool
309 using msg_details_t=MsgDetails;
311 BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, >=, msg_details_t::header_t_size);
313 const bool read_err=
read_msg(reinterpret_cast<
std::uint8_t *>(buff.data())
, msg_details_t::header_t_size
);
315 typename msg_details_t::Header_t
const *hdr=
reinterpret_cast<
typename msg_details_t::Header_t
const *>(buff.data());
316 const std::size_t length=hdr->length();
317 if (length>=msg_details_t::header_t_size && length<=SSIZE_MAX && length<=(N*
sizeof(V))) {
318 const std::size_t body_size=length-msg_details_t::header_t_size;
320 const bool read_fail=
read_msg(reinterpret_cast<
std::uint8_t *>(buff.data())+msg_details_t::header_t_size
, length-msg_details_t::header_t_size
);
321 assert(read_fail || hdr->is_valid());
324 assert(hdr->is_valid());
339 socklen_t error_code_size =
sizeof(error_code);
340 return ::getsockopt(
socket_, SOL_SOCKET, SO_ERROR, &error_code, &error_code_size)==0;
346 std::ostringstream ss;
354 operator<<(
std::ostream &os,
wrapper<LkT>
const &ec)
noexcept(
false) {
366 : base_t(opened_skt) {
372 : base_t(type, domain) {
378 ::sockaddr_in addr_in;
379 addr_in.sin_family=AF_INET;
380 ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
381 addr_in.sin_port=htons(port);
382 std::fill_n(addr_in.sin_zero,
sizeof(addr_in.sin_zero), 0);
384 JMMCG_SYSCALL_WRAPPER(
"Unable to connect socket.", _T(LIBJMMCG_VERSION_NUMBER), ::connect,
this->socket_,
reinterpret_cast<::sockaddr
const *>(&addr_in),
sizeof(::sockaddr_in));
398 std::function<
void(
typename socket_t::socket_type )>
fn;
400 template<
class RecvProcMsgs>
constexpr
408 wrapper<LkT>::
wrapper(
char const *addr, uint16_t port,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short timeout,
socket_priority priority,
std::size_t incoming_cpu, type_t type, domain_t domain)
noexcept(
false)
409 : base_t(type, domain) {
410 ::sockaddr_in addr_in;
411 addr_in.sin_family=AF_INET;
412 ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
413 addr_in.sin_port=htons(port);
414 std::fill_n(addr_in.sin_zero,
sizeof(addr_in.sin_zero), 0);
416 JMMCG_SYSCALL_WRAPPER(
"Unable to bind socket.", _T(LIBJMMCG_VERSION_NUMBER), ::bind,
this->socket_,
reinterpret_cast<::sockaddr
const *>(&addr_in),
sizeof(::sockaddr_in));
422 base_t::set_options(min_message_size, max_message_size, timeout, priority, incoming_cpu);
428 return static_cast<
bool>(exited_);
437 std::rethrow_exception(ex);
452 explicit constexpr set_exited(
std::atomic<
bool> &exited)
noexcept(
true)
455 ~set_exited()
noexcept(
true) {
460 std::atomic<
bool> &exited_;
463 const set_exited exited(exited_);
465 while (!
static_cast<
bool>(exit_)) {
472 const int poll_ret=
JMMCG_SYSCALL_WRAPPER(
"Unable to accept a client connection, re-trying.", _T(LIBJMMCG_VERSION_NUMBER), ::poll, &fds, 1, 0);
474 thread_traits::sleep(0);
478 ::sockaddr_in client_addr;
479 ::socklen_t len=
sizeof(::sockaddr_in);
480 const int accept_skt=
JMMCG_SYSCALL_WRAPPER(
"Unable to accept a client connection, re-trying.", _T(LIBJMMCG_VERSION_NUMBER), ::accept,
this->socket_,
reinterpret_cast<::sockaddr *>(&client_addr), &len);
481 assert(len==
sizeof(sockaddr_in));
482 assert(client_addr.sin_port!=0);
483 const std::shared_ptr<recv_msg_ops_t> recv_msg_ops_cpy(recv_msg_ops);
484 if (recv_msg_ops_cpy.get()) {
485 recv_msg_ops_cpy->fn(accept_skt);
488 }
catch (std::exception
const &e) {
490 ex=std::make_exception_ptr(e);
493 }
catch (
std::exception
const &e) {
495 ex=
std::make_exception_ptr(e);
506 socklen_t buff_sz=
sizeof(details);
507 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_LINGER, &details, &buff_sz);
509 buff_sz=
sizeof(snd_msg_sz);
510 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_SNDBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_SNDBUF, &snd_msg_sz, &buff_sz);
512 buff_sz=
sizeof(rcv_msg_sz);
513 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_RCVBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_RCVBUF, &rcv_msg_sz, &buff_sz);
515 buff_sz=
sizeof(rcv_msg_lowat);
516 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_RCVLOWAT.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_RCVLOWAT, &rcv_msg_lowat, &buff_sz);
519 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_PRIORITY.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_PRIORITY, &priority, &buff_sz);
521 buff_sz=
sizeof(incoming_cpu);
522 JMMCG_SYSCALL_WRAPPER(
"Unable to get SO_INCOMING_CPU.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt,
this->socket_, SOL_SOCKET, SO_INCOMING_CPU, &incoming_cpu, &buff_sz);
523 skt.set_options(rcv_msg_lowat,
std::max(snd_msg_sz, rcv_msg_sz), details.l_linger, priority, incoming_cpu);
527 template<
class RecvProcMsgs>
inline void
529 recv_msg_ops=
std::make_shared<recv_msg_ops_t>(
std::move(f));