1 #ifndef LIBJMMCG_CORE_PRIVATE_THREAD_POOL_HPP
2 #define LIBJMMCG_CORE_PRIVATE_THREAD_POOL_HPP
23 #include "../../core/thread_pool_aspects.hpp"
86 using pool_traits_type=P;
89 using work_distribution_mode=DM;
90 using signalled_work_queue_type=
typename pool_traits_type::
template signalled_work_queue_type<
typename work_distribution_mode::queue_model>;
91 using queue_size_type=
typename signalled_work_queue_type::size_type;
94 using statistics_type=
typename pool_traits_type::
template statistics_type<
typename work_distribution_mode::queue_model>;
131 class FnType=
decltype(&
std::remove_reference<InpWk>::type::process)
158 template<
class InpWk>
171 pool.statistics_.added_work();
174 pool.statistics_.processed_vertical_work();
276 : pred(pr), colln(c) {
302 std::bind(
std::equal_to<
typename Colln::value_type>(),
typename Colln::value_type(),
std::placeholders::_1)
309 std::bind(
std::equal_to<
typename Colln::value_type>(),
typename Colln::value_type(),
std::placeholders::_1)
317 : base_t(c,
typename base_t::operation_type(
std::equal_to<
typename Colln::value_type>(), v,
std::placeholders::_1)) {
337 : pred(pr), colln(c) {
363 std::bind(
std::equal_to<
typename Colln::value_type>(),
typename Colln::value_type(),
std::placeholders::_1)
369 std::bind(
std::equal_to<
typename Colln::value_type>(),
typename Colln::value_type(),
std::placeholders::_1)
375 :
base_t(c,
typename base_t::operation_type(
std::equal_to<
typename Colln::value_type>(), v,
std::placeholders::_1)) {
396 : init_val(v), binop(op), colln(c) {
429 class Comp=
std::less<
typename Colln::value_type>
438 : colln(c), compare(comp) {
441 : colln(m.colln), compare(m.compare) {
452 const typename Colln::container_type::const_iterator i(
std::max_element(colln.colln().begin(), colln.colln().end(), compare));
463 class Comp=
std::less<
typename Colln::value_type>
472 : colln(c), compare(comp) {
475 : colln(m.colln), compare(m.compare) {
486 const typename Colln::container_type::const_iterator i(
std::min_element(colln.colln().begin(), colln.colln().end(), compare));
528 : pool_size_(sz), statistics_() {
562 static constexpr unsigned long __fastcall
573 static constexpr unsigned long __fastcall
597 statistics_.update_colln_stats(c.colln().size());
598 std::for_each(c.colln().begin(), c.colln().end(), fn);
599 return reduction_t(
typename reduction_t::operation_type());
615 typedef parallel_algorithm<
count_if_t<Colln, Pred> > reduction_t;
616 statistics_.update_colln_stats(c.colln().size());
617 return reduction_t(
typename reduction_t::operation_type(c, p));
632 count(Colln &c,
typename Colln::value_type
const &v)
const {
633 typedef parallel_algorithm<
count_t<Colln> > reduction_t;
634 statistics_.update_colln_stats(c.colln().size());
635 return reduction_t(
typename reduction_t::operation_type(c, v));
651 find_if(Colln
const &c, Pred
const &p)
const {
652 typedef parallel_algorithm<
find_if_t<Colln, Pred> > reduction_t;
653 statistics_.update_colln_stats(c.colln().size());
654 return reduction_t(
typename reduction_t::operation_type(c, p));
669 find(Colln
const &c,
typename Colln::value_type
const &v)
const {
670 typedef parallel_algorithm<
find_t<Colln> > reduction_t;
671 statistics_.update_colln_stats(c.colln().size());
672 return reduction_t(
typename reduction_t::operation_type(c, v));
690 transform(CollnIn
const &in, CollnOut &out, UniOp
const &op)
const {
693 statistics_.update_colln_stats(in.colln().size());
694 out.resize_noinit_nolk(in.size());
701 return reduction_t(
typename reduction_t::operation_type());
721 transform(CollnIn1
const &in1, CollnIn2
const &in2, CollnOut &out, BinOp
const &op)
const {
724 statistics_.update_colln_stats(in1.colln().size());
725 out.resize_noinit_nolk(in1.size());
733 return reduction_t(
typename reduction_t::operation_type());
748 copy(CollnIn
const &in, CollnOut &out)
const {
751 statistics_.update_colln_stats(in.colln().size());
752 out.resize_noinit_nolk(in.size());
758 return reduction_t(
typename reduction_t::operation_type());
775 accumulate(Colln
const &c,
typename BinOp::result_type
const &v, BinOp
const &binop)
const {
777 statistics_.update_colln_stats(c.colln().size());
778 return reduction_t(
typename reduction_t::operation_type(c, v, binop));
796 statistics_.update_colln_stats(c.colln().size());
797 return reduction_t(
typename reduction_t::operation_type(c, v));
811 fill_n(Colln &c,
typename Colln::size_type sz,
typename Colln::value_type
const &v)
const {
814 c.resize_noinit_nolk(sz);
815 statistics_.update_colln_stats(c.colln().size());
816 std::fill(c.colln().begin(), c.colln().end(), v);
817 return reduction_t(
typename reduction_t::operation_type());
830 fill(Colln &c,
typename Colln::value_type
const &v)
const {
833 statistics_.update_colln_stats(c.colln().size());
834 std::fill(c.colln().begin(), c.colln().end(), v);
835 return reduction_t(
typename reduction_t::operation_type());
850 statistics_.update_colln_stats(c.colln().size());
851 std::reverse(c.colln().begin(), c.colln().end());
852 return reduction_t(
typename reduction_t::operation_type());
869 typedef parallel_algorithm<
max_element_t<Colln, Comp>> reduction_t;
870 statistics_.update_colln_stats(c.colln().size());
871 return reduction_t(
typename reduction_t::operation_type(c, comp));
886 return max_element(c,
std::less<
typename Colln::value_type>());
903 typedef parallel_algorithm<
min_element_t<Colln, Comp>> reduction_t;
904 statistics_.update_colln_stats(c.colln().size());
905 return reduction_t(
typename reduction_t::operation_type(c, comp));
920 return min_element(c,
std::less<
typename Colln::value_type>());
936 sort(Colln &c, Compare
const &comp)
const {
939 statistics_.update_colln_stats(c.colln().size());
945 return reduction_t(
typename reduction_t::operation_type());
960 return sort(c,
std::less<
typename Colln::value_type>());
980 merge(CollnIn1
const &in1, CollnIn2
const &in2, CollnOut &out, Compare
const &comp)
const {
983 statistics_.update_colln_stats(in1.colln().size()+in2.colln().size());
984 out.colln().reserve(in1.colln().size()+in2.colln().size());
990 std::back_inserter(out.colln()),
994 assert((in1.colln().size()+in2.colln().size())==out.colln().size());
995 assert((in1.size()+in2.size())==out.size());
996 return reduction_t(
typename reduction_t::operation_type());
1014 merge(CollnIn1
const &in1, CollnIn2
const &in2, CollnOut &out)
const {
1015 return merge(in1, in2, out,
std::less<
typename CollnIn1::value_type>());
1027 return execution_context_stack<work_type>(*
this,
typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params(), work_type(
std::forward<ArgT>(a), op, *
this));
1037 binary_fun(LHSArg &&lhs, RHSArg &&rhs, BinFn
const &op=BinFn()) {
1040 return execution_context_stack<work_type>(*
this,
typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params(), work_type(
std::forward<LHSArg>(lhs),
std::forward<RHSArg>(rhs), op, *
this));
1049 return this->binary_fun<T, T,
std::logical_and<
bool>>(
std::forward<T>(lhs),
std::forward<T>(rhs));
1058 return this->binary_fun<T, T,
std::logical_or<
bool>>(
std::forward<T>(lhs),
std::forward<T>(rhs));
1120 template<
template<
class>
class Joinability,
class TPB,
typename TPB::priority_type Pri>
friend class priority_t;
1125 template<
class ExecCtx>
1126 typename ExecCtx::chk_argument_type
__fastcall FORCE_INLINE
1127 make_arg(
typename signalled_work_queue_type::value_type &&async_wk) {
1128 return ExecCtx::
template make_arg<
typename ExecCtx::result_type>(
1129 std::forward<
typename signalled_work_queue_type::value_type>(async_wk),
1135 mutable statistics_type statistics_;
1138 void __fastcall add_nonjoinable_work(
typename signalled_work_queue_type::value_type &&wk)
FORCE_INLINE {
1139 statistics_.added_work();
1140 wk->process_nonjoinable(
cfg_type::sequential_edge_annotation);
1141 statistics_.processed_vertical_work();
1143 typename signalled_work_queue_type::value_type
__fastcall add_joinable_work(
typename signalled_work_queue_type::value_type &&wk)
FORCE_INLINE {
1144 statistics_.added_work();
1145 wk->process_nonjoinable(
cfg_type::sequential_edge_annotation);
1146 statistics_.processed_vertical_work();