libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
thread_pool_queue_model.hpp
Go to the documentation of this file.
1 #ifndef LIBJMMCG_CORE_PRIVATE_THREAD_POOL_QUEUE_MODEL_HPP
2 #define LIBJMMCG_CORE_PRIVATE_THREAD_POOL_QUEUE_MODEL_HPP
3 
4 /******************************************************************************
5 ** Copyright © 2010 by J.M.McGuiness, coder@hussar.me.uk
6 **
7 ** This library is free software; you can redistribute it and/or
8 ** modify it under the terms of the GNU Lesser General Public
9 ** License as published by the Free Software Foundation; either
10 ** version 2.1 of the License, or (at your option) any later version.
11 **
12 ** This library is distributed in the hope that it will be useful,
13 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 ** Lesser General Public License for more details.
16 **
17 ** You should have received a copy of the GNU Lesser General Public
18 ** License along with this library; if not, write to the Free Software
19 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
20 */
21 
23 
24 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd { namespace private_ {
25 
26 /// This is the batch that the main thread will process.
27 template<
28  unsigned long GSSkSz,
29  class PTT,
30  class Pt,
31  class QM
32 >
34 public:
35  typedef PTT pool_traits_type;
37  typedef Pt pool_type;
39  using signalled_work_queue_type=typename pool_traits_type::template signalled_work_queue_type<QM>;
40  /// Return a container of GSSkSz items from the front of the queue to implement the GSS(k) or bakers' scheduling algorithm.
41  using batch_details_type=batch_details<pool_traits_type::GSSk, signalled_work_queue_type, typename remove_shared_ptr<typename pool_type::value_type, api_lock_traits<platform_api, sequential_mode>>::value_type::statistics_type>;
42  typedef typename batch_details_type::statistics_type statistics_type;
43  static constexpr unsigned long GSSk=batch_details_type::GSSk;
44 
45  explicit GSSk_batching(const typename thread_traits::api_params_type::tid_type mtid) noexcept(true) FORCE_INLINE
46  : main_tid(mtid) {
47  }
48 
49  /// Process an closure_base-derived closure item from the batch of a pool_thread.
50  /**
51  \param pool The thread pool.
52  \param tid The thread_id of the pool_thread to query the batch for more work.
53  \return true if there is more closure_base-derived closure to process() in the pool_thread's batch, otherwise false.
54 
55  \see batch_details::process_a_batch_item()
56  */
57  bool __fastcall process_a_batch_item(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename os_traits::thread_exception const &exception_thrown_in_thread) noexcept(false) FORCE_INLINE {
58  if (tid==main_tid) {
59  return batch.process_a_batch_item();
60  } else {
61  const typename pool_type::container_type::iterator thread=pool.find(tid);
62  if (thread!=pool.end()) {
63  assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
64  return thread->second->process_a_batch_item(exception_thrown_in_thread);
65  } else {
66  // We might have a horizontal thread spawned by a horizontal thread, so the ancestor_thread_id will no longer be in that of a pool_thread in the thread_pool. But this feature is used to flush the batch of a pool_thread of any more work, but horizontal threads only have one item in their batch, the active closure_base-derived closure, i.e. no backed-up work, so just return that all work have been done.
67  return false;
68  }
69  }
70  }
71 
72  /// Put the closure_base-derived closure in the batch, if it is empty.
73  /**
74  Note that this function runs with no locks, as it presumes that the caller is the same pool_thread that consumes the work from the batch.
75 
76  \param pool The thread_pool_base-derived thread pool to which the wk will be transferred.
77  \param tid The thread_id of the pool_thread to which the closure_base-derived closure should be added, if possible.
78  \param wk The closure_base-derived closure to attempt to add.
79  \return true if the closure_base-derived closure was added, false otherwise.
80 
81  \see batch_details::add_work_to_batch()
82  */
83  bool __fastcall add_work_to_batch(pool_type &pool, const typename thread_traits::api_params_type::tid_type tid, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
84  if (tid==main_tid) {
85  return batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
86  } else {
87  const auto thread=pool.find(tid);
88  if (thread!=pool.end()) {
89  assert(dynamic_cast<typename pool_type::container_type::mapped_type::value_type *>(&*thread->second));
90  return thread->second->add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
91  }
92  }
93  // We might have a horizontal thread spawned by a horizontal thread, so the ancestor_thread_id will no longer be that of a pool_thread in the thread_pool. But horizontal threads only have one item in their batch, the active closure_base-derived closure, i.e. no backed-up work, so just return that the work couldn't be added.
94  return false;
95  }
96 
97  statistics_type const &__fastcall statistics() const noexcept(true) FORCE_INLINE {
98  return batch.statistics();
99  }
100  statistics_type &__fastcall statistics() noexcept(true) FORCE_INLINE {
101  return batch.statistics();
102  }
103 
104 private:
105  batch_details_type batch;
106  const typename thread_traits::api_params_type::tid_type main_tid;
107 };
108 
109 /// Implements the specifics of how the queue(s) within the thread_pool or pool_threads are implemented.
110 /**
111  \see thread_pool_base
112 */
113 template<
114  class DM,
116  typename PTT,
117  class Pt
118 >
119 class thread_pool_queue_model;
120 
121 /// Implements the case when the signalled_work_queue is contained within the thread_pool & shared by the pool_threads.
122 /**
123  This implies that the cost of executing the input_work is larger than the locking & serialisation cost that having a single queue from which all of the pool_threads compete to steal work from.
124 
125  \see thread_pool_base, signalled_work_queue
126 */
127 template<
128  template<class> class QM,
130  typename PTT,
131  class Pt
132 >
133 class thread_pool_queue_model<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt> : public thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue> {
134 public:
135  using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>;
136  using base1_t=thread_pool_base<QM<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, Ps, PTT, Pt>;
137  using pool_traits_type=typename base1_t::pool_traits_type;
138  using os_traits=typename base1_t::os_traits;
139  using pool_type=typename base1_t::pool_type;
140  using queue_size_type=typename base1_t::queue_size_type;
141  using pool_size_type=typename base1_t::pool_size_type;
142  using pool_thread_type=typename base1_t::pool_thread_type;
143  using exception_type=typename base1_t::exception_type;
144  using thread_traits=typename base1_t::thread_traits;
145  using api_params_type=typename base1_t::api_params_type;
146  using priority_type=typename base1_t::priority_type;
147  using work_distribution_mode=typename base1_t::work_distribution_mode;
148  using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
149  using queue_model=typename base_t::queue_model;
150 
151  /**
152  To assist in allowing compile-time computation of the algorithmic order of the threading model.
153  */
154  static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
155 
156  using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
157  /// The type of statistics collected related to the operation of the thread_pool.
158  /**
159  The general concept behind this type is that the cost of gathering the statistics should be as small as possible, even to the extent to the statistics being inaccurate under-estimations, to ensure the cost is minimised.
160 
161  \see no_statistics
162  \see basic_statistics
163  */
164  using statistics_type=typename base1_t::statistics_type;
165  using cfg_type=typename base1_t::cfg_type;
166 
167  /// Returns true if there no threads in the thread_pool.
168  /**
169  \return true if there no threads in the thread_pool.
170  */
171  bool __fastcall pool_empty() const noexcept(true) override FORCE_INLINE {
172  return pool.empty();
173  }
174  /// Returns the current number of threads in the thread_pool.
175  /**
176  \return The current number of threads in the thread_pool.
177  */
178  const pool_size_type __fastcall pool_size() const noexcept(true) override FORCE_INLINE final {
179  return pool.size();
180  }
181  /**
182  \return true if there is no input_work to process by the thread_pool.
183  */
184  bool __fastcall queue_empty() const noexcept(true) override FORCE_INLINE {
185  return this->signalled_work_queue.empty();
186  }
187  /**
188  \return The current amount of outstanding, unscheduled input_work items to be processed by the thread_pool.
189  */
190  const queue_size_type __fastcall queue_size() const noexcept(true) override FORCE_INLINE {
191  return this->signalled_work_queue.size();
192  }
193 
194  void __fastcall queue_clear() noexcept(false) override FORCE_INLINE {
195  this->signalled_work_queue.clear();
196  }
197 
198  /// Return the theoretical minimum time in computations according to section 3.3 & Theorem 3.3 in [1] required to complete the current work with the current number of threads in the pool using a CREW-PRAM and according to section 1.3.2, Theorem 1.2 in [2] for an EREW-PRAM.
199  /**
200  The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
201 
202  [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
203  [2] Casanova, H., Legrand, A., Robert, Y., "Parallel Algorithms", CRC Press, 2008.
204 
205  \return The minimum number of computations
206 
207  \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
208 
209  \see safe_colln
210  */
211  unsigned long __fastcall
212  min_time(generic_traits::memory_access_modes mode) const noexcept(true) override FORCE_INLINE;
213  template<class T>
214  unsigned long __fastcall FORCE_INLINE
215  min_time(T const &) const noexcept(true);
216 
217  /// Return the theoretical minimum number of processors required to achieve the minimum computation time according to section 3.3 & Theorem 3.3 in [1] required to complete the current work using a CREW-PRAM.
218  /**
219  The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
220 
221  [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
222 
223  \return The minimum number of processors
224 
225  \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
226 
227  \see safe_colln
228  */
229  unsigned long __fastcall
230  min_processors(generic_traits::memory_access_modes mode) const noexcept(true) override FORCE_INLINE;
231  template<class T>
232  unsigned long __fastcall FORCE_INLINE
233  min_processors(T const &) const noexcept(true);
234 
235 protected:
236  pool_type pool;
237 
238 // TODO BOOST_MPL_ASSERT((std::is_same<typename base_t::exit_requested_type, typename pool_type::have_work_type::atomic_t>));
239 
240  __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
241  : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_, this->signalled_work_queue) {
242  }
243 
244  queue_size_type __fastcall
245  batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
246 
247  signalled_work_queue_type & __fastcall queue() noexcept(true) override FORCE_INLINE {
248  return this->signalled_work_queue;
249  }
250  signalled_work_queue_type const & __fastcall queue() const noexcept(true) override FORCE_INLINE final {
251  return this->signalled_work_queue;
252  }
253  /**
254  \param wk closure_base-derived closure to be process()ed by a pool_thread.
255  \return True if the closure_base-derived closure was added to the internal batch_details of the specified pool_thread.
256  */
257  virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
258  return false;
259  }
260 
261  typename base_t::exit_requested_type &exit_requested() noexcept(true) override FORCE_INLINE {
262  return this->exit_requested_;
263  }
264 
265 private:
266  template<class TPB> friend class joinable_t;
267  template<class TPB> friend class nonjoinable_t;
268  template<class TPB> friend class nonjoinable_buff_t;
269  template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
270  template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
271  template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
272  template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
273 };
274 
275 /// Implements the case when there is a signalled_work_queue contained within each pool_thread, and an algorithm is used to steal work from the pool_thread by other pool_threads.
276 /**
277  This implies that the cost of executing the input_work is similar to the cost of the locking & serialisation costs of that input_work.
278 
279  \see thread_pool_base, signalled_work_queue
280 */
281 template<
284  typename PTT,
285  class Pt
286 >
287 class thread_pool_queue_model<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<SM>>, Ps, PTT, Pt> : public thread_pool_base<pool_traits::work_distribution_mode_t::template worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>, Ps, PTT, Pt>, protected PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>> {
288 public:
289  using base_t=typename PTT::template thread_pool_queue_details<pool_traits::work_distribution_mode_t::queue_model_t::template thread_owns_queue<SM>>;
291  using pool_traits_type=typename base1_t::pool_traits_type;
292  using os_traits=typename base1_t::os_traits;
293  using pool_type=typename base1_t::pool_type;
294  using queue_size_type=typename base1_t::queue_size_type;
295  using pool_size_type=typename base1_t::pool_size_type;
296  using pool_thread_type=typename base1_t::pool_thread_type;
297  using exception_type=typename base1_t::exception_type;
298  using thread_traits=typename base1_t::thread_traits;
299  using api_params_type=typename base1_t::api_params_type;
300  using priority_type=typename base1_t::priority_type;
301  using work_distribution_mode=typename base1_t::work_distribution_mode;
302  using signalled_work_queue_type=typename base1_t::signalled_work_queue_type;
303  using queue_model=typename base_t::queue_model;
304 
305  /**
306  To assist in allowing compile-time computation of the algorithmic order of the threading model.
307  */
308  static constexpr generic_traits::memory_access_modes memory_access_mode=base1_t::memory_access_mode;
309 
310  /// GSS(k) batching is not supported.
311  BOOST_MPL_ASSERT((std::is_same<std::integral_constant<unsigned long, pool_traits_type::GSSk>, std::integral_constant<unsigned long, 1UL>>));
312 
313  using GSSk_batching_type=GSSk_batching<pool_traits_type::GSSk, pool_traits_type, pool_type, typename work_distribution_mode::queue_model>;
314  /// The type of statistics collected related to the operation of the thread_pool.
315  /**
316  The general concept behind this type is that the cost of gathering the statistics should be as small as possible, even to the extent to the statistics being inaccurate under-estimations, to ensure the cost is minimised.
317 
318  \see no_statistics
319  \see basic_statistics
320  */
321  using statistics_type=typename base1_t::statistics_type;
322  using cfg_type=typename base1_t::cfg_type;
323 
324  /// Returns true if there no threads in the thread_pool.
325  /**
326  \return true if there no threads in the thread_pool.
327  */
328  bool __fastcall pool_empty() const noexcept(true) override FORCE_INLINE {
329  return pool.empty();
330  }
331  /// Returns the current number of threads in the thread_pool.
332  /**
333  \return The current number of threads in the thread_pool.
334  */
335  const pool_size_type __fastcall pool_size() const noexcept(true) override FORCE_INLINE {
336  return pool.size();
337  }
338 
339  /// Return the theoretical minimum time in computations according to section 3.3 & Theorem 3.3 in [1] required to complete the current work with the current number of threads in the pool using a CREW-PRAM and according to section 1.3.2, Theorem 1.2 in [2] for an EREW-PRAM.
340  /**
341  The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
342 
343  [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
344  [2] Casanova, H., Legrand, A., Robert, Y., "Parallel Algorithms", CRC Press, 2008.
345 
346  \return The minimum number of computations
347 
348  \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
349 
350  \see safe_colln
351  */
352  unsigned long __fastcall
353  min_time(generic_traits::memory_access_modes mode) const noexcept(true) override FORCE_INLINE;
354  template<class T>
355  unsigned long __fastcall FORCE_INLINE
356  min_time(T const &) const noexcept(true);
357 
358  /// Return the theoretical minimum number of processors required to achieve the minimum computation time according to section 3.3 & Theorem 3.3 in [1] required to complete the current work using a CREW-PRAM.
359  /**
360  The allows the user to determine the current computational efficiency of their thread_pool with the supplied thread-safe adapted container, safe_colln, as they can use this to profile their code and adjust the size of the thread_pool for the target architecture.
361 
362  [1] Alan Gibbons, Wojciech Rytter, "Efficient Parallel Algorithms", Cambridge University Press, 1989.
363 
364  \return The minimum number of processors
365 
366  \todo It would be nice if there was some result for returning this with respect to the memory access models of the work within the queue (which may be a mix of CREW & EREW memory models) for the current thread_pool.
367 
368  \see safe_colln
369  */
370  unsigned long __fastcall
372  template<class T>
373  unsigned long __fastcall FORCE_INLINE
374  min_processors(T const &) const noexcept(true);
375 
376 protected:
377  pool_type pool;
378 
379  BOOST_MPL_ASSERT((std::is_same<typename base_t::exit_requested_type, typename pool_type::exit_requested_type>));
380 
381  __stdcall thread_pool_queue_model(const pool_size_type max_num_threads, const pool_size_type num_threads) noexcept(false) FORCE_INLINE
382  : base1_t(max_num_threads), base_t(), pool(num_threads, this->exit_requested_) {
383  }
384 
385  queue_size_type __fastcall
386  batch_size(queue_size_type const sz) const noexcept(true) FORCE_INLINE;
387 
388  /**
389  \param wk closure_base-derived closure to be process()ed by a pool_thread.
390  \return True if the closure_base-derived closure was added to the internal batch_details of the specified pool_thread.
391  */
392  virtual bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) FORCE_INLINE {
393  return false;
394  }
395 
396  typename base_t::exit_requested_type &exit_requested() noexcept(true) override FORCE_INLINE {
397  return this->exit_requested_;
398  }
399 
400 private:
401  template<class TPB> friend class joinable_t;
402  template<class TPB> friend class nonjoinable_t;
403  template<class TPB> friend class nonjoinable_buff_t;
404  template<template<class> class Joinability, class TPB, typename TPB::priority_type Pri> friend class priority_t;
405  template<class DM1, generic_traits::return_data RD, class TPB, class Wk> friend class execution_context_stack_type;
406  template<class DM1, generic_traits::return_data RD, class TPB, template<class, class, template<class> class, template<class> class> class CoreWk, class AlgoWrapT, class Wk> friend class execution_context_algo_stack_type;
407  template<generic_traits::return_data RD, class TPB, template<class> class Del, template<class> class AtCtr> friend class horizontal_execution;
408 
409  signalled_work_queue_type & __fastcall queue() noexcept(true) override FORCE_INLINE {}
410  signalled_work_queue_type const & __fastcall queue() const noexcept(true) override FORCE_INLINE {}
411  /**
412  \return true if there is no input_work to process by the pool_threads.
413  */
414  bool __fastcall queue_empty() const noexcept(true) override FORCE_INLINE {}
415  /**
416  \return The approximate amount of outstanding, unscheduled input_work items to be processed by the pool_threads.
417  */
418  const queue_size_type __fastcall queue_size() const noexcept(true) override FORCE_INLINE {}
419  void __fastcall queue_clear() noexcept(true) override FORCE_INLINE {}
420 };
421 
422 } } } }
423 
425 
426 #endif