libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
socket_server_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2015 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace socket {
20 
21 template<class ProcessingRules, class SvrMgr>
22 template<class LatencyTimestamps, class>
23 inline
24 svr<ProcessingRules, SvrMgr>::svr(boost::asio::ip::address const &addr, unsigned short port_num, unsigned short timeout, typename svr_mgr_t::socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, proc_rules_t const &proc_ops, LatencyTimestamps &ts, char const *svr_name, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
25 : base_t(),
26  address(addr),
27  port_number(port_num),
28  processor(proc_ops),
29  manager(
30  address,
34  timeout,
35  priority,
36  mask.get_cpu(),
38  ),
39  io_thread(
40  [this, &ts]() {
41  manager.start_accept(
42  [this, &ts](auto &src_cxn, auto &dest_skt) {
43  return this->read_and_process_msgs(src_cxn, dest_skt, ts);
44  }
45  );
46  // This must come after the accept otherwise starting subsequent servers will fail.
47  return std::bind(&svr::run, this);
48  }()
49  ) {
50  io_thread.kernel_priority(cpu_priority);
51  io_thread.kernel_affinity(mask);
52  io_thread.set_name(svr_name);
53 }
54 
55 template<class ProcessingRules, class SvrMgr>
56 template<class LatencyTimestamps, class>
57 inline
58 svr<ProcessingRules, SvrMgr>::svr(boost::asio::ip::address const &addr, unsigned short port_num, socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, proc_rules_t const &proc_ops, LatencyTimestamps &ts, char const *svr_name, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
60 }
61 
62 template<class ProcessingRules, class SvrMgr>
63 template<class LatencyTimestamps, class>
64 inline
65 svr<ProcessingRules, SvrMgr>::svr(boost::asio::ip::address const &addr, unsigned short port_num, unsigned short timeout, socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, proc_rules_t const &proc_ops, socket_t &dest_skt, LatencyTimestamps &ts, char const *svr_name, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
66 : base_t(),
67  address(addr),
68  port_number(port_num),
69  processor(proc_ops),
70  manager(
71  address,
75  timeout,
76  priority,
77  mask.get_cpu(),
79  dest_skt
80  ),
81  io_thread(
82  [this, &ts]() {
83  manager.start_accept(
84  [this, &ts](auto &src_cxn, auto &dest_skt) {
85  return this->read_and_process_msgs(src_cxn, dest_skt, ts);
86  }
87  );
88  // This must come after the accept otherwise starting subsequent servers will fail.
89  return std::bind(&svr::run, this);
90  }()
91  ) {
92  io_thread.kernel_priority(cpu_priority);
93  io_thread.kernel_affinity(mask);
94  io_thread.set_name(svr_name);
95 }
96 
97 template<class ProcessingRules, class SvrMgr>
98 template<class LatencyTimestamps, class>
99 inline
100 svr<ProcessingRules, SvrMgr>::svr(boost::asio::ip::address const &addr, unsigned short port_num, socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, proc_rules_t const &proc_ops, socket_t &dest_skt, LatencyTimestamps &ts, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
102 }
103 
104 template<class ProcessingRules, class SvrMgr>
105 template<class LatencyTimestamps>
106 inline
107 svr<ProcessingRules, SvrMgr>::svr(ctor_args const &args, socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, LatencyTimestamps &ts, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
109 }
110 
111 template<class ProcessingRules, class SvrMgr>
112 template<class LatencyTimestamps>
113 inline
114 svr<ProcessingRules, SvrMgr>::svr(ctor_args const &args, socket_t &dest_skt, unsigned short timeout, socket_priority priority, thread_t::thread_traits::api_params_type::processor_mask_type mask, const thread_t::thread_traits::api_params_type::priority_type cpu_priority, LatencyTimestamps &ts, char const *svr_name, server_to_client_flow_t &&server_to_client_flow) noexcept(false)
116 }
117 
118 template<class ProcessingRules, class SvrMgr> inline
119 svr<ProcessingRules, SvrMgr>::~svr() noexcept(true) {
120  stop();
121 }
122 
123 template<class ProcessingRules, class SvrMgr> inline void
124 svr<ProcessingRules, SvrMgr>::stop() noexcept(true) {
125  exit_=true;
126  manager.stop();
127 }
128 
129 template<class ProcessingRules, class SvrMgr> inline int
130 svr<ProcessingRules, SvrMgr>::main(int argc, char const * const *argv) noexcept(true) {
131  using thread_traits=thread_t::thread_traits;
132 
133  try {
134  boost::program_options::options_description desc(
135  "A simple exchange-simulator that listens to an IPADDR:PORT combination. The executable name indicates the message version implemented. For details regarding the properties of the simulator see the documentation that came with the distribution. Copyright © J.M.McGuiness, coder@hussar.me.uk. http://libjmmcg.sf.net/ Distributed under the terms of the GPL v2.1.\n"+exit_codes::to_string()+"Arguments"
136  );
137  desc.add_options()
138  ("help", "Print this help message.")
139  ("version", "Print the build number of this program.")
140  ("address", boost::program_options::value<boost::asio::ip::address>()->default_value(boost::asio::ip::address_v4::loopback()), "IP address (in v4 format; thus the network interface to which it is bound) to which the server should listen.")
141  ("port", boost::program_options::value<unsigned short>()->required(), "An unused port to which the server should listen.")
142  ("processor", boost::program_options::value<unsigned short>()->required()->default_value(libisimud::exchanges::common::thread_traits::exchange_simulator_thread.core), "The core to which the server should be bound.")
143  ;
144  boost::program_options::variables_map vm;
145  boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
146  if (vm.count("help")) {
147  std::cout<<desc<<std::endl;
149  }
150  if (vm.count("version")) {
151  std::cout<<"Build: " LIBJMMCG_VERSION_NUMBER ", " LIBJMMCG_BUILD_TYPE<<std::endl;
153  }
154  boost::program_options::notify(vm);
155  thread_traits::set_backtrace_on_signal();
156  proc_rules_t proc_rules;
157  no_latency_timestamps ts(0U);
158  const std::string svr_name(std::string(argv[0]).substr(0, 15));
159  svr sim(
160  vm["address"].as<boost::asio::ip::address>(),
161  vm["port"].as<unsigned short>(),
162  svr::socket_t::socket_priority::low,
163  thread_traits::api_params_type::processor_mask_type(vm["processor"].as<unsigned short>()),
164  thread_traits::api_params_type::priority_type::normal,
165  proc_rules,
166  ts,
167  svr_name.c_str()
168  );
169  std::clog<<sim<<std::endl;
170  thread_traits::set_kernel_affinity(
171  thread_traits::get_current_thread(),
172  thread_traits::api_params_type::processor_mask_type(0)
173  );
174  thread_traits::set_kernel_priority(
175  thread_traits::get_current_thread(),
176  thread_traits::api_params_type::priority_type::idle
177  );
178  while (!sim.signal_status()) {
179  thread_traits::sleep(0);
180  }
181  std::clog<<sim<<std::endl;
183  } catch (exception_type const &ex) {
184  std::cerr<<"CRT exception. Detail: "<<ex.what()<<std::endl;
186  } catch (std::exception const &ex) {
187  std::cerr<<"STL-derived exception. Details: "<<boost::diagnostic_information(ex)<<std::endl;
189  } catch (...) {
190  std::cerr<<"Unknown exception."<<std::endl;
192  }
194 }
195 
196 template<class ProcessingRules, class SvrMgr> inline std::string
197 svr<ProcessingRules, SvrMgr>::to_string() const noexcept(false) {
198  std::ostringstream ss;
199  ss
200  <<thread_t::thread_traits::demangle_name(typeid(*this))
201  <<",\naddress="<<address
202  <<", port_number: "<<port_number
203  <<", exit="<<static_cast<bool>(exit_)
204  <<", client processing-error: '"<<client_ex<<"'"
205  <<", processor: "<<processor
206  <<", manager: "<<manager;
207  return ss.str();
208 }
209 
210 template<class ProcessingRules, class SvrMgr>
211 template<class LatencyTimestamps>
212 inline bool
213 svr<ProcessingRules, SvrMgr>::read_and_process_msgs(typename svr_mgr_t::session &src_cxn, socket_t &dest_skt, LatencyTimestamps &ts) noexcept(false) {
214  if (LIKELY(!static_cast<bool>(exit_))) {
215  src_cxn.start(); // Should really only do this once a logon message received...
216  while (LIKELY(!static_cast<bool>(exit_))) {
217  assert(src_cxn.socket().is_open());
218  if (UNLIKELY(processor.read_and_process_a_msg(src_cxn.socket(), dest_skt, ts))) {
219  // TODO the glibc wrapper prefers this to always return "true"...
220  return static_cast<bool>(exit_);
221  }
222  }
223  }
224  return static_cast<bool>(exit_);
225 }
226 
227 template<class ProcessingRules, class SvrMgr>
228 template<class LatencyTimestamps>
229 inline bool
230 svr<ProcessingRules, SvrMgr>::read_and_process_msgs(typename svr_mgr_t::session &src_cxn, typename svr_mgr_t::session &dest_cxn, LatencyTimestamps &ts) noexcept(false) {
231  return read_and_process_msgs(src_cxn, dest_cxn.socket(), ts);
232 }
233 
234 template<class ProcessingRules, class SvrMgr> inline void
235 svr<ProcessingRules, SvrMgr>::run() noexcept(true) {
236  try {
237  manager.run();
238  } catch (...) {
239  client_ex=boost::current_exception();
240  }
241 }
242 
243 template<class ProcessingRules, class SvrMgr> inline std::ostream &
244 operator<<(std::ostream &os, svr<ProcessingRules, SvrMgr> const &ec) noexcept(false) {
245  os<<ec.to_string();
246  return os;
247 }
248 
249 } } }