libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
socket_server_manager_glibc_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2015 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace socket { namespace server_manager { namespace glibc {
20 
21 template<class LkT>
22 inline std::string
23 manager<LkT>::session::to_string() const noexcept(false) {
24  std::ostringstream ss;
25  ss<<socket_;
26  return ss.str();
27 }
28 
29 template<class LkT>
30 inline std::ostream &
31 operator<<(std::ostream &os, typename manager<LkT>::session const &ec) noexcept(false) {
32  os<<ec.to_string();
33  return os;
34 }
35 
36 template<class LkT>
37 inline
38 manager<LkT>::manager(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t min_message_size, std::size_t max_message_size, unsigned short timeout, socket_priority priority, std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
39 : acceptor(addr.to_string().c_str(), port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu),
40  server_to_client_flow_(std::move(server_to_client_flow)) {
41 }
42 
43 template<class LkT>
44 inline void
45 manager<LkT>::run() {
46  acceptor.run();
47 }
48 
49 template<class LkT>
50 inline void
51 manager<LkT>::stop() {
52  acceptor.stop();
53 }
54 
55 template<class LkT>
56 inline void
57 manager<LkT>::set_options(acceptor_t &acceptor, socket_t &skt) {
58  acceptor.set_options(skt);
59 }
60 
61 template<class LkT>
62 inline void
63 manager<LkT>::set_options(socket_t &skt) {
64  set_options(acceptor, skt);
65 }
66 
67 template<class LkT>
68 inline std::string
69 manager<LkT>::to_string() const noexcept(false) {
70  std::ostringstream ss;
71  ss<<acceptor;
72  return ss.str();
73 }
74 
75 template<class LkT>
76 inline std::ostream &
77 operator<<(std::ostream &os, manager<LkT> const &ec) noexcept(false) {
78  os<<ec.to_string();
79  return os;
80 }
81 
82 template<class SvrHBs, class LkT>
83 class loopback<SvrHBs, LkT>::send_heartbeats final : public manager<LkT>::session {
84 public:
85  using base_t=typename manager<LkT>::session;
86  using session=base_t;
87 
88  send_heartbeats(typename socket_t::socket_type accept_socket)
89  : base_t(accept_socket) {
90  }
91 
93  this->stop();
94  }
95 
96  static typename session::ptr_type make(typename socket_t::socket_type accept_socket) noexcept(false) {
97  return std::make_shared<send_heartbeats>(accept_socket);
98  }
99 
100  void start() override {
101  assert(!heatbeating);
102  heatbeating.reset(new heartbeats_t(this->socket_));
103  }
104 
105  void stop() override {
106  if (heatbeating) {
107  heatbeating->stop();
108  }
109  base_t::stop();
110  }
111 
112 private:
113  std::unique_ptr<heartbeats_t> heatbeating;
114 };
115 
116 template<class SvrHBs, class LkT>
117 inline
118 loopback<SvrHBs, LkT>::loopback(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t min_message_size, std::size_t max_message_size, unsigned short, socket_priority priority, std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
119 : base_t(addr, port_num, min_message_size, max_message_size, (heartbeats_t::max_missed_heartbeats*heartbeats_t::heartbeat_interval).count(), priority, incoming_cpu, std::move(server_to_client_flow)) {
120 }
121 
122 template<class SvrHBs, class LkT>
123 template<class RecvProcMsgs> inline void
124 loopback<SvrHBs, LkT>::start_accept(RecvProcMsgs proc_fn) noexcept(false) {
125  this->acceptor.async_accept(
126  [this, proc_fn](typename socket_t::socket_type accept_socket) {
127  bool exit=false;
128  {
129  typename base_t::session::ptr_type client_connection(send_heartbeats::make(accept_socket));
130  this->set_options(client_connection->socket());
131  assert(client_connection.get());
132  assert(client_connection->socket().is_open());
133  this->server_to_client_flow_(client_connection);
134  exit=client_connection->process(proc_fn);
135  this->server_to_client_flow_(typename base_t::session::ptr_type{});
136  }
137  if (!exit) {
138  start_accept(proc_fn);
139  }
140  }
141  );
142 }
143 
144 template<class LkT>
145 inline
146 forwarding<LkT>::forwarding(boost::asio::ip::address const &addr, unsigned short port_num, std::size_t min_message_size, std::size_t max_message_size, unsigned short timeout, socket_priority priority, std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow, socket_t &dest)
147 : base_t(addr, port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu, std::move(server_to_client_flow)), dest_socket_(dest) {
148 }
149 
150 template<class LkT>
151 template<class RecvProcMsgs> inline void
152 forwarding<LkT>::start_accept(RecvProcMsgs proc_fn) noexcept(false) {
153  this->acceptor.async_accept(
154  [this, proc_fn](typename socket_t::socket_type accept_socket) {
155  bool exit=false;
156  {
157  typename base_t::session::ptr_type client_connection(base_t::session::make(accept_socket));
158  assert(client_connection.get());
159  this->set_options(client_connection->socket());
160  this->set_options(dest_socket_);
161  assert(client_connection->socket().is_open());
162  this->server_to_client_flow_(client_connection);
163  exit=client_connection->process(proc_fn, dest_socket_);
164  this->server_to_client_flow_(typename base_t::session::ptr_type{});
165  }
166  if (!exit) {
167  start_accept(proc_fn);
168  }
169  }
170  );
171 }
172 
173 } } } } }