libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
socket_server.cpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2015 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 #include "stdafx.h"
20 
21 #define BOOST_TEST_MODULE libjmmcg_tests
22 #include <boost/test/included/unit_test.hpp>
23 
24 #include <boost/mpl/list.hpp>
25 
26 #include "core/ave_deviation_meter.hpp"
27 #include "core/jthread.hpp"
28 #include "core/latency_timestamps.hpp"
29 #include "core/max_min.hpp"
30 #include "core/socket_client_manager.hpp"
31 
32 using namespace libjmmcg;
33 
34 namespace isimud { namespace ISIMUD_VER_NAMESPACE { namespace exchanges { namespace common {
35 
36 struct thread_traits {
37  using api_threading_traits=ppd::api_threading_traits<ppd::platform_api, ppd::heavyweight_threading>;
38  struct thread_info {
39  const unsigned short core{};
41  };
42 
43  static inline constexpr auto exchange_simulator_thread=thread_info{
44  0,
46  };
47 };
48 
49 } } } }
50 
51 namespace libisimud=isimud::ISIMUD_VER_NAMESPACE;
52 using namespace libisimud;
53 
54 #include "core/socket_server.hpp"
55 
56 using api_thread_traits=ppd::thread_params<ppd::platform_api>;
57 using timed_results_t=ave_deviation_meter<double>;
58 const boost::asio::ip::address localhost(boost::asio::ip::address_v4::loopback());
59 const unsigned short unused_port=12347u;
60 const unsigned short timeout=5u; // In seconds.
61 
62 struct msgs_t {
63  struct static_size {
64  struct Header_t {
65  enum : bool {
66  has_static_size=true
67  };
68 
69  const std::size_t length_;
70  std::uint64_t sequence{};
71 
72  explicit constexpr Header_t(std::size_t l) noexcept(true) : length_(l) {}
73 
74  constexpr std::size_t length() const noexcept(true) {
75  return length_;
76  }
77 
78  constexpr bool is_valid() const noexcept(true);
79  } __attribute__((packed));
80  struct message : Header_t {
81  enum : std::size_t {
82  header_t_size=sizeof(Header_t)
83  };
84 
85  const uint8_t c=42;
86  std::array<char, 123> data{'x'};
87 
88  constexpr message() noexcept(true) : Header_t(sizeof(message)) {}
89  } __attribute__((packed));
90  struct heartbeat : Header_t {
91  enum : std::size_t {
92  header_t_size=sizeof(Header_t)
93  };
94  enum : unsigned {
96  };
97  static inline constexpr std::chrono::seconds heartbeat_interval{1};
98 
99  const uint64_t c=69;
100 
101  constexpr heartbeat() noexcept(true) : Header_t(sizeof(heartbeat)) {}
102  } __attribute__((packed));
103  enum : std::size_t {
104  min_msg_size=min<std::size_t, sizeof(message), sizeof(heartbeat)>::value,
105  max_msg_size=max<std::size_t, sizeof(message), sizeof(heartbeat)>::value,
106  header_t_size=sizeof(Header_t)
107  };
108  using msg_buffer_t=std::array<std::uint8_t, max_msg_size>;
109  };
110  struct variable_size {
111  struct Header_t {
112  enum : bool {
113  has_static_size=false
114  };
115 
116  const std::size_t length_;
117  std::uint64_t sequence{};
118 
119  explicit constexpr Header_t(std::size_t l) noexcept(true) : length_(l) {}
120 
121  constexpr std::size_t length() const noexcept(true) {
122  return length_;
123  }
124 
125  constexpr bool is_valid() const noexcept(true);
126  } __attribute__((packed));
127  struct message : Header_t {
128  enum : std::size_t {
129  header_t_size=sizeof(Header_t)
130  };
131 
132  const std::uint8_t c=42;
133  std::array<char, 123> data{'x'};
134 
135  constexpr message() noexcept(true) : Header_t(sizeof(message)) {}
136 
137  constexpr bool is_valid() const noexcept(true) {
138  return Header_t::is_valid() && length_==sizeof(message);
139  }
140  } __attribute__((packed));
141  struct heartbeat : Header_t {
142  enum : std::size_t {
143  header_t_size=sizeof(Header_t)
144  };
145  enum : unsigned {
147  };
148  static inline constexpr std::chrono::seconds heartbeat_interval{1};
149 
150  const std::uint64_t c=69;
151 
152  constexpr heartbeat() noexcept(true) : Header_t(sizeof(heartbeat)) {}
153 
154  constexpr bool is_valid() const noexcept(true) {
155  return Header_t::is_valid() && length_==sizeof(heartbeat);
156  }
157  } __attribute__((packed));
158  enum : std::size_t {
159  min_msg_size=min<std::size_t, sizeof(message), sizeof(heartbeat)>::value,
160  max_msg_size=max<std::size_t, sizeof(message), sizeof(heartbeat)>::value,
161  header_t_size=sizeof(Header_t)
162  };
163  using msg_buffer_t=std::array<std::uint8_t, max_msg_size>;
164  };
165 };
166 
167 constexpr bool
168 msgs_t::static_size::Header_t::is_valid() const noexcept(true) {
170 }
171 
172 constexpr bool
173 msgs_t::variable_size::Header_t::is_valid() const noexcept(true) {
175 }
176 
177 struct just_connect {
178  template<class Fn>
179  void operator()(Fn const &f) const {
180  const boost::asio::ip::tcp::endpoint endpoint(localhost, unused_port);
181  f(endpoint);
182  }
183 };
184 
185 struct num_msgs {
186  unsigned long recv{};
187  unsigned long send{};
188 };
189 
190 template<class SktT, class MsgsT>
191 struct sink {
192  using socket_t=SktT;
193  using src_msg_details_t=MsgsT;
194  using msg_details_t=src_msg_details_t;
195  struct heartbeats {
196  enum : unsigned {
198  };
199  static inline constexpr std::chrono::seconds heartbeat_interval{1};
200 
201  template<class Skt>
202  explicit constexpr heartbeats(Skt const &) noexcept(true) {}
203  static constexpr void stop() noexcept(true) {}
204  };
205 
206  std::shared_ptr<num_msgs> msg_ctrs;
207 
208  explicit sink(std::shared_ptr<num_msgs> ctrs)
209  : msg_ctrs(ctrs) {
210  }
211 
212  template<class Buff>
213  bool process_msg(Buff &buff, socket_t &, socket_t &) {
214  using hdr_t=typename msg_details_t::Header_t;
215  [[maybe_unused]] hdr_t const &hdr=reinterpret_cast<hdr_t const &>(buff);
216  assert(hdr.is_valid());
217  ++msg_ctrs->recv;
218  return false;
219  }
220 };
221 
222 template<class SktT, class MsgsT>
224  using socket_t=SktT;
225  using src_msg_details_t=MsgsT;
226  using msg_details_t=src_msg_details_t;
227  struct heartbeats {
228  enum : unsigned {
230  };
231  static inline constexpr std::chrono::seconds heartbeat_interval{1};
232 
233  template<class Skt>
234  explicit constexpr heartbeats(Skt const &) noexcept(true) {}
235  static constexpr void stop() noexcept(true) {}
236  };
237 
238  std::shared_ptr<num_msgs> msg_ctrs;
239 
240  explicit simple_reflect(std::shared_ptr<num_msgs> ctrs)
241  : msg_ctrs(ctrs) {
242  }
243 
244  template<class Buff>
245  bool process_msg(Buff &buff, socket_t &, socket_t &client_skt) {
246  using hdr_t=typename msg_details_t::Header_t;
247  [[maybe_unused]] hdr_t const &hdr=reinterpret_cast<hdr_t const &>(buff);
248  assert(hdr.is_valid());
249  ++msg_ctrs->recv;
250  client_skt.write(buff);
251  ++msg_ctrs->send;
252  assert(msg_ctrs->recv==msg_ctrs->send);
253  return false;
254  }
255 };
256 
257 using asio_client_mgr_t=socket::asio::client_manager<
258  ppd::api_lock_traits<ppd::platform_api, ppd::heavyweight_threading>::critical_section_type::lock_type
259 >;
260 using glibc_client_mgr_t=socket::glibc::client_manager<
261  ppd::api_lock_traits<ppd::platform_api, ppd::heavyweight_threading>::critical_section_type::lock_type
262 >;
263 
264 template<template<class, class> class ProcRules>
265 using cxns_types=boost::mpl::list<
266  std::pair<
267  asio_client_mgr_t,
268  socket::svr<
269  ProcRules<asio_client_mgr_t::socket_t, msgs_t::static_size>,
270  socket::server_manager::loopback<
271  typename ProcRules<asio_client_mgr_t::socket_t, msgs_t::static_size>::heartbeats,
272  asio_client_mgr_t::socket_t
273  >
274  >
275  >,
276  std::pair<
277  glibc_client_mgr_t,
278  socket::svr<
279  ProcRules<glibc_client_mgr_t::socket_t, msgs_t::static_size>,
280  socket::server_manager::loopback<
281  typename ProcRules<glibc_client_mgr_t::socket_t, msgs_t::static_size>::heartbeats,
282  glibc_client_mgr_t::socket_t
283  >
284  >
285  >,
286  std::pair<
287  asio_client_mgr_t,
288  socket::svr<
289  ProcRules<asio_client_mgr_t::socket_t, msgs_t::variable_size>,
290  socket::server_manager::loopback<
291  typename ProcRules<asio_client_mgr_t::socket_t, msgs_t::variable_size>::heartbeats,
292  asio_client_mgr_t::socket_t
293  >
294  >
295  >,
296  std::pair<
297  glibc_client_mgr_t,
298  socket::svr<
299  ProcRules<glibc_client_mgr_t::socket_t, msgs_t::variable_size>,
300  socket::server_manager::loopback<
301  typename ProcRules<glibc_client_mgr_t::socket_t, msgs_t::variable_size>::heartbeats,
302  glibc_client_mgr_t::socket_t
303  >
304  >
305  >
306 >;
307 
308 using cxns_sink_types=cxns_types<sink>;
309 using cxns_reflect_types=cxns_types<simple_reflect>;
310 
311 BOOST_AUTO_TEST_SUITE(socket_tests)
312 
313 BOOST_AUTO_TEST_SUITE(server)
314 
315 BOOST_AUTO_TEST_CASE_TEMPLATE(ctor, cxns_t, cxns_reflect_types) {
316  using svr_t=typename cxns_t::second_type;
317 
318  no_latency_timestamps ts(0U);
319  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
320  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
321  BOOST_CHECK_NO_THROW(
322  svr_t svr(
323  boost::asio::ip::address(),
324  unused_port,
325  svr_t::socket_t::socket_priority::high,
326  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
327  exchanges::common::thread_traits::exchange_simulator_thread.priority,
328  proc_rules,
329  ts,
330  "svr" LIBJMMCG_ENQUOTE(__LINE__)
331  )
332  );
333 }
334 
335 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client, cxns_t, cxns_reflect_types) {
336  using client_t=typename cxns_t::first_type;
337  using svr_t=typename cxns_t::second_type;
338  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
339 
340  no_latency_timestamps ts(0U);
341  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
342  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
343  svr_t svr(
344  boost::asio::ip::address(),
345  unused_port,
346  svr_t::socket_t::socket_priority::high,
347  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
349  proc_rules,
350  ts,
351  "svr" LIBJMMCG_ENQUOTE(__LINE__)
352  );
353  BOOST_CHECK_NO_THROW(
354  client_t skt(
355  msgs_t::min_msg_size,
356  msgs_t::max_msg_size,
357  timeout,
358  client_t::socket_t::socket_priority::high,
359  exchanges::common::thread_traits::exchange_simulator_thread.core,
360  just_connect()
361  )
362  );
363 }
364 
365 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_two_successive_clients, cxns_t, cxns_reflect_types) {
366  using client_t=typename cxns_t::first_type;
367  using svr_t=typename cxns_t::second_type;
368  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
369 
370  no_latency_timestamps ts(0U);
371  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
372  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
373  svr_t svr(
374  boost::asio::ip::address(),
375  unused_port,
376  svr_t::socket_t::socket_priority::high,
377  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
379  proc_rules,
380  ts,
381  "svr" LIBJMMCG_ENQUOTE(__LINE__)
382  );
383  {
384  BOOST_CHECK_NO_THROW(
385  client_t skt(
386  msgs_t::min_msg_size,
387  msgs_t::max_msg_size,
388  timeout,
389  client_t::socket_t::socket_priority::high,
390  exchanges::common::thread_traits::exchange_simulator_thread.core,
391  just_connect()
392  )
393  );
394  }
395  {
396  BOOST_CHECK_NO_THROW(
397  client_t skt(
398  msgs_t::min_msg_size,
399  msgs_t::max_msg_size,
400  timeout,
401  client_t::socket_t::socket_priority::high,
402  exchanges::common::thread_traits::exchange_simulator_thread.core,
403  just_connect()
404  )
405  );
406  }
407 }
408 
409 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_two_simultaneous_clients, cxns_t, cxns_reflect_types) {
410  using client_t=typename cxns_t::first_type;
411  using svr_t=typename cxns_t::second_type;
412  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
413 
414  no_latency_timestamps ts(0U);
415  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
416  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
417  svr_t svr(
418  boost::asio::ip::address(),
419  unused_port,
420  svr_t::socket_t::socket_priority::high,
421  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
423  proc_rules,
424  ts,
425  "svr" LIBJMMCG_ENQUOTE(__LINE__)
426  );
427  BOOST_CHECK_NO_THROW(
428  client_t skt1(
429  msgs_t::min_msg_size,
430  msgs_t::max_msg_size,
431  timeout,
432  client_t::socket_t::socket_priority::high,
433  exchanges::common::thread_traits::exchange_simulator_thread.core,
434  just_connect()
435  );
436  client_t skt2(
437  msgs_t::min_msg_size,
438  msgs_t::max_msg_size,
439  timeout,
440  client_t::socket_t::socket_priority::high,
441  exchanges::common::thread_traits::exchange_simulator_thread.core,
442  just_connect()
443  )
444  );
445 }
446 
447 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_one_message, cxns_t, cxns_reflect_types) {
448  using client_t=typename cxns_t::first_type;
449  using svr_t=typename cxns_t::second_type;
450  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
451 
452  no_latency_timestamps ts(0U);
453  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
454  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
455  {
456  svr_t svr(
457  boost::asio::ip::address(),
458  unused_port,
459  svr_t::socket_t::socket_priority::high,
460  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
462  proc_rules,
463  ts,
464  "svr" LIBJMMCG_ENQUOTE(__LINE__)
465  );
466  client_t skt(
467  msgs_t::min_msg_size,
468  msgs_t::max_msg_size,
469  timeout,
470  client_t::socket_t::socket_priority::high,
472  just_connect()
473  );
474  typename msgs_t::message msg;
475  BOOST_CHECK_NO_THROW(skt.write(msg));
476  typename msgs_t::message response;
477  BOOST_CHECK_NO_THROW(skt.read(response));
478  BOOST_CHECK_EQUAL(response.c, msg.c);
479  BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
480  }
481  BOOST_CHECK_EQUAL(msg_ctrs->send, 1);
482 }
483 
484 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_one_message_reconnect_sequential, cxns_t, cxns_reflect_types) {
485  using client_t=typename cxns_t::first_type;
486  using svr_t=typename cxns_t::second_type;
487  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
488 
489  no_latency_timestamps ts(0U);
490  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
491  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
492  {
493  svr_t svr(
494  boost::asio::ip::address(),
495  unused_port,
496  svr_t::socket_t::socket_priority::high,
497  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
499  proc_rules,
500  ts,
501  "svr" LIBJMMCG_ENQUOTE(__LINE__)
502  );
503  {
504  client_t skt(
505  msgs_t::min_msg_size,
506  msgs_t::max_msg_size,
507  timeout,
508  client_t::socket_t::socket_priority::high,
510  just_connect()
511  );
512  typename msgs_t::message msg;
513  BOOST_CHECK_NO_THROW(skt.write(msg));
514  typename msgs_t::message response;
515  BOOST_CHECK_NO_THROW(skt.read(response));
516  BOOST_CHECK_EQUAL(response.c, msg.c);
517  BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
518  }
519  {
520  client_t skt(
521  msgs_t::min_msg_size,
522  msgs_t::max_msg_size,
523  timeout,
524  client_t::socket_t::socket_priority::high,
526  just_connect()
527  );
528  typename msgs_t::message msg;
529  BOOST_CHECK_NO_THROW(skt.write(msg));
530  typename msgs_t::message response;
531  BOOST_CHECK_NO_THROW(skt.read(response));
532  BOOST_CHECK_EQUAL(response.c, msg.c);
533  BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
534  }
535  }
536  BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
537 }
538 
539 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_two_messages, cxns_t, cxns_reflect_types) {
540  using client_t=typename cxns_t::first_type;
541  using svr_t=typename cxns_t::second_type;
542  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
543 
544  no_latency_timestamps ts(0U);
545  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
546  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
547  {
548  svr_t svr(
549  boost::asio::ip::address(),
550  unused_port,
551  svr_t::socket_t::socket_priority::high,
552  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
554  proc_rules,
555  ts,
556  "svr" LIBJMMCG_ENQUOTE(__LINE__)
557  );
558  client_t skt(
559  msgs_t::min_msg_size,
560  msgs_t::max_msg_size,
561  timeout,
562  client_t::socket_t::socket_priority::high,
564  just_connect()
565  );
566  typename msgs_t::message msg1;
567  BOOST_CHECK_NO_THROW(skt.write(msg1));
568  typename msgs_t::message response1;
569  BOOST_CHECK_NO_THROW(skt.read(response1));
570  BOOST_CHECK_EQUAL(response1.c, msg1.c);
571  typename msgs_t::heartbeat msg2;
572  BOOST_CHECK_NO_THROW(skt.write(msg2));
573  typename msgs_t::heartbeat response2;
574  BOOST_CHECK_NO_THROW(skt.read(response2));
575  BOOST_CHECK_EQUAL(response2.c, msg2.c);
576  BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
577  }
578  BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
579 }
580 
581 BOOST_AUTO_TEST_CASE_TEMPLATE(two_repeated_servers_and_clients, cxns_t, cxns_reflect_types) {
582  using client_t=typename cxns_t::first_type;
583  using svr_t=typename cxns_t::second_type;
584  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
585 
586  no_latency_timestamps ts(0U);
587  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
588  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
589  {
590  svr_t svr(
591  boost::asio::ip::address(),
592  unused_port,
593  svr_t::socket_t::socket_priority::high,
594  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
596  proc_rules,
597  ts,
598  "svr" LIBJMMCG_ENQUOTE(__LINE__)
599  );
600  client_t skt(
601  msgs_t::min_msg_size,
602  msgs_t::max_msg_size,
603  timeout,
604  client_t::socket_t::socket_priority::high,
606  just_connect()
607  );
608  typename msgs_t::message msg;
609  BOOST_CHECK_NO_THROW(skt.write(msg));
610  typename msgs_t::message response;
611  BOOST_CHECK_NO_THROW(skt.read(response));
612  BOOST_CHECK_EQUAL(response.c, msg.c);
613  BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
614  }
615  BOOST_CHECK_EQUAL(msg_ctrs->send, 1);
616  {
617  svr_t svr(
618  boost::asio::ip::address(),
619  unused_port,
620  svr_t::socket_t::socket_priority::high,
621  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
623  proc_rules,
624  ts,
625  "svr" LIBJMMCG_ENQUOTE(__LINE__)
626  );
627  client_t skt(
628  msgs_t::min_msg_size,
629  msgs_t::max_msg_size,
630  timeout,
631  client_t::socket_t::socket_priority::high,
633  just_connect()
634  );
635  typename msgs_t::heartbeat msg;
636  BOOST_CHECK_NO_THROW(skt.write(msg));
637  typename msgs_t::heartbeat response;
638  BOOST_CHECK_NO_THROW(skt.read(response));
639  BOOST_CHECK_EQUAL(response.c, msg.c);
640  BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
641  }
642  BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
643 }
644 
645 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_msgs_at_a_time, cxns_t, cxns_reflect_types) {
646  using client_t=typename cxns_t::first_type;
647  using svr_t=typename cxns_t::second_type;
648  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
649 
650 #ifdef JMMCG_PERFORMANCE_TESTS
651  const unsigned long num_loops=60000;
652  const unsigned short loops_for_conv=900;
653 #else
654  const unsigned long num_loops=200;
655  const unsigned short loops_for_conv=1;
656 #endif
657  const double perc_conv_estimate=0.1;
658 
659 #ifdef JMMCG_PERFORMANCE_TESTS
660  latency_timestamps ts{2*3*num_loops*loops_for_conv};
661 #else
662  no_latency_timestamps ts{0U};
663 #endif
664  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
665  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
666  {
667  svr_t svr(
668  boost::asio::ip::address(),
669  unused_port,
670  svr_t::socket_t::socket_priority::high,
671  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
673  proc_rules,
674  ts,
675  "svr" LIBJMMCG_ENQUOTE(__LINE__)
676  );
677  client_t skt(
678  msgs_t::min_msg_size,
679  msgs_t::max_msg_size,
680  timeout,
681  client_t::socket_t::socket_priority::high,
683  just_connect()
684  );
685 
686  auto send_and_receive_all=[&skt]() {
687  auto send_and_receive=[&skt]() {
688  const typename msgs_t::message msg1;
689  skt.write(msg1);
690  typename msgs_t::message response1;
691  skt.read(response1);
692  BOOST_CHECK_EQUAL(response1.c, msg1.c);
693  };
694 
695  const std::chrono::high_resolution_clock::time_point start_time=std::chrono::high_resolution_clock::now();
696  for (unsigned long i=0; i<num_loops; ++i) {
697  send_and_receive();
698  }
699  const std::chrono::high_resolution_clock::time_point end_time=std::chrono::high_resolution_clock::now();
700  return timed_results_t::value_type(static_cast<double>(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/num_loops);
701  };
702 
703  const std::pair<timed_results_t, bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
704  perc_conv_estimate,
705  loops_for_conv,
706  std::move(send_and_receive_all)
707  ));
708  std::cout<<svr_t::thread_t::thread_traits::demangle_name(typeid(client_t))<<"\n\tMessage round-trip in-of-order time (microseconds)="<<timed_results.first<<std::endl;
709 #ifdef JMMCG_PERFORMANCE_TESTS
710  BOOST_CHECK(!timed_results.second);
711  ts.write_to_named_csv_file(std::cout, "socket_server_reflect_msgs_at_a_time_performance_latencies");
712 #endif
713  BOOST_CHECK_EQUAL(msg_ctrs->recv, 2*num_loops);
714  }
715  BOOST_CHECK_EQUAL(msg_ctrs->send, 2*num_loops);
716 }
717 
718 BOOST_AUTO_TEST_CASE_TEMPLATE(sink_one_msg_repeatedly, cxns_t, cxns_sink_types) {
719  using client_t=typename cxns_t::first_type;
720  using svr_t=typename cxns_t::second_type;
721  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
722 
723  const unsigned long num_loops=2000;
724 
725  no_latency_timestamps ts(0U);
726  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
727  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
728  {
729  svr_t svr(
730  boost::asio::ip::address(),
731  unused_port,
732  svr_t::socket_t::socket_priority::high,
733  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
735  proc_rules,
736  ts,
737  "svr" LIBJMMCG_ENQUOTE(__LINE__)
738  );
739  client_t skt(
740  msgs_t::min_msg_size,
741  msgs_t::max_msg_size,
742  timeout,
743  client_t::socket_t::socket_priority::high,
745  just_connect()
746  );
747 
748  auto send=[&skt]() {
749  auto send_batch_of_msgs=[&skt]() {
750  typename msgs_t::heartbeat msg;
751  BOOST_CHECK(msg.is_valid());
752  for (unsigned long i=0; i<num_loops; ++i) {
753  msg.sequence=i;
754  skt.write(msg);
755  }
756  };
757  {
758  ppd::jthread send(std::bind(std::move(send_batch_of_msgs)));
759  }
760  };
761 
762  BOOST_CHECK_NO_THROW(send());
763  BOOST_CHECK_EQUAL(msg_ctrs->recv, num_loops);
764  }
765 }
766 
767 BOOST_AUTO_TEST_CASE_TEMPLATE(sink_different_msgs_parallel, cxns_t, cxns_sink_types) {
768  using client_t=typename cxns_t::first_type;
769  using svr_t=typename cxns_t::second_type;
770  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
771 
772  const unsigned long num_loops=20000;
773 
774  no_latency_timestamps ts(0U);
775  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
776  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
777  {
778  svr_t svr(
779  boost::asio::ip::address(),
780  unused_port,
781  svr_t::socket_t::socket_priority::high,
782  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
784  proc_rules,
785  ts,
786  "svr" LIBJMMCG_ENQUOTE(__LINE__)
787  );
788  client_t skt(
789  msgs_t::min_msg_size,
790  msgs_t::max_msg_size,
791  timeout,
792  client_t::socket_t::socket_priority::high,
794  just_connect()
795  );
796 
797  auto send=[&skt]() {
798  auto send_batch_of_msgs1=[&skt]() {
799  typename msgs_t::message msg;
800  BOOST_CHECK(msg.is_valid());
801  for (unsigned long i=0; i<num_loops; ++i) {
802  msg.sequence=i;
803  skt.write(msg);
804  }
805  };
806  auto send_batch_of_msgs2=[&skt]() {
807  typename msgs_t::message msg;
808  BOOST_CHECK(msg.is_valid());
809  for (unsigned long i=0; i<num_loops; ++i) {
810  msg.sequence=i+num_loops;
811  skt.write(msg);
812  }
813  };
814  auto send_batch_of_hbs=[&skt]() {
815  typename msgs_t::heartbeat msg;
816  BOOST_CHECK(msg.is_valid());
817  for (unsigned long i=0; i<num_loops; ++i) {
818  msg.sequence=i;
819  skt.write(msg);
820  }
821  };
822  {
823  ppd::jthread send_msg1(std::bind(std::move(send_batch_of_msgs1)));
824  ppd::jthread send_msg2(std::bind(std::move(send_batch_of_msgs2)));
825  ppd::jthread send_hb(std::bind(std::move(send_batch_of_hbs)));
826  }
827  };
828 
829  BOOST_CHECK_NO_THROW(send());
830  BOOST_CHECK_EQUAL(msg_ctrs->recv, 3*num_loops);
831  }
832 }
833 
834 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_msgs_parallel, cxns_t, cxns_reflect_types) {
835  using client_t=typename cxns_t::first_type;
836  using svr_t=typename cxns_t::second_type;
837  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
838 
839  static constexpr unsigned long num_loops=2000;
840 
841  no_latency_timestamps ts(0U);
842  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
843  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
844  {
845  svr_t svr(
846  boost::asio::ip::address(),
847  unused_port,
848  svr_t::socket_t::socket_priority::high,
849  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
851  proc_rules,
852  ts,
853  "svr" LIBJMMCG_ENQUOTE(__LINE__)
854  );
855  client_t skt(
856  msgs_t::min_msg_size,
857  msgs_t::max_msg_size,
858  timeout,
859  client_t::socket_t::socket_priority::high,
861  just_connect()
862  );
863 
864  auto send_and_receive=[&skt]() {
865  auto send_batch_of_msgs=[&skt]() {
866  typename msgs_t::message msg;
867  BOOST_CHECK(msg.is_valid());
868  for (unsigned long i=0; i<num_loops; ++i) {
869  msg.sequence=i;
870  skt.write(msg);
871  }
872  };
873  auto receive_batch_of_msgs=[&skt]() {
874  typename msgs_t::message response;
875  for (unsigned long i=0; i<num_loops; ++i) {
876  skt.read(response);
877  }
878  BOOST_CHECK(response.is_valid());
879  BOOST_CHECK_EQUAL(response.sequence, num_loops-1);
880  };
881  {
882  ppd::jthread send(std::bind(std::move(send_batch_of_msgs)));
883  ppd::jthread receive(std::bind(std::move(receive_batch_of_msgs)));
884  }
885  };
886 
887  BOOST_CHECK_NO_THROW(send_and_receive());
888  BOOST_CHECK_EQUAL(msg_ctrs->recv, num_loops);
889  }
890  BOOST_CHECK_EQUAL(msg_ctrs->send, num_loops);
891 }
892 
893 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_different_msgs_parallel, cxns_t, cxns_reflect_types) {
894  using client_t=typename cxns_t::first_type;
895  using svr_t=typename cxns_t::second_type;
896  using msgs_t=typename svr_t::proc_rules_t::msg_details_t;
897 
898 #ifdef JMMCG_PERFORMANCE_TESTS
899  const unsigned long num_loops=1000;
900  const unsigned short loops_for_conv=200;
901 #else
902  const unsigned long num_loops=200;
903  const unsigned short loops_for_conv=1;
904 #endif
905  const double perc_conv_estimate=0.1;
906 
907 #ifdef JMMCG_PERFORMANCE_TESTS
908  latency_timestamps ts{2*3*num_loops*loops_for_conv};
909 #else
910  no_latency_timestamps ts{0U};
911 #endif
912  std::shared_ptr<num_msgs> msg_ctrs(new num_msgs);
913  typename svr_t::proc_rules_t proc_rules(msg_ctrs);
914  {
915  svr_t svr(
916  boost::asio::ip::address(),
917  unused_port,
918  svr_t::socket_t::socket_priority::high,
919  api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
921  proc_rules,
922  ts,
923  "svr" LIBJMMCG_ENQUOTE(__LINE__)
924  );
925  client_t skt(
926  msgs_t::min_msg_size,
927  msgs_t::max_msg_size,
928  timeout,
929  client_t::socket_t::socket_priority::high,
931  just_connect()
932  );
933 
934  auto send_and_receive=[&skt]() {
935  auto send_batch_of_msgs1=[&skt]() {
936  typename msgs_t::message msg;
937  BOOST_CHECK(msg.is_valid());
938  for (unsigned long i=0; i<num_loops; ++i) {
939  msg.sequence=i;
940  skt.write(msg);
941  }
942  };
943  auto send_batch_of_heartbeats=[&skt]() {
944  typename msgs_t::heartbeat msg;
945  BOOST_CHECK(msg.is_valid());
946  for (unsigned long i=0; i<num_loops; ++i) {
947  msg.sequence=i;
948  skt.write(msg);
949  }
950  };
951  auto receive_batch_of_msgs=[&skt]() {
952  typename msgs_t::msg_buffer_t::value_type response[msgs_t::max_msg_size];
953  typename msgs_t::Header_t const *hdr=reinterpret_cast<typename msgs_t::Header_t const *>(response);
954  for (unsigned long i=0; i<2*num_loops; ++i) {
955  skt.read(response);
956  }
957  BOOST_CHECK(hdr->is_valid());
958  BOOST_CHECK_EQUAL(hdr->sequence, num_loops-1);
959  };
960  const std::chrono::high_resolution_clock::time_point start_time=std::chrono::high_resolution_clock::now();
961  {
962  ppd::jthread send(std::bind(std::move(send_batch_of_msgs1)));
963  ppd::jthread send_heartbeats(std::bind(std::move(send_batch_of_heartbeats)));
964  ppd::jthread receive(std::bind(std::move(receive_batch_of_msgs)));
965  }
966  const std::chrono::high_resolution_clock::time_point end_time=std::chrono::high_resolution_clock::now();
967  return timed_results_t::value_type(static_cast<double>(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/num_loops);
968  };
969 
970  BOOST_CHECK_NO_THROW(send_and_receive());
971  BOOST_CHECK_EQUAL(msg_ctrs->recv, 2*num_loops);
972 
973  const std::pair<timed_results_t, bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
974  perc_conv_estimate,
975  loops_for_conv,
976  std::move(send_and_receive)
977  ));
978  std::cout<<svr_t::thread_t::thread_traits::demangle_name(typeid(client_t))<<"\n\tMessage round-trip out-of-order time (microseconds)="<<timed_results.first<<std::endl;
979 #ifdef JMMCG_PERFORMANCE_TESTS
980  BOOST_CHECK(!timed_results.second);
981  ts.write_to_named_csv_file(std::cout, "socket_server_reflect_different_msgs_parallel_performance_latencies");
982 #endif
983  }
984  BOOST_CHECK_EQUAL(msg_ctrs->send, 3*2*num_loops);
985 }
986 
987 BOOST_AUTO_TEST_SUITE_END()
988 
989 BOOST_AUTO_TEST_SUITE_END()