libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
socket_server_manager_asio_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2015 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace socket { namespace server_manager { namespace asio {
20 
21 template<class LkT>
22 inline std::string
23 manager<LkT>::session::to_string() const noexcept(false) {
24  std::ostringstream ss;
25  ss<<socket_;
26  return ss.str();
27 }
28 
29 template<class LkT>
30 inline std::ostream &
31 operator<<(std::ostream &os, typename manager<LkT>::session const &ec) noexcept(false) {
32  os<<ec.to_string();
33  return os;
34 }
35 
36 template<class LkT>
37 inline
38 manager<LkT>::manager(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t /*min_message_size*/, std::size_t max_message_size, unsigned short timeout, socket_priority /*priority*/, std::size_t /*incoming_cpu*/, server_to_client_flow_t &&server_to_client_flow)
40  server_to_client_flow_(std::move(server_to_client_flow)) {
41 // TODO "protocol not available": acceptor.set_option(boost::asio::socket_base::send_low_watermark(min_message_size));
42  acceptor.set_option(boost::asio::socket_base::send_buffer_size(max_message_size));
43 // TODO "protocol not available": acceptor.set_option(boost::asio::socket_base::receive_low_watermark(min_message_size));
44  acceptor.set_option(boost::asio::socket_base::receive_buffer_size(max_message_size));
45  acceptor.set_option(boost::asio::socket_base::linger(true, timeout));
46  acceptor.set_option(boost::asio::ip::tcp::no_delay(true));
47  acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
48 }
49 
50 template<class LkT>
51 inline void
52 manager<LkT>::run() {
53  io_context.run();
54 }
55 
56 template<class LkT>
57 inline void
58 manager<LkT>::stop() {
59  io_context.stop();
60  while (!io_context.stopped()) {
61  thread_traits::sleep(0);
62  }
63 }
64 
65 template<class LkT>
66 inline void
67 manager<LkT>::set_options(boost::asio::ip::tcp::acceptor &acceptor, socket_t &skt) {
68  boost::asio::socket_base::send_buffer_size send_buffer_size;
69  acceptor.get_option(send_buffer_size);
70  boost::asio::socket_base::receive_buffer_size receive_buffer_size;
71  acceptor.get_option(receive_buffer_size);
72  boost::asio::socket_base::linger linger_details;
73  acceptor.get_option(linger_details);
74  skt.set_options(1, std::max(send_buffer_size.value(), receive_buffer_size.value()), linger_details.timeout(), socket_t::socket_priority::low, 0);
75  boost::asio::ip::tcp::no_delay no_delay;
76  acceptor.get_option(no_delay);
77 }
78 
79 template<class LkT>
80 inline void
81 manager<LkT>::set_options(socket_t &skt) {
82  set_options(acceptor, skt);
83 }
84 
85 template<class LkT>
86 inline std::string
87 manager<LkT>::to_string() const noexcept(false) {
88  std::ostringstream ss;
89 // TODO ss<<acceptor;
90  return ss.str();
91 }
92 
93 template<class LkT>
94 inline std::ostream &
95 operator<<(std::ostream &os, manager<LkT> const &ec) noexcept(false) {
96  os<<ec.to_string();
97  return os;
98 }
99 
100 template<class SvrHBs, class LkT>
101 class loopback<SvrHBs, LkT>::send_heartbeats final : public manager<LkT>::session {
102 public:
103  using base_t=typename manager<LkT>::session;
104  using session=base_t;
105 
106  explicit send_heartbeats(boost::asio::ip::tcp::socket &&socket)
107  : base_t(std::move(socket)) {
108  }
109 
111  this->stop();
112  }
113 
114  static typename session::ptr_type make(boost::asio::ip::tcp::socket &&socket) noexcept(false) {
115  return std::make_shared<send_heartbeats>(std::move(socket));
116  }
117 
118  void start() override {
119  assert(!heatbeating);
120  heatbeating.reset(new heartbeats_t(this->socket_));
121  }
122 
123  void stop() override {
124  if (heatbeating) {
125  heatbeating->stop();
126  }
127  base_t::stop();
128  }
129 
130 private:
131  std::unique_ptr<heartbeats_t> heatbeating;
132 };
133 
134 template<class SvrHBs, class LkT>
135 inline
136 loopback<SvrHBs, LkT>::loopback(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t min_message_size, std::size_t max_message_size, unsigned short, socket_priority priority, std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
137 : base_t(addr, port_num, min_message_size, max_message_size, (heartbeats_t::max_missed_heartbeats*heartbeats_t::heartbeat_interval).count(), priority, incoming_cpu, std::move(server_to_client_flow)) {
138 }
139 
140 template<class SvrHBs, class LkT>
141 template<class RecvProcMsgs> inline void
142 loopback<SvrHBs, LkT>::start_accept(RecvProcMsgs proc_fn) noexcept(false) {
143  this->acceptor.async_accept(
144  [this, proc_fn](boost::system::error_code const &error, boost::asio::ip::tcp::socket &&client_conn) {
145  bool exit=false;
146  if (!error) {
147  typename base_t::session::ptr_type client_connection(send_heartbeats::make(std::move(client_conn)));
148  assert(client_connection.get());
149  this->set_options(client_connection->socket());
150  assert(client_connection->socket().is_open());
151  this->server_to_client_flow_(client_connection);
152  exit=client_connection->process(proc_fn);
153  this->server_to_client_flow_(typename base_t::session::ptr_type{});
154  }
155  if (!exit) {
156  start_accept(proc_fn);
157  }
158  }
159  );
160 }
161 
162 template<class LkT> inline
163 forwarding<LkT>::forwarding(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t min_message_size, std::size_t max_message_size, unsigned short timeout, socket_priority priority, std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow, socket_t &dest)
164 : base_t(addr, port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu, std::move(server_to_client_flow)), dest_socket_(dest) {
165 }
166 
167 template<class LkT>
168 template<class RecvProcMsgs> inline void
169 forwarding<LkT>::start_accept(RecvProcMsgs proc_fn) noexcept(false) {
170  this->acceptor.async_accept(
171  [this, proc_fn](boost::system::error_code const &error, boost::asio::ip::tcp::socket &&client_conn) {
172  bool exit=false;
173  if (!error) {
174  typename base_t::session::ptr_type client_connection(base_t::session::make(std::move(client_conn)));
175  assert(client_connection.get());
176  this->set_options(client_connection->socket());
177  this->set_options(dest_socket_);
178  assert(client_connection->socket().is_open());
179  this->server_to_client_flow_(client_connection);
180  exit=client_connection->process(proc_fn, dest_socket_);
181  this->server_to_client_flow_(typename base_t::session::ptr_type{});
182  }
183  if (!exit) {
184  start_accept(proc_fn);
185  }
186  }
187  );
188 }
189 
190 } } } } }