21 template<
class ClientCxnPtr,
class EchgCxn>
22 class exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow
final {
24 template<
class LatencyTimestamps>
25 flow(
std::atomic<
bool> &e, exchg_link_t &exchg_link, LatencyTimestamps ×tamps)
noexcept(
true);
27 ~
flow()
noexcept(
false);
33 void connect_client(client_connection_t client_connection)
noexcept(
true);
38 using thread_t=libjmmcg::
ppd::jthread;
40 std::atomic<
bool> &exit_;
41 boost::exception_ptr client_err{};
42 exchg_link_t &exchg_link_;
43 client_connection_t client_connection_{};
47 thread_t exchg_to_client_thread;
52 template<
class LatencyTimestamps>
53 void process_msgs(LatencyTimestamps ×tamps)
noexcept(
true);
56 template<
class ClientCxnPtr,
class EchgCxn>
58 operator<<(
std::ostream &os,
typename exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow
const &s)
noexcept(
false) {
63 template<
class ClientCxnPtr,
class EchgCxn>
64 template<
class LatencyTimestamps>
inline
65 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::
exchange_to_client_processor(
std::atomic<
bool> &e, ctor_args
const &exchange_cxn_details, socket_priority to_exchg_priority, proc_rules_t
const &proc_rules, LatencyTimestamps ×tamps)
67 exchange_to_client(e, exchg_link, timestamps) {
70 template<
class ClientCxnPtr,
class EchgCxn>
inline bool
71 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::
is_logged_on()
const noexcept(
true) {
72 return exchg_link.is_logged_on();
75 template<
class ClientCxnPtr,
class EchgCxn>
inline void
76 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::exchange_to_client_processor::
connect_client(client_connection_t client_connection)
noexcept(
true) {
77 exchange_to_client.connect_client(client_connection);
80 template<
class ClientCxnPtr,
class EchgCxn>
inline typename exchange_to_client_processor<ClientCxnPtr, EchgCxn>::socket_t &
81 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::
socket()
noexcept(
true) {
82 return exchg_link.socket();
85 template<
class ClientCxnPtr,
class EchgCxn>
inline std::string
86 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::
to_string()
const noexcept(
false) {
87 std::ostringstream os;
88 os<<
"exchange connection: "<<exchg_link
89 <<
", exchange-to-client details: "<<exchange_to_client.to_string();
93 template<
class ClientCxnPtr,
class EchgCxn>
95 operator<<(
std::ostream &os, exchange_to_client_processor<ClientCxnPtr, EchgCxn>
const &s)
noexcept(
false) {
100 template<
class ClientCxnPtr,
class EchgCxn>
101 template<
class LatencyTimestamps>
inline
102 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::
flow(
std::atomic<
bool> &e, exchg_link_t &exchg_link, LatencyTimestamps ×tamps)
noexcept(
true)
104 exchg_link_(exchg_link),
105 exchg_to_client_thread(
106 [
this, ×tamps]() {
107 this->process_msgs<LatencyTimestamps>(timestamps);
111 exchg_to_client_thread.kernel_affinity(
112 typename thread_t::thread_traits::api_params_type::processor_mask_type(common::thread_traits::exchange_to_client_thread.core)
114 exchg_to_client_thread
.set_name("exchg_to_client");
117 template<
class ClientCxnPtr,
class EchgCxn>
inline
118 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::~
flow()
noexcept(
false) {
122 boost::rethrow_exception(client_err);
126 template<
class ClientCxnPtr,
class EchgCxn>
inline void
127 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::
connect_client(client_connection_t client_connection)
noexcept(
true) {
128 client_connection_=client_connection;
129 assert(!client_connection_ || client_connection_->socket().is_open());
132 template<
class ClientCxnPtr,
class EchgCxn>
inline std::string
133 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::
to_string()
const noexcept(
false) {
134 std::ostringstream os;
136 <<
", exchange-to-client core="<<common::thread_traits::exchange_to_client_thread
137 <<
", client processing-error: '"<<client_err<<
"'";
138 if (client_connection_.get()) {
139 os<<
", client link: "<<client_connection_->socket();
144 template<
class ClientCxnPtr,
class EchgCxn>
145 template<
class LatencyTimestamps>
inline void
146 exchange_to_client_processor<ClientCxnPtr, EchgCxn>::flow::process_msgs(LatencyTimestamps ×tamps)
noexcept(
true) {
148 while (
LIKELY(!
static_cast<
bool>(exit_))) {
150 if (
UNLIKELY(exchg_link_.read_and_process_a_msg(client_connection_, timestamps))) {
155 client_err=boost::current_exception();
159 client_err=boost::current_exception();