libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
socket_wrapper_glibc_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2015 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace socket { namespace glibc {
20 
21 namespace basic {
22 
23 template<class LkT>
24 inline void
25 wrapper<LkT>::set_options(std::size_t min_message_size, std::size_t max_message_size, unsigned short timeout, socket_priority priority, std::size_t incoming_cpu) noexcept(false) {
26  ::linger details{
27  1, // Enabled.
28  timeout
29  };
30  JMMCG_SYSCALL_WRAPPER("Unable to set SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_LINGER, &details, sizeof(::linger));
31  unsigned int sock_timeout=timeout;
32  JMMCG_SYSCALL_WRAPPER("Unable to set TCP_USER_TIMEOUT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, TCP_USER_TIMEOUT, &sock_timeout, sizeof(unsigned int));
33  int val=1;
34  JMMCG_SYSCALL_WRAPPER("Unable to set SO_REUSEADDR.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
35  JMMCG_SYSCALL_WRAPPER("Unable to set TCP_NODELAY.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
36  JMMCG_SYSCALL_WRAPPER("Unable to set SO_SNDBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_SNDBUF, &max_message_size, sizeof(max_message_size));
37  JMMCG_SYSCALL_WRAPPER("Unable to set SO_RCVBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_RCVBUF, &max_message_size, sizeof(max_message_size));
38  JMMCG_SYSCALL_WRAPPER("Unable to set SO_RCVLOWAT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_RCVLOWAT, &min_message_size, sizeof(min_message_size));
39  JMMCG_SYSCALL_WRAPPER("Unable to set SO_PRIORITY.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_PRIORITY, &priority, sizeof(priority));
40  JMMCG_SYSCALL_WRAPPER("Unable to set SO_INCOMING_CPU.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_INCOMING_CPU, &incoming_cpu, sizeof(incoming_cpu));
41 }
42 
43 template<class LkT>
44 inline void
45 wrapper<LkT>::close() noexcept(true) {
46  ::linger details{
47  1, // Enabled.
48  1
49  };
50  JMMCG_SYSCALL_WRAPPER("Unable to set SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, SO_LINGER, &details, sizeof(::linger));
51  unsigned int sock_timeout=1;
52  JMMCG_SYSCALL_WRAPPER("Unable to set TCP_USER_TIMEOUT.", _T(LIBJMMCG_VERSION_NUMBER), ::setsockopt, socket_, SOL_SOCKET, TCP_USER_TIMEOUT, &sock_timeout, sizeof(unsigned int));
53  // Perform a TCP half-close: no longer permit sending, but received until the connected client closes. See section 18.5 in W.R.Stevens "TCP/IP Illustrated. The Protocols", Vol 1, 1st Edition.
54  ::shutdown(socket_, SHUT_WR);
55 }
56 
57 template<class LkT>
58 inline void
59 wrapper<LkT>::ignore_sigpipe_for_a_socket_that_closes() noexcept(false) {
60  JMMCG_SYSCALL_WRAPPER("Could not ignore SIGPIPE.", _T(LIBJMMCG_VERSION_NUMBER), ::signal, SIGPIPE, SIG_IGN);
61 }
62 
63 template<class LkT>
64 inline
65 wrapper<LkT>::wrapper(socket_type opened_skt) noexcept(false)
66 : socket_(opened_skt) {
67  assert(socket_);
68  ignore_sigpipe_for_a_socket_that_closes();
69 }
70 
71 template<class LkT>
72 inline
73 wrapper<LkT>::wrapper(type_t type, domain_t domain) noexcept(false)
74 : socket_(JMMCG_SYSCALL_WRAPPER("Unable to create the socket.", _T(LIBJMMCG_VERSION_NUMBER), ::socket, domain, type|SOCK_CLOEXEC, 0)) {
75  ignore_sigpipe_for_a_socket_that_closes();
76 }
77 
78 template<class LkT>
79 inline
80 wrapper<LkT>::~wrapper() noexcept(true) {
81  ::close(socket_);
82 }
83 
84 template<class LkT>
85 template<class MsgT> inline void
86 wrapper<LkT>::write(MsgT const &message) noexcept(true) {
87  const typename thread_traits::cancellability set;
88  assert(message.is_valid());
89  ssize_t bytes_written=0;
90  const write_lock_t lk(mutex_);
91  if constexpr (MsgT::has_static_size) {
92  assert(message.length()==sizeof(MsgT));
93  // Must be done in a loop because in very rare circumstances under high loads, we can get partial writes.
94  while (static_cast<std::size_t>(bytes_written)<sizeof(MsgT)) {
95  assert(sizeof(MsgT)-bytes_written);
96  const int ret_code=::write(socket_, reinterpret_cast<char const *>(&message)+bytes_written, sizeof(MsgT)-bytes_written);
97  if (LIKELY(ret_code>0)) {
98  bytes_written+=ret_code;
99  } else if (ret_code<0) {
100  const int err=errno;
101  BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
102  switch (err) {
103  case EAGAIN:
104  case EINTR:
105  case EISCONN:
106  case ETIMEDOUT:
107  break;
108  case EBADF:
109  case EIO:
110  case EPIPE:
111  case ENETDOWN:
112  case ENETUNREACH:
113  case ENETRESET:
114  case ECONNABORTED:
115  case ECONNRESET:
116  case ENOTCONN:
117  case ESHUTDOWN:
118  return;
119  default:
120  assert(!"Error writing data to the socket.");
121  return;
122  }
123  } else {
124  assert(ret_code==0); // EOF
125  return;
126  }
127  }
128  assert(static_cast<std::size_t>(bytes_written)==sizeof(MsgT));
129  } else {
130  assert(message.length());
131  // Must be done in a loop because in very rare circumstances under high loads, we can get partial writes.
132  while (static_cast<std::size_t>(bytes_written)<message.length()) {
133  assert(message.length()-bytes_written);
134  const int ret_code=::write(socket_, reinterpret_cast<char const *>(&message)+bytes_written, message.length()-bytes_written);
135  if (LIKELY(ret_code>0)) {
136  bytes_written+=ret_code;
137  } else if (ret_code<0) {
138  const int err=errno;
139  BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
140  switch (err) {
141  case EAGAIN:
142  case EINTR:
143  case EISCONN:
144  case ETIMEDOUT:
145  break;
146  case EBADF:
147  case EIO:
148  case EPIPE:
149  case ENETDOWN:
150  case ENETUNREACH:
151  case ENETRESET:
152  case ECONNABORTED:
153  case ECONNRESET:
154  case ENOTCONN:
155  case ESHUTDOWN:
156  return;
157  default:
158  assert(!"Error writing data to the socket.");
159  return;
160  }
161  } else {
162  assert(ret_code==0); // EOF
163  return;
164  }
165  }
166  assert(static_cast<std::size_t>(bytes_written)<=sizeof(MsgT));
167  assert(static_cast<std::size_t>(bytes_written)==message.length());
168  }
169 }
170 
171 template<class LkT>
172 template<class V, std::size_t N> inline void
173 wrapper<LkT>::write(std::array<V, N> const &message) noexcept(true) {
174  const typename thread_traits::cancellability set;
175  ssize_t bytes_written=0;
176  const write_lock_t lk(mutex_);
177  // Must be done in a loop because in very rare circumstances under high loads, we can get partial writes.
178  while (static_cast<std::size_t>(bytes_written)<(sizeof(V)*N)) {
179  assert(sizeof(V)*N-bytes_written);
180  const int ret_code=::write(socket_, reinterpret_cast<char const *>(message.data())+bytes_written, sizeof(V)*N-bytes_written);
181  if (LIKELY(ret_code>0)) {
182  bytes_written+=ret_code;
183  } else if (ret_code<0) {
184  const int err=errno;
185  BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
186  switch (err) {
187  case EAGAIN:
188  case EINTR:
189  case EISCONN:
190  case ETIMEDOUT:
191  break;
192  case EBADF:
193  case EIO:
194  case EPIPE:
195  case ENETDOWN:
196  case ENETUNREACH:
197  case ENETRESET:
198  case ECONNABORTED:
199  case ECONNRESET:
200  case ENOTCONN:
201  case ESHUTDOWN:
202  return;
203  default:
204  assert(!"Error writing data to the socket.");
205  return;
206  }
207  } else {
208  assert(ret_code==0); // EOF
209  return;
210  }
211  }
212  assert(bytes_written>0);
213  assert(static_cast<std::size_t>(bytes_written)==sizeof(V)*N);
214 }
215 
216 template<class LkT>
217 inline bool
218 wrapper<LkT>::read_msg(std::uint8_t *dest, std::size_t msg_size) noexcept(true) {
219  if (msg_size>0) {
220  const typename thread_traits::cancellability set;
221  // Try to avoid repeated calls to the kernel, etc, whilst receiving the message: do it in one lot, hopefully...
222  // Sadly this seems to slow it down by about 7%...
223 // std::size_t bytes_to_read=msg_size-1;
224 // ::setsockopt(socket_, SOL_SOCKET, SO_RCVLOWAT, &bytes_to_read, sizeof(bytes_to_read));
225  std::size_t bytes_read=0;
226  while (UNLIKELY(bytes_read<msg_size)) {
227  const ssize_t ret_code=::read(socket_, dest+bytes_read, msg_size-bytes_read);
228  if (LIKELY(ret_code>0)) {
229  assert((bytes_read+ret_code)<=msg_size);
230  bytes_read+=ret_code;
231  } else if (ret_code<0) {
232  const int err=errno;
233  BOOST_MPL_ASSERT_RELATION(EAGAIN, ==, EWOULDBLOCK);
234  switch (err) {
235  case EAGAIN:
236  case EINTR:
237  case EISCONN:
238  case ETIMEDOUT:
239  break;
240  case EBADF:
241  case EFAULT:
242  case EIO:
243  case ENETDOWN:
244  case ENETUNREACH:
245  case ENETRESET:
246  case ECONNABORTED:
247  case ECONNRESET:
248  case ENOTCONN:
249  case ESHUTDOWN:
250  return true;
251  default:
252  assert(!"Failed to read from socket.");
253  return true;
254  }
255  } else {
256  assert(ret_code==0); // EOF
257  return true;
258  }
259  }
260  assert(bytes_read==msg_size);
261  }
262  return false;
263 }
264 
265 template<class LkT>
266 template<class MsgT> inline bool
267 wrapper<LkT>::read(MsgT &dest) noexcept(true) {
268  if constexpr (MsgT::has_static_size) {
269  assert(dest.length()<=sizeof(MsgT));
270  BOOST_MPL_ASSERT_RELATION(sizeof(MsgT), <=, SSIZE_MAX);
271  const bool read_fail=read_msg(reinterpret_cast<std::uint8_t *>(&dest), sizeof(MsgT));
272  assert(!(read_fail && dest.is_valid()));
273  return read_fail;
274  } else {
275  constexpr std::size_t header_t_sz=MsgT::header_t_size;
276  const bool read_err=read_msg(reinterpret_cast<std::uint8_t *>(&dest), header_t_sz);
277  if (LIKELY(!read_err)) {
278  typename MsgT::Header_t const *hdr=reinterpret_cast<typename MsgT::Header_t const *>(&dest);
279  const std::size_t length=hdr->length();
280  if (length>=header_t_sz && length<=SSIZE_MAX && length<=sizeof(MsgT)) {
281  const std::size_t body_size=length-header_t_sz;
282  if (body_size>0) {
283  const bool read_fail=read_msg(reinterpret_cast<std::uint8_t *>(&dest)+header_t_sz, length-header_t_sz);
284  assert(read_fail || dest.is_valid());
285  return read_fail;
286  } else {
287  assert(hdr->is_valid());
288  return false;
289  }
290  } else {
291  return true;
292  }
293  } else {
294  return true;
295  }
296  }
297 }
298 
299 template<class LkT>
300 template<class V, std::size_t SrcSz> inline bool
301 wrapper<LkT>::read(V (& dest)[SrcSz]) noexcept(true) {
302  BOOST_MPL_ASSERT_RELATION(sizeof(V)*SrcSz, <=, SSIZE_MAX);
303  return read_msg(dest, sizeof(V)*SrcSz);
304 }
305 
306 template<class LkT>
307 template<class MsgDetails, class V, std::size_t N> inline bool
308 wrapper<LkT>::read(std::array<V, N> &buff) noexcept(false) {
309  using msg_details_t=MsgDetails;
310 
311  BOOST_MPL_ASSERT_RELATION(msg_details_t::max_msg_size, >=, msg_details_t::header_t_size);
312 
313  const bool read_err=read_msg(reinterpret_cast<std::uint8_t *>(buff.data()), msg_details_t::header_t_size);
314  if (LIKELY(!read_err)) {
315  typename msg_details_t::Header_t const *hdr=reinterpret_cast<typename msg_details_t::Header_t const *>(buff.data());
316  const std::size_t length=hdr->length();
317  if (length>=msg_details_t::header_t_size && length<=SSIZE_MAX && length<=(N*sizeof(V))) {
318  const std::size_t body_size=length-msg_details_t::header_t_size;
319  if (body_size>0) {
320  const bool read_fail=read_msg(reinterpret_cast<std::uint8_t *>(buff.data())+msg_details_t::header_t_size, length-msg_details_t::header_t_size);
321  assert(read_fail || hdr->is_valid());
322  return read_fail;
323  } else {
324  assert(hdr->is_valid());
325  return false;
326  }
327  } else {
328  return true;
329  }
330  } else {
331  return true;
332  }
333 }
334 
335 template<class LkT>
336 inline bool
337 wrapper<LkT>::is_open() const noexcept(true) {
338  int error_code;
339  socklen_t error_code_size = sizeof(error_code);
340  return ::getsockopt(socket_, SOL_SOCKET, SO_ERROR, &error_code, &error_code_size)==0;
341 }
342 
343 template<class LkT>
344 inline std::string
345 wrapper<LkT>::to_string() const noexcept(false) {
346  std::ostringstream ss;
347  ss
348  <<"socket_="<<socket_;
349  return ss.str();
350 }
351 
352 template<class LkT>
353 inline std::ostream &
354 operator<<(std::ostream &os, wrapper<LkT> const &ec) noexcept(false) {
355  os<<ec.to_string();
356  return os;
357 }
358 
359 }
360 
361 namespace client {
362 
363 template<class LkT>
364 inline
365 wrapper<LkT>::wrapper(socket_type opened_skt) noexcept(false)
366 : base_t(opened_skt) {
367 }
368 
369 template<class LkT>
370 inline
371 wrapper<LkT>::wrapper(type_t type, domain_t domain) noexcept(false)
372 : base_t(type, domain) {
373 }
374 
375 template<class LkT>
376 inline void
377 wrapper<LkT>::connect(char const *addr, uint16_t port) noexcept(false) {
378  ::sockaddr_in addr_in;
379  addr_in.sin_family=AF_INET;
380  ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
381  addr_in.sin_port=htons(port);
382  std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
383  try {
384  JMMCG_SYSCALL_WRAPPER("Unable to connect socket.", _T(LIBJMMCG_VERSION_NUMBER), ::connect, this->socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
385  } catch (...) {
386  this->close();
387  throw;
388  }
389 }
390 
391 }
392 
393 namespace server {
394 
395 template<class LkT>
396 class wrapper<LkT>::recv_msg_ops_t {
397 public:
398  std::function<void(typename socket_t::socket_type /* connected accept_socket */)> fn;
399 
400  template<class RecvProcMsgs> constexpr
401  recv_msg_ops_t(RecvProcMsgs &&f) noexcept(false)
402  : fn(std::move(f)) {
403  }
404 };
405 
406 template<class LkT>
407 inline
408 wrapper<LkT>::wrapper(char const *addr, uint16_t port, std::size_t min_message_size, std::size_t max_message_size, unsigned short timeout, socket_priority priority, std::size_t incoming_cpu, type_t type, domain_t domain) noexcept(false)
409 : base_t(type, domain) {
410  ::sockaddr_in addr_in;
411  addr_in.sin_family=AF_INET;
412  ::inet_pton(AF_INET, addr, &addr_in.sin_addr);
413  addr_in.sin_port=htons(port);
414  std::fill_n(addr_in.sin_zero, sizeof(addr_in.sin_zero), 0);
415  try {
416  JMMCG_SYSCALL_WRAPPER("Unable to bind socket.", _T(LIBJMMCG_VERSION_NUMBER), ::bind, this->socket_, reinterpret_cast<::sockaddr const *>(&addr_in), sizeof(::sockaddr_in));
417  JMMCG_SYSCALL_WRAPPER("Unable to listen to socket.", _T(LIBJMMCG_VERSION_NUMBER), ::listen, this->socket_, max_backlog_connections);
418  } catch (...) {
419  this->close();
420  throw;
421  }
422  base_t::set_options(min_message_size, max_message_size, timeout, priority, incoming_cpu);
423 }
424 
425 template<class LkT>
426 inline bool
427 wrapper<LkT>::stopped() const noexcept(true) {
428  return static_cast<bool>(exited_);
429 }
430 
431 template<class LkT>
432 inline void
433 wrapper<LkT>::stop() noexcept(false) {
434  exit_=true;
435  this->close();
436  if (ex) {
437  std::rethrow_exception(ex);
438  }
439 }
440 
441 template<class LkT>
442 inline
443 wrapper<LkT>::~wrapper() noexcept(false) {
444  stop();
445 }
446 
447 template<class LkT>
448 inline void
449 wrapper<LkT>::run() noexcept(false) {
450  class set_exited {
451  public:
452  explicit constexpr set_exited(std::atomic<bool> &exited) noexcept(true)
453  : exited_(exited) {
454  }
455  ~set_exited() noexcept(true) {
456  exited_=true;
457  }
458 
459  private:
460  std::atomic<bool> &exited_;
461  };
462 
463  const set_exited exited(exited_);
464  try {
465  while (!static_cast<bool>(exit_)) {
466  try {
467  ::pollfd fds={
468  this->socket_,
469  POLLIN|POLLPRI,
470  0
471  };
472  const int poll_ret=JMMCG_SYSCALL_WRAPPER("Unable to accept a client connection, re-trying.", _T(LIBJMMCG_VERSION_NUMBER), ::poll, &fds, 1, 0);
473  if (poll_ret==0) {
474  thread_traits::sleep(0);
475  continue;
476  } else {
477  assert(poll_ret==1);
478  ::sockaddr_in client_addr;
479  ::socklen_t len=sizeof(::sockaddr_in);
480  const int accept_skt=JMMCG_SYSCALL_WRAPPER("Unable to accept a client connection, re-trying.", _T(LIBJMMCG_VERSION_NUMBER), ::accept, this->socket_, reinterpret_cast<::sockaddr *>(&client_addr), &len);
481  assert(len==sizeof(sockaddr_in));
482  assert(client_addr.sin_port!=0);
483  const std::shared_ptr<recv_msg_ops_t> recv_msg_ops_cpy(recv_msg_ops);
484  if (recv_msg_ops_cpy.get()) {
485  recv_msg_ops_cpy->fn(accept_skt);
486  }
487  }
488  } catch (std::exception const &e) {
489  // TODO oops: could overwrite an error... Do we care?
490  ex=std::make_exception_ptr(e);
491  }
492  }
493  } catch (std::exception const &e) {
494  // TODO oops: could overwrite an error... Do we care?
495  ex=std::make_exception_ptr(e);
496  }
497 }
498 
499 template<class LkT>
500 inline void
501 wrapper<LkT>::set_options(base_t &skt) const noexcept(false) {
502  ::linger details{
503  0,
504  0
505  };
506  socklen_t buff_sz=sizeof(details);
507  JMMCG_SYSCALL_WRAPPER("Unable to get SO_LINGER.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_LINGER, &details, &buff_sz);
508  int snd_msg_sz=0;
509  buff_sz=sizeof(snd_msg_sz);
510  JMMCG_SYSCALL_WRAPPER("Unable to get SO_SNDBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_SNDBUF, &snd_msg_sz, &buff_sz);
511  int rcv_msg_sz=0;
512  buff_sz=sizeof(rcv_msg_sz);
513  JMMCG_SYSCALL_WRAPPER("Unable to get SO_RCVBUF.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_RCVBUF, &rcv_msg_sz, &buff_sz);
514  int rcv_msg_lowat=0;
515  buff_sz=sizeof(rcv_msg_lowat);
516  JMMCG_SYSCALL_WRAPPER("Unable to get SO_RCVLOWAT.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_RCVLOWAT, &rcv_msg_lowat, &buff_sz);
517  buff_sz=sizeof(socket_priority);
519  JMMCG_SYSCALL_WRAPPER("Unable to get SO_PRIORITY.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_PRIORITY, &priority, &buff_sz);
520  int incoming_cpu=0;
521  buff_sz=sizeof(incoming_cpu);
522  JMMCG_SYSCALL_WRAPPER("Unable to get SO_INCOMING_CPU.", _T(LIBJMMCG_VERSION_NUMBER), ::getsockopt, this->socket_, SOL_SOCKET, SO_INCOMING_CPU, &incoming_cpu, &buff_sz);
523  skt.set_options(rcv_msg_lowat, std::max(snd_msg_sz, rcv_msg_sz), details.l_linger, priority, incoming_cpu);
524 }
525 
526 template<class LkT>
527 template<class RecvProcMsgs> inline void
528 wrapper<LkT>::async_accept(RecvProcMsgs &&f) noexcept(false) {
529  recv_msg_ops=std::make_shared<recv_msg_ops_t>(std::move(f));
530 }
531 
532 }
533 
534 } } } }