24 std::ostringstream ss;
31 operator<<(
std::ostream &os,
typename manager<LkT>::session
const &ec)
noexcept(
false) {
38 manager<LkT>::
manager(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short timeout, socket_priority priority,
std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
39 :
acceptor(addr.to_string().c_str(), port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu),
58 acceptor.set_options(skt);
70 std::ostringstream ss;
77 operator<<(
std::ostream &os,
manager<LkT>
const &ec)
noexcept(
false) {
82 template<
class SvrHBs,
class LkT>
83 class loopback<SvrHBs, LkT>::send_heartbeats
final :
public manager<LkT>::session {
85 using base_t=
typename manager<LkT>::session;
89 : base_t(accept_socket) {
96 static typename session::
ptr_type make(
typename socket_t::socket_type accept_socket)
noexcept(
false) {
97 return std::make_shared<send_heartbeats>(accept_socket);
101 assert(!heatbeating);
102 heatbeating.reset(
new heartbeats_t(
this->socket_));
113 std::unique_ptr<heartbeats_t> heatbeating;
116 template<
class SvrHBs,
class LkT>
118 loopback<SvrHBs, LkT>::
loopback(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short, socket_priority priority,
std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
119 : base_t(addr, port_num, min_message_size, max_message_size, (heartbeats_t::max_missed_heartbeats*heartbeats_t::heartbeat_interval).count(), priority, incoming_cpu,
std::move(server_to_client_flow)) {
123 template<
class RecvProcMsgs>
inline void
125 this->acceptor.async_accept(
126 [
this, proc_fn](
typename socket_t::socket_type accept_socket) {
129 typename base_t::session::ptr_type client_connection(send_heartbeats::make(accept_socket));
130 this->set_options(client_connection->socket());
131 assert(client_connection.get());
132 assert(client_connection->socket().is_open());
133 this->server_to_client_flow_(client_connection);
134 exit=client_connection->process(proc_fn);
135 this->server_to_client_flow_(
typename base_t::session::ptr_type{});
138 start_accept(proc_fn);
146 forwarding<LkT>::
forwarding(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short timeout, socket_priority priority,
std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow, socket_t &dest)
147 : base_t(addr, port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu,
std::move(server_to_client_flow)), dest_socket_(dest) {
151 template<
class RecvProcMsgs>
inline void
153 this->acceptor.async_accept(
154 [
this, proc_fn](
typename socket_t::socket_type accept_socket) {
157 typename base_t::session::ptr_type client_connection(base_t::session::make(accept_socket));
158 assert(client_connection.get());
159 this->set_options(client_connection->socket());
160 this->set_options(dest_socket_);
161 assert(client_connection->socket().is_open());
162 this->server_to_client_flow_(client_connection);
163 exit=client_connection->process(proc_fn, dest_socket_);
164 this->server_to_client_flow_(
typename base_t::session::ptr_type{});
167 start_accept(proc_fn);