1 #ifndef LIBJMMCG_CORE_PRIVATE_THREAD_CLIENT_CONTEXT_HPP
2 #define LIBJMMCG_CORE_PRIVATE_THREAD_CLIENT_CONTEXT_HPP
21 #include "../../core/dynamic_cast.hpp"
22 #include "../../core/non_allocatable.hpp"
23 #include "../../core/thread_api_traits.hpp"
25 #include "../../core/thread_wrapper.hpp"
26 #include "../../core/exception.hpp"
42 template<
class ParAlg>
58 typename TT::api_params_type::priority_type new_priority
73 : thread(thr), orig_pri(
thread_traits::get_kernel_priority(thread)) {
82 const typename thread_traits::api_params_type::handle_type thread;
107 static constexpr unsigned long GSSk=GSSkSz;
136 template<
class UpdStats>
141 work.process_the_work(
std::forward<UpdStats>(update_stats), e_details);
155 #pragma GCC diagnostic push
157 #pragma GCC diagnostic ignored "-Wmissing-braces"
168 #pragma GCC diagnostic pop
176 signalled_work_queue.have_work.add();
178 if (!signalled_work_queue.colln().empty()) {
229 static constexpr unsigned long GSSk=1UL;
247 return !current_work;
254 current_work=signalled_work_queue.pop_front_1_nochk_nosig();
257 template<
class UpdStats>
261 work.process_the_work(
std::forward<UpdStats>(update_stats), e_details);
266 : statistics_(), current_work() {}
358 return this->work_complete().try_lock()==
os_traits::lock_traits::atom_set;
365 this->work_complete().lock();
369 template<
class TPB,
template<
class>
class Del,
template<
class>
class AtCtr>
394 const typename os_traits::lock_traits::critical_section_type::write_lock_type lock(erase_lock, os_traits::lock_traits::infinite_timeout());
395 if (waiting.try_lock()==os_traits::lock_traits::atom_unset) {
431 const typename os_traits::lock_traits::critical_section_type::read_lock_type e_lock(erase_lock, os_traits::lock_traits::infinite_timeout());
432 work_in_queue=(erased.try_lock()==os_traits::lock_traits::atom_unset);
437 return work_in_queue;
441 mutable typename os_traits::lock_traits::critical_section_type erase_lock;
443 mutable typename os_traits::lock_traits::anon_event_type erased;
459 return static_cast<
type>(core_work_.closure().get_results());
463 execute(CW
const &core_work_)
noexcept(
true) {
464 return static_cast<
type>(core_work_.closure().get_results());
490 template<
class ExCxt>
499 return e.get_results();
502 return e.get_results();
505 return &e.get_results();
508 return &e.get_results();
513 template<
class ExCxt>
522 return e.get_results();
525 return e.get_results();
528 return &e.get_results();
531 return &e.get_results();
536 template<
class ExCxt>
545 return e.get_results().get();
548 return e.get_results().get();
551 return e.get_results().get();
554 return e.get_results().get();
563 template<
class CoreWk>
572 template<
class CoreWk>
577 core_work.closure().get_results()=
typename thread_wk_t::closure_t::result_type();
582 template<
class CoreWk>
587 core_work.closure().get_results()=
false;
592 template<
class CoreWk>
597 core_work.closure().get_results()=
typename thread_wk_t::closure_t::result_type(core_work.closure().input().init);
634 typename os_traits::lock_traits::atomic_state_type wk_complete;
635 assert(
dynamic_cast<thread_pool_type *>(&pool));
637 while ((wk_complete=
this->work_complete().try_lock())!=os_traits::lock_traits::atom_set && pool.process_a_batch_item(os_traits::thread_traits::get_current_thread(),
this->core_work().exception_thrown_in_thread()));
638 if (wk_complete!=os_traits::lock_traits::atom_set) {
641 const typename execute_any_work_horizontally::scoped sc(process,
this->work_complete(), wk_complete);
642 if (wk_complete!=os_traits::lock_traits::atom_set) {
644 this->work_complete().lock();
647 this->work_complete().lock();
654 class execute_any_work_horizontally
final :
public wrapper<os_traits::thread_traits::api_params_type::api_type,
typename os_traits::thread_traits::model_type> {
656 typedef wrapper<os_traits::thread_traits::api_params_type::api_type,
typename os_traits::thread_traits::model_type> base_t;
657 typedef typename base_t::lock_traits lock_traits;
658 typedef typename base_t::thread_context_t thread_context_t;
659 using exit_requested_type=
typename pool_traits_type::
template exit_requested_type<
typename thread_pool_type::work_distribution_mode::queue_model>;
660 typedef typename base_t::exception_type exception_type;
666 scoped(execute_any_work_horizontally &e,
typename thread_wk_t::work_complete_t &work_complete,
typename os_traits::lock_traits::atomic_state_type &wk_complete)
noexcept(
false)
FORCE_INLINE
669 os_traits::thread_traits::sleep(0);
670 if ((wk_complete=work_complete.try_lock())!=os_traits::lock_traits::atom_set) {
671 thr.create_running();
679 execute_any_work_horizontally &thr;
686 base_t::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
687 && thread_context_t::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
688 && exit_requested_type::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
689 ? ppd::generic_traits::memory_access_modes::crew_memory_access
690 : ppd::generic_traits::memory_access_modes::erew_memory_access
693 execute_any_work_horizontally(
thread_pool_type &p,
typename os_traits::thread_exception
const &ex_thr,
const typename os_traits::thread_traits::api_params_type::handle_type ancestor_thr_id)
noexcept(
true)
FORCE_INLINE
694 : base_t(), hrz_work(), pool(p), exception_thrown_in_thread(ex_thr), ancestor_thread_id(ancestor_thr_id) {
695 assert(
dynamic_cast<thread_pool_type *>(&pool));
698 ~execute_any_work_horizontally()
noexcept(
false)
FORCE_INLINE {
702 void operator=(execute_any_work_horizontally
const &)=
delete;
703 void operator=(execute_any_work_horizontally &&)=
delete;
705 void create_running()
noexcept(
false) override
FORCE_INLINE {
706 base_t::create_running();
709 this->kernel_affinity(os_traits::thread_traits::get_kernel_affinity(ancestor_thread_id));
711 orig_pri=
this->kernel_priority();
712 this->kernel_priority(os_traits::thread_traits::api_params_type::idle);
713 }
catch (exception_type
const &ex) {
718 void __fastcall request_exit()
const noexcept(
true) override
FORCE_INLINE {
720 base_t::request_exit();
721 }
catch (exception_type
const &ex) {
723 const typename lock_traits::critical_section_type::write_lock_type lock(
this->thread_params_lock, lock_traits::infinite_timeout());
724 this->thread_params.id=0;
725 this->thread_params.state=os_traits::thread_traits::api_params_type::failed_to_cancel;
735 using hrz_work_type=batch_details<1,
typename thread_pool_type::pool_traits_type::
template signalled_work_queue_type<
typename thread_pool_type::work_distribution_mode::queue_model>,
typename thread_pool_type::statistics_type>;
736 using setter_type=setter<
typename os_traits::thread_traits, os_traits::thread_traits::api_params_type::idle>;
738 hrz_work_type hrz_work;
740 typename os_traits::thread_exception
const &exception_thrown_in_thread;
741 const typename os_traits::thread_traits::api_params_type::handle_type ancestor_thread_id;
742 typename thread_pool_type::priority_type orig_pri;
744 bool __fastcall pre_exit()
noexcept(
false) override {
745 if (!base_t::pre_exit()) {
747 const typename os_traits::thread_traits::cancellability set;
749 os_traits::thread_traits::sleep(0);
750 assert(
dynamic_cast<thread_pool_type *>(&pool));
751 const typename exit_requested_type::lock_result_type lkd=pool.exit_requested().lock();
753 if (lkd.first==exit_requested_type::states::new_work_arrived) {
754 assert(
dynamic_cast<thread_pool_type *>(&pool));
756 }
else if (lkd.first==exit_requested_type::states::exit_requested) {
758 assert(
dynamic_cast<thread_pool_type *>(&pool));
759 pool.exit_requested().set(exit_requested_type::states::exit_requested);
765 void __fastcall wait_thread_exit()
noexcept(
false)
FORCE_INLINE {
767 this->exit_requested=
true;
768 base_t::wait_thread_exit();
769 assert(
dynamic_cast<thread_pool_type *>(&pool));
771 pool.set_statistics().add_hrz_work(hrz_work.statistics().total_hrz_work());
772 pool.set_statistics().add_vertical_work(hrz_work.statistics().total_vertical_work());
775 bool __fastcall worker_fn(thread_context_t &)
noexcept(
false) override
FORCE_INLINE {
776 assert(
dynamic_cast<thread_pool_type *>(&pool));
777 hrz_work.refill_batch(pool.queue());
778 const setter_type setter(
this->params().id);
780 hrz_work.process_a_batch_item();
797 mutable execute_any_work_horizontally process;
813 template<
class TPB,
class Wk>
877 assert(
dynamic_cast<thread_wk_t *>(&core_work_));
992 template<
class TPB,
class Wk>
993 class execution_context_stack_type<
pool_traits::
work_distribution_mode_t::
worker_threads_get_work<
typename TPB::
work_distribution_mode::
queue_model>,
generic_traits::return_data::joinable, TPB, Wk> :
public execution_context_stack_type<
pool_traits::
work_distribution_mode_t::
one_thread_distributes<>,
generic_traits::
return_data::
joinable,
TPB,
Wk>,
public horizontal_execution<
generic_traits::
return_data::
joinable,
TPB,
noop_dtor,
TPB::
os_traits::
lock_traits::
template noop_atomic_ctr> {
1046 template<
class DM,
class TPB,
class Wk>
1059 template<
class DM,
generic_traits::
return_data RD,
class TPB,
template<
class,
class,
template<
class>
class,
template<
class>
class>
class CoreWk,
class AlgoWrapT,
class Wk>
1069 template<
class TPB,
template<
class,
class,
template<
class>
class,
template<
class>
class>
class CoreWk,
class AlgoWrapT,
class Wk>
1162 assert(
dynamic_cast<thread_wk_t *>(&core_work_));
1268 template<
class TPB,
template<
class,
class,
template<
class>
class,
template<
class>
class>
class CoreWk,
class AlgoWrapT,
class Wk>
1269 class execution_context_algo_stack_type<
pool_traits::
work_distribution_mode_t::
worker_threads_get_work<
typename TPB::
work_distribution_mode::
queue_model>,
generic_traits::
return_data::
joinable,
TPB,
CoreWk,
AlgoWrapT,
Wk>
final :
public execution_context_algo_stack_type<
pool_traits::
work_distribution_mode_t::
one_thread_distributes<>,
generic_traits::
return_data::
joinable,
TPB,
CoreWk,
AlgoWrapT,
Wk>,
public horizontal_execution<
generic_traits::
return_data::
joinable,
TPB,
noop_dtor,
TPB::
os_traits::
lock_traits::
template noop_atomic_ctr> {
1320 template<
class DM,
class TPB,
template<
class,
class,
template<
class>
class,
template<
class>
class>
class CoreWk,
class AlgoWrapT,
class Wk>
1333 template<
class DM,
generic_traits::
return_data RD,
template<
class,
class,
class,
template<
class>
class,
template<
class>
class>
class AlgCoreWk,
class GenWk,
class Wk,
template<
class>
class Deref=
deref::
noop,
template<
class>
class InitCoreWk=
core_work_result::
noop>
1334 class execution_context_algo_buff_stack_type;
1345 template<
template<
class,
class,
class,
template<
class>
class,
template<
class>
class>
class AlgCoreWk,
class GenWk,
class Wk,
template<
class>
class Deref,
template<
class>
class InitCoreWk>
1448 assert(
dynamic_cast<thread_wk_t *>(&core_work_));
1554 template<
template<
class,
class,
class,
template<
class>
class,
template<
class>
class>
class AlgCoreWk,
class GenWk,
class Wk,
template<
class>
class Deref,
template<
class>
class InitCoreWk>
1555 class execution_context_algo_buff_stack_type<
pool_traits::
work_distribution_mode_t::
worker_threads_get_work<
typename GenWk::
thread_pool_type::
work_distribution_mode::
queue_model>,
generic_traits::
return_data::
joinable,
AlgCoreWk,
GenWk,
Wk,
Deref,
InitCoreWk>
final :
public execution_context_algo_buff_stack_type<
pool_traits::
work_distribution_mode_t::
one_thread_distributes<>,
generic_traits::
return_data::
joinable,
AlgCoreWk,
GenWk,
Wk,
Deref,
InitCoreWk>,
public horizontal_execution<
generic_traits::
return_data::
joinable,
typename GenWk::
thread_pool_type,
noop_dtor,
GenWk::
thread_pool_type::
os_traits::
lock_traits::
template noop_atomic_ctr> {
1607 template<
class DM,
template<
class,
class,
class,
template<
class>
class,
template<
class>
class>
class AlgCoreWk,
class GenWk,
class Wk,
template<
class>
class Deref,
template<
class>
class InitCoreWk>