libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
thread_client_context.hpp
Go to the documentation of this file.
1 #ifndef LIBJMMCG_CORE_PRIVATE_THREAD_CLIENT_CONTEXT_HPP
2 #define LIBJMMCG_CORE_PRIVATE_THREAD_CLIENT_CONTEXT_HPP
3 /******************************************************************************
4 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
5 **
6 ** This library is free software; you can redistribute it and/or
7 ** modify it under the terms of the GNU Lesser General Public
8 ** License as published by the Free Software Foundation; either
9 ** version 2.1 of the License, or (at your option) any later version.
10 **
11 ** This library is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 ** Lesser General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public
17 ** License along with this library; if not, write to the Free Software
18 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20 
21 #include "../../core/dynamic_cast.hpp"
22 #include "../../core/non_allocatable.hpp"
23 #include "../../core/thread_api_traits.hpp"
25 #include "../../core/thread_wrapper.hpp"
26 #include "../../core/exception.hpp"
28 
29 #include <array>
30 #include <functional>
31 #include <memory>
32 
33 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd { namespace private_ {
34 
35  inline constexpr char gen_wk_node_str[]="distribute";
36  inline constexpr char gen_wk_node_root_str[]="distribute_root";
37  inline constexpr char algo_reduction_str[]="algo_reduction";
38 
39  /**
40  Compute the static size of the largest object allocated in the subdivide_n_gen_wk* algorithms, including itself. This is used to compute the size of the memory buffer allocated in algo_thread_wk_buffered into which the objects will be placement new'd.
41  */
42  template<class ParAlg>
44  typedef ParAlg gen_wk_t;
49  typedef typename std::conditional<(sizeof(subdiv_algo_work_t)>sizeof(alg_wrap_work_t)), subdiv_algo_work_t, alg_wrap_work_t>::type type;
50  };
51 
52  /// The internal class that does the priority setting and restoration as RAII.
53  /**
54  \see set_priority_closure, set_priority_work
55  */
56  template<
57  class TT,
58  typename TT::api_params_type::priority_type new_priority ///< The priority at which the wrapped work should run.
59  >
60  class setter final {
61  public:
62  typedef TT thread_traits;
64 
65  void set() noexcept(true) FORCE_INLINE {
66  thread_traits::set_kernel_priority(thread, new_priority);
67  }
68  void reset() noexcept(true) FORCE_INLINE {
69  thread_traits::set_kernel_priority(thread, orig_pri);
70  }
71 
72  explicit __stdcall setter(typename thread_traits::api_params_type::handle_type thr) noexcept(true) FORCE_INLINE
73  : thread(thr), orig_pri(thread_traits::get_kernel_priority(thread)) {
74  set();
75  }
76  setter(setter const &)=delete;
77  __stdcall ~setter() noexcept(true) FORCE_INLINE {
78  reset();
79  }
80 
81  private:
82  const typename thread_traits::api_params_type::handle_type thread; ///< The identifier for the thread that should be manipulated.
83  const priority_type orig_pri; ///< The original priority of the thread, that should be restored.
84  };
85 
86  /// A class to assist in processing the core_work in the GSS(k) batch, ensuring that items within a batch in a thread are fully processed before any wait in that thread is performed, because we could deadlock.
87  /**
88  Note that this class is used for the implicit batch contained formed by the main thread in the program that creates the thread_pool.
89 
90  \see execution_context_type, thread_pool, pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_thread, generic_traits::return_data::joinable
91  */
92  template<
93  unsigned long GSSkSz,
94  class WQ,
95  class Stats
96  >
97  class batch_details {
98  public:
99  /// This is a container of GSSk items from the front of the queue to implement the GSS(k) or bakers' scheduling algorithm.
101  /// The statistics to be gathered by the thread_pool, by default none.
102  typedef Stats statistics_type;
105  using eval_shared_del_t=eval_shared_deleter_t<typename signalled_work_queue_type::value_ret_type::value_type>;
106 
107  static constexpr unsigned long GSSk=GSSkSz;
108 
109  /**
110  To assist in allowing compile-time computation of the algorithmic order of the threading model.
111  */
112  static constexpr ppd::generic_traits::memory_access_modes memory_access_mode=eval_shared_del_t::memory_access_mode;
113 
114  /**
115  Make sure that the batch_size's are the same, so that when a batch is popped off the signalled_work_queue, all of the elements are copied, with none being lost, which would cause closure_base-derived closure not to be processed.
116 
117  \see pool_aspect::GSSk
118  */
119  BOOST_STATIC_ASSERT(signalled_work_queue_type::max_size<=GSSk);
120 
121  /**
122  \return true if the batch is empty, false otherwise.
123  */
124  bool __fastcall batch_empty() const noexcept(true) FORCE_INLINE {
125  return current_work_item==batched_work.end() || !current_work_item->get();
126  }
127 
128  protected:
129  void __fastcall reload_batch_if_empty_nochk_nolk(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
130  if (batch_empty()) {
131  batched_work=signalled_work_queue.pop_front_nolk();
132  // We always have at least one item in the batch.
134  }
135  }
136  template<class UpdStats>
137  void FORCE_INLINE
138  process_the_work(UpdStats &&update_stats, typename cfg_type::edge_annotation_t const e_details) noexcept(false) {
139  eval_shared_del_t work(*current_work_item);
141  work.process_the_work(std::forward<UpdStats>(update_stats), e_details);
142  }
143 
144  public:
145  constexpr __stdcall batch_details() noexcept(true) FORCE_INLINE
147 
148  /// Put the closure_base-derived closure in the batch, if it is empty.
149  /**
150  Note that this function runs with no locks, as it presumes that the caller is the same pool_thread that consumes the work from the batch.
151 
152  \param wk The closure_base-derived closure to attempt to add.
153  \return true if the closure_base-derived closure was added, false otherwise.
154  */
155 #pragma GCC diagnostic push
156 // There's no sensible way to provide a simple specialisation that can remove the warning, which is due to possible use of GSS(k) batching in the queue, so we have to just ignore this warning.
157 #pragma GCC diagnostic ignored "-Wmissing-braces"
158  bool __fastcall add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
159  if (batch_empty()) {
160  batched_work=typename signalled_work_queue_type::value_ret_type{wk};
161  // We always have at least one item in the batch.
163  return true;
164  } else {
165  return false;
166  }
167  }
168 #pragma GCC diagnostic pop
169 
170  void __fastcall refill_batch(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
171  /*
172  1. Need to re-add an event as we consumed one to get here, so we'll incorrectly lead the queue to think it has one less work item than it really has....
173  2. Must not appear inside the lock, otherwise we can get deadlocks, because there is more work in the queue than the counter says.
174  3. Re-adding outside the lock means that another (horizontal) thread could steal this work because adding work may trigger another thread, and they race to get here. So add just before we lock to try and reduce this possibility.
175  */
176  signalled_work_queue.have_work.add();
177  const typename signalled_work_queue_type::atomic_t::write_lock_type work_queue_lk(signalled_work_queue.pop_lock(), signalled_work_queue_type::atomic_t::lock_traits::infinite_timeout());
178  if (!signalled_work_queue.colln().empty()) {
179  reload_batch_if_empty_nochk_nolk(signalled_work_queue);
180  }
181  }
182  // TODO Should process only one batch item at a time....
183  bool __fastcall process_a_batch_item() noexcept(false) FORCE_INLINE {
184  while (!batch_empty()) {
185  process_the_work(std::bind(&statistics_type::processed_hrz_work, &statistics_), cfg_type::hrz_edge_annotation);
186  }
187  return current_work_item!=batched_work.end();
188  }
189  /**
190  If the batch_size>1 and the first closure_base-derived closure depends upon a later job to complete, then that sub-tree of dependent closure_base-derived closures will deadlock. This is because this loop will wait for the first closure_base-derived closure to complete, which depends upon the second (or later in the batch) closure_base-derived closure which will not be executed as the earlier closure_base-derived closure is preventing this loop for continuing.
191 
192  \see process_a_batch_item
193  */
194  void __fastcall process_a_batch(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
195  refill_batch(signalled_work_queue);
196  while (!batch_empty()) {
197  process_the_work(std::bind(&statistics_type::processed_vertical_work, &statistics_), cfg_type::vertical_edge_annotation);
198  }
199  }
200 
201  statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
202  return statistics_;
203  }
204  statistics_type &__fastcall statistics() noexcept(true) FORCE_INLINE {
205  return statistics_;
206  }
207 
208  protected:
212  };
213  /// Optimisation for when GSS(k)=GSS(1), i.e. no batching.
214  template<
215  class WQ,
216  class Stats
217  >
218  class batch_details<1UL, WQ, Stats> {
219  public:
220  /// This is a container of GSSk items from the front of the queue to implement the GSS(k) or bakers' scheduling algorithm.
222  /// The statistics to be gathered by the thread_pool, by default none.
223  typedef Stats statistics_type;
227  using eval_shared_del_t=eval_shared_deleter_t<typename signalled_work_queue_type::value_type>;
228 
229  static constexpr unsigned long GSSk=1UL;
230 
231  /**
232  To assist in allowing compile-time computation of the algorithmic order of the threading model.
233  */
234  static constexpr ppd::generic_traits::memory_access_modes memory_access_mode=eval_shared_del_t::memory_access_mode;
235 
236  /**
237  Make sure that the batch_size's are the same, so that when a batch is popped off the signalled_work_queue, all of the elements are copied, with none being lost, which would cause closure_base-derived closure not to be processed. Put this check in here just because I'm ultra-cautious at the moment, as it is strictly not needed, but the horizontal threading test case in dataflow_full fails with GSSk>1 as it is a poor test case.
238 
239  \see pool_aspect::GSSk
240  */
241  BOOST_STATIC_ASSERT(signalled_work_queue_type::max_size<=GSSk);
242 
243  /**
244  \return true if the batch is empty, false otherwise.
245  */
246  bool __fastcall batch_empty() const noexcept(true) FORCE_INLINE {
247  return !current_work;
248  }
249 
250  protected:
251  void __fastcall reload_batch_if_empty_nochk_nosig(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
252  if (batch_empty() && !signalled_work_queue.empty()) {
253  // Make sure that when the batch is popped off the signalled_work_queue, only one element is popped, otherwise the extra would be lost, which would cause the lost closure_base-derived closure not to be processed and execution_context's never to be satisfied.
254  current_work=signalled_work_queue.pop_front_1_nochk_nosig();
255  }
256  }
257  template<class UpdStats>
258  void FORCE_INLINE
259  process_the_work(UpdStats &&update_stats, typename cfg_type::edge_annotation_t const e_details) noexcept(false) {
260  eval_shared_deleter_t<typename signalled_work_queue_type::value_ret_type::value_type> work(current_work);
261  work.process_the_work(std::forward<UpdStats>(update_stats), e_details);
262  }
263 
264  public:
265  constexpr __stdcall batch_details() noexcept(true) FORCE_INLINE
266  : statistics_(), current_work() {}
267 
268  /// Put the closure_base-derived closure in the batch, if it is empty.
269  /**
270  Note that this function runs with no locks, as it presumes that the caller is the same pool_thread that consumes the work from the batch.
271 
272  \param wk The closure_base-derived closure to attempt to add.
273  \return true if the closure_base-derived closure was added, false otherwise.
274  */
275  bool __fastcall add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
276  if (batch_empty()) {
277  current_work=wk;
278  return true;
279  } else {
280  return false;
281  }
282  }
283 
284  void __fastcall refill_batch(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
285  reload_batch_if_empty_nochk_nosig(signalled_work_queue);
286  }
287  bool __fastcall process_a_batch_item() noexcept(false) FORCE_INLINE {
288  while (!batch_empty()) {
289  process_the_work(std::bind(&statistics_type::processed_hrz_work, &statistics_), cfg_type::hrz_edge_annotation);
290  }
291  return false;
292  }
293  void __fastcall process_a_batch(signalled_work_queue_type &signalled_work_queue) noexcept(false) FORCE_INLINE {
294  refill_batch(signalled_work_queue);
295  while (!batch_empty()) {
296  process_the_work(std::bind(&statistics_type::processed_vertical_work, &statistics_), cfg_type::vertical_edge_annotation);
297  }
298  }
299 
300  statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
301  return statistics_;
302  }
303  statistics_type &__fastcall statistics() noexcept(true) FORCE_INLINE {
304  return statistics_;
305  }
306 
307  private:
308  statistics_type statistics_;
309  typename signalled_work_queue_type::value_type current_work;
310  };
311 
312  /// Interface for allowing an execution context to potentially execute work horizontally whilst the execution_context is held, so that we both ensure that we keep the cores busy, but also avoid deadlock due to resource starvation from a lack of available threads to process input_work (tasks) from the signalled_work_queue in the thread_pool_type.
313  template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr>
315  public:
316  typedef TPB thread_pool_type;
317  typedef typename thread_pool_type::pool_traits_type pool_traits_type; ///< The pool traits.
323  typedef typename thread_wk_t::work_complete_t work_complete_t; ///< This atomic object is the object that is used to signal to a waiting future that the work has been completed.
325 
326  /**
327  To assist in allowing compile-time computation of the algorithmic order of the threading model.
328  */
337  );
338 
340 
341  protected:
342  constexpr horizontal_execution_itf() noexcept(true) FORCE_INLINE {}
343  /// Can't be used polymorphically - to maintain the concept that this is a stack allocated object.
345 
346  virtual work_complete_t &__fastcall work_complete() noexcept(true)=0;
347  virtual work_complete_t &__fastcall work_complete() const noexcept(true)=0;
348 
349  virtual thread_wk_t const & __fastcall core_work() const noexcept(true)=0;
350 
351  /// Check to see if the work has been completed.
352  /**
353  This is a non-blocking call. Note that if this is used unwisely, race-conditions or deadlocks will occur in the users' code.
354 
355  \return If the work is joinable, returns true if the work has been executed by a thread in the pool, otherwise in all other cases returns false.
356  */
357  bool __fastcall work_done() const noexcept(true) FORCE_INLINE {
358  return this->work_complete().try_lock()==os_traits::lock_traits::atom_set;
359  }
360 
361  /**
362  By default no horizontal execution is performed, we just wait for the closure_base-derived closure to be process()ed.
363  */
364  virtual void __fastcall wait_or_horizontal_thread() const noexcept(false) FORCE_INLINE {
365  this->work_complete().lock();
366  }
367  };
368 
369  template<class TPB, template<class> class Del, template<class> class AtCtr>
371  public:
374  typedef typename base_t::pool_traits_type pool_traits_type; ///< The pool traits.
375  typedef typename base_t::os_traits os_traits;
376  typedef typename base_t::pool_type pool_type;
377  typedef typename base_t::atomic_t atomic_t;
379  typedef typename base_t::thread_wk_t thread_wk_t;
381 
382  /**
383  To assist in allowing compile-time computation of the algorithmic order of the threading model.
384  */
386 
387  /// Erase the thread_wk_t item from the queue in the thread_pool, if it is still in there.
388  /**
389  Note that the queue_size() of the thread_pool is not guaranteed to have been reduced by one until the execution_context has been waited upon, either explicitly (via dereference) or implicitly (via destruction). (Amongst other reasons, this is because a worker-thread might have already removed the item from the queue to mutate it.)
390 
391  \see thread_pool_type::queue_size()
392  */
393  bool __fastcall erase() noexcept(false) FORCE_INLINE {
394  const typename os_traits::lock_traits::critical_section_type::write_lock_type lock(erase_lock, os_traits::lock_traits::infinite_timeout());
395  if (waiting.try_lock()==os_traits::lock_traits::atom_unset) {
396  erased.set();
397  return false;
398  } else {
399  return true;
400  }
401  }
402 
403  /// Ensure that if an execution context is passed to another function, only a constant version may be passed.
404  /**
405  This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. i.e. only one thread can write to the results, but many can read. Because multiple read operations do not require locking with respect to each other. Note that if the results are accessed, then that call will block, as necessary, until the results are written, thus ensuring that all reads follow any writes to the contained data.
406  */
407  virtual const eraseable_execution_context_base * __fastcall operator&() const noexcept(true)=0;
408 
409  protected:
412  }
413 
414  /// Can't be used polymorphically - to maintain the concept that this is a stack allocated object.
416 
417  /// A counted reference to the item of work that has been transferred to the pool for execution.
418  const typename signalled_work_queue_type::value_type & __fastcall
419  wk_queue_item() const noexcept(true)=delete;
420 
421  /// A counted reference to the item of work that has been transferred to the pool for execution.
422  typename signalled_work_queue_type::value_type & __fastcall
423  wk_queue_item() noexcept(true)=delete;
424 
425  /**
426  \return True if the work has not been erased from the queue, false otherwise.
427  */
428  bool __fastcall has_work() const noexcept(true) FORCE_INLINE {
429  bool work_in_queue;
430  {
431  const typename os_traits::lock_traits::critical_section_type::read_lock_type e_lock(erase_lock, os_traits::lock_traits::infinite_timeout());
432  work_in_queue=(erased.try_lock()==os_traits::lock_traits::atom_unset);
433  if (work_in_queue) {
434  waiting.set();
435  }
436  }
437  return work_in_queue;
438  }
439 
440  private:
441  mutable typename os_traits::lock_traits::critical_section_type erase_lock;
442  mutable atomic_t waiting;
443  mutable typename os_traits::lock_traits::anon_event_type erased;
444 
445  template<class ExCxt> friend class call_push_back;
446  };
447 
448  /// The execution_context may need to avoid dereferencing if the result_type it contains is void.
449  /**
450  An ugly, evil, hack: this works around the fact that the standard doesn't allow references to void,only pointers, and the result_type of some of the parallel algorithms is void.
451  */
452  template<class Ret>
454  typedef Ret & type;
455 
456  template<class CW>
457  static constexpr type FORCE_INLINE
458  execute(CW &core_work_) noexcept(true) {
459  return static_cast<type>(core_work_.closure().get_results());
460  }
461  template<class CW>
462  static constexpr type FORCE_INLINE
463  execute(CW const &core_work_) noexcept(true) {
464  return static_cast<type>(core_work_.closure().get_results());
465  }
466  };
467  template<>
468  struct add_ref_if_not_void<void> {
469  typedef void type;
470 
471  template<class CW>
472  static constexpr type FORCE_INLINE
473  execute(CW &) noexcept(true) {
474  }
475  };
476  template<>
477  struct add_ref_if_not_void<void const> {
478  typedef void type;
479 
480  template<class CW>
481  static constexpr type FORCE_INLINE
482  execute(CW &) noexcept(true) {
483  }
484  };
485 
486  /// The execution_context may need extra dereferencing according to the result_type it contains.
487  namespace deref {
488 
489  /// Just dereference the execution_context as normal.
490  template<class ExCxt>
491  struct noop {
492  typedef ExCxt excution_context;
497 
498  static constexpr const_ref_result_type deref(excution_context const &e) noexcept(false) FORCE_INLINE {
499  return e.get_results();
500  }
501  static constexpr ref_result_type deref(excution_context &e) noexcept(false) FORCE_INLINE {
502  return e.get_results();
503  }
504  static constexpr const_addr_result_type arrow(excution_context const &e) noexcept(false) FORCE_INLINE {
505  return &e.get_results();
506  }
507  static constexpr addr_result_type arrow(excution_context &e) noexcept(true) FORCE_INLINE {
508  return &e.get_results();
509  }
510  };
511 
512  /// Ensure that when the execution_context is dereferenced to obtain the result any extra dereferences are done as necessary, for example if it is a boolean.
513  template<class ExCxt>
514  struct extra {
515  typedef ExCxt excution_context;
520 
521  static constexpr const_ref_result_type __fastcall deref(excution_context const &e) noexcept(false) FORCE_INLINE {
522  return e.get_results();
523  }
524  static constexpr ref_result_type __fastcall deref(excution_context &e) noexcept(false) FORCE_INLINE {
525  return e.get_results();
526  }
527  static constexpr const_addr_result_type __fastcall arrow(excution_context const &e) noexcept(false) FORCE_INLINE {
528  return &e.get_results();
529  }
530  static constexpr addr_result_type __fastcall arrow(excution_context &e) noexcept(false) FORCE_INLINE {
531  return &e.get_results();
532  }
533  };
534 
535  /// Ensure that when the execution_context is dereferenced to obtain the result any extra dereferences are done as necessary, for example if it is a count.
536  template<class ExCxt>
537  struct extra_deref {
538  typedef ExCxt excution_context;
543 
544  static constexpr const_ref_result_type __fastcall deref(excution_context const &e) noexcept(false) FORCE_INLINE {
545  return e.get_results().get();
546  }
547  static constexpr ref_result_type __fastcall deref(excution_context &e) noexcept(false) FORCE_INLINE {
548  return e.get_results().get();
549  }
550  static constexpr const_addr_result_type __fastcall arrow(excution_context const &e) noexcept(false) FORCE_INLINE {
551  return e.get_results().get();
552  }
553  static constexpr addr_result_type __fastcall arrow(excution_context &e) noexcept(false) FORCE_INLINE {
554  return e.get_results().get();
555  }
556  };
557 
558  }
559 
560  namespace core_work_result {
561 
562  /// Don't initialise the result of the execution_context.
563  template<class CoreWk>
564  struct noop {
565  typedef CoreWk thread_wk_t;
566 
567  static constexpr void init(thread_wk_t &) noexcept(true) FORCE_INLINE {
568  }
569  };
570 
571  /// Default initialise the result of the execution_context.
572  template<class CoreWk>
573  struct to_zero {
574  typedef CoreWk thread_wk_t;
575 
576  static void init(thread_wk_t &core_work) noexcept(true) FORCE_INLINE {
577  core_work.closure().get_results()=typename thread_wk_t::closure_t::result_type();
578  }
579  };
580 
581  /// Default initialise the boolean result of the execution_context to false.
582  template<class CoreWk>
583  struct to_false {
584  typedef CoreWk thread_wk_t;
585 
586  static void init(thread_wk_t &core_work) noexcept(true) FORCE_INLINE {
587  core_work.closure().get_results()=false;
588  }
589  };
590 
591  /// Initialise the result of the execution_context with the provided value from the input initialisation value from the core_work.
592  template<class CoreWk>
593  struct to_op {
594  typedef CoreWk thread_wk_t;
595 
596  static void init(thread_wk_t &core_work) noexcept(true) FORCE_INLINE {
597  core_work.closure().get_results()=typename thread_wk_t::closure_t::result_type(core_work.closure().input().init);
598  }
599  };
600 
601  }
602 
603  /// Class that implements the horizontal execution algorithm.
604  /**
605  It wait for work on the main thread_pool::signalled_work_queue at low priority (to try and ensure that the pool_threads are more likely to get the thread_wk_t), takes one item at a time, i.e. no batching, GSS(k), where k=1, and resultant work is placed back into the thread_pool::signalled_work_queue, to be available for all threads.
606 
607  Due to the PThreads API, which lacks wait for multiple objects, one has to create another thread to execute the work horizontally whilst the execution context is held. Then once the execution context is released, one has to cancel the thread, but be hyper-careful about when one can cancel the thread to ensure that not only is any work it is processing completed, but also that any C++ objects it is holding have their dtors correctly run. Yuck. The thread is created in the call to get_results(), only if the work has not been completed.
608 
609  \see get_results()
610  */
611  template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr>
613  protected:
617  typedef typename base_t::pool_type pool_type;
618  typedef typename base_t::os_traits os_traits;
619  typedef typename base_t::thread_wk_t thread_wk_t;
620 
623  }
624  ~horizontal_execution() noexcept(false) FORCE_INLINE {}
625 
626  /// Wait upon the core_work to complete process(), or process() items from the pool::batch_details.
627  /**
628  When items are directly process()ed from the pool::batch_details whilst waiting for the attaced core_work to complete, I term this as "horizontal threading", as opposed to "vertical threading", which is when core_work items are process()ed in pool_thread pool::thread_types::steal::process(), possibly being batched.
629  This function will attempt to process core_work from the signalled_work_queue, or if the wk_queue_item() upon which it is waiting has been completed, will return. Batching is carefully done: if there are items in the pool::batch_details, then these will be processed first, but only from the same thread as this one.
630 
631  \see steal::batch_type, pool::batch_details
632  */
633  void __fastcall wait_or_horizontal_thread() const noexcept(false) final override {
634  typename os_traits::lock_traits::atomic_state_type wk_complete;
635  assert(dynamic_cast<thread_pool_type *>(&pool));
636  // Process work in the batch, as we may be waiting for an item in the batch to complete before we are released.
637  while ((wk_complete=this->work_complete().try_lock())!=os_traits::lock_traits::atom_set && pool.process_a_batch_item(os_traits::thread_traits::get_current_thread(), this->core_work().exception_thrown_in_thread()));
638  if (wk_complete!=os_traits::lock_traits::atom_set) {
639  // We need to perform the horizontal threading in a separate thread, because we don't have WaitOnMultipleObjects() in PThreads.
640  try {
641  const typename execute_any_work_horizontally::scoped sc(process, this->work_complete(), wk_complete);
642  if (wk_complete!=os_traits::lock_traits::atom_set) {
643  // Wait for the closure_base-derived closure to be transformed, whilst any horizontal closure_base-derived closure transformations.
644  this->work_complete().lock();
645  }
646  } catch (...) {
647  this->work_complete().lock();
648  throw;
649  }
650  }
651  }
652 
653  private:
654  class execute_any_work_horizontally final : public wrapper<os_traits::thread_traits::api_params_type::api_type, typename os_traits::thread_traits::model_type> {
655  public:
656  typedef wrapper<os_traits::thread_traits::api_params_type::api_type, typename os_traits::thread_traits::model_type> base_t;
657  typedef typename base_t::lock_traits lock_traits;
658  typedef typename base_t::thread_context_t thread_context_t;
659  using exit_requested_type=typename pool_traits_type::template exit_requested_type<typename thread_pool_type::work_distribution_mode::queue_model>;
660  typedef typename base_t::exception_type exception_type;
661  /**
662  Don't force the thread waiting upon the just satisfied execution_context to have to also immediately synchronise with the horizontally executing thread, just request it to stop processing more work, and delay the forced synchronisation until the execution_context leaves scope, i.e. allow some greater opportunity for overlapping processing work.
663  */
664  class scoped final {
665  public:
666  scoped(execute_any_work_horizontally &e, typename thread_wk_t::work_complete_t &work_complete, typename os_traits::lock_traits::atomic_state_type &wk_complete) noexcept(false) FORCE_INLINE
667  : thr(e) {
668  // Try to allow the pool_threads a chance to process the work before this horizontal thread is created that might compete with the pool_threads for the work.
669  os_traits::thread_traits::sleep(0);
670  if ((wk_complete=work_complete.try_lock())!=os_traits::lock_traits::atom_set) {
671  thr.create_running();
672  }
673  }
674  ~scoped() noexcept(true) FORCE_INLINE {
675  thr.request_exit();
676  }
677 
678  private:
679  execute_any_work_horizontally &thr;
680  };
681 
682  /**
683  To assist in allowing compile-time computation of the algorithmic order of the threading model.
684  */
685  static constexpr ppd::generic_traits::memory_access_modes memory_access_mode=(
686  base_t::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
687  && thread_context_t::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
688  && exit_requested_type::memory_access_mode==ppd::generic_traits::memory_access_modes::crew_memory_access
689  ? ppd::generic_traits::memory_access_modes::crew_memory_access
690  : ppd::generic_traits::memory_access_modes::erew_memory_access
691  );
692 
693  execute_any_work_horizontally(thread_pool_type &p, typename os_traits::thread_exception const &ex_thr, const typename os_traits::thread_traits::api_params_type::handle_type ancestor_thr_id) noexcept(true) FORCE_INLINE
694  : base_t(), hrz_work(), pool(p), exception_thrown_in_thread(ex_thr), ancestor_thread_id(ancestor_thr_id) {
695  assert(dynamic_cast<thread_pool_type *>(&pool));
696  }
697 
698  ~execute_any_work_horizontally() noexcept(false) FORCE_INLINE {
699  wait_thread_exit();
700  }
701 
702  void operator=(execute_any_work_horizontally const &)=delete;
703  void operator=(execute_any_work_horizontally &&)=delete;
704 
705  void create_running() noexcept(false) override FORCE_INLINE {
706  base_t::create_running();
707  try {
708  // We want the current horizontal thread to run on the same core as the thread that has been held, waiting for a result, i.e. it was an idle core, now to be used.
709  this->kernel_affinity(os_traits::thread_traits::get_kernel_affinity(ancestor_thread_id));
710  // Ensure that we prefer vertical to horizontal threading, otherwise we can accidentally get convoying of tasks together with (bizarrely) free threads, which seriously ruins scalability.
711  orig_pri=this->kernel_priority();
712  this->kernel_priority(os_traits::thread_traits::api_params_type::idle);
713  } catch (exception_type const &ex) {
714  // Ignore any library errors, it's not a nightmare if the above tweaks don't happen..
715  }
716  }
717 
718  void __fastcall request_exit() const noexcept(true) override FORCE_INLINE {
719  try {
720  base_t::request_exit();
721  } catch (exception_type const &ex) {
722  // Ensure that if we request that a thread should be cancelled, and fail, then we don't later attempt to join on it and potentially lock up in the thread_base::wait_thread_exit() when it might attempt to rejoin with it.
723  const typename lock_traits::critical_section_type::write_lock_type lock(this->thread_params_lock, lock_traits::infinite_timeout());
724  this->thread_params.id=0;
725  this->thread_params.state=os_traits::thread_traits::api_params_type::failed_to_cancel;
726  // Ignore any library errors, as it is telling us nothing that we can do anything about.
727  }
728  }
729 
730  private:
731  /**
732  Prevent excessive core-to-core chatter: accumulate statistics locally, then at the end send them back to the thread_pool. Note that this implies that local gathering can be lock-free.
733  \todo Note that we don't "do" GSS(k) batching here, yet, because it greatly simplifies horizontal threading.
734  */
735  using hrz_work_type=batch_details<1, typename thread_pool_type::pool_traits_type::template signalled_work_queue_type<typename thread_pool_type::work_distribution_mode::queue_model>, typename thread_pool_type::statistics_type>;
736  using setter_type=setter<typename os_traits::thread_traits, os_traits::thread_traits::api_params_type::idle>;
737 
738  hrz_work_type hrz_work;
739  thread_pool_type &pool;
740  typename os_traits::thread_exception const &exception_thrown_in_thread;
741  const typename os_traits::thread_traits::api_params_type::handle_type ancestor_thread_id;
742  typename thread_pool_type::priority_type orig_pri;
743 
744  bool __fastcall pre_exit() noexcept(false) override {
745  if (!base_t::pre_exit()) {
746  // Make sure we carefully control when this thread can be cancelled, to avoid nasty double-exceptions being thrown in client code.
747  const typename os_traits::thread_traits::cancellability set;
748  // We prefer vertical to horizontal threading, so try to prefer the former.
749  os_traits::thread_traits::sleep(0);
750  assert(dynamic_cast<thread_pool_type *>(&pool));
751  const typename exit_requested_type::lock_result_type lkd=pool.exit_requested().lock();
752  // Process work if available, then check for the exit flag (note that the exit flag takes priority in the signalled_work_queue), because we don't want random pool_threads to exit, causing resource starvation, because all remaining threads might be waiting for a thread_wk to be processed, and there may be no remaining threads left to do this, thus causing the library to lock up.
753  if (lkd.first==exit_requested_type::states::new_work_arrived) {
754  assert(dynamic_cast<thread_pool_type *>(&pool));
755  return false;
756  } else if (lkd.first==exit_requested_type::states::exit_requested) {
757  // Ensure the rest of the threads in the pool exit.
758  assert(dynamic_cast<thread_pool_type *>(&pool));
759  pool.exit_requested().set(exit_requested_type::states::exit_requested);
760  }
761  }
762  return true;
763  }
764 
765  void __fastcall wait_thread_exit() noexcept(false) FORCE_INLINE {
766  // Cancel the thread, because we have no "WaitForMultipleObjects()" in PThreads.
767  this->exit_requested=true;
768  base_t::wait_thread_exit();
769  assert(dynamic_cast<thread_pool_type *>(&pool));
770  // Note that this may not be locked, so is not guaranteed to be thread-safe, so the result may be incorrect, but it is faster...!
771  pool.set_statistics().add_hrz_work(hrz_work.statistics().total_hrz_work());
772  pool.set_statistics().add_vertical_work(hrz_work.statistics().total_vertical_work());
773  }
774 
775  bool __fastcall worker_fn(thread_context_t &) noexcept(false) override FORCE_INLINE {
776  assert(dynamic_cast<thread_pool_type *>(&pool));
777  hrz_work.refill_batch(pool.queue());
778  const setter_type setter(this->params().id);
779  // Process an item at a time to try and reduce the amount of closure_base-derived closure mutated in this horizontal thread, to give the vertical threads a better stab at the closure_base-derived closure.
780  hrz_work.process_a_batch_item();
781  return false;
782  }
783  };
784 
785  public:
786  /**
787  To assist in allowing compile-time computation of the algorithmic order of the threading model.
788  */
794  );
795 
796  private:
797  mutable execute_any_work_horizontally process;
798 
799  protected:
801  };
802 
803  template<class DM, generic_traits::return_data RD, class TPB, class Wk>
805 
806  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data.
807  /**
808  The execution_context stores the thread_wk_type inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the thread_wk_type on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, noop_atomic_ctr, because the counter doesn't do anything, for a greater saving on atomic operations.
809  No horizontal threading is done in this specialisation, because a master thread distributes the work.
810 
811  \see horizontal_execution
812  */
813  template<class TPB, class Wk>
815  public:
818  typedef typename base_t::pool_traits_type pool_traits_type; ///< The pool traits.
819  typedef typename base_t::os_traits os_traits;
820  typedef typename os_traits::lock_traits lock_traits;
821  typedef typename base_t::pool_type pool_type;
822  typedef typename base_t::atomic_t atomic_t;
824  typedef typename thread_pool_type::template create_direct<Wk> creator_t;
825  typedef typename creator_t::result_type result_type;
828 
829  private:
830  class stack_exec_ctx_helper {
832  using closure_t=typename creator_t::closure_t;
833 
834  public:
835  /**
836  Note how we're pretty damn tricky here, and make use of both the dtor and the counter interfaces, using no-op implementations, because the object is going to be allocated on the stack, we manage the memory ourselves, normally. So we save on avoiding heap allocation deallocation, but also the atomic-operations that would be required in the counter for the shared_ptr.
837  */
839 
840  /**
841  To assist in allowing compile-time computation of the algorithmic order of the threading model.
842  */
844  };
845 
846  public:
849 
850  /**
851  To assist in allowing compile-time computation of the algorithmic order of the threading model.
852  */
859  );
860 
865  }
866 
867  /**
868  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
869  */
872 
873  /**
874  In case the user didn't specifically call wait(), operator*() or operator->() for some reason, throw any registered exception. (I don't throw the exception in the thread's destructor in the thread pool, as this is too late, and makes evil memory leaks in the thread pool destructor.)
875  */
877  assert(dynamic_cast<thread_wk_t *>(&core_work_));
879  }
880 
881  /// Can't automatically convert to a base-class address automatically - to maintain the concept that this is a stack allocated object.
882  void operator&()=delete;
883  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
884  template<class T> operator T () const=delete;
885  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
886  template<class T> operator T ()=delete;
887  void operator=(execution_context_stack_type const &)=delete;
889 
890  /// A (potentially blocking) access to the results, but only after they are written.
891  /**
892  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations. Throws if the work has been previously erased.
893 
894  \see work_done()
895  */
896  typename add_ref_if_not_void<result_type const>::type __fastcall operator*() const noexcept(false) FORCE_INLINE {
897  return this->get_results();
898  }
899  /// A (potentially blocking) access to the results, but only after they are written.
900  /**
901  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. Throws if the work has been previously erased.
902 
903  \see work_done()
904  */
905  typename add_ref_if_not_void<result_type>::type __fastcall operator*() noexcept(false) FORCE_INLINE {
906  return this->get_results();
907  }
908 
909  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
910  /**
911  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
912 
913  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
914  */
915  result_type const * __fastcall operator->() const noexcept(false) FORCE_INLINE {
916  return &this->get_results();
917  }
918  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
919  /**
920  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
921 
922  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
923  */
924  result_type * __fastcall operator->() noexcept(false) FORCE_INLINE {
925  return &this->get_results();
926  }
927 
928  /// Ensure that if an execution context is passed to another function, only a constant version may be passed.
929  /**
930  This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. i.e. only one thread can write to the results, but many can read. Because multiple read operations do not require locking with respect to each other. Note that if the results are accessed, then that call will block, as necessary, until the results are written, thus ensuring that all reads follow any writes to the contained data.
931  */
932  const execution_context_stack_type * __fastcall operator&() const noexcept(true) override FORCE_INLINE {
933  return this;
934  }
935 
936  /// A counted reference to the item of work that has been transferred to the pool for execution.
937  const typename signalled_work_queue_type::value_type __fastcall
938  wk_queue_item() const noexcept(true) FORCE_INLINE {
940  }
941 
942  /// A counted reference to the item of work that has been transferred to the pool for execution.
943  typename signalled_work_queue_type::value_type __fastcall
944  wk_queue_item() noexcept(true) FORCE_INLINE {
946  }
947 
948  protected:
951 
952  thread_wk_t const & __fastcall core_work() const noexcept(true) final override FORCE_INLINE {
953  return core_work_;
954  }
955  work_complete_t &__fastcall work_complete() noexcept(true) final override FORCE_INLINE {
956  return work_complete_;
957  }
958  work_complete_t &__fastcall work_complete() const noexcept(true) final override FORCE_INLINE {
959  return work_complete_;
960  }
961 
962  typename add_ref_if_not_void<result_type const>::type __fastcall get_results() const noexcept(false) FORCE_INLINE {
963  if (this->has_work()) {
965  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
968  } else {
969  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
970  }
971  }
972 
973  typename add_ref_if_not_void<result_type>::type __fastcall get_results() noexcept(false) FORCE_INLINE {
974  if (this->has_work()) {
976  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
979  } else {
980  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
981  }
982  }
983  };
984 
985  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data, but also allow horizontal threading: if the object being managed has not yet had its result computed, and the queue in the thread_pool is not empty, process an item from that queue in the mean-time.
986  /**
987  This ensures that resource starvation (of threads) cannot occur, as no longer does waiting upon a dereference of an execution context block that thread, as that thread can process other work in the mean-time, which is important for finite-sized thread_pools. This specialisation performs horizontal threading, and the horizontal thread is also created on the stack, so with cache locality and all, quite good at stealing work.
988  The execution_context stores the thread_wk_type inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the thread_wk_type on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, because the counter doesn't do anything, for a greater saving on atomic operations.
989 
990  \see horizontal_execution
991  */
992  template<class TPB, class Wk>
994  public:
999  typedef typename base_t::os_traits os_traits;
1000  typedef typename base_t::pool_type pool_type;
1001  typedef typename base_t::atomic_t atomic_t;
1003  typedef typename base_t::creator_t creator_t;
1004  typedef typename base_t::result_type result_type;
1005  typedef typename base_t::thread_wk_t thread_wk_t;
1007 
1008  /**
1009  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1010  */
1016  );
1017 
1018  execution_context_stack_type(thread_pool_type &pl, typename thread_wk_t::cfg_details_type::params const &p, typename thread_wk_t::closure_t::argument_type &&wk) noexcept(false) FORCE_INLINE
1019  : base_t(pl, p, std::forward<typename thread_wk_t::closure_t::argument_type>(wk)), base2_t(pl, this->core_work_) {
1020  }
1021 
1022  /**
1023  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
1024  */
1027  ~execution_context_stack_type() noexcept(false) {}
1028 
1029  /// Can't automatically convert to a base-class address automatically - to maintain the concept that this is a stack allocated object.
1030  void operator&()=delete;
1031  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1032  template<class T> operator T () const=delete;
1033  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1034  template<class T> operator T ()=delete;
1037 
1038  private:
1039  template<class ExCxt> friend class call_push_back;
1040  };
1041 
1042  /// Ensure that the compiler emits an error if attempting to non-joinably create an execution context.
1043  /**
1044  This class is not supposed to be constructible. It is just here to allow the compiler to compile the code. Non-joinable transfers to a thread pool never create an execution context. And transfers to a non-joinable thread pool also never create an execution context.
1045  */
1046  template<class DM, class TPB, class Wk>
1048  public:
1049  typedef TPB thread_pool_type;
1055 
1057  };
1058 
1059  template<class DM, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk>
1061 
1062  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data.
1063  /**
1064  The execution_context stores the thread_wk_t and algo_thread_wk inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the thread_wk_t and algo_thread_wk on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, noop_atomic_ctr, because the counter doesn't do anything, for a greater saving on atomic operations.
1065  No horizontal threading is done in this specialisation, because a master thread distributes the work.
1066 
1067  \see horizontal_execution
1068  */
1069  template<class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk>
1071  public:
1075  typedef typename base_t::os_traits os_traits;
1076  typedef typename base_t::pool_type pool_type;
1077  typedef typename base_t::atomic_t atomic_t;
1079  typedef typename thread_pool_type::template create_direct<Wk> creator_t;
1080  typedef typename creator_t::result_type result_type;
1083 
1084  private:
1086  typedef typename thread_pool_type::template create_direct<alg_wrap_t> alg_wrap_creator_t;
1087 
1089  using closure_t=typename creator_t::closure_t;
1090 
1091  public:
1093 
1094  /**
1095  Note how we're pretty damn tricky here, and make use of both the dtor and the counter interfaces, using no-op implementations, because the object is going to be allocated on the stack, we manage the memory ourselves, normally. So we save on avoiding heap allocation/deallocation, but also the atomic-operations that would be required in the counter. for the shared_ptr.
1096  */
1098 
1099  /**
1100  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1101  */
1107  );
1108  };
1109 
1110  void add_work(const typename thread_pool_type::pool_type::size_type clique) noexcept(false) FORCE_INLINE {
1114  typename alg_wrap_t::work_wrap(
1116  core_work_.closure(),
1117  clique
1118  ),
1120  );
1121  leaf_wk.process();
1122  }
1123 
1124  public:
1127 
1128  /**
1129  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1130  */
1138  );
1139 
1142  // Ensure any resizing of the output collection is done whilst the inputs & output collections are locked to avoid the inputs being resized in the mean-time.
1144  add_work(clique);
1145  }
1149  add_work(clique);
1150  }
1151 
1152  /**
1153  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
1154  */
1157 
1158  /**
1159  In case the user didn't specifically call wait(), operator*() or operator->() for some reason, throw any registered exception. (I don't throw the exception in the thread's destructor in the thread pool, as this is too late, and makes evil memory leaks in the thread pool destructor.)
1160  */
1162  assert(dynamic_cast<thread_wk_t *>(&core_work_));
1164  }
1165 
1166  /// Can't automatically convert to a base-class address automatically - to maintain the concept that this is a stack allocated object.
1167  void operator&()=delete;
1168  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1169  template<class T> operator T () const=delete;
1170  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1171  template<class T> operator T ()=delete;
1174 
1175  /// A (potentially blocking) access to the results, but only after they are written.
1176  /**
1177  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations. Throws if the work has been previously erased.
1178 
1179  \see work_done()
1180  */
1181  typename add_ref_if_not_void<result_type const>::type __fastcall operator*() const noexcept(false) FORCE_INLINE {
1182  return this->get_results();
1183  }
1184  /// A (potentially blocking) access to the results, but only after they are written.
1185  /**
1186  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. Throws if the work has been previously erased.
1187 
1188  \see work_done()
1189  */
1190  typename add_ref_if_not_void<result_type>::type __fastcall operator*() noexcept(false) FORCE_INLINE {
1191  return this->get_results();
1192  }
1193 
1194  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
1195  /**
1196  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
1197 
1198  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
1199  */
1200  result_type const * __fastcall operator->() const noexcept(false) FORCE_INLINE {
1201  return &this->get_results();
1202  }
1203  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
1204  /**
1205  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
1206 
1207  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
1208  */
1209  result_type * __fastcall operator->() noexcept(false) FORCE_INLINE {
1210  return &this->get_results();
1211  }
1212 
1213  /// Ensure that if an execution context is passed to another function, only a constant version may be passed.
1214  /**
1215  This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. i.e. only one thread can write to the results, but many can read. Because multiple read operations do not require locking with respect to each other. Note that if the results are accessed, then that call will block, as necessary, until the results are written, thus ensuring that all reads follow any writes to the contained data.
1216  */
1217  const execution_context_algo_stack_type * __fastcall operator&() const noexcept(true) override FORCE_INLINE {
1218  return this;
1219  }
1220 
1221  protected:
1224 
1225  thread_wk_t const & __fastcall core_work() const noexcept(true) final override FORCE_INLINE {
1226  return core_work_;
1227  }
1229  return work_complete_;
1230  }
1231  work_complete_t &__fastcall work_complete() const noexcept(true) final override FORCE_INLINE {
1232  return work_complete_;
1233  }
1234 
1235  private:
1236  template<class ExCxt> friend class call_push_back;
1237 
1238  typename add_ref_if_not_void<result_type const>::type __fastcall get_results() const noexcept(false) FORCE_INLINE {
1239  if (this->has_work()) {
1240  this->wait_or_horizontal_thread();
1241  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
1244  } else {
1245  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
1246  }
1247  }
1248 
1249  typename add_ref_if_not_void<result_type>::type __fastcall get_results() noexcept(false) FORCE_INLINE {
1250  if (this->has_work()) {
1251  this->wait_or_horizontal_thread();
1252  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
1255  } else {
1256  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
1257  }
1258  }
1259  };
1260 
1261  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data, but also allow horizontal threading: if the object being managed has not yet had its result computed, and the queue in the thread_pool is not empty, process an item from that queue in the mean-time.
1262  /**
1263  This ensures that resource starvation (of threads) cannot occur, as no longer does waiting upon a dereference of an execution context block that thread, as that thread can process other work in the mean-time, which is important for finite-sized thread_pools. This specialisation performs horizontal threading, and the horizontal thread is also created on the stack, so with cache locality and all, quite good at stealing work.
1264  The execution_context stores the algo_thread_wk inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the algo_thread_wk on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, because the counter doesn't do anything, for a greater saving on atomic operations.
1265 
1266  \see horizontal_execution
1267  */
1268  template<class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk>
1270  public:
1274  typedef typename thread_pool_type::pool_traits_type pool_traits_type; ///< The pool traits.
1279  typedef typename base_t::thread_wk_t thread_wk_t;
1281 
1282  /**
1283  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1284  */
1290  );
1291 
1295  }
1299  }
1300 
1301  /**
1302  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
1303  */
1307  }
1308 
1311 
1312  private:
1313  template<class ExCxt> friend class call_push_back;
1314  };
1315 
1316  /// Ensure that the compiler emits an error if attempting to non-joinably create an execution context.
1317  /**
1318  This class is not supposed to be constructible. It is just here to allow the compiler to compile the code. Non-joinable transfers to a thread pool never create an execution context. And transfers to a non-joinable thread pool also never create an execution context.
1319  */
1320  template<class DM, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk>
1322  public:
1329 
1331  };
1332 
1333  template<class DM, generic_traits::return_data RD, template<class, class, class, template<class> class, template<class> class> class AlgCoreWk, class GenWk, class Wk, template<class> class Deref=deref::noop, template<class> class InitCoreWk=core_work_result::noop>
1334  class execution_context_algo_buff_stack_type;
1335 
1336  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data.
1337  /**
1338  The execution_context stores the thread_wk_t inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the thread_wk_t on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, noop_atomic_ctr, because the counter doesn't do anything, for a greater saving on atomic operations.
1339  No horizontal threading is done in this specialisation, because a master thread distributes the work.
1340 
1341  \todo Add an extra template parameter to supply modifiable functionality to operator*() & operator->(), with a no-op default. Then accumulate, count & find may use this class.
1342 
1343  \see horizontal_execution
1344  */
1345  template<template<class, class, class, template<class> class, template<class> class> class AlgCoreWk, class GenWk, class Wk, template<class> class Deref, template<class> class InitCoreWk>
1347  public:
1348  typedef GenWk gen_wk_t;
1352  typedef typename base_t::os_traits os_traits;
1353  typedef typename base_t::pool_type pool_type;
1354  typedef typename base_t::atomic_t atomic_t;
1356  typedef typename thread_pool_type::template create_direct<Wk> creator_t;
1357  typedef typename creator_t::result_type result_type;
1359 
1360  private:
1362  friend dereference_ops;
1363 
1366  using closure_t=typename creator_t::closure_t;
1368 
1369  public:
1370  /**
1371  Note how we're pretty damn tricky here, and make use of both the dtor and the counter interfaces, using no-op implementations, because the object is going to be allocated on the stack, we manage the memory ourselves, normally. So we save on avoiding heap allocation.deallocation, but also the atomic-operations that would be required in the counter. for the shared_ptr.
1372  */
1375 
1376  /**
1377  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1378  */
1380  };
1381  typedef typename thread_pool_type::template create_direct<gen_wk_t> gen_wk_creator_t;
1382 
1383  void add_work(thread_pool_type &pool, const typename thread_pool_type::pool_type::size_type cliques, const unsigned short default_num_subranges) noexcept(false) FORCE_INLINE {
1387  typename gen_wk_creator_t::closure_t gen_wk(
1388  gen_wk_t(
1389  pool,
1390  core_work_.closure(),
1394  cliques
1395  ),
1397  );
1399 // TODO Set start & end times of processing op, so that time-order of nodes in DFG can be seen.
1400  gen_wk.process();
1401  }
1402 
1403  public:
1406 
1407  /**
1408  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1409  */
1416  );
1417 
1422  p
1423  ) {
1424  // Ensure any resizing of the output collection is done whilst the inputs & output collections are locked to avoid the inputs being resized in the mean-time.
1427  }
1432  p
1433  ) {
1436  }
1437 
1438  /**
1439  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
1440  */
1443 
1444  /**
1445  In case the user didn't specifically call wait(), operator*() or operator->() for some reason, throw any registered exception. (I don't throw the exception in the thread's destructor in the thread pool, as this is too late, and makes evil memory leaks in the thread pool destructor.)
1446  */
1448  assert(dynamic_cast<thread_wk_t *>(&core_work_));
1450  }
1451 
1452  /// Can't automatically convert to a base-class address automatically - to maintain the concept that this is a stack allocated object.
1453  void operator&()=delete;
1454  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1455  template<class T> operator T () const=delete;
1456  /// Attempt to remove the ability to subvert the safety by incorrectly casting the execution_context.
1457  template<class T> operator T ()=delete;
1460 
1461  /// A (potentially blocking) access to the results, but only after they are written.
1462  /**
1463  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations. Throws if the work has been previously erased.
1464 
1465  \see work_done()
1466  */
1467  typename dereference_ops::const_ref_result_type __fastcall operator*() const noexcept(false) FORCE_INLINE {
1468  return dereference_ops::deref(*this);
1469  }
1470  /// A (potentially blocking) access to the results, but only after they are written.
1471  /**
1472  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. Throws if the work has been previously erased.
1473 
1474  \see work_done()
1475  */
1476  typename dereference_ops::ref_result_type __fastcall operator*() noexcept(false) FORCE_INLINE {
1477  return dereference_ops::deref(*this);
1478  }
1479 
1480  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
1481  /**
1482  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
1483 
1484  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
1485  */
1486  typename dereference_ops::const_addr_result_type __fastcall operator->() const noexcept(false) FORCE_INLINE {
1487  return dereference_ops::arrow(*this);
1488  }
1489  /// A (potentially blocking) access to the results, but only after they are written, or process other work from the signalled_work_queue or batch whilst wait for the core_work to be processed.
1490  /**
1491  Obtain the results of the mutation of the input work. Note that this is a potentially blocking call: it will return only when the mutation has been signalled as completed. i.e. the work has been transferred, joinably to the pool, then executed, and not erased beforehand. Also note that this may throw an exception of the type specified by any of the exception specifications that may have been used when transferring the work to the pool. This function allows write access to the results (in the current stack-frame, i.e. thread), but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. (This operator has been provided to allow chaining of "operator->()"s by the compiler.) Throws if the work has been previously erased.
1492 
1493  \see work_done(), get_results(), wait_or_horizontal_thread(), pool::batch_details
1494  */
1495  typename dereference_ops::addr_result_type __fastcall operator->() noexcept(false) FORCE_INLINE {
1496  return dereference_ops::arrow(*this);
1497  }
1498 
1499  /// Ensure that if an execution context is passed to another function, only a constant version may be passed.
1500  /**
1501  This function allows read access to the results, but only after the other thread has written, thus implying a sequential ordering of memory operations by the current thread. i.e. only one thread can write to the results, but many can read. Because multiple read operations do not require locking with respect to each other. Note that if the results are accessed, then that call will block, as necessary, until the results are written, thus ensuring that all reads follow any writes to the contained data.
1502  */
1503  const execution_context_algo_buff_stack_type * __fastcall operator&() const noexcept(true) override FORCE_INLINE {
1504  return this;
1505  }
1506 
1507  protected:
1510 
1511  thread_wk_t const & __fastcall core_work() const noexcept(true) final override FORCE_INLINE {
1512  return core_work_;
1513  }
1515  return work_complete_;
1516  }
1517  work_complete_t &__fastcall work_complete() const noexcept(true) final override FORCE_INLINE {
1518  return work_complete_;
1519  }
1520 
1521  private:
1522  template<class ExCxt> friend class call_push_back;
1523 
1524  typename add_ref_if_not_void<result_type const>::type __fastcall get_results() const noexcept(false) FORCE_INLINE {
1525  if (this->has_work()) {
1526  this->wait_or_horizontal_thread();
1527  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
1530  } else {
1531  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
1532  }
1533  }
1534 
1535  typename add_ref_if_not_void<result_type>::type __fastcall get_results() noexcept(false) FORCE_INLINE {
1536  if (this->has_work()) {
1537  this->wait_or_horizontal_thread();
1538  // Right - we now check for any uncaught (by the user type, that is) exceptions that were caught by the thread class, and throw it.
1541  } else {
1542  throw exception_type(_T("No results: work previously erased from the pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this)), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
1543  }
1544  }
1545  };
1546 
1547  /// Enforce a sequential-consistency memory-model on the result data that this object manages, via the accessors to the result data, but also allow horizontal threading: if the object being managed has not yet had its result computed, and the queue in the thread_pool is not empty, process an item from that queue in the mean-time.
1548  /**
1549  This ensures that resource starvation (of threads) cannot occur, as no longer does waiting upon a dereference of an execution context block that thread, as that thread can process other work in the mean-time, which is important for finite-sized thread_pools. This specialisation performs horizontal threading, and the horizontal thread is also created on the stack, so with cache locality and all, quite good at stealing work.
1550  The execution_context stores the algo_thread_wk inside it, so it is allocated on the stack, not on the heap, which is only useful for classic, joinable, data-flow operations. This optimisation saves allocating the algo_thread_wk on the heap, and also the shared_ptr in it can have a dummy sp_counter_type, because the counter doesn't do anything, for a greater saving on atomic operations.
1551 
1552  \see horizontal_execution
1553  */
1554  template<template<class, class, class, template<class> class, template<class> class> class AlgCoreWk, class GenWk, class Wk, template<class> class Deref, template<class> class InitCoreWk>
1556  public:
1560  typedef typename thread_pool_type::pool_traits_type pool_traits_type; ///< The pool traits.
1565  typedef typename base_t::thread_wk_t thread_wk_t;
1567 
1568  /**
1569  To assist in allowing compile-time computation of the algorithmic order of the threading model.
1570  */
1576  );
1577 
1581  }
1585  }
1586 
1587  /**
1588  This needs to be declared, to be standards compliant, but needn't be defined, as cctor elision doesn't require the definition.
1589  */
1592 
1594  }
1595 
1598 
1599  private:
1600  template<class ExCxt> friend class call_push_back;
1601  };
1602 
1603  /// Ensure that the compiler emits an error if attempting to non-joinably create an execution context.
1604  /**
1605  This class is not supposed to be constructible. It is just here to allow the compiler to compile the code. Non-joinable transfers to a thread pool never create an execution context. And transfers to a non-joinable thread pool also never create an execution context.
1606  */
1607  template<class DM, template<class, class, class, template<class> class, template<class> class> class AlgCoreWk, class GenWk, class Wk, template<class> class Deref, template<class> class InitCoreWk>
1609  public:
1616 
1618  };
1619 
1620 } } } }
1621 
1622 #endif