1 #ifndef LIBJMMCG_CORE_THREAD_POOL_WORKERS_HPP
2 #define LIBJMMCG_CORE_THREAD_POOL_WORKERS_HPP
21 #include "private_/fixed_threads_container.hpp"
22 #include "private_/thread_pool_queue_model.hpp"
23 #include "private_/pool_thread.hpp"
32 using statistics_type=S;
33 using result_type=
typename statistics_type::ave_stats_type;
36 struct agg_vert_stats {
39 operator()(result_type acc, V
const &v)
const noexcept(
true) {
40 return acc.update(v.statistics().total_vertical_work().total());
43 struct agg_hrz_stats {
46 operator()(result_type acc, V
const &v)
const noexcept(
true) {
47 return acc.update(v.statistics().total_hrz_work().total());
55 return std::accumulate(
65 return std::accumulate(
77 using result_type=
typename statistics_type::ave_stats_type;
101 :
public private_::thread_pool_queue_model<
102 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
103 pool_traits::size_mode_t::fixed_size,
105 private_::fixed_pool_of_threads<
107 pool::private_::thread_types::steal<
109 typename PTT::os_traits,
111 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
116 using base_t=private_::thread_pool_queue_model<
117 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
118 pool_traits::size_mode_t::fixed_size,
120 private_::fixed_pool_of_threads<
122 pool::private_::thread_types::steal<
124 typename PTT::os_traits,
126 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
130 using pool_traits_type=
typename base_t::pool_traits_type;
131 using os_traits=
typename base_t::os_traits;
132 using thread_traits=
typename base_t::thread_traits;
133 using api_params_type=
typename base_t::api_params_type;
134 using pool_type=
typename base_t::pool_type;
135 using statistics_type=
typename base_t::statistics_type;
136 using work_distribution_mode=
typename base_t::work_distribution_mode;
137 using signalled_work_queue_type=
typename base_t::signalled_work_queue_type;
138 using GSSk_batching_type=
typename base_t::GSSk_batching_type;
140 BOOST_MPL_ASSERT((
std::is_same<
typename std::is_same<
typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type,
std::false_type>));
154 assert(
this->max_num_threads_in_pool>0);
155 if (!
this->max_num_threads_in_pool) {
156 throw typename base_t::exception_type(
157 _T(
"Cannot have an empty thread pool."),
162 info::function::argument(
163 _T(
"const typename pool_traits_type::pool_type::size_type max_num_threads"),
164 tostring(num_threads)
188 template<
typename ExecT>
194 ret=(
this->signalled_work_queue.erase(ec.wk_queue_item()) ?
erase_states::erased_successfully :
erase_states::ignoring_result);
206 using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
208 statistics_type stats(batch_details.statistics());
209 stats.add_vertical_work(acc_t::vertical_work(
this->pool));
210 stats.add_hrz_work(acc_t::hrz_work(
this->pool));
215 this->exit_requested().set(pool_traits_type::
template exit_requested_type<
typename work_distribution_mode::queue_model>::states::exit_requested);
220 this->signalled_work_queue.clear();
225 GSSk_batching_type batch_details;
227 statistics_type &
__fastcall set_statistics()
noexcept(
true)
override FORCE_INLINE {
228 return batch_details.statistics();
231 bool __fastcall add_work_to_batch(
const typename thread_traits::api_params_type::tid_type tid,
typename signalled_work_queue_type::value_type &&wk)
noexcept(
true)
override FORCE_INLINE {
232 return batch_details.add_work_to_batch(
this->pool, tid, std::forward<
typename signalled_work_queue_type::value_type>(wk));
238 void __fastcall add_nonjoinable_work(
typename signalled_work_queue_type::value_type &&wk)
noexcept(
false)
override FORCE_INLINE {
240 thread_traits::sleep(0);
241 this->signalled_work_queue.push_back(std::forward<
typename signalled_work_queue_type::value_type>(wk));
243 os_traits::thread_traits::sleep(0);
244 batch_details.statistics().update_max_queue_len(
this->queue_size());
246 batch_details.statistics().added_work();
252 typename signalled_work_queue_type::value_type
__fastcall add_joinable_work(
typename signalled_work_queue_type::value_type &&wk)
noexcept(
false)
override FORCE_INLINE {
254 thread_traits::sleep(0);
255 this->signalled_work_queue.push_back(std::forward<
typename signalled_work_queue_type::value_type>(wk));
257 os_traits::thread_traits::sleep(0);
258 batch_details.statistics().update_max_queue_len(
this->queue_size());
260 batch_details.statistics().added_work();
261 return std::move(wk);
264 bool __fastcall process_a_batch_item(
const typename thread_traits::api_params_type::tid_type tid,
typename os_traits::thread_exception
const &ex)
noexcept(
false)
override FORCE_INLINE {
265 return batch_details.process_a_batch_item(
this->pool, tid, ex);
278 :
public private_::thread_pool_queue_model<
279 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
280 pool_traits::size_mode_t::tracks_to_max,
282 private_::fixed_pool_of_threads<
284 pool::private_::thread_types::steal<
286 typename PTT::os_traits,
288 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
293 using base_t=private_::thread_pool_queue_model<
294 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
295 pool_traits::size_mode_t::tracks_to_max,
297 private_::fixed_pool_of_threads<
299 pool::private_::thread_types::steal<
301 typename PTT::os_traits,
303 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
307 using pool_traits_type=
typename base_t::pool_traits_type;
308 using os_traits=
typename base_t::os_traits;
309 using thread_traits=
typename base_t::thread_traits;
310 using api_params_type=
typename base_t::api_params_type;
311 using pool_type=
typename base_t::pool_type;
312 using GSSk_batching_type=
typename base_t::GSSk_batching_type;
313 using statistics_type=
typename base_t::statistics_type;
314 using work_distribution_mode=
typename base_t::work_distribution_mode;
315 using signalled_work_queue_type=
typename base_t::signalled_work_queue_type;
317 BOOST_MPL_ASSERT((
std::is_same<
typename std::is_same<
typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type,
std::false_type>));
370 template<
typename ExecT_>
376 ret=(
this->signalled_work_queue.erase(ec.wk_queue_item()) ?
erase_states::erased_successfully :
erase_states::ignoring_result);
388 using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
390 statistics_type stats(batch_details.statistics());
391 stats.processed_vertical_work(acc_t::vertical_work(
this->pool));
392 stats.processed_hrz_work(acc_t::hrz_work(
this->pool));
397 this->signalled_work_queue.clear();
398 this->exit_requested().set(pool_traits_type::
template exit_requested_type<
typename work_distribution_mode::queue_model>::states::exit_requested);
405 GSSk_batching_type batch_details;
407 statistics_type &
__fastcall set_statistics()
noexcept(
true)
FORCE_INLINE {
408 return batch_details.statistics();
431 bool __fastcall process_a_batch_item(
const typename thread_traits::api_params_type::tid_type tid,
typename os_traits::thread_exception
const &exception_thrown_in_thread)
FORCE_INLINE {
432 return batch_details.process_a_batch_item(
this->pool, tid, exception_thrown_in_thread);
444 :
public private_::thread_pool_queue_model<
445 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>,
446 pool_traits::size_mode_t::fixed_size,
448 private_::fixed_pool_of_threads<
450 pool::private_::thread_types::steal<
452 typename PTT::os_traits,
454 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::queue_model
459 using base_t=private_::thread_pool_queue_model<
460 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>,
461 pool_traits::size_mode_t::fixed_size,
463 private_::fixed_pool_of_threads<
465 pool::private_::thread_types::steal<
467 typename PTT::os_traits,
469 pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::queue_model
473 using pool_traits_type=
typename base_t::pool_traits_type;
474 using os_traits=
typename base_t::os_traits;
475 using thread_traits=
typename base_t::thread_traits;
476 using api_params_type=
typename base_t::api_params_type;
477 using pool_type=
typename base_t::pool_type;
478 using statistics_type=
typename base_t::statistics_type;
479 using work_distribution_mode=
typename base_t::work_distribution_mode;
480 using signalled_work_queue_type=
typename base_t::signalled_work_queue_type;
482 BOOST_MPL_ASSERT((
std::is_same<
typename std::is_same<
typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type,
std::false_type>));
496 assert(
this->max_num_threads_in_pool>0);
497 if (!
this->max_num_threads_in_pool) {
498 throw typename base_t::exception_type(
499 _T(
"Cannot have an empty thread pool."),
504 info::function::argument(
505 _T(
"const typename pool_traits_type::pool_type::size_type max_num_threads"),
506 tostring(num_threads)
526 template<
typename ExecT>
536 using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
538 statistics_type stats(statistics_);
539 stats.add_vertical_work(acc_t::vertical_work(
this->pool));
540 stats.add_hrz_work(acc_t::hrz_work(
this->pool));
545 this->exit_requested().set(pool_traits_type::
template exit_requested_type<
typename work_distribution_mode::queue_model>::states::exit_requested);
550 this->main_queue.clear();
555 signalled_work_queue_type main_queue;
556 statistics_type statistics_;
558 statistics_type &
__fastcall set_statistics()
noexcept(
true)
override FORCE_INLINE {
562 bool __fastcall add_work_to_batch(
const typename thread_traits::api_params_type::tid_type,
typename signalled_work_queue_type::value_type &&wk)
noexcept(
true)
override FORCE_INLINE {
563 main_queue.push_front(std::forward<
typename signalled_work_queue_type::value_type>(wk));
569 void __fastcall add_nonjoinable_work(
typename signalled_work_queue_type::value_type &&wk)
noexcept(
false)
override FORCE_INLINE {
570 this->pool.first_thread().push_front(std::forward<
typename signalled_work_queue_type::value_type>(wk));
571 statistics_.added_work();
577 typename signalled_work_queue_type::value_type
__fastcall add_joinable_work(
typename signalled_work_queue_type::value_type &&wk)
noexcept(
false)
override FORCE_INLINE {
578 this->pool.first_thread().push_front(std::forward<
typename signalled_work_queue_type::value_type>(wk));
579 statistics_.added_work();
580 return std::move(wk);
583 bool __fastcall process_a_batch_item(
const typename thread_traits::api_params_type::tid_type tid)
noexcept(
false)
FORCE_INLINE {
584 return main_queue.process_a_batch_item(
this->pool, tid);