libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
exchange_to_client_processor_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2020 by J.M.McGuiness, isimud@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace isimud { namespace ISIMUD_VER_NAMESPACE { namespace exchanges { namespace common {
20 
21 template<class ClientCxnPtr, class EchgCxn>
22 class exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow final {
23 public:
24  template<class LatencyTimestamps>
25  flow(std::atomic<bool> &e, exchg_link_t &exchg_link, LatencyTimestamps &timestamps) noexcept(true);
26 
27  ~flow() noexcept(false);
28 
29  /// A client has connected so make sure the connection is passed to the exchg_to_client_thread, so it can forward messages as necessary. Also must be used to stop sending messages to that client.
30  /**
31  * When a client is disconnected, this must be done early to ensure that this class does not attempt to delete the contained socket, as that is actually owned by the boost::io_context and if not reset could cause a double-free error.
32  */
33  void connect_client(client_connection_t client_connection) noexcept(true);
34 
35  std::string to_string() const noexcept(false);
36 
37 private:
38  using thread_t=libjmmcg::ppd::jthread;
39 
40  std::atomic<bool> &exit_;
41  boost::exception_ptr client_err{};
42  exchg_link_t &exchg_link_;
43  client_connection_t client_connection_{};
44  /**
45  The thread that spins waiting for messages from the exchange to forward to the connected client.
46  */
47  thread_t exchg_to_client_thread;
48 
49  /**
50  * There may not be a client connected yet, but we still need to respond to heartbeats from the exchange.
51  */
52  template<class LatencyTimestamps>
53  void process_msgs(LatencyTimestamps &timestamps) noexcept(true);
54 };
55 
56 template<class ClientCxnPtr, class EchgCxn>
57 inline std::ostream &
58 operator<<(std::ostream &os, typename exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow const &s) noexcept(false) {
59  os<<s.to_string();
60  return os;
61 }
62 
63 template<class ClientCxnPtr, class EchgCxn>
64 template<class LatencyTimestamps> inline
65 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::exchange_to_client_processor(std::atomic<bool> &e, ctor_args const &exchange_cxn_details, socket_priority to_exchg_priority, proc_rules_t const &proc_rules, LatencyTimestamps &timestamps)
66 : exchg_link(exchange_cxn_details, to_exchg_priority, common::thread_traits::exchange_to_client_thread.core, proc_rules),
67  exchange_to_client(e, exchg_link, timestamps) {
68 }
69 
70 template<class ClientCxnPtr, class EchgCxn> inline bool
71 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::is_logged_on() const noexcept(true) {
72  return exchg_link.is_logged_on();
73 }
74 
75 template<class ClientCxnPtr, class EchgCxn> inline void
76 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::exchange_to_client_processor::connect_client(client_connection_t client_connection) noexcept(true) {
77  exchange_to_client.connect_client(client_connection);
78 }
79 
80 template<class ClientCxnPtr, class EchgCxn> inline typename exchange_to_client_processor<ClientCxnPtr, EchgCxn>::socket_t &
81 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::socket() noexcept(true) {
82  return exchg_link.socket();
83 }
84 
85 template<class ClientCxnPtr, class EchgCxn> inline std::string
86 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::to_string() const noexcept(false) {
87  std::ostringstream os;
88  os<<"exchange connection: "<<exchg_link
89  <<", exchange-to-client details: "<<exchange_to_client.to_string();
90  return os.str();
91 }
92 
93 template<class ClientCxnPtr, class EchgCxn>
94 inline std::ostream &
95 operator<<(std::ostream &os, exchange_to_client_processor<ClientCxnPtr, EchgCxn> const &s) noexcept(false) {
96  os<<s.to_string();
97  return os;
98 }
99 
100 template<class ClientCxnPtr, class EchgCxn>
101 template<class LatencyTimestamps> inline
102 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::flow(std::atomic<bool> &e, exchg_link_t &exchg_link, LatencyTimestamps &timestamps) noexcept(true)
103 : exit_(e),
104  exchg_link_(exchg_link),
105  exchg_to_client_thread(
106  [this, &timestamps]() {
107  this->process_msgs<LatencyTimestamps>(timestamps);
108  }
109  ) {
110  exchg_to_client_thread.kernel_priority(common::thread_traits::exchange_to_client_thread.priority);
111  exchg_to_client_thread.kernel_affinity(
112  typename thread_t::thread_traits::api_params_type::processor_mask_type(common::thread_traits::exchange_to_client_thread.core)
113  );
114  exchg_to_client_thread.set_name("exchg_to_client");
115 }
116 
117 template<class ClientCxnPtr, class EchgCxn> inline
118 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::~flow() noexcept(false) {
119  connect_client(client_connection_t{});
120  exit_=true;
121  if (client_err) {
122  boost::rethrow_exception(client_err);
123  }
124 }
125 
126 template<class ClientCxnPtr, class EchgCxn> inline void
127 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::connect_client(client_connection_t client_connection) noexcept(true) {
128  client_connection_=client_connection;
129  assert(!client_connection_ || client_connection_->socket().is_open());
130 }
131 
132 template<class ClientCxnPtr, class EchgCxn> inline std::string
133 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::to_string() const noexcept(false) {
134  std::ostringstream os;
135  os
136  <<", exchange-to-client core="<<common::thread_traits::exchange_to_client_thread
137  <<", client processing-error: '"<<client_err<<"'";
138  if (client_connection_.get()) {
139  os<<", client link: "<<client_connection_->socket();
140  }
141  return os.str();
142 }
143 
144 template<class ClientCxnPtr, class EchgCxn>
145 template<class LatencyTimestamps> inline void
146 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::process_msgs(LatencyTimestamps &timestamps) noexcept(true) {
147  try {
148  while (LIKELY(!static_cast<bool>(exit_))) {
149  try {
150  if (UNLIKELY(exchg_link_.read_and_process_a_msg(client_connection_, timestamps))) {
151  return;
152  }
153  } catch (...) {
154  // TODO Should really return a msg to the client indicating failed to translate, with details...
155  client_err=boost::current_exception();
156  }
157  }
158  } catch (...) {
159  client_err=boost::current_exception();
160  }
161 }
162 
163 } } } }