libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
thread_pool_workers.hpp
Go to the documentation of this file.
1 #ifndef LIBJMMCG_CORE_THREAD_POOL_WORKERS_HPP
2 #define LIBJMMCG_CORE_THREAD_POOL_WORKERS_HPP
3 /******************************************************************************
4 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
5 **
6 ** This library is free software; you can redistribute it and/or
7 ** modify it under the terms of the GNU Lesser General Public
8 ** License as published by the Free Software Foundation; either
9 ** version 2.1 of the License, or (at your option) any later version.
10 **
11 ** This library is distributed in the hope that it will be useful,
12 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 ** Lesser General Public License for more details.
15 **
16 ** You should have received a copy of the GNU Lesser General Public
17 ** License along with this library; if not, write to the Free Software
18 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 */
20 
21 #include "private_/fixed_threads_container.hpp"
22 #include "private_/thread_pool_queue_model.hpp"
23 #include "private_/pool_thread.hpp"
24 
25 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd {
26 
27  namespace private_ {
28 
29  template<class S>
31  public:
32  using statistics_type=S;
33  using result_type=typename statistics_type::ave_stats_type;
34 
35  private:
36  struct agg_vert_stats {
37  template<class V>
38  result_type __fastcall FORCE_INLINE
39  operator()(result_type acc, V const &v) const noexcept(true) {
40  return acc.update(v.statistics().total_vertical_work().total());
41  }
42  };
43  struct agg_hrz_stats {
44  template<class V>
45  result_type __fastcall FORCE_INLINE
46  operator()(result_type acc, V const &v) const noexcept(true) {
47  return acc.update(v.statistics().total_hrz_work().total());
48  }
49  };
50 
51  public:
52  template<class P>
53  static result_type __fastcall FORCE_INLINE
54  vertical_work(P const &pool) noexcept(true) {
55  return std::accumulate(
56  pool.colln().begin(),
57  pool.colln().end(),
58  result_type(),
59  agg_vert_stats()
60  );
61  }
62  template<class P>
63  static result_type __fastcall FORCE_INLINE
64  hrz_work(P const &pool) noexcept(true) {
65  return std::accumulate(
66  pool.colln().begin(),
67  pool.colln().end(),
68  result_type(),
69  agg_hrz_stats()
70  );
71  }
72  };
73  template<class T>
75  public:
76  using statistics_type=no_statistics<T>;
77  using result_type=typename statistics_type::ave_stats_type;
78 
79  template<class P>
80  static constexpr result_type __fastcall FORCE_INLINE
81  vertical_work(P const &) noexcept(true) {
82  return result_type();
83  }
84  template<class P>
85  static constexpr result_type __fastcall FORCE_INLINE
86  hrz_work(P const &) noexcept(true) {
87  return result_type();
88  }
89  };
90 
91  }
92 
93  /// This pool has a specified size, and the worker pool_threads steal work from a centrally-held signalled_work_queue.
94  /**
95  To reduce calls to the global operator new, and enhance scalability, the parallel algorithms (for_each(), transform(), etc, except merge() and sort()) internally pre-allocate a contiguous buffer of memory for their internal operations. This buffer grows O(p.log(p)) for p processors, with a constant that is ~200 bytes, so for millions of processors, megabytes could be allocated per parallel algorithm. This buffer is guaranteed to be released after all mutations on the elements within the collection are complete, which is usually before any waiting execution_context, constructed by the call to the parallel algorithm, would be released, but is not guaranteed. For each parallel algorithm (except merge() and sort()) exactly 2 calls to the global operator new are made by the library. For merge() the internal buffer grows as O(p.log(p).log(n)) with O(log(n)) calls to the global operator new, and for sort(), O(p.log(p).log(n)^2)) and O(log(n)^2) respectively.
96  */
97  template<
98  class PTT
99  >
101  : public private_::thread_pool_queue_model<
102  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
103  pool_traits::size_mode_t::fixed_size,
104  PTT,
105  private_::fixed_pool_of_threads<
106  PTT,
107  pool::private_::thread_types::steal<
108  PTT::result_traits_,
109  typename PTT::os_traits,
110  PTT,
111  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
112  >
113  >
114  > {
115  public:
116  using base_t=private_::thread_pool_queue_model<
117  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
118  pool_traits::size_mode_t::fixed_size,
119  PTT,
120  private_::fixed_pool_of_threads<
121  PTT,
122  pool::private_::thread_types::steal<
123  PTT::result_traits_,
124  typename PTT::os_traits,
125  PTT,
126  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
127  >
128  >
129  >;
130  using pool_traits_type=typename base_t::pool_traits_type;
131  using os_traits=typename base_t::os_traits;
132  using thread_traits=typename base_t::thread_traits;
133  using api_params_type=typename base_t::api_params_type;
134  using pool_type=typename base_t::pool_type;
135  using statistics_type=typename base_t::statistics_type;
136  using work_distribution_mode=typename base_t::work_distribution_mode;
137  using signalled_work_queue_type=typename base_t::signalled_work_queue_type;
138  using GSSk_batching_type=typename base_t::GSSk_batching_type;
139 
140  BOOST_MPL_ASSERT((std::is_same<typename std::is_same<typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type, std::false_type>));
141 
142  enum class erase_states {
146  };
147 
148  /// Create the thread pool.
149  /**
150  \param num_threads The number of threads in the pool, which must be greater than zero.
151  */
152  explicit __stdcall thread_pool(const typename base_t::pool_type::size_type num_threads) noexcept(false) FORCE_INLINE
154  assert(this->max_num_threads_in_pool>0);
155  if (!this->max_num_threads_in_pool) {
156  throw typename base_t::exception_type(
157  _T("Cannot have an empty thread pool."),
158  info::function(
159  __LINE__,
160  __PRETTY_FUNCTION__,
161  typeid(*this),
162  info::function::argument(
163  _T("const typename pool_traits_type::pool_type::size_type max_num_threads"),
164  tostring(num_threads)
165  )
166  ),
168  );
169  }
170  }
171 
172  /**
173  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
174  */
175  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
176  exit();
177  }
178 
179  /// Erase the specified, queued work.
180  /**
181  Note that if the work has started processing, it will not be erased.
182 
183  \param ec The execution context of the work to erase.
184  \return The outcome of the erase request, which may be successful, or failed because the work may be being processed.
185 
186  \see erase_states
187  */
188  template<typename ExecT>
189  erase_states __fastcall FORCE_INLINE
190  erase(ExecT &ec) noexcept(false) {
191  erase_states ret=erase_states::failed_to_erase;
192  if (!ec.erase()) {
193  // i.e. we won't wait forever for a result from work that has been erased. (Although we may discard a calculated result. If we can't erase the work from the execution context, then wherever that work is, allow it to be processed to avoid deadlocking that waiting client.)
194  ret=(this->signalled_work_queue.erase(ec.wk_queue_item()) ? erase_states::erased_successfully : erase_states::ignoring_result);
195  }
196  return ret;
197  }
198 
199  /// Obtain access to any statistics data collected by the operation of the thread_pool.
200  /**
201  Algorithmic complexity when specialised with no_statistics: constant time, otherwise O(pool_size()).
202  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. The value computed for the statistics_type::total_hrz_work() is guaranteed not be more than the value as if it were computed atomically. Therefore the following holds:
203  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
204  */
205  statistics_type const __fastcall statistics() const noexcept(true) override FORCE_INLINE {
206  using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
207 
208  statistics_type stats(batch_details.statistics());
209  stats.add_vertical_work(acc_t::vertical_work(this->pool));
210  stats.add_hrz_work(acc_t::hrz_work(this->pool));
211  return stats;
212  }
213 
214  void exit() noexcept(false) {
215  this->exit_requested().set(pool_traits_type::template exit_requested_type<typename work_distribution_mode::queue_model>::states::exit_requested);
216  // The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
217  // The natural object-destruction order causes the threads in the pool to be destroyed too late, so the pool must be emptied now.
218  this->pool.clear();
219  // We must empty the queue after deleting the threads, because of the tricky way the pool_threads steal work from the signalled_work_queue can cause the pool_threads to crash is the queue is emptied whilst the threads are stealing work. Basically the pool_threads steal work atomically with respect to each other, but not this clear() method.)
220  this->signalled_work_queue.clear();
221  }
222 
223  private:
224  /// This is the batch that the main thread will process.
225  GSSk_batching_type batch_details;
226 
227  statistics_type &__fastcall set_statistics() noexcept(true) override FORCE_INLINE {
228  return batch_details.statistics();
229  }
230 
231  bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type tid, typename signalled_work_queue_type::value_type &&wk) noexcept(true) override FORCE_INLINE {
232  return batch_details.add_work_to_batch(this->pool, tid, std::forward<typename signalled_work_queue_type::value_type>(wk));
233  }
234 
235  /**
236  Try to add the new work to this thread's batch, if empty, to avoid locking the main queue in the pool. This is very important: it helps maintain throughput of work, by avoiding having to place work on the shared signalled_work_queue in the thread_pool, which involves locks and signals, as placing the work directly in the pool_thread's batch can be done lock-free.
237  */
238  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) noexcept(false) override FORCE_INLINE {
239 // TODO Stops "overlapped_write_held_by_reads" working: if (!this->add_work_to_batch(os_traits::thread_traits::get_current_thread(), wk)) {
240  thread_traits::sleep(0); // This sleep seems to be vital to ensure that all threads manage to remove work from the queue, ensuring load distribution.
241  this->signalled_work_queue.push_back(std::forward<typename signalled_work_queue_type::value_type>(wk));
242  // Try to allow the pool_threads a chance to process the work before this horizontal thread is created that might compete with the pool_threads for the work.
243  os_traits::thread_traits::sleep(0);
244  batch_details.statistics().update_max_queue_len(this->queue_size());
245 // }
246  batch_details.statistics().added_work();
247  }
248 
249  /**
250  Try to add the new work to this thread's batch, if empty, to avoid locking the main queue in the pool. This is very important: it helps maintain throughput of work, by avoiding having to place work on the shared signalled_work_queue in the thread_pool, which involves locks and signals, as placing the work directly in the pool_thread's batch can be done lock-free.
251  */
252  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) noexcept(false) override FORCE_INLINE {
253 // TODO Stops "overlapped_write_held_by_reads" working: if (!this->add_work_to_batch(os_traits::thread_traits::get_current_thread(), wk)) {
254  thread_traits::sleep(0); // This sleep seems to be vital to ensure that all threads manage to remove work from the queue, ensuring load distribution.
255  this->signalled_work_queue.push_back(std::forward<typename signalled_work_queue_type::value_type>(wk));
256  // Try to allow the pool_threads a chance to process the work before this horizontal thread is created that might compete with the pool_threads for the work.
257  os_traits::thread_traits::sleep(0);
258  batch_details.statistics().update_max_queue_len(this->queue_size());
259 // }
260  batch_details.statistics().added_work();
261  return std::move(wk);
262  }
263 
264  bool __fastcall process_a_batch_item(const typename thread_traits::api_params_type::tid_type tid, typename os_traits::thread_exception const &ex) noexcept(false) override FORCE_INLINE {
265  return batch_details.process_a_batch_item(this->pool, tid, ex);
266  }
267  };
268 
269  /// This pool has a maximum specified size to which it will grow and reduce from, and the worker pool_threads steal work from a centrally-held signalled_work_queue.
270  /**
271  The internal signalled_work_queue is reasonably efficiently implemented: if there is enough work in the signalled_work_queue, the addition of the input_work to it can occur independently of the removal of the input_work by
272  the worker pool_threads.
273  */
274  template<
275  class PTT
276  >
278  : public private_::thread_pool_queue_model<
279  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
280  pool_traits::size_mode_t::tracks_to_max,
281  PTT,
282  private_::fixed_pool_of_threads<
283  PTT,
284  pool::private_::thread_types::steal<
285  PTT::result_traits_,
286  typename PTT::os_traits,
287  PTT,
288  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
289  >
290  >
291  > {
292  public:
293  using base_t=private_::thread_pool_queue_model<
294  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>,
295  pool_traits::size_mode_t::tracks_to_max,
296  PTT,
297  private_::fixed_pool_of_threads<
298  PTT,
299  pool::private_::thread_types::steal<
300  PTT::result_traits_,
301  typename PTT::os_traits,
302  PTT,
303  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>::queue_model
304  >
305  >
306  >;
307  using pool_traits_type=typename base_t::pool_traits_type;
308  using os_traits=typename base_t::os_traits;
309  using thread_traits=typename base_t::thread_traits;
310  using api_params_type=typename base_t::api_params_type;
311  using pool_type=typename base_t::pool_type;
312  using GSSk_batching_type=typename base_t::GSSk_batching_type;
313  using statistics_type=typename base_t::statistics_type;
314  using work_distribution_mode=typename base_t::work_distribution_mode;
315  using signalled_work_queue_type=typename base_t::signalled_work_queue_type;
316 
317  BOOST_MPL_ASSERT((std::is_same<typename std::is_same<typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type, std::false_type>));
318 
319  enum class erase_states {
323  };
324 
325  /// Create the thread pool.
326  /**
327  \todo JMG: Need to complete this... What's the thread creation policy????
328 
329  \param num_threads The number of threads in the pool, which must be greater than zero.
330  */
331 /*
332  explicit __stdcall thread_pool(const typename pool_traits_type::pool_type::size_type num_threads) noexcept(false) FORCE_INLINE
333  : base_t(num_threads,num_threads) {
334  assert(this->max_num_threads_in_pool>0);
335  if (!this->max_num_threads_in_pool) {
336  throw typename base_t::exception(
337  _T("Cannot have an empty thread pool."),
338  info::function(
339  __LINE__,
340  __PRETTY_FUNCTION__,
341  typeid(*this),
342  info::function::argument(
343  _T("const typename pool_traits_type::pool_type::size_type max_num_threads"),
344  tostring(num_threads)
345  )
346  ),
347  JMMCG_REVISION_HDR(_T(LIBJMMCG_VERSION_NUMBER))
348  );
349  }
350  }
351 */
352  thread_pool(thread_pool const &)=delete;
353 
354  /**
355  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
356  */
357  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
358  exit();
359  }
360 
361  /// Erase the specified, queued work.
362  /**
363  Note that if the work has started processing, it will not be erased.
364 
365  \param ec The execution context of the work to erase.
366  \return The outcome of the erase request, which may be successful, or failed because the work may be being processed.
367 
368  \see erase_states
369  */
370  template<typename ExecT_>
371  erase_states __fastcall FORCE_INLINE
372  erase(ExecT_ &ec) {
373  erase_states ret=erase_states::failed_to_erase;
374  if (!ec.erase()) {
375  // i.e. we won't wait forever for a result from work that has been erased. (Although we may discard a calculated result. If we can't erase the work from the execution context, then wherever that work is, allow it to be processed to avoid deadlocking that waiting client.)
376  ret=(this->signalled_work_queue.erase(ec.wk_queue_item()) ? erase_states::erased_successfully : erase_states::ignoring_result);
377  }
378  return ret;
379  }
380 
381  /// Obtain access to any statistics data collected by the operation of the thread_pool.
382  /**
383  Algorithmic complexity: O(pool_size())
384  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. The value computed for the statistics_type::total_hrz_work() is guaranteed not be more than the value as if it were computed atomically. Therefore the following holds:
385  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
386  */
387  statistics_type const __fastcall statistics() const noexcept(true) FORCE_INLINE {
388  using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
389 
390  statistics_type stats(batch_details.statistics());
391  stats.processed_vertical_work(acc_t::vertical_work(this->pool));
392  stats.processed_hrz_work(acc_t::hrz_work(this->pool));
393  return stats;
394  }
395 
396  void exit() noexcept(false) {
397  this->signalled_work_queue.clear();
398  this->exit_requested().set(pool_traits_type::template exit_requested_type<typename work_distribution_mode::queue_model>::states::exit_requested);
399  // The natural object-destruction order causes the threads in the pool to be destroyed too late, so the pool must be emptied now.
400  this->pool.clear();
401  }
402 
403  private:
404  /// This is the batch that the main thread will process.
405  GSSk_batching_type batch_details;
406 
407  statistics_type &__fastcall set_statistics() noexcept(true) FORCE_INLINE {
408  return batch_details.statistics();
409  }
410 /*
411  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
412  thread_traits::sleep(0); // This sleep seems to be vital to ensure that all threads manage to remove work from the queue, ensuring load distribiution.
413  this->signalled_work_queue.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
414  // Try to allow the pool_threads a chance to process the work before this horizontal thread is created that might compete with the pool_threads for the work.
415  os_traits::thread_traits::sleep(0);
416  batch_details.statistics().added_work();
417 // TODO JMG: Need to make the threads get the work according to our type...
418  }
419 
420  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) FORCE_INLINE {
421  thread_traits::sleep(0); // This sleep seems to be vital to ensure that all threads manage to remove work from the queue, ensuring load distribiution.
422  this->signalled_work_queue.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
423  // Try to allow the pool_threads a chance to process the work before this horizontal thread is created that might compete with the pool_threads for the work.
424  os_traits::thread_traits::sleep(0);
425  batch_details.statistics().added_work();
426 // TODO JMG: Need to make the threads get the work according to our type...
427  return this->signalled_work_queue.front();
428  }
429 */
430 
431  bool __fastcall process_a_batch_item(const typename thread_traits::api_params_type::tid_type tid, typename os_traits::thread_exception const &exception_thrown_in_thread) FORCE_INLINE {
432  return batch_details.process_a_batch_item(this->pool, tid, exception_thrown_in_thread);
433  }
434  };
435 
436  /// This pool has a specified size, and the worker pool_threads steal each have a signalled_work_queue.
437  /**
438  To reduce calls to the global operator new, and enhance scalability, the parallel algorithms (for_each(), transform(), etc, except merge() and sort()) internally pre-allocate a contiguous buffer of memory for their internal operations. This buffer grows O(p.log(p)) for p processors, with a constant that is ~200 bytes, so for millions of processors, megabytes could be allocated per parallel algorithm. This buffer is guaranteed to be released after all mutations on the elements within the collection are complete, which is usually before any waiting execution_context, constructed by the call to the parallel algorithm, would be released, but is not guaranteed. For each parallel algorithm (except merge() and sort()) exactly 2 calls to the global operator new are made by the library. For merge() the internal buffer grows as O(p.log(p).log(n)) with O(log(n)) calls to the global operator new, and for sort(), O(p.log(p).log(n)^2)) and O(log(n)^2) respectively.
439  */
440  template<
441  class PTT
442  >
444  : public private_::thread_pool_queue_model<
445  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>,
446  pool_traits::size_mode_t::fixed_size,
447  PTT,
448  private_::fixed_pool_of_threads<
449  PTT,
450  pool::private_::thread_types::steal<
451  PTT::result_traits_,
452  typename PTT::os_traits,
453  PTT,
454  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::queue_model
455  >
456  >
457  > {
458  public:
459  using base_t=private_::thread_pool_queue_model<
460  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>,
461  pool_traits::size_mode_t::fixed_size,
462  PTT,
463  private_::fixed_pool_of_threads<
464  PTT,
465  pool::private_::thread_types::steal<
466  PTT::result_traits_,
467  typename PTT::os_traits,
468  PTT,
469  pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>::queue_model
470  >
471  >
472  >;
473  using pool_traits_type=typename base_t::pool_traits_type;
474  using os_traits=typename base_t::os_traits;
475  using thread_traits=typename base_t::thread_traits;
476  using api_params_type=typename base_t::api_params_type;
477  using pool_type=typename base_t::pool_type;
478  using statistics_type=typename base_t::statistics_type;
479  using work_distribution_mode=typename base_t::work_distribution_mode;
480  using signalled_work_queue_type=typename base_t::signalled_work_queue_type;
481 
482  BOOST_MPL_ASSERT((std::is_same<typename std::is_same<typename PTT::os_traits::thread_traits::model_type, sequential_mode>::type, std::false_type>));
483 
484  enum class erase_states {
488  };
489 
490  /// Create the thread pool.
491  /**
492  \param num_threads The number of threads in the pool, which must be greater than zero.
493  */
494  explicit __stdcall thread_pool(const typename base_t::pool_type::size_type num_threads) noexcept(false) FORCE_INLINE
496  assert(this->max_num_threads_in_pool>0);
497  if (!this->max_num_threads_in_pool) {
498  throw typename base_t::exception_type(
499  _T("Cannot have an empty thread pool."),
500  info::function(
501  __LINE__,
502  __PRETTY_FUNCTION__,
503  typeid(*this),
504  info::function::argument(
505  _T("const typename pool_traits_type::pool_type::size_type max_num_threads"),
506  tostring(num_threads)
507  )
508  ),
510  );
511  }
512  }
513  thread_pool(thread_pool const &)=delete;
514 
515  /**
516  The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
517  */
518  __stdcall ~thread_pool() noexcept(false) FORCE_INLINE {
519  exit();
520  }
521 
522  /// Erase the specified, queued work.
523  /**
524  \see erase_states
525  */
526  template<typename ExecT>
527  erase_states __fastcall erase(ExecT &ec) noexcept(false)=delete;
528 
529  /// Obtain access to any statistics data collected by the operation of the thread_pool.
530  /**
531  Algorithmic complexity when specialised with no_statistics: constant time, otherwise O(pool_size()).
532  Note that the value computed for the statistics_type::total_vertical_work() is guaranteed to be accurate. The value computed for the statistics_type::total_hrz_work() is guaranteed not be more than the value as if it were computed atomically. Therefore the following holds:
533  statistics_type::total_work_added()>=statistics_type::total_vertical_work()+statistics_type::total_hrz_work()
534  */
535  statistics_type const __fastcall statistics() const noexcept(true) override FORCE_INLINE {
536  using acc_t=private_::wrkr_accumulate_across_threads<statistics_type>;
537 
538  statistics_type stats(statistics_);
539  stats.add_vertical_work(acc_t::vertical_work(this->pool));
540  stats.add_hrz_work(acc_t::hrz_work(this->pool));
541  return stats;
542  }
543 
544  void exit() noexcept(false) {
545  this->exit_requested().set(pool_traits_type::template exit_requested_type<typename work_distribution_mode::queue_model>::states::exit_requested);
546  // The destruction of the collection of threads is sequential, but the threads themselves can exit in parallel, thus speeding up the clean-up of the pool.
547  // The natural object-destruction order causes the threads in the pool to be destroyed too late, so the pool must be emptied now.
548  this->pool.clear();
549  // We must empty the queue after deleting the threads, because of the tricky way the pool_threads steal work from the signalled_work_queue can cause the pool_threads to crash is the queue is emptied whilst the threads are stealing work. Basically the pool_threads steal work atomically with respect to each other, but not this clear() method.)
550  this->main_queue.clear();
551  }
552 
553  private:
554  /// This is the batch that the main thread will process.
555  signalled_work_queue_type main_queue;
556  statistics_type statistics_;
557 
558  statistics_type &__fastcall set_statistics() noexcept(true) override FORCE_INLINE {
559  return statistics_;
560  }
561 
562  bool __fastcall add_work_to_batch(const typename thread_traits::api_params_type::tid_type, typename signalled_work_queue_type::value_type &&wk) noexcept(true) override FORCE_INLINE {
563  main_queue.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
564  return true;
565  }
566 
567  /**
568  */
569  void __fastcall add_nonjoinable_work(typename signalled_work_queue_type::value_type &&wk) noexcept(false) override FORCE_INLINE {
570  this->pool.first_thread().push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
571  statistics_.added_work();
572  }
573 
574  /**
575  Try to add the new work to this thread's batch, if empty, to avoid locking the main queue in the pool. This is very important: it helps maintain throughput of work, by avoiding having to place work on the shared signalled_work_queue in the thread_pool, which involves locks and signals, as placing the work directly in the pool_thread's batch can be done lock-free.
576  */
577  typename signalled_work_queue_type::value_type __fastcall add_joinable_work(typename signalled_work_queue_type::value_type &&wk) noexcept(false) override FORCE_INLINE {
578  this->pool.first_thread().push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
579  statistics_.added_work();
580  return std::move(wk);
581  }
582 
583  bool __fastcall process_a_batch_item(const typename thread_traits::api_params_type::tid_type tid) noexcept(false) FORCE_INLINE {
584  return main_queue.process_a_batch_item(this->pool, tid);
585  }
586  };
587 
588 } } }
589 
590 #endif