libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
pool_thread_impl.hpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2004 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 namespace jmmcg { namespace LIBJMMCG_VER_NAMESPACE { namespace ppd { namespace pool { namespace private_ { namespace thread_types {
20 
21 template<class PTT> inline
23 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
24 : base_t(exit_r), batch(work_q) {
25 }
26 
27 template<class PTT> inline
29 ~steal() noexcept(false) {
30  base_t::wait_thread_exit();
31  assert(batch.batch.batch_empty());
32 }
33 
34 template<class PTT> inline
35 bool
37 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
38  return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
39 }
40 
41 template<class PTT> inline
42 bool
44 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
45  return batch.batch.process_a_batch_item();
46 }
47 
48 template<class PTT> inline
51 process() noexcept(false) {
52  // Serialize access to the queue by all of the other threads in the pool, this means that only one job can be removed at a time from the work queue.
53  for (;;) {
54  typename exit_requested_type::lock_result_type lkd;
55  {
56  const typename os_traits::thread_traits::cancellability set;
57  lkd=this->exit_requested_.lock();
58  }
59  if (lkd.second!=lock_traits::atom_set || lkd.first==exit_requested_type::states::exit_requested) {
60  // Ensure the rest of the threads in the pool exit.
61  this->exit_requested_.set(exit_requested_type::states::exit_requested);
62  break;
63  } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
64  // The counter in signalled_work_queue is one item too low, and will need to be re-added, but we'll do that closer to where we lock the queue, to reduce the chance of a horizontal thread slipping in and getting the work.
65  batch.batch.process_a_batch(batch.signalled_work_queue);
66  assert(batch.batch.batch_empty());
67  }
68  }
69  return thread_traits::api_params_type::no_kernel_thread;
70 }
71 
72 template<class PTT> inline
74 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
75 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
76 }
77 
78 template<class PTT> inline
80 ~slave() noexcept(true) {
81  base_t::wait_thread_exit();
82 }
83 
84 template<class PTT> inline
85 typename slave<generic_traits::return_data::nonjoinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
87 process() noexcept(false) {
88  some_work->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
89  this->statistics_.processed_vertical_work();
90  return thread_traits::api_params_type::no_kernel_thread;
91 }
92 
93 template<class PTT> inline
95 steal(exit_requested_type &exit_r, signalled_work_queue_type &work_q) noexcept(true)
96 : base_t(exit_r), batch(work_q) {
97 }
98 
99 template<class PTT> inline
101 ~steal() noexcept(false) {
102  base_t::wait_thread_exit();
103  assert(batch.batch.batch_empty());
104 }
105 
106 template<class PTT> inline
107 bool
109 add_work_to_batch(typename signalled_work_queue_type::value_type &&wk) {
110  return batch.batch.add_work_to_batch(std::forward<typename signalled_work_queue_type::value_type>(wk));
111 }
112 
113 template<class PTT> inline
114 bool
116 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thr) {
117  return batch.batch.process_a_batch_item();
118 }
119 
120 template<class PTT> inline
123 process() noexcept(false) {
124  // Serialize access to the queue by all of the other threads in the pool, this means that only one job can be removed at a time from the work queue.
125  for (;;) {
126  typename exit_requested_type::lock_result_type lkd;
127  {
128  const typename os_traits::thread_traits::cancellability set;
129  lkd=this->exit_requested_.lock();
130  }
131  assert(lkd.second==exit_requested_type::lock_result_type::second_type::atom_set);
132  if (lkd.first==exit_requested_type::states::exit_requested) {
133  // Ensure the rest of the threads in the pool exit.
134  this->exit_requested_.set(exit_requested_type::states::exit_requested);
135  break;
136  } else if (lkd.first==exit_requested_type::states::new_work_arrived) {
137  // The counter in signalled_work_queue is one item too low, and will need to be re-added, but we'll do that closer to where we lock the queue, to reduce the chance of a horizontal thread slipping in and getting the work.
138  batch.batch.process_a_batch(batch.signalled_work_queue);
139  assert(batch.batch.batch_empty());
140  }
141  }
142  return thread_traits::api_params_type::no_kernel_thread;
143 }
144 
145 template<class PTT> inline
147 steal(exit_requested_type &exit_r) noexcept(true)
148 : base_t(exit_r), work() {
149 }
150 
151 template<class PTT> inline
153 steal(steal &&s) noexcept(true)
154 : base_t(s.exit_requested_), work(std::move(s.work)) {
155 }
156 
157 template<class PTT> inline
159 ~steal() noexcept(false) {
160  base_t::wait_thread_exit();
161 }
162 
163 template<class PTT> inline bool
165 push_front(typename signalled_work_queue_type::value_type &&wk) {
166  work.batch.push_front(std::forward<typename signalled_work_queue_type::value_type>(wk));
167  return true;
168 }
169 
170 template<class PTT> inline bool
172 process_a_batch_item(typename os_traits::thread_exception const &exception_thrown_in_thread) {
173 // TODO
174 }
175 
176 template<class PTT> inline
179 process() noexcept(false) {
180  using cfg_type=typename container_type::container_type::value_type::value_type::cfg_type;
181 
182  for (;;) {
183  // Busy wait for work or exit flag....
184  while (work.batch.empty()) {
185  const typename exit_requested_type::lock_result_type lkd=this->exit_requested_.try_lock();
186  assert(lkd.second==exit_requested_type::lock_result_type::second_type::atom_set);
187  if (lkd.first==exit_requested_type::states::exit_requested) {
188  // Ensure the rest of the threads in the pool exit.
189  this->exit_requested_.set(exit_requested_type::states::exit_requested);
190  return thread_traits::api_params_type::no_kernel_thread;
191  }
192  }
193  typename container_type::container_type::value_type current_work(work.batch.pop_front_1_nochk_nosig());
194  ppd::private_::eval_shared_deleter_t<typename container_type::container_type::value_type> wk(current_work);
195  wk.process_the_work(std::bind(&statistics_type::processed_vertical_work, &work.statistics_), cfg_type::vertical_edge_annotation);
196  }
197  return thread_traits::api_params_type::no_kernel_thread;
198 }
199 
200 template<class PTT> inline
202 slave(exit_requested_type &exit_r, typename signalled_work_queue_type::value_type &&wk) noexcept(true)
203 : base_t(exit_r), some_work(std::forward<typename signalled_work_queue_type::value_type>(wk)), statistics_() {
204 }
205 
206 template<class PTT> inline
208 ~slave() noexcept(true) {
209  base_t::wait_thread_exit();
210 }
211 
212 template<class PTT> inline
213 typename slave<generic_traits::return_data::joinable, thread_os_traits<generic_traits::api_type::posix_pthreads, heavyweight_threading>, PTT>::thread_traits::api_params_type::states
215 process() noexcept(false) {
216  class set_work_complete {
217  public:
218  explicit set_work_complete(typename signalled_work_queue_type::value_type::atomic_ptr_t &wc) noexcept(true)
219  : work_ptr(wc) {
220  }
221  ~set_work_complete() noexcept(true) {
222  work_ptr->work_complete()->set();
223  }
224 
225  private:
226  typename signalled_work_queue_type::value_type::atomic_ptr_t &work_ptr;
227  };
228 
229  typename signalled_work_queue_type::value_type::atomic_ptr_t work_ptr(some_work.get());
230  if (dynamic_cast<typename signalled_work_queue_type::value_type::no_ref_counting *>(work_ptr.get())) {
231  some_work=typename signalled_work_queue_type::value_type();
232  }
233  if (work_ptr->result_traits()==generic_traits::return_data::joinable) {
234  const set_work_complete setter(work_ptr);
235  work_ptr->process_joinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
236  } else {
237  work_ptr->process_nonjoinable(signalled_work_queue_type::value_ret_type::value_type::value_type::cfg_type::vertical_edge_annotation);
238  }
239  this->statistics_.processed_vertical_work();
240  return thread_traits::api_params_type::no_kernel_thread;
241 }
242 
243 } } } } } }