libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
thread_pool_master.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 #include "private_/fixed_threads_container.hpp"
20 #include "private_/thread_pool_queue_model.hpp"
21 #include "private_/pool_thread.hpp"
22 
23 #include <boost/ptr_container/ptr_vector.hpp>
24 
25 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd {
26 
27  namespace private_ {
28 
29  template<class S>
31  public:
32  typedef S statistics_type;
34 
35  template<class PTT>
36  static result_type __fastcall FORCE_INLINE
37  vertical_work(PTT const &pool) noexcept(true) {
38  const typename PTT::read_lock_type lk(pool.pop_lock(), PTT::atomic_t::lock_traits::infinite_timeout());
39  return std::accumulate(
40  pool.colln().begin(),
41  pool.colln().end(),
42  result_type(),
43  [](result_type acc, typename PTT::value_type const &v) {
44  return acc.update(v->statistics().total_vertical_work().total());
45  }
46  );
47  }
48  template<class PTT>
49  static result_type __fastcall FORCE_INLINE
50  hrz_work(PTT const &pool) noexcept(true) {
51  const typename PTT::read_lock_type lk(pool.pop_lock(), PTT::atomic_t::lock_traits::infinite_timeout());
52  return std::accumulate(
53  pool.colln().begin(),
54  pool.colln().end(),
55  result_type(),
56  [](result_type acc, typename PTT::value_type const &v) {
57  return acc.update(v->statistics().total_hrz_work().total());
58  }
59  );
60  }
61  };
62  template<class T>
64  public:
67 
68  template<class PTT>
69  static constexpr result_type FORCE_INLINE
70  vertical_work(PTT const &) noexcept(true) {
71  return result_type();
72  }
73  template<class PTT>
74  static constexpr result_type FORCE_INLINE
75  hrz_work(PTT const &) noexcept(true) {
76  return result_type();
77  }
78  };
79 
80  }
81 
82  /// This pool has an unlimited size, and uses a master to distribute the work to the worker threads.
83  /**
84  i.e. it presumes an unlimited resource of threads. These threads exit after completing their work, but may
85  be re-used if there is enough work being submitted to the pool, fast enough.
86  Memory is dynamically allocated in the operation of the parallel algorithms, so a quality parallel memory-allocator is recommended such as Hoard or HeapLayers, if scalability is limited by excessive calls to the global operator new..
87  */
88  template<
89  class PTT
90  >
95  PTT,
96  safe_colln<
97  std::list<
98  shared_ptr<
101  typename PTT::os_traits,
102  PTT
103  >,
105  >
106  >,
108  >
109  > {
110  public:
114  PTT,
115  safe_colln<
116  std::list<
117  shared_ptr<
120  typename PTT::os_traits,
121  PTT
122  >,
124  >
125  >,
127  >
128  >;
130  using os_traits=typename base_t::os_traits;
133  using pool_type=typename base_t::pool_type;
137 
138  private:
139  void __fastcall create_worker(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
142  this->pool.push_back(std::move(wk_thread));
143  }
144 
145  public:
146  explicit __stdcall thread_pool() noexcept(false) FORCE_INLINE
148  }
149 
150  /**
151  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
152  */
153  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
154  exit();
155  }
156 
157  /// Obtain access to any statistics data collected by the operation of the thread_pool.
158  /**
159  Algorithmic complexity when specialised with no_statistics: constant time, otherwise O(pool_size()).
160  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. This type of thread_pool does not perform horizontal threading, because it spawns a thread per closure_base-derived closure, so can never suffer deadlock through resource starvation, therefore the value of statistics_type::total_hrz_work() is zero. Therefore, because of the vagaries of multi-threading the following holds:
161  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
162  */
163  statistics_type const __fastcall statistics() const noexcept(true) override FORCE_INLINE {
165 
169  return stats;
170  }
171 
172  void exit() noexcept(false) {
173  purge_pool.exit();
174  this->signalled_work_queue.clear();
175  have_work.reset();
177  // The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
178  // The natural object-destruction order causes the threads in the pool to be destroyed too late, so the pool must be emptied now.
179  this->pool.clear();
180  }
181 
182  private:
184 
185  typedef pool_aspects<
191  typedef thread_pool<
195  > purge_pool_t;
196 
197  class purge_work : protected non_assignable {
198  public:
199  constexpr __stdcall purge_work(typename base_t::pool_type &p, statistics_type &s) noexcept(true) FORCE_INLINE
200  : pool(p), statistics_(s) {
201  }
202  __stdcall purge_work(const purge_work &p) noexcept(true) FORCE_INLINE
204  }
205 
206  void __fastcall process() {
207  typedef typename base_t::pool_type::container_type::iterator pool_iter_t;
208  typedef typename base_t::pool_type::value_type wk_thr_t;
209  // Hand off my time-slice, to allow another thread to complete it's work.
210  // Maybe that will be a thread from our pool.
212  // Just make sure we don't purge too much...
213  while (pool.size()>1) {
214  // It is important to let this go out of scope here, not inside the lock, otherwise we'll deadlock e.g. when trying to add closure_base-derived closure elsewhere.
216  {
218  if (pool.colln().size()>1) {
220  if (finished(*i)) {
221  wk_thr=std::move(*i);
222  pool.colln().pop_front();
223  }
224  }
225  }
226  pool.sync_size();
227  if (wk_thr.get().get()) {
229  }
230  }
231  }
232 
233  bool __fastcall operator<(purge_work const &) const noexcept(true) FORCE_INLINE {
234  return true;
235  }
236 
237  private:
238  typename base_t::pool_type &pool;
240 
241  static bool __fastcall finished(const typename base_t::pool_type::value_type &thread) noexcept(true) FORCE_INLINE {
243  }
244  };
245 
246  mutable typename os_traits::lock_traits::anon_event_type have_work;
249 
250  statistics_type &__fastcall set_statistics() noexcept(true) override FORCE_INLINE {
251  return statistics_;
252  }
253 
254  void __fastcall purge_idle_workers() FORCE_INLINE {
255  if (purge_pool.queue_empty() && !this->pool_empty()) {
256  // Need to iterate through the list of workers purging the idle ones, to prevent there getting too many of them. Also do this in a separate thread!
257  // TODO there is a race condition with the pool exiting and this accessing a dangling reference....
258  purge_pool<<typename purge_pool_t::nonjoinable()<<purge_work(this->pool, this->statistics_);
259  }
260  }
261 
267  }
268 
271  return std::move(wk);
272  }
273  };
274 
275  /// This pool has a limited, specified size, and uses a master to distribute the work to the worker threads.
276  template<
277  class PTT
278  >
283  PTT,
285  PTT,
288  typename PTT::os_traits,
289  PTT
290  >
291  >
292  > {
293  public:
297  PTT,
299  PTT,
300  boost::ptr_vector<
303  typename PTT::os_traits,
304  PTT
305  >
306  >
307  >
308  >;
310  using os_traits=typename base_t::os_traits;
313  using pool_type=typename base_t::pool_type;
317 
318  enum class erase_states {
322  };
323 
324  /// Create the thread pool.
325  /**
326  \param num_threads The number of threads in the pool, which must be greater than zero.
327  */
328  explicit __stdcall thread_pool(const typename pool_traits_type::pool_type::size_type num_threads) noexcept(false) FORCE_INLINE
329  : base_t(num_threads, 0) {
330  assert(this->max_num_threads_in_pool>0);
331  if (!this->max_num_threads_in_pool) {
332  throw typename base_t::exception(_T("Cannot have an empty thread pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this), info::function::argument(_T("const typename pool_traits_type::pool_type::size_type max_num_threads"), tostring(num_threads))), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
333  }
334  }
335  thread_pool(thread_pool const &)=delete;
336 
337  /**
338  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
339  */
340  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
341  exit();
342  }
343 
344  /// Erase the specified, queued work.
345  /**
346  Note that if the work has started processing, it will not be erased.
347 
348  \param ec The execution context of the work to erase.
349  \return The outcome of the erase request, which may be successful, or failed because the work may be being processed.
350 
351  \see erase_states
352  */
353  template<typename ExecT_>
354  erase_states __fastcall erase(ExecT_ &ec) {
356  if (!ec.erase()) {
357  // i.e. we won't wait forever for a result from work that has been erased. (Although we may discard a calculated result. If we can't erase the work from the execution context, then wherever that work is, allow it to be processed to avoid deadlocking that waiting client.)
359  }
360  if (this->work_queue.empty()) {
361  have_work.clear();
362  }
363  return ret;
364  }
365 
366  /// Obtain access to any statistics data collected by the operation of the thread_pool.
367  /**
368  Algorithmic complexity when specialised with no_statistics: constant time, otherwise O(pool_size()).
369  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. This type of thread_pool does not perform horizontal threading, because it spawns a thread per closure_base-derived closure, so can never suffer deadlock through resource starvation, therefore the value of statistics_type::total_hrz_work() is zero. Therefore, because of the vagaries of multi-threading the following holds:
370  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
371  */
372  statistics_type const __fastcall statistics() const noexcept(true) FORCE_INLINE {
374 
378  return stats;
379  }
380 
381  void exit() noexcept(false) {
382  have_work.clear();
383  this->work_queue.clear();
384  this->exit_requested().set();
385  // The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
386  this->pool.clear();
387  }
388 
389  private:
390  mutable typename os_traits::lock_traits::anon_event_type have_work;
392 
393  statistics_type &__fastcall set_statistics() noexcept(true) FORCE_INLINE {
394  return statistics_;
395  }
396 
397 /* TODO JMG: implement this!
398  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
399  thread_traits::sleep(0);
400  this->work_queue.push_front(wk);
401 // TODO JMG: Need to make the threads get the work according to our type...
402  this->have_work.add();
403  statistics_.added_work();
404  }
405 
406  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
407  thread_traits::sleep(0);
408  this->work_queue.push_front(wk);
409 // TODO JMG: Need to make the threads get the work according to our type...
410  this->have_work.add();
411  statistics_.added_work();
412  return this->work_queue.front();
413  }
414 */
415  };
416 
417  /// This pool has a maximum specified size to which it will grow and reduce from, and uses a master to distribute the work to the worker threads.
418  /**
419  \todo JMG: Need to complete this...
420  */
421  template<
422  class PTT
423  >
428  PTT,
429  safe_colln<
430  std::list<
431  shared_ptr<
434  typename PTT::os_traits,
435  PTT
436  >,
438  >
439  >,
441  >
442  >,
443  protected non_assignable {
444  public:
448  PTT,
449  safe_colln<
450  std::list<
451  shared_ptr<
454  typename PTT::os_traits,
455  PTT
456  >,
458  >
459  >,
461  >
462  >;
464  using os_traits=typename base_t::os_traits;
467  using pool_type=typename base_t::pool_type;
471 
472  enum class erase_states {
476  };
477 
478  /// Create the thread pool.
479  /**
480  \todo JMG: Need to complete this... What's the thread creation policy????
481 
482  \param num_threads The maximum number of threads in the pool, which must be greater than zero.
483  */
484 /*
485  explicit __stdcall thread_pool(const typename pool_traits_type::pool_type::size_type num_threads) noexcept(false) FORCE_INLINE
486  : base_t(num_threads, 0) {
487  assert(this->max_num_threads_in_pool>0);
488  if (!this->max_num_threads_in_pool) {
489  throw typename this->exception(_T("Cannot have an empty thread pool."), info::function(__LINE__, __PRETTY_FUNCTION__, typeid(*this), info::function::argument(_T("const typename pool_traits_type::pool_type::size_type max_num_threads"), tostring(num_threads))), JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER)));
490  }
491  }
492 */
493 
494  /**
495  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
496  */
497  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
498  exit();
499  }
500 
501  /// Erase the specified, queued work.
502  /**
503  Note that if the work has started processing, it will not be erased.
504 
505  \param ec The execution context of the work to erase.
506  \return The outcome of the erase request, which may be successful, or failed because the work may be being processed.
507 
508  \see erase_states
509  */
510  template<typename ExecT_>
511  erase_states __fastcall FORCE_INLINE
514  if (!ec.erase()) {
515  // i.e. we won't wait forever for a result from work that has been erased. (Although we may discard a calculated result. If we can't erase the work from the execution context, then wherever that work is, allow it to be processed to avoid deadlocking that waiting client.)
517  }
518  if (this->work_queue.empty()) {
519  have_work.clear();
520  }
521  return ret;
522  }
523 
524  /// Obtain access to any statistics data collected by the operation of the thread_pool.
525  /**
526  Algorithmic complexity when specialised with no_statistics: constant time, otherwise O(pool_size()).
527  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. This type of thread_pool does not perform horizontal threading, because it spawns a thread per closure_base-derived closure, so can never suffer deadlock through resource starvation, therefore the value of statistics_type::total_hrz_work() is zero. Therefore, because of the vagaries of multi-threading the following holds:
528  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
529  */
530  statistics_type const __fastcall statistics() const noexcept(true) FORCE_INLINE {
532 
536  return stats;
537  }
538 
539  void exit() noexcept(false) {
540  have_work.clear();
541  this->work_queue.clear();
542  this->exit_requested().set();
543  // The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
544  this->pool.clear();
545  }
546 
547  private:
548  mutable typename os_traits::lock_traits::anon_event_type have_work;
550 
551  statistics_type &__fastcall set_statistics() noexcept(true) FORCE_INLINE {
552  return statistics_;
553  }
554 /*
555  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
556  this->work_queue.push_front(wk);
557 // TODO JMG: Need to make the threads get the work according to our type...
558  this->have_work.add();
559  statistics_.added_work();
560  }
561 
562  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
563  this->work_queue.push_front(wk);
564 // TODO JMG: Need to make the threads get the work according to our type...
565  this->have_work.add();
566  statistics_.added_work();
567  return this->work_queue.front();
568  }
569 */
570  };
571 
572 } } }