21 #define BOOST_TEST_MODULE libjmmcg_tests
22 #include <boost/test/included/unit_test.hpp>
24 #include <boost/mpl/list.hpp>
26 #include "core/ave_deviation_meter.hpp"
27 #include "core/jthread.hpp"
28 #include "core/latency_timestamps.hpp"
29 #include "core/max_min.hpp"
30 #include "core/socket_client_manager.hpp"
32 using namespace libjmmcg;
37 using api_threading_traits=
ppd::api_threading_traits<
ppd::platform_api,
ppd::heavyweight_threading>;
39 const unsigned short core{};
52 using namespace libisimud;
54 #include "core/socket_server.hpp"
74 constexpr std::size_t
length()
const noexcept(
true) {
78 constexpr bool is_valid()
const noexcept(
true);
79 }
__attribute__((packed));
89 }
__attribute__((packed));
102 }
__attribute__((packed));
104 min_msg_size=min<std::size_t,
sizeof(message),
sizeof(heartbeat)>::value,
105 max_msg_size=max<std::size_t,
sizeof(message),
sizeof(heartbeat)>::value,
126 }
__attribute__((packed));
140 }
__attribute__((packed));
157 }
__attribute__((packed));
159 min_msg_size=min<std::size_t,
sizeof(message),
sizeof(heartbeat)>::value,
160 max_msg_size=max<std::size_t,
sizeof(message),
sizeof(heartbeat)>::value,
180 const boost::
asio::
ip::tcp::endpoint endpoint(localhost, unused_port);
190 template<
class SktT,
class MsgsT>
193 using src_msg_details_t=MsgsT;
194 using msg_details_t=src_msg_details_t;
203 static constexpr void stop()
noexcept(
true) {}
214 using hdr_t=
typename msg_details_t::Header_t;
215 [[maybe_unused]] hdr_t
const &hdr=
reinterpret_cast<hdr_t
const &>(buff);
216 assert(hdr.is_valid());
222 template<
class SktT,
class MsgsT>
225 using src_msg_details_t=MsgsT;
226 using msg_details_t=src_msg_details_t;
235 static constexpr void stop()
noexcept(
true) {}
246 using hdr_t=
typename msg_details_t::Header_t;
247 [[maybe_unused]] hdr_t
const &hdr=
reinterpret_cast<hdr_t
const &>(buff);
248 assert(hdr.is_valid());
250 client_skt.write(buff);
252 assert(msg_ctrs->recv==msg_ctrs->send);
258 ppd::api_lock_traits<
ppd::platform_api,
ppd::heavyweight_threading>::critical_section_type::
lock_type
261 ppd::api_lock_traits<
ppd::platform_api,
ppd::heavyweight_threading>::critical_section_type::
lock_type
264 template<
template<
class,
class>
class ProcRules>
265 using cxns_types=boost::mpl::list<
269 ProcRules<asio_client_mgr_t::socket_t, msgs_t::static_size>,
270 socket::server_manager::loopback<
271 typename ProcRules<asio_client_mgr_t::socket_t, msgs_t::static_size>::heartbeats,
272 asio_client_mgr_t::socket_t
279 ProcRules<glibc_client_mgr_t::socket_t, msgs_t::static_size>,
280 socket::server_manager::loopback<
281 typename ProcRules<glibc_client_mgr_t::socket_t, msgs_t::static_size>::heartbeats,
282 glibc_client_mgr_t::socket_t
289 ProcRules<asio_client_mgr_t::socket_t, msgs_t::variable_size>,
290 socket::server_manager::loopback<
291 typename ProcRules<asio_client_mgr_t::socket_t, msgs_t::variable_size>::heartbeats,
292 asio_client_mgr_t::socket_t
299 ProcRules<glibc_client_mgr_t::socket_t, msgs_t::variable_size>,
300 socket::server_manager::loopback<
301 typename ProcRules<glibc_client_mgr_t::socket_t, msgs_t::variable_size>::heartbeats,
302 glibc_client_mgr_t::socket_t
308 using cxns_sink_types=cxns_types<sink>;
309 using cxns_reflect_types=cxns_types<simple_reflect>;
311 BOOST_AUTO_TEST_SUITE(socket_tests)
313 BOOST_AUTO_TEST_SUITE(server)
315 BOOST_AUTO_TEST_CASE_TEMPLATE(ctor, cxns_t, cxns_reflect_types) {
316 using svr_t=
typename cxns_t::second_type;
318 no_latency_timestamps ts
(0U
);
320 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
321 BOOST_CHECK_NO_THROW(
323 boost::asio::ip::address(),
325 svr_t::socket_t::socket_priority::high,
326 api_thread_traits::processor_mask_type(exchanges::common::thread_traits::exchange_simulator_thread.core),
327 exchanges::common::thread_traits::exchange_simulator_thread.priority,
335 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client, cxns_t, cxns_reflect_types) {
336 using client_t=
typename cxns_t::first_type;
337 using svr_t=
typename cxns_t::second_type;
338 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
340 no_latency_timestamps ts
(0U
);
342 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
346 svr_t::socket_t::socket_priority::high,
353 BOOST_CHECK_NO_THROW(
355 msgs_t::min_msg_size,
356 msgs_t::max_msg_size,
358 client_t::socket_t::socket_priority::high,
359 exchanges::common::thread_traits::exchange_simulator_thread.core,
365 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_two_successive_clients, cxns_t, cxns_reflect_types) {
366 using client_t=
typename cxns_t::first_type;
367 using svr_t=
typename cxns_t::second_type;
368 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
370 no_latency_timestamps ts
(0U
);
372 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
376 svr_t::socket_t::socket_priority::high,
384 BOOST_CHECK_NO_THROW(
386 msgs_t::min_msg_size,
387 msgs_t::max_msg_size,
389 client_t::socket_t::socket_priority::high,
390 exchanges::common::thread_traits::exchange_simulator_thread.core,
396 BOOST_CHECK_NO_THROW(
398 msgs_t::min_msg_size,
399 msgs_t::max_msg_size,
401 client_t::socket_t::socket_priority::high,
402 exchanges::common::thread_traits::exchange_simulator_thread.core,
409 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_two_simultaneous_clients, cxns_t, cxns_reflect_types) {
410 using client_t=
typename cxns_t::first_type;
411 using svr_t=
typename cxns_t::second_type;
412 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
414 no_latency_timestamps ts
(0U
);
416 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
420 svr_t::socket_t::socket_priority::high,
427 BOOST_CHECK_NO_THROW(
429 msgs_t::min_msg_size,
430 msgs_t::max_msg_size,
432 client_t::socket_t::socket_priority::high,
433 exchanges::common::thread_traits::exchange_simulator_thread.core,
437 msgs_t::min_msg_size,
438 msgs_t::max_msg_size,
440 client_t::socket_t::socket_priority::high,
441 exchanges::common::thread_traits::exchange_simulator_thread.core,
447 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_one_message, cxns_t, cxns_reflect_types) {
448 using client_t=
typename cxns_t::first_type;
449 using svr_t=
typename cxns_t::second_type;
450 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
452 no_latency_timestamps ts
(0U
);
454 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
459 svr_t::socket_t::socket_priority::high,
467 msgs_t::min_msg_size,
468 msgs_t::max_msg_size,
470 client_t::socket_t::socket_priority::high,
474 typename msgs_t::message msg;
475 BOOST_CHECK_NO_THROW(skt.write(msg));
476 typename msgs_t::message response;
477 BOOST_CHECK_NO_THROW(skt.read(response));
478 BOOST_CHECK_EQUAL(response.c, msg.c);
479 BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
481 BOOST_CHECK_EQUAL(msg_ctrs->send, 1);
484 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_one_message_reconnect_sequential, cxns_t, cxns_reflect_types) {
485 using client_t=
typename cxns_t::first_type;
486 using svr_t=
typename cxns_t::second_type;
487 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
489 no_latency_timestamps ts
(0U
);
491 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
496 svr_t::socket_t::socket_priority::high,
505 msgs_t::min_msg_size,
506 msgs_t::max_msg_size,
508 client_t::socket_t::socket_priority::high,
512 typename msgs_t::message msg;
513 BOOST_CHECK_NO_THROW(skt.write(msg));
514 typename msgs_t::message response;
515 BOOST_CHECK_NO_THROW(skt.read(response));
516 BOOST_CHECK_EQUAL(response.c, msg.c);
517 BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
521 msgs_t::min_msg_size,
522 msgs_t::max_msg_size,
524 client_t::socket_t::socket_priority::high,
528 typename msgs_t::message msg;
529 BOOST_CHECK_NO_THROW(skt.write(msg));
530 typename msgs_t::message response;
531 BOOST_CHECK_NO_THROW(skt.read(response));
532 BOOST_CHECK_EQUAL(response.c, msg.c);
533 BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
536 BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
539 BOOST_AUTO_TEST_CASE_TEMPLATE(single_server_and_client_two_messages, cxns_t, cxns_reflect_types) {
540 using client_t=
typename cxns_t::first_type;
541 using svr_t=
typename cxns_t::second_type;
542 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
544 no_latency_timestamps ts
(0U
);
546 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
551 svr_t::socket_t::socket_priority::high,
559 msgs_t::min_msg_size,
560 msgs_t::max_msg_size,
562 client_t::socket_t::socket_priority::high,
566 typename msgs_t::message msg1;
567 BOOST_CHECK_NO_THROW(skt.write(msg1));
568 typename msgs_t::message response1;
569 BOOST_CHECK_NO_THROW(skt.read(response1));
570 BOOST_CHECK_EQUAL(response1.c, msg1.c);
571 typename msgs_t::heartbeat msg2;
572 BOOST_CHECK_NO_THROW(skt.write(msg2));
573 typename msgs_t::heartbeat response2;
574 BOOST_CHECK_NO_THROW(skt.read(response2));
575 BOOST_CHECK_EQUAL(response2.c, msg2.c);
576 BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
578 BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
581 BOOST_AUTO_TEST_CASE_TEMPLATE(two_repeated_servers_and_clients, cxns_t, cxns_reflect_types) {
582 using client_t=
typename cxns_t::first_type;
583 using svr_t=
typename cxns_t::second_type;
584 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
586 no_latency_timestamps ts
(0U
);
588 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
593 svr_t::socket_t::socket_priority::high,
601 msgs_t::min_msg_size,
602 msgs_t::max_msg_size,
604 client_t::socket_t::socket_priority::high,
608 typename msgs_t::message msg;
609 BOOST_CHECK_NO_THROW(skt.write(msg));
610 typename msgs_t::message response;
611 BOOST_CHECK_NO_THROW(skt.read(response));
612 BOOST_CHECK_EQUAL(response.c, msg.c);
613 BOOST_CHECK_EQUAL(msg_ctrs->recv, 1);
615 BOOST_CHECK_EQUAL(msg_ctrs->send, 1);
620 svr_t::socket_t::socket_priority::high,
628 msgs_t::min_msg_size,
629 msgs_t::max_msg_size,
631 client_t::socket_t::socket_priority::high,
635 typename msgs_t::heartbeat msg;
636 BOOST_CHECK_NO_THROW(skt.write(msg));
637 typename msgs_t::heartbeat response;
638 BOOST_CHECK_NO_THROW(skt.read(response));
639 BOOST_CHECK_EQUAL(response.c, msg.c);
640 BOOST_CHECK_EQUAL(msg_ctrs->recv, 2);
642 BOOST_CHECK_EQUAL(msg_ctrs->send, 2);
645 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_msgs_at_a_time, cxns_t, cxns_reflect_types) {
646 using client_t=
typename cxns_t::first_type;
647 using svr_t=
typename cxns_t::second_type;
648 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
650 #ifdef JMMCG_PERFORMANCE_TESTS
651 const unsigned long num_loops=60000;
652 const unsigned short loops_for_conv=900;
654 const unsigned long num_loops=200;
655 const unsigned short loops_for_conv=1;
657 const double perc_conv_estimate=0.1;
659 #ifdef JMMCG_PERFORMANCE_TESTS
660 latency_timestamps ts{2*3*num_loops*loops_for_conv};
662 no_latency_timestamps ts
{0U
};
665 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
670 svr_t::socket_t::socket_priority::high,
678 msgs_t::min_msg_size,
679 msgs_t::max_msg_size,
681 client_t::socket_t::socket_priority::high,
686 auto send_and_receive_all=[&skt]() {
687 auto send_and_receive=[&skt]() {
688 const typename msgs_t::message msg1;
690 typename msgs_t::message response1;
692 BOOST_CHECK_EQUAL(response1.c, msg1.c);
695 const std::chrono::high_resolution_clock::time_point start_time=std::chrono::high_resolution_clock::now();
696 for (
unsigned long i=0; i<num_loops; ++i) {
699 const std::chrono::high_resolution_clock::time_point end_time=std::chrono::high_resolution_clock::now();
700 return timed_results_t::value_type(
static_cast<
double>(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/num_loops);
703 const std::pair<timed_results_t,
bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
706 std::move(send_and_receive_all)
708 std::cout<<svr_t::thread_t::thread_traits::demangle_name(
typeid(client_t))<<
"\n\tMessage round-trip in-of-order time (microseconds)="<<timed_results.first<<std::endl;
709 #ifdef JMMCG_PERFORMANCE_TESTS
710 BOOST_CHECK(!timed_results.second);
711 ts.write_to_named_csv_file(std::cout,
"socket_server_reflect_msgs_at_a_time_performance_latencies");
713 BOOST_CHECK_EQUAL(msg_ctrs->recv, 2*num_loops);
715 BOOST_CHECK_EQUAL(msg_ctrs->send, 2*num_loops);
718 BOOST_AUTO_TEST_CASE_TEMPLATE(sink_one_msg_repeatedly, cxns_t, cxns_sink_types) {
719 using client_t=
typename cxns_t::first_type;
720 using svr_t=
typename cxns_t::second_type;
721 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
723 const unsigned long num_loops=2000;
725 no_latency_timestamps ts
(0U
);
727 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
732 svr_t::socket_t::socket_priority::high,
740 msgs_t::min_msg_size,
741 msgs_t::max_msg_size,
743 client_t::socket_t::socket_priority::high,
749 auto send_batch_of_msgs=[&skt]() {
750 typename msgs_t::heartbeat msg;
751 BOOST_CHECK(msg.is_valid());
752 for (
unsigned long i=0; i<num_loops; ++i) {
758 ppd::jthread send(
std::bind(
std::move(send_batch_of_msgs)));
762 BOOST_CHECK_NO_THROW(send());
763 BOOST_CHECK_EQUAL(msg_ctrs->recv, num_loops);
767 BOOST_AUTO_TEST_CASE_TEMPLATE(sink_different_msgs_parallel, cxns_t, cxns_sink_types) {
768 using client_t=
typename cxns_t::first_type;
769 using svr_t=
typename cxns_t::second_type;
770 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
772 const unsigned long num_loops=20000;
774 no_latency_timestamps ts
(0U
);
776 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
781 svr_t::socket_t::socket_priority::high,
789 msgs_t::min_msg_size,
790 msgs_t::max_msg_size,
792 client_t::socket_t::socket_priority::high,
798 auto send_batch_of_msgs1=[&skt]() {
799 typename msgs_t::message msg;
800 BOOST_CHECK(msg.is_valid());
801 for (
unsigned long i=0; i<num_loops; ++i) {
806 auto send_batch_of_msgs2=[&skt]() {
807 typename msgs_t::message msg;
808 BOOST_CHECK(msg.is_valid());
809 for (
unsigned long i=0; i<num_loops; ++i) {
810 msg.sequence=i+num_loops;
814 auto send_batch_of_hbs=[&skt]() {
815 typename msgs_t::heartbeat msg;
816 BOOST_CHECK(msg.is_valid());
817 for (
unsigned long i=0; i<num_loops; ++i) {
823 ppd::jthread send_msg1(
std::bind(
std::move(send_batch_of_msgs1)));
824 ppd::jthread send_msg2(
std::bind(
std::move(send_batch_of_msgs2)));
825 ppd::jthread send_hb(
std::bind(
std::move(send_batch_of_hbs)));
829 BOOST_CHECK_NO_THROW(send());
830 BOOST_CHECK_EQUAL(msg_ctrs->recv, 3*num_loops);
834 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_msgs_parallel, cxns_t, cxns_reflect_types) {
835 using client_t=
typename cxns_t::first_type;
836 using svr_t=
typename cxns_t::second_type;
837 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
839 static constexpr unsigned long num_loops=2000;
841 no_latency_timestamps ts
(0U
);
843 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
848 svr_t::socket_t::socket_priority::high,
856 msgs_t::min_msg_size,
857 msgs_t::max_msg_size,
859 client_t::socket_t::socket_priority::high,
864 auto send_and_receive=[&skt]() {
865 auto send_batch_of_msgs=[&skt]() {
866 typename msgs_t::message msg;
867 BOOST_CHECK(msg.is_valid());
868 for (
unsigned long i=0; i<num_loops; ++i) {
873 auto receive_batch_of_msgs=[&skt]() {
874 typename msgs_t::message response;
875 for (
unsigned long i=0; i<num_loops; ++i) {
878 BOOST_CHECK(response.is_valid());
879 BOOST_CHECK_EQUAL(response.sequence, num_loops-1);
882 ppd::jthread send(
std::bind(
std::move(send_batch_of_msgs)));
883 ppd::jthread receive(
std::bind(
std::move(receive_batch_of_msgs)));
887 BOOST_CHECK_NO_THROW(send_and_receive());
888 BOOST_CHECK_EQUAL(msg_ctrs->recv, num_loops);
890 BOOST_CHECK_EQUAL(msg_ctrs->send, num_loops);
893 BOOST_AUTO_TEST_CASE_TEMPLATE(reflect_different_msgs_parallel, cxns_t, cxns_reflect_types) {
894 using client_t=
typename cxns_t::first_type;
895 using svr_t=
typename cxns_t::second_type;
896 using msgs_t=
typename svr_t::proc_rules_t::msg_details_t;
898 #ifdef JMMCG_PERFORMANCE_TESTS
899 const unsigned long num_loops=1000;
900 const unsigned short loops_for_conv=200;
902 const unsigned long num_loops=200;
903 const unsigned short loops_for_conv=1;
905 const double perc_conv_estimate=0.1;
907 #ifdef JMMCG_PERFORMANCE_TESTS
908 latency_timestamps ts{2*3*num_loops*loops_for_conv};
910 no_latency_timestamps ts
{0U
};
913 typename svr_t::proc_rules_t proc_rules(msg_ctrs);
918 svr_t::socket_t::socket_priority::high,
926 msgs_t::min_msg_size,
927 msgs_t::max_msg_size,
929 client_t::socket_t::socket_priority::high,
934 auto send_and_receive=[&skt]() {
935 auto send_batch_of_msgs1=[&skt]() {
936 typename msgs_t::message msg;
937 BOOST_CHECK(msg.is_valid());
938 for (
unsigned long i=0; i<num_loops; ++i) {
943 auto send_batch_of_heartbeats=[&skt]() {
944 typename msgs_t::heartbeat msg;
945 BOOST_CHECK(msg.is_valid());
946 for (
unsigned long i=0; i<num_loops; ++i) {
951 auto receive_batch_of_msgs=[&skt]() {
952 typename msgs_t::msg_buffer_t::value_type response[msgs_t::max_msg_size];
953 typename msgs_t::Header_t
const *hdr=
reinterpret_cast<
typename msgs_t::Header_t
const *>(response);
954 for (
unsigned long i=0; i<2*num_loops; ++i) {
957 BOOST_CHECK(hdr->is_valid());
958 BOOST_CHECK_EQUAL(hdr->sequence, num_loops-1);
960 const std::chrono::high_resolution_clock::time_point start_time=std::chrono::high_resolution_clock::now();
962 ppd::jthread send(
std::bind(
std::move(send_batch_of_msgs1)));
963 ppd::jthread send_heartbeats(
std::bind(
std::move(send_batch_of_heartbeats)));
964 ppd::jthread receive(
std::bind(
std::move(receive_batch_of_msgs)));
966 const std::chrono::high_resolution_clock::time_point end_time=std::chrono::high_resolution_clock::now();
967 return timed_results_t::value_type(
static_cast<
double>(std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count())/num_loops);
970 BOOST_CHECK_NO_THROW(send_and_receive());
971 BOOST_CHECK_EQUAL(msg_ctrs->recv, 2*num_loops);
973 const std::pair<timed_results_t,
bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
976 std::move(send_and_receive)
978 std::cout<<svr_t::thread_t::thread_traits::demangle_name(
typeid(client_t))<<
"\n\tMessage round-trip out-of-order time (microseconds)="<<timed_results.first<<std::endl;
979 #ifdef JMMCG_PERFORMANCE_TESTS
980 BOOST_CHECK(!timed_results.second);
981 ts.write_to_named_csv_file(std::cout,
"socket_server_reflect_different_msgs_parallel_performance_latencies");
984 BOOST_CHECK_EQUAL(msg_ctrs->send, 3*2*num_loops);
987 BOOST_AUTO_TEST_SUITE_END()
989 BOOST_AUTO_TEST_SUITE_END()