24 std::ostringstream ss;
31 operator<<(
std::ostream &os,
typename manager<LkT>::session
const &ec)
noexcept(
false) {
38 manager<LkT>::
manager(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t ,
std::size_t max_message_size,
unsigned short timeout, socket_priority ,
std::size_t , server_to_client_flow_t &&server_to_client_flow)
42 acceptor.set_option(boost::asio::socket_base::send_buffer_size(max_message_size));
44 acceptor.set_option(boost::asio::socket_base::receive_buffer_size(max_message_size));
45 acceptor.set_option(boost::asio::socket_base::linger(
true, timeout));
46 acceptor.set_option(boost::asio::ip::tcp::no_delay(
true));
47 acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(
true));
60 while (!io_context.stopped()) {
61 thread_traits::sleep(0);
68 boost::
asio::socket_base::send_buffer_size send_buffer_size;
69 acceptor.get_option(send_buffer_size);
70 boost::
asio::socket_base::receive_buffer_size receive_buffer_size;
71 acceptor.get_option(receive_buffer_size);
72 boost::
asio::socket_base::linger linger_details;
73 acceptor.get_option(linger_details);
74 skt.set_options(1, std::max(send_buffer_size.value(), receive_buffer_size.value()), linger_details.timeout(), socket_t::socket_priority::low, 0);
76 acceptor.get_option(no_delay);
88 std::ostringstream ss;
95 operator<<(
std::ostream &os,
manager<LkT>
const &ec)
noexcept(
false) {
100 template<
class SvrHBs,
class LkT>
104 using session=base_t;
107 : base_t(
std::move(socket)) {
115 return std::make_shared<send_heartbeats>(
std::move(socket));
119 assert(!heatbeating);
120 heatbeating.reset(
new heartbeats_t(
this->socket_));
131 std::unique_ptr<heartbeats_t> heatbeating;
134 template<
class SvrHBs,
class LkT>
136 loopback<SvrHBs, LkT>::
loopback(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short, socket_priority priority,
std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow)
137 : base_t(addr, port_num, min_message_size, max_message_size, (heartbeats_t::max_missed_heartbeats*heartbeats_t::heartbeat_interval).count(), priority, incoming_cpu,
std::move(server_to_client_flow)) {
141 template<
class RecvProcMsgs>
inline void
143 this->acceptor.async_accept(
144 [
this, proc_fn](
boost::system::error_code
const &error,
boost::
asio::
ip::tcp::socket &&client_conn) {
147 typename base_t::session::ptr_type client_connection(send_heartbeats::make(
std::move(client_conn)));
148 assert(client_connection.get());
149 this->set_options(client_connection->socket());
150 assert(client_connection->socket().is_open());
151 this->server_to_client_flow_(client_connection);
152 exit=client_connection->process(proc_fn);
153 this->server_to_client_flow_(
typename base_t::session::ptr_type{});
156 start_accept(proc_fn);
162 template<
class LkT>
inline
163 forwarding<LkT>::
forwarding(
boost::
asio::
ip::address
const &addr,
unsigned short port_num,
std::size_t min_message_size,
std::size_t max_message_size,
unsigned short timeout, socket_priority priority,
std::size_t incoming_cpu, server_to_client_flow_t &&server_to_client_flow, socket_t &dest)
164 : base_t(addr, port_num, min_message_size, max_message_size, timeout, priority, incoming_cpu,
std::move(server_to_client_flow)), dest_socket_(dest) {
168 template<
class RecvProcMsgs>
inline void
170 this->acceptor.async_accept(
171 [
this, proc_fn](
boost::system::error_code
const &error,
boost::
asio::
ip::tcp::socket &&client_conn) {
174 typename base_t::session::ptr_type client_connection(base_t::session::make(
std::move(client_conn)));
175 assert(client_connection.get());
176 this->set_options(client_connection->socket());
177 this->set_options(dest_socket_);
178 assert(client_connection->socket().is_open());
179 this->server_to_client_flow_(client_connection);
180 exit=client_connection->process(proc_fn, dest_socket_);
181 this->server_to_client_flow_(
typename base_t::session::ptr_type{});
184 start_accept(proc_fn);