libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
thread_pool.hpp
Go to the documentation of this file.
1 #ifndef LIBJMMCG_CORE_PRIVATE_THREAD_POOL_HPP
2 #define LIBJMMCG_CORE_PRIVATE_THREAD_POOL_HPP
3 
4 /******************************************************************************
5 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
6 **
7 ** This library is free software; you can redistribute it and/or
8 ** modify it under the terms of the GNU Lesser General Public
9 ** License as published by the Free Software Foundation; either
10 ** version 2.1 of the License, or (at your option) any later version.
11 **
12 ** This library is distributed in the hope that it will be useful,
13 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 ** Lesser General Public License for more details.
16 **
17 ** You should have received a copy of the GNU Lesser General Public
18 ** License along with this library; if not, write to the Free Software
19 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 */
21 
23 #include "../../core/thread_pool_aspects.hpp"
24 
25 #include <numeric>
26 
27 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd { namespace private_ {
28 
29  /// Hacking to avoid using the full execution_context and create a cheaper, more simple type for sequential use.
30  namespace void_chooser {
31  template<class Elem>
32  class type {
33  public:
34  typedef Elem element_type;
35 
36  constexpr type() noexcept(true) FORCE_INLINE {}
37  type(element_type const &v) noexcept(true) FORCE_INLINE
38  : value(v) {}
39  element_type const & val() const noexcept(true) FORCE_INLINE {
40  return value;
41  }
42  element_type & val() noexcept(true) FORCE_INLINE {
43  return value;
44  }
45 
46  template<class InpWk>
47  void FORCE_INLINE
48  process(InpWk &wk) {
49  wk.process(value);
50  }
51 
52  private:
53  element_type value;
54  };
55  template<>
56  struct type<void> {
57  typedef void element_type;
58 
59  constexpr type() noexcept(true) FORCE_INLINE {}
60  static element_type
61  val() noexcept(true) FORCE_INLINE {
62  }
63 
64  template<class InpWk>
65  static void FORCE_INLINE
66  process(InpWk &wk) {
67  wk.process();
68  }
69  };
70  }
71 
72  /// A specialisation for sequential "threading", i.e. everything runs on the main thread.
73  /**
74  So that the use can make use of the thread library, and automatically switch between real threading and sequential mode without changing their user code apart from some simple typedefs.
75  */
76  template<
77  class DM,
79  typename P
80  >
82  private:
83  /// An opaque, hack, for the thread-type in the "pool".
84  class fake_thread;
85  public:
86  using pool_traits_type=P;
87  /// A hack to give us a size_type member-typedef.
89  using work_distribution_mode=DM;
90  using signalled_work_queue_type=typename pool_traits_type::template signalled_work_queue_type<typename work_distribution_mode::queue_model>;
91  using queue_size_type=typename signalled_work_queue_type::size_type;
92  typedef typename pool_type::size_type pool_size_type;
93  /// Just stub statistics - to support the interface. No actual statistics are collected.
94  using statistics_type=typename pool_traits_type::template statistics_type<typename work_distribution_mode::queue_model>;
95  /// The type of the control-flow graph that will be generated at run-time, if supported.
96  /**
97  \see cfg(), dummy_control_flow_graph, control_flow_graph
98  */
99  typedef typename pool_traits_type::cfg_type cfg_type;
100  /// A useful typedef to easily get to the various OS traits.
101  typedef typename pool_traits_type::os_traits os_traits;
103  /// A useful typedef to easily get to the various OS specific thread-traits.
105  /// A useful typedef to easily get to the various API details.
107  /// A useful typedef to easily get to the various priorities.
109 
111 
112  /// A useful typedef to easily get to the nonjoinable grammar element.
113  /**
114  \see nonjoinable_t
115  */
117  /// A useful typedef to easily get to the joinable grammar element.
118  /**
119  \see joinable_t
120  */
122  /// A useful typedef to easily get to the nonjoinable_buff grammar element.
123  /**
124  \see nonjoinable_buff_t
125  */
127  template<priority_type Pri> struct priority {};
128  /// Used by the library to implicitly generate a closure from the InpWk type.
129  template<
130  typename InpWk, ///< The closure_base-derived closure type. The result_type is inferred from the process(result_type) or process() member-functions declared in the Wk type. Note that the process() member-function must not be overloaded, or this will not work, also that it must use the __fastcall calling-convention on those platforms that support it.
131  class FnType=decltype(&std::remove_reference<InpWk>::type::process) ///< The default mutator function is called process, but you could provide an alternative member-function name if desired, as long as the signature is correct.
132 // TODO FnType FnPtr=&InpWk::process
133  >
137  using closure_t=typename base_t::closure_t;
139  /// This is a useful typedef to get at the execution_context.
140  /**
141  The execution_context is created by joinably transferring work into the pool. It has various uses, but is primarily used to atomically and synchronously wait on the results of the work on the closure_base-derived closure-derived object, as specified by the thread_wk_t object transferred into the pool. But it can also pass back specified exceptions that may be thrown by the work. It can also be used to asynchronously test if the work has been completed, and delete the work from the pool, if it has not been started.
142 
143  \see execution_context_type_stack
144  \see joinable
145  \see closure_base
146  */
148  };
149  /// This is a useful typedef to get at the execution_context.
150  /**
151  The execution_context is created by joinably transferring work into the pool. It has various uses, but is primarily used to atomically and synchronously wait on the results of the work on the closure_base-derived closure-derived object, as specified by the thread_wk_t object transferred into the pool. But it can also pass back specified exceptions that may be thrown by the work. It can also be used to asynchronously test if the work has been completed, and delete the work from the pool, if it has not been started.
152 
153  \see create_direct
154  \see execution_context_stack_type
155  \see joinable
156  \see closure_base
157  */
158  template<class InpWk>
160  public:
162 
163  private:
164  typedef void_chooser::type<result_type> element_type;
165 
166  public:
167  constexpr execution_context_stack() noexcept(true) FORCE_INLINE {}
168  template<class Wk> FORCE_INLINE
169  execution_context_stack(sequential_pool &pool, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &, Wk &&wk)
170  : res() {
171  pool.statistics_.added_work();
172  Wk work(wk);
173  res.process(work);
174  pool.statistics_.processed_vertical_work();
175  }
176 
177  typename add_ref_if_not_void<result_type const>::type operator*() const noexcept(true) FORCE_INLINE {
178  return res.val();
179  }
180  typename add_ref_if_not_void<result_type const>::type operator*() noexcept(true) FORCE_INLINE {
181  return res.val();
182  }
183  result_type const * operator->() const noexcept(true) FORCE_INLINE {
184  return &res.val();
185  }
186  result_type * operator->() noexcept(true) FORCE_INLINE {
187  return &res.val();
188  }
189 
190  execution_context_stack const &operator&() const noexcept(true) FORCE_INLINE {
191  return *this;
192  }
193 
194  void operator&()=delete;
195 
196  private:
197  element_type res;
198  };
199  struct void_work {
200  typedef void result_type;
201  constexpr void process() noexcept(true) FORCE_INLINE {}
202  };
204  template<class Res>
206  public:
207  typedef Res result_type;
208 
209  private:
210  typedef void_chooser::type<result_type> element_type;
211 
212  public:
213  execution_context_algo_stack(element_type const &wk) noexcept(true) FORCE_INLINE
214  : res(wk) {
215  }
216 
217  typename add_ref_if_not_void<result_type const>::type operator*() const noexcept(true) FORCE_INLINE {
218  return res.val();
219  }
220  typename add_ref_if_not_void<result_type const>::type operator*() noexcept(true) FORCE_INLINE {
221  return res.val();
222  }
223  result_type const * operator->() const noexcept(true) FORCE_INLINE {
224  return &res.val();
225  }
226  result_type * operator->() noexcept(true) FORCE_INLINE {
227  return &res.val();
228  }
229 
230  execution_context_algo_stack const &operator&() const noexcept(true) FORCE_INLINE {
231  return *this;
232  }
233 
234  void operator&()=delete;
235 
236  private:
237  element_type res;
238  };
240 
241  /// A modifier to allow joinably transferring the work to the pool.
242  struct algo_hack_t {
244 
245  static constexpr execution_context
246  process(const typename sequential_pool::pool_type::size_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) noexcept(true) FORCE_INLINE {
247  return execution_context();
248  }
249  };
250  /// A modifier to allow joinably transferring the work to the pool.
253 
254  static constexpr execution_context
255  process(const typename sequential_pool::pool_type::size_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) noexcept(true) FORCE_INLINE {
256  return execution_context();
257  }
258  };
259 
260  /// A modifier to allow joinably transferring the work to the pool.
261  template<
262  class Colln,
263  typename Pred ///< The predicate to be used to find the value to be counted.
264  >
265  class count_if_t {
266  public:
267  typedef Pred operation_type;
268  typedef long num_elems_ct_t;
270 
271  /**
272  \param c The adapted collection to iterate over.
273  \param pr The predicate to compare against in the collection.
274  */
275  __stdcall count_if_t(Colln const &c, operation_type const &pr) noexcept(true) FORCE_INLINE
276  : pred(pr), colln(c) {
277  }
278 
279  /// Joinably transfer the predicate to the pool.
280  /**
281  \return An execution_context for obtaining the number of matching items in the collection.
282 
283  \see execution_context_stack
284  */
285  execution_context __fastcall
286  process(cliques::element_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) const FORCE_INLINE {
287  return execution_context(num_elems_ct_t(std::count_if(colln.colln().begin(), colln.colln().end(), pred)));
288  }
289 
290  private:
291  operation_type const pred;
292  Colln const &colln;
293  };
294 
295  /// A modifier to allow joinably transferring the work to the pool.
296  template<
297  class Colln
298  >
299  class count_t : public count_if_t<
300  Colln,
301  decltype(
302  std::bind(std::equal_to<typename Colln::value_type>(), typename Colln::value_type(), std::placeholders::_1)
303  )
304  > {
305  private:
306  typedef count_if_t<
307  Colln,
308  decltype(
309  std::bind(std::equal_to<typename Colln::value_type>(), typename Colln::value_type(), std::placeholders::_1)
310  )
311  > base_t;
312 
313  public:
314  typedef typename base_t::execution_context execution_context;
315 
316  __stdcall count_t(Colln const &c, typename Colln::value_type const &v) noexcept(true) FORCE_INLINE
317  : base_t(c, typename base_t::operation_type(std::equal_to<typename Colln::value_type>(), v, std::placeholders::_1)) {
318  }
319  };
320 
321  /// A modifier to allow joinably transferring the work to the pool.
322  template<
323  class Colln, ///< The collection to search.
324  typename Pred ///< The predicate to be used to find the value to be counted.
325  >
326  class find_if_t {
327  public:
328  typedef Pred operation_type;
329  typedef bool found_t;
331 
332  /**
333  \param c The adapted collection to search.
334  \param pr The predicate to compare against in the collection.
335  */
336  __stdcall find_if_t(Colln const &c, operation_type const &pr) noexcept(true) FORCE_INLINE
337  : pred(pr), colln(c) {
338  }
339 
340  /// Joinably transfer the predicate to the pool.
341  /**
342  \return An execution_context for obtaining the number of matching items in the collection.
343 
344  \see execution_context_stack
345  */
346  execution_context __fastcall
347  process(cliques::element_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) const FORCE_INLINE {
348  return execution_context(found_t(std::find_if(colln.colln().begin(), colln.colln().end(), pred)!=colln.colln().end()));
349  }
350 
351  private:
352  operation_type const pred;
353  Colln const &colln;
354  };
355 
356  /// A modifier to allow joinably transferring the work to the pool.
357  template<
358  class Colln
359  >
360  struct find_t : public find_if_t<
361  Colln,
362  decltype(
363  std::bind(std::equal_to<typename Colln::value_type>(), typename Colln::value_type(), std::placeholders::_1)
364  )
365  > {
366  typedef find_if_t<
367  Colln,
368  decltype(
369  std::bind(std::equal_to<typename Colln::value_type>(), typename Colln::value_type(), std::placeholders::_1)
370  )
373 
374  __stdcall find_t(Colln const &c, typename Colln::value_type const &v) noexcept(true) FORCE_INLINE
375  : base_t(c, typename base_t::operation_type(std::equal_to<typename Colln::value_type>(), v, std::placeholders::_1)) {
376  }
377  };
378 
379  /// A modifier to allow joinably transferring the work to the pool.
380  template<
381  class Colln, ///< The collection to search.
382  typename BinOp ///< The BinOp to be used to accumulate the result.
383  >
385  public:
386  typedef BinOp operation_type;
389 
390  /**
391  \param c The adapted collection to iterate over.
392  \param v The value with which the accumulate operation should be initialised.
393  \param op The binary operation to uses to accumulate the result.
394  */
395  accumulate_op_processor(Colln const &c, accumulated_res_t const &v, operation_type const &op) noexcept(true) FORCE_INLINE
396  : init_val(v), binop(op), colln(c) {
397  }
398  /// Joinably transfer the predicate to the pool.
399  /**
400  \return An execution_context for obtaining the number of matching items in the collection.
401 
402  \see execution_context_stack
403  */
404  execution_context __fastcall
405  process(cliques::element_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) const FORCE_INLINE {
406  return execution_context(accumulated_res_t(std::accumulate(colln.colln().begin(), colln.colln().end(), init_val, binop)));
407  }
408 
409  private:
410  accumulated_res_t const init_val;
411  operation_type const binop;
412  Colln const &colln;
413  };
414  /// A modifier to allow joinably transferring the work to the pool.
415  template<
416  class Colln, ///< The collection to search.
417  class V
418  >
419  struct accumulate_processor : public accumulate_op_processor<Colln, std::plus<V>> {
420  typedef accumulate_op_processor<Colln, std::plus<V>> base_t;
421 
422  accumulate_processor(Colln const &colln, V const &v) noexcept(true) FORCE_INLINE
423  : base_t(colln, v, typename base_t::operation_type()) {
424  }
425  };
426  /// A modifier to allow joinably transferring the work to the pool.
427  template<
428  class Colln, ///< The collection to search.
429  class Comp=std::less<typename Colln::value_type> ///< The comparator to use to compare the items.
430  >
432  public:
433  typedef Comp operation_type;
434  typedef typename Colln::value_type result_type;
436 
437  __stdcall max_element_t(Colln const &c, Comp const &comp) noexcept(true) FORCE_INLINE
438  : colln(c), compare(comp) {
439  }
440  max_element_t(max_element_t const &m) noexcept(true) FORCE_INLINE
441  : colln(m.colln), compare(m.compare) {
442  }
443 
444  /// Joinably transfer the predicate to the pool.
445  /**
446  \return An execution_context for obtaining the number of matching items in the collection.
447 
448  \see execution_context
449  */
450  execution_context __fastcall
451  process(cliques::element_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) const FORCE_INLINE {
452  const typename Colln::container_type::const_iterator i(std::max_element(colln.colln().begin(), colln.colln().end(), compare));
453  return execution_context(result_type(i!=colln.colln().end() ? *i : std::numeric_limits<typename Colln::value_type>::min()));
454  }
455 
456  private:
457  Colln const &colln;
458  Comp const &compare;
459  };
460  /// A modifier to allow joinably transferring the work to the pool.
461  template<
462  class Colln, ///< The collection to search.
463  class Comp=std::less<typename Colln::value_type> ///< The comparator to use to compare the items.
464  >
466  public:
467  typedef Comp operation_type;
468  typedef typename Colln::value_type result_type;
470 
471  __stdcall min_element_t(Colln const &c, Comp const &comp) noexcept(true) FORCE_INLINE
472  : colln(c), compare(comp) {
473  }
474  min_element_t(min_element_t const &m) noexcept(true) FORCE_INLINE
475  : colln(m.colln), compare(m.compare) {
476  }
477 
478  /// Joinably transfer the predicate to the pool.
479  /**
480  \return An execution_context for obtaining the number of matching items in the collection.
481 
482  \see execution_context
483  */
484  execution_context __fastcall
485  process(cliques::element_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) const FORCE_INLINE {
486  const typename Colln::container_type::const_iterator i(std::min_element(colln.colln().begin(), colln.colln().end(), compare));
487  return execution_context(result_type(i!=colln.colln().end() ? *i : std::numeric_limits<typename Colln::value_type>::max()));
488  }
489 
490  private:
491  Colln const &colln;
492  Comp const &compare;
493  };
494 
495  /// A modifier to allow joinably transferring the work to the pool.
496  template<
497  typename CollnIn1,
498  typename CollnIn2,
499  typename CollnOut,
500  typename Compare
501  >
502  struct merge_t {
504 
505  static constexpr execution_context __fastcall
506  process(const typename sequential_pool::pool_type::size_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) noexcept(true) FORCE_INLINE {
507  return execution_context();
508  }
509  };
510  /// A modifier to allow joinably transferring the work to the pool.
511  template<
512  class Colln,
513  typename Compare
514  >
515  struct sort_t {
517 
518  static constexpr execution_context __fastcall
519  process(const typename sequential_pool::pool_type::size_type, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params const &) noexcept(true) FORCE_INLINE {
520  return execution_context();
521  }
522  };
523 
524  constexpr __stdcall sequential_pool() noexcept(true) FORCE_INLINE
525  : pool_size_() {
526  }
527  explicit __stdcall sequential_pool(pool_size_type sz) noexcept(true) FORCE_INLINE
528  : pool_size_(sz), statistics_() {
529  }
531 
533  }
534 
535  /// A stub just for compatibility. Always returns "true".
536  static constexpr bool __fastcall pool_empty() noexcept(true) FORCE_INLINE {
537  return true;
538  }
539  /// A stub just for compatibility. Always returns "0".
540  constexpr pool_size_type __fastcall pool_size() const noexcept(true) FORCE_INLINE {
541  return pool_size_;
542  }
543  /// A stub just for compatibility. Always returns "true".
544  static constexpr bool __fastcall queue_empty() noexcept(true) FORCE_INLINE {
545  return true;
546  }
547  /// A stub just for compatibility. Always returns "0".
548  static constexpr pool_size_type __fastcall queue_size() noexcept(true) FORCE_INLINE {
549  return 0;
550  }
551 
552  /// A stub just for compatibility. Always returns "0".
553  static void __fastcall queue_clear() noexcept(true) FORCE_INLINE {
554  }
555 
556  /// A stub just for compatibility.
557  statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
558  return statistics_;
559  }
560 
561  /// Return the theoretical minimum time in computations required to complete the current work.
562  static constexpr unsigned long __fastcall
564  return 0;
565  }
566  template<class T>
567  static constexpr unsigned long __fastcall FORCE_INLINE
568  min_time(T) noexcept(true) {
569  return 0;
570  }
571 
572  /// Return the theoretical minimum number of processors required to achieve the minimum computation time required to complete the current work.
573  static constexpr unsigned long __fastcall
575  return 0;
576  }
577  template<class T>
578  static constexpr unsigned long __fastcall FORCE_INLINE
579  min_processors(T) noexcept(true) {
580  return 0;
581  }
582 
583  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
584  /**
585  \param c A collection.
586  \param fn A unary_function-type that need not be thread-safe, nor support reentrancy, but should not have side-effects, but in other respects the same as for std::for_each().
587  \return An atomic object that may be waited upon to determine when all of the applications of f are complete.
588 
589  \see std::for_each
590  */
591  template<
592  class Colln,
593  class Fn
594  > parallel_algorithm<algo_hack_stack_t> __fastcall FORCE_INLINE
595  for_each(Colln const &c, Fn const &fn) const {
596  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
597  statistics_.update_colln_stats(c.colln().size());
598  std::for_each(c.colln().begin(), c.colln().end(), fn);
599  return reduction_t(typename reduction_t::operation_type());
600  }
601 
602  /**
603  \param c A collection.
604  \param p The predicate to use to count the matching values.
605  \return An execution_context that may be waited upon to determine when all of the applications of p are complete, and obtain the count.
606 
607  \see std::count_if
608  \see execution_context
609  */
610  template<
611  class Colln,
612  typename Pred
613  > parallel_algorithm<count_if_t<Colln, Pred> > __fastcall FORCE_INLINE
614  count_if(Colln &c, Pred const &p) const {
615  typedef parallel_algorithm<count_if_t<Colln, Pred> > reduction_t;
616  statistics_.update_colln_stats(c.colln().size());
617  return reduction_t(typename reduction_t::operation_type(c, p));
618  }
619 
620  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
621  /**
622  \param c A collection.
623  \param v The value to find and be counted.
624  \return An execution_context that may be waited upon, and obtain the count.
625 
626  \see std::count
627  \see execution_context
628  */
629  template<
630  class Colln
631  > parallel_algorithm<count_t<Colln> > __fastcall FORCE_INLINE
632  count(Colln &c, typename Colln::value_type const &v) const {
633  typedef parallel_algorithm<count_t<Colln> > reduction_t;
634  statistics_.update_colln_stats(c.colln().size());
635  return reduction_t(typename reduction_t::operation_type(c, v));
636  }
637 
638  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
639  /**
640  \param c A collection.
641  \param p The predicate to use to find the matching value.
642  \return An execution_context that may be waited upon to determine when all of the applications of p are complete, and obtain a boolean indicating if the item was found.
643 
644  \see std::find_if
645  \see execution_context
646  */
647  template<
648  class Colln,
649  class Pred
650  > parallel_algorithm<find_if_t<Colln, Pred> > __fastcall FORCE_INLINE
651  find_if(Colln const &c, Pred const &p) const {
652  typedef parallel_algorithm<find_if_t<Colln, Pred> > reduction_t;
653  statistics_.update_colln_stats(c.colln().size());
654  return reduction_t(typename reduction_t::operation_type(c, p));
655  }
656 
657  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
658  /**
659  \param c A collection.
660  \param v The value to be found.
661  \return An execution_context that may be waited upon, and obtain a boolean indicating if the item was found.
662 
663  \see std::find
664  \see execution_context
665  */
666  template<
667  class Colln
668  > parallel_algorithm<find_t<Colln> > __fastcall FORCE_INLINE
669  find(Colln const &c, typename Colln::value_type const &v) const {
670  typedef parallel_algorithm<find_t<Colln> > reduction_t;
671  statistics_.update_colln_stats(c.colln().size());
672  return reduction_t(typename reduction_t::operation_type(c, v));
673  }
674 
675  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
676  /**
677  \param in A collection.
678  \param out Another collection.
679  \param op The unary operator to apply to each element of in the output placed into out.
680  \return An execution_context that may be waited upon to determine when all of the applications of op are complete.
681 
682  \see std::transform
683  \see execution_context
684  */
685  template<
686  typename CollnIn,
687  typename CollnOut,
688  class UniOp
689  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
690  transform(CollnIn const &in, CollnOut &out, UniOp const &op) const {
691  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
692 
693  statistics_.update_colln_stats(in.colln().size());
694  out.resize_noinit_nolk(in.size());
695  std::transform(
696  in.colln().begin(),
697  in.colln().end(),
698  out.colln().begin(),
699  op
700  );
701  return reduction_t(typename reduction_t::operation_type());
702  }
703 
704  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
705  /**
706  \param in1 The first collection.
707  \param in2 The second collection.
708  \param out Another collection.
709  \param op The unary operator to apply to each element of in the output placed into out.
710  \return An execution_context that may be waited upon to determine when all of the applications of op are complete.
711 
712  \see std::transform
713  \see execution_context
714  */
715  template<
716  typename CollnIn1,
717  typename CollnIn2,
718  typename CollnOut,
719  class BinOp
720  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
721  transform(CollnIn1 const &in1, CollnIn2 const &in2, CollnOut &out, BinOp const &op) const {
722  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
723 
724  statistics_.update_colln_stats(in1.colln().size());
725  out.resize_noinit_nolk(in1.size());
726  std::transform(
727  in1.colln().begin(),
728  in1.colln().end(),
729  in2.colln().begin(),
730  out.colln().begin(),
731  op
732  );
733  return reduction_t(typename reduction_t::operation_type());
734  }
735 
736  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
737  /**
738  \param in A collection.
739  \param out Another collection.
740  \return An execution_context that may be waited upon to determine when all of the applications of f are complete.
741 
742  \see std::copy
743  */
744  template<
745  typename CollnIn,
746  typename CollnOut
747  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
748  copy(CollnIn const &in, CollnOut &out) const {
749  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
750 
751  statistics_.update_colln_stats(in.colln().size());
752  out.resize_noinit_nolk(in.size());
753  std::copy(
754  in.colln().begin(),
755  in.colln().end(),
756  out.colln().begin()
757  );
758  return reduction_t(typename reduction_t::operation_type());
759  }
760 
761  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
762  /**
763  \param c A collection.
764  \param v The initial value.
765  \param binop The binary operation to use.
766  \return An execution_context that may be waited upon to determine when all of the applications of op are complete, and obtain the result.
767 
768  \see std::accumulate
769  \see execution_context
770  */
771  template<
772  class Colln,
773  typename BinOp
774  > parallel_algorithm<accumulate_op_processor<Colln, BinOp> > __fastcall FORCE_INLINE
775  accumulate(Colln const &c, typename BinOp::result_type const &v, BinOp const &binop) const {
776  typedef parallel_algorithm<accumulate_op_processor<Colln, BinOp> > reduction_t;
777  statistics_.update_colln_stats(c.colln().size());
778  return reduction_t(typename reduction_t::operation_type(c, v, binop));
779  }
780 
781  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
782  /**
783  \param c A collection.
784  \param v The initial value.
785  \return An execution_context that may be waited upon to determine when all of the applications of op are complete, and obtain the result.
786 
787  \see std::accumulate
788  \see execution_context
789  */
790  template<
791  class Colln,
792  class V
793  > parallel_algorithm<accumulate_processor<Colln, V>> __fastcall FORCE_INLINE
794  accumulate(Colln const &c, V const &v) const {
795  typedef parallel_algorithm<accumulate_processor<Colln, V>> reduction_t;
796  statistics_.update_colln_stats(c.colln().size());
797  return reduction_t(typename reduction_t::operation_type(c, v));
798  }
799 
800  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
801  /**
802  \param c A collection to be filled.
803  \param sz The number of items to place into the collection.
804  \param v The value used to copy into the collection.
805 
806  \see std::fill_n
807  */
808  template<
809  class Colln
810  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
811  fill_n(Colln &c, typename Colln::size_type sz, typename Colln::value_type const &v) const {
812  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
813 
814  c.resize_noinit_nolk(sz);
815  statistics_.update_colln_stats(c.colln().size());
816  std::fill(c.colln().begin(), c.colln().end(), v);
817  return reduction_t(typename reduction_t::operation_type());
818  }
819 
820  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
821  /**
822  \param c A collection to be filled.
823  \param v The value used to copy into the collection.
824 
825  \see std::fill
826  */
827  template<
828  class Colln
829  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
830  fill(Colln &c, typename Colln::value_type const &v) const {
831  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
832 
833  statistics_.update_colln_stats(c.colln().size());
834  std::fill(c.colln().begin(), c.colln().end(), v);
835  return reduction_t(typename reduction_t::operation_type());
836  }
837 
838  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
839  /**
840  \param c A collection to be filled.
841 
842  \see std::reverse
843  */
844  template<
845  typename Colln
846  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
847  reverse(Colln &c) const {
848  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
849 
850  statistics_.update_colln_stats(c.colln().size());
851  std::reverse(c.colln().begin(), c.colln().end());
852  return reduction_t(typename reduction_t::operation_type());
853  }
854 
855  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
856  /**
857  \param c A collection.
858  \param comp The comparator to use to compare the items.
859  \return An execution_context that may be waited upon to obtain the result, which is the largest value in the collection, not an iterator to it.
860 
861  \see std::max_element
862  \see execution_context
863  */
864  template<
865  class Colln,
866  class Comp
867  > parallel_algorithm<max_element_t<Colln, Comp>> __fastcall FORCE_INLINE
868  max_element(Colln const &c, Comp const &comp) const {
869  typedef parallel_algorithm<max_element_t<Colln, Comp>> reduction_t;
870  statistics_.update_colln_stats(c.colln().size());
871  return reduction_t(typename reduction_t::operation_type(c, comp));
872  }
873 
874  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
875  /**
876  \param c A collection.
877  \return An execution_context that may be waited upon to obtain the result, which is the largest value in the collection, not an iterator to it.
878 
879  \see std::max_element
880  \see execution_context
881  */
882  template<
883  typename Colln
884  > parallel_algorithm<max_element_t<Colln, std::less<typename Colln::value_type>>> __fastcall FORCE_INLINE
885  max_element(Colln const &c) const {
886  return max_element(c, std::less<typename Colln::value_type>());
887  }
888 
889  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
890  /**
891  \param c A collection.
892  \param comp The comparator to use to compare the items.
893  \return An execution_context that may be waited upon to obtain the result, which is the smallest value in the collection, not an iterator to it.
894 
895  \see std::min_element
896  \see execution_context
897  */
898  template<
899  class Colln,
900  class Comp
901  > parallel_algorithm<min_element_t<Colln, Comp>> __fastcall FORCE_INLINE
902  min_element(Colln const &c, Comp const &comp) const {
903  typedef parallel_algorithm<min_element_t<Colln, Comp>> reduction_t;
904  statistics_.update_colln_stats(c.colln().size());
905  return reduction_t(typename reduction_t::operation_type(c, comp));
906  }
907 
908  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
909  /**
910  \param c A collection.
911  \return An execution_context that may be waited upon to obtain the result, which is the smallest value in the collection, not an iterator to it.
912 
913  \see std::min_element
914  \see execution_context
915  */
916  template<
917  typename Colln
918  > parallel_algorithm<min_element_t<Colln, std::less<typename Colln::value_type>>> __fastcall FORCE_INLINE
919  min_element(Colln const &c) const {
920  return min_element(c, std::less<typename Colln::value_type>());
921  }
922 
923  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
924  /**
925  \param c The collection to be sorted.
926  \param comp The comparison operator.
927  \return An execution_context that may be waited upon to determine when the sort is complete.
928 
929  \see std::sort()
930  \see execution_context
931  */
932  template<
933  typename Colln,
934  class Compare
935  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
936  sort(Colln &c, Compare const &comp) const {
937  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
938 
939  statistics_.update_colln_stats(c.colln().size());
940  std::sort(
941  c.colln().begin(),
942  c.colln().end(),
943  comp
944  );
945  return reduction_t(typename reduction_t::operation_type());
946  }
947 
948  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
949  /**
950  \param c The collection to be sorted.
951  \return An execution_context that may be waited upon to determine when the sort is complete.
952 
953  \see std::sort()
954  \see execution_context
955  */
956  template<
957  typename Colln
958  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
959  sort(Colln &c) const {
960  return sort(c, std::less<typename Colln::value_type>());
961  }
962 
963  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
964  /**
965  \param in1 The first collection.
966  \param in2 The second collection.
967  \param out Another collection.
968  \param comp The comparison operator.
969  \return An execution_context that may be waited upon to determine when the merge is complete.
970 
971  \see std::merge()
972  \see execution_context
973  */
974  template<
975  typename CollnIn1,
976  typename CollnIn2,
977  typename CollnOut,
978  class Compare
979  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
980  merge(CollnIn1 const &in1, CollnIn2 const &in2, CollnOut &out, Compare const &comp) const {
981  typedef parallel_algorithm<algo_hack_stack_t> reduction_t;
982 
983  statistics_.update_colln_stats(in1.colln().size()+in2.colln().size());
984  out.colln().reserve(in1.colln().size()+in2.colln().size());
985  std::merge(
986  in1.colln().begin(),
987  in1.colln().end(),
988  in2.colln().begin(),
989  in2.colln().end(),
990  std::back_inserter(out.colln()),
991  comp
992  );
993  out.sync_size();
994  assert((in1.colln().size()+in2.colln().size())==out.colln().size());
995  assert((in1.size()+in2.size())==out.size());
996  return reduction_t(typename reduction_t::operation_type());
997  }
998 
999  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
1000  /**
1001  \param in1 The first collection.
1002  \param in2 The second collection.
1003  \param out Another collection.
1004  \return An execution_context that may be waited upon to determine when the merge is complete.
1005 
1006  \see std::merge()
1007  \see execution_context
1008  */
1009  template<
1010  typename CollnIn1,
1011  typename CollnIn2,
1012  typename CollnOut
1013  > parallel_algorithm<algo_hack_stack_t> FORCE_INLINE
1014  merge(CollnIn1 const &in1, CollnIn2 const &in2, CollnOut &out) const {
1015  return merge(in1, in2, out, std::less<typename CollnIn1::value_type>());
1016  }
1017 
1018  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
1019  template<
1020  class ArgT,
1021  class UniFn
1022  >
1024  unary_fun(ArgT &&a, UniFn const &op=UniFn()) {
1025  typedef private_::unary_fun_work_type<ArgT, UniFn, sequential_pool> work_type;
1026 
1027  return execution_context_stack<work_type>(*this, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params(), work_type(std::forward<ArgT>(a), op, *this));
1028  }
1029 
1030  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
1031  template<
1032  class LHSArg,
1033  class RHSArg,
1034  class BinFn
1035  >
1037  binary_fun(LHSArg &&lhs, RHSArg &&rhs, BinFn const &op=BinFn()) {
1038  typedef private_::binary_fun_work_type<LHSArg, RHSArg, BinFn, sequential_pool> work_type;
1039 
1040  return execution_context_stack<work_type>(*this, typename pool_traits_type::thread_wk_elem_type::cfg_details_type::params(), work_type(std::forward<LHSArg>(lhs), std::forward<RHSArg>(rhs), op, *this));
1041  }
1042 
1043  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
1044  template<
1045  class T
1046  >
1048  logical_and(T &&lhs, T &&rhs) {
1049  return this->binary_fun<T, T, std::logical_and<bool>>(std::forward<T>(lhs), std::forward<T>(rhs));
1050  }
1051 
1052  /// This just forwards to the STL algorithm of the same name, for compatibility with the parallel version.
1053  template<
1054  class T
1055  >
1057  logical_or(T &&lhs, T &&rhs) {
1058  return this->binary_fun<T, T, std::logical_or<bool>>(std::forward<T>(lhs), std::forward<T>(rhs));
1059  }
1060 
1061  /// Non-joinably transfer the closure_base-derived closure into the thread_pool.
1062  /**
1063  \todo JMG: Hubert Matthews suggested that potentially expression templates could be used here to concatenate the thread_wk_t's that are transferred into the pool; also as an implementation of back_batching, i.e. GSS(k) scheduling.
1064 
1065  \see nonjoinable
1066  */
1067  nonjoinable __fastcall
1068  operator<<(nonjoinable &&nj) noexcept(true) FORCE_INLINE {
1069  return nonjoinable(std::forward<nonjoinable>(nj), *this);
1070  }
1071 
1072  /// Non-joinably transfer the closure_base-derived closure into the thread_pool.
1073  /**
1074  \param njb Initialised with a suitable buffer for allocating the internal work items into.
1075 
1076  \todo JMG: Hubert Matthews suggested that potentially expression templates could be used here to concatenate the thread_wk_t's that are transferred into the pool; also as an implementation of back_batching, i.e. GSS(k) scheduling.
1077 
1078  \see nonjoinable_buff, algo_thread_wk_buffered
1079  */
1080  nonjoinable_buff __fastcall
1081  operator<<(nonjoinable_buff &&njb) noexcept(true) FORCE_INLINE {
1082  return nonjoinable_buff(std::forward<nonjoinable>(njb), *this);
1083  }
1084 
1085  /// Joinably transfer the closure_base-derived closure into the thread_pool.
1086  /**
1087  \todo JMG: Hubert Matthews suggested that potentially expression templates could be used here to concatenate the thread_wk_t's that are transferred into the pool; also as an implementation of back_batching, i.e. GSS(k) scheduling.
1088 
1089  \see joinable_t
1090  */
1092  operator<<(joinable_t<sequential_pool> &&j) noexcept(true) FORCE_INLINE {
1093  return joinable_t<sequential_pool>(std::forward<joinable_t<sequential_pool>>(j), *this);
1094  }
1095 
1096  /**
1097  \todo Implement using the advice given in "Standard C++ IOStreams and Locales" by A.Langer & K.Kreft, page 170.
1098  */
1099  friend inline tostream &__fastcall
1100  operator<<(tostream &os, sequential_pool const &t) FORCE_INLINE {
1101  os
1102  <<_T("Pool=0x")<<&t
1103  <<_T(", type: ")<<sequential_pool::thread_traits::demangle_name(typeid(t));
1104  return os;
1105  }
1106 
1107  /// Access the control-flow graph, if supported.
1108  cfg_type & cfg() noexcept(true) FORCE_INLINE {
1109  return cfg_;
1110  }
1111  /// Access the control-flow graph, if supported.
1112  cfg_type const & cfg() const noexcept(true) FORCE_INLINE {
1113  return cfg_;
1114  }
1115 
1116  private:
1117  template<class TPB> friend class joinable_t;
1118  template<class TPB> friend class nonjoinable_t;
1119  template<class TPB> friend class nonjoinable_buff_t;
1120  template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
1121  template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
1122  template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
1123  template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
1124 
1125  template<class ExecCtx>
1126  typename ExecCtx::chk_argument_type __fastcall FORCE_INLINE
1127  make_arg(typename signalled_work_queue_type::value_type &&async_wk) {
1128  return ExecCtx::template make_arg<typename ExecCtx::result_type>(
1129  std::forward<typename signalled_work_queue_type::value_type>(async_wk),
1130  this
1131  );
1132  }
1133 
1134  const pool_size_type pool_size_;
1135  mutable statistics_type statistics_;
1136  cfg_type cfg_;
1137 
1138  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
1139  statistics_.added_work();
1140  wk->process_nonjoinable(cfg_type::sequential_edge_annotation);
1141  statistics_.processed_vertical_work();
1142  }
1143  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
1144  statistics_.added_work();
1145  wk->process_nonjoinable(cfg_type::sequential_edge_annotation);
1146  statistics_.processed_vertical_work();
1147  return wk;
1148  }
1149  };
1150 
1151 }
1152 
1153 } } }
1154 
1155 #endif