libjmmcg  release_579_6_g8cffd
A C++ library containing an eclectic mix of useful, advanced components.
dataflow_full_algos_performance.cpp
Go to the documentation of this file.
1 /******************************************************************************
2 ** Copyright © 2002 by J.M.McGuiness, coder@hussar.me.uk
3 **
4 ** This library is free software; you can redistribute it and/or
5 ** modify it under the terms of the GNU Lesser General Public
6 ** License as published by the Free Software Foundation; either
7 ** version 2.1 of the License, or (at your option) any later version.
8 **
9 ** This library is distributed in the hope that it will be useful,
10 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
11 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 ** Lesser General Public License for more details.
13 **
14 ** You should have received a copy of the GNU Lesser General Public
15 ** License along with this library; if not, write to the Free Software
16 ** Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 */
18 
19 #include "stdafx.h"
20 
21 #define BOOST_TEST_MODULE libjmmcg_tests
22 #include <boost/test/included/unit_test.hpp>
23 
24 #include <boost/mpl/list.hpp>
25 
26 #include "core/thread_pool_sequential.hpp"
27 #include "core/thread_pool_workers.hpp"
28 
29 #include <boost/graph/graphviz.hpp>
30 
31 #include <chrono>
32 #include <random>
33 
34 using namespace libjmmcg;
35 using namespace ppd;
36 
37 using timed_results_t=ave_deviation_meter<double>;
38 
39 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1>
40 struct erew_normal_fifo_t {
41  typedef api_lock_traits<platform_api, Mdl> lock_traits;
42  typedef safe_colln<
43  std::vector<long>,
46 
47  typedef pool_aspects<
48  Jn,
50  Mdl,
52  std::less,
53  GSSk/*,
54  basic_statistics,
55  control_flow_graph*/
57 
58  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
59 
60  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
61 };
62 
63 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1>
64 struct erew_normal_lifo_t {
65  typedef api_lock_traits<platform_api, Mdl> lock_traits;
66  typedef safe_colln<
67  std::vector<long>,
70 
71  typedef pool_aspects<
72  Jn,
74  Mdl,
76  std::less,
77  GSSk/*,
78  basic_statistics,
79  control_flow_graph*/
81 
82  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
83 
84  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
85 };
86 
87 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1>
88 struct erew_priority_queue_t {
89  typedef api_lock_traits<platform_api, Mdl> lock_traits;
90  typedef safe_colln<
91  std::vector<long>,
94 
95  typedef pool_aspects<
96  Jn,
98  Mdl,
100  std::less,
101  GSSk/*,
102  basic_statistics,
103  control_flow_graph*/
105 
106  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
107 
108  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
109 };
110 
111 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1UL>
112 struct crew_normal_fifo_t {
113  typedef api_lock_traits<platform_api, Mdl> lock_traits;
114  typedef safe_colln<
115  std::vector<long>,
116  typename lock::rw::locker<lock_traits>,
119 
120  typedef pool_aspects<
121  Jn,
122  platform_api,
123  Mdl,
125  std::less,
126  GSSk/*,
127  basic_statistics,
128  control_flow_graph*/
130 
131  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
132 
133  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
134 };
135 
136 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1UL>
137 struct crew_normal_lifo_t {
138  typedef api_lock_traits<platform_api, Mdl> lock_traits;
139  typedef safe_colln<
140  std::vector<long>,
141  typename lock::rw::locker<lock_traits>,
144 
145  typedef pool_aspects<
146  Jn,
147  platform_api,
148  Mdl,
150  std::less,
151  GSSk/*,
152  basic_statistics,
153  control_flow_graph*/
155 
156  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
157 
158  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
159 };
160 
161 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1UL>
163  typedef api_lock_traits<platform_api, Mdl> lock_traits;
164  typedef safe_colln<
165  std::vector<long>,
166  typename lock::rw::locker<lock_traits>,
169 
170  typedef pool_aspects<
171  Jn,
172  platform_api,
173  Mdl,
175  std::less,
176  GSSk/*,
177  basic_statistics,
178  control_flow_graph*/
180 
181  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
182 
183  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
184 };
185 
186 template<class Db, pool_traits::size_mode_t Sz, generic_traits::return_data Jn, class Mdl, unsigned int PoolSize=0, unsigned long GSSk=1>
187 struct crew_priority_queue_t {
188  typedef api_lock_traits<platform_api, Mdl> lock_traits;
189  typedef safe_colln<
190  std::vector<long>,
191  typename lock::rw::locker<lock_traits>,
194 
195  typedef pool_aspects<
196  Jn,
197  platform_api,
198  Mdl,
200  std::less,
201  GSSk/*,
202  basic_statistics,
203  control_flow_graph*/
205 
206  typedef thread_pool<Db, Sz, thread_pool_traits> pool_type;
207 
208  static constexpr typename pool_type::pool_type::size_type pool_size=PoolSize;
209 };
210 
211 /**
212  \todo Does not seem to be perf tested...?
213 */
214 typedef boost::mpl::list<
215  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::sequential, generic_traits::return_data::joinable, sequential_mode>,
216  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 1>,
217 // erew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 2>,
218  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 2>,
219  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 4>,
220  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 8>,
221  crew_normal_fifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 12>
222 > finite_fifo_test_types;
223 
224 typedef boost::mpl::list<
225  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::sequential, generic_traits::return_data::joinable, sequential_mode>,
226  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 1>,
227  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 2>,
228  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 4>,
229  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 8>,
230  crew_normal_lifo_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 12>//,
231 // crew_normal_lifo_lockfree_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::pool_owns_queue>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 12>
232 // crew_normal_lifo_lockfree_t<pool_traits::work_distribution_mode_t::worker_threads_get_work<pool_traits::work_distribution_mode_t::queue_model_t::thread_owns_queue<pool_traits::work_distribution_mode_t::queue_model_t::stealing_mode_t::random>>, pool_traits::size_mode_t::fixed_size, generic_traits::return_data::joinable, heavyweight_threading, 1>
233 > finite_lifo_test_types;
234 
236  auto const &bt=api_threading_traits<platform_api, sequential_mode>::gen_backtrace();
237  std::cerr<<bt.begin()<<std::endl;
238 }
239 
240 namespace __cxxabiv1 {
241  extern "C" void
244  std::terminate();
245  }
246 }
247 
248 // TODO Need to test: pool_traits::size_mode_t::tracks_to_max
249 
250 BOOST_AUTO_TEST_SUITE(thread_pool_tests)
251 
252 BOOST_AUTO_TEST_SUITE(joinable_dataflow)
253 
254 BOOST_AUTO_TEST_SUITE(finite)
255 
256 BOOST_AUTO_TEST_SUITE(n_elements)
257 
258 /**
259  \test <a href="./examples/dataflow_full_algos_performance_accumulate.svg">Graph</a> of scalability of parallel accumulate.
260  ===================================
261  For 2<<10 items. Note that we observe anti-scaling, as there are simply not enough items in the collection to offset the costs of threading.
262  -# Results:
263  -# Build 1405:
264  - Pool=0, items=1024, time taken: [0, 1.35999e-09 ~(+/-2.8e+02%), 3.1e-05], samples=100001, total=0.00014 sec.
265  - Pool=1, items=1024, time taken: [1e-06, 1.01439e-06 ~(+/-10%), 2.5e-05], samples=5002, total=0.0051 sec.
266  - Pool=2, items=1024, time taken: [2.9e-05, 9.26648e-05 ~(+/-10%), 0.00098], samples=13838, total=1.3 sec.
267  - Pool=4, items=1024, time taken: [0.000136, 0.00021595 ~(+/-9.8%), 0.0003], samples=60, total=0.013 sec.
268  - Pool=8, items=1024, time taken: [0.000272, 0.000470489 ~(+/-10%), 0.0008], samples=47, total=0.022 sec.
269  - Pool=12, items=1024, time taken: [0.000419, 0.000632359 ~(+/-9.8%), 0.0015], samples=64, total=0.04 sec.
270  -# Build 1469:
271  - Pool=0, items=1024, time taken: [0, 2.851e-10 ~(+/-2.2e+02%), 4e-05], samples=10000001, total=0.0029 sec.
272  - Pool=1, items=1024, time taken: [1e-06, 1.00901e-06 ~(+/-10%), 4.6e-05], samples=488923, total=0.49 sec.
273  - Pool=2, items=1024, time taken: [8e-06, 0.000108706 ~(+/-12%), 0.0032], samples=10000001, total=1.1e+03 sec.
274  - Pool=4, items=1024, time taken: [7.8e-05, 0.000269517 ~(+/-10%), 0.00054], samples=675, total=0.18 sec.
275  - Pool=8, items=1024, time taken: [0.000282, 0.000469618 ~(+/-10%), 0.0017], samples=304, total=0.14 sec.
276  - Pool=12, items=1024, time taken: [0.000321, 0.000540948 ~(+/-10%), 0.00077], samples=192, total=0.1 sec.
277 */
278 BOOST_AUTO_TEST_CASE_TEMPLATE(accumulate, T, finite_lifo_test_types) {
279  typedef typename T::vtr_colln_t vtr_colln_t;
280  typedef typename T::pool_type pool_type;
281 
282 #ifdef JMMCG_PERFORMANCE_TESTS
283  const unsigned long test_size=2<<10;
284  const unsigned long num_reps=10000000;
285 #else
286  const unsigned long test_size=2<<3;
287  const unsigned long num_reps=2;
288 #endif
289  std::set_terminate(&gen_bt_terminate);
290  pool_type pool(T::pool_size);
291  std::cout<<"Pool="<<pool.pool_size()<<std::flush;
292  vtr_colln_t v;
293  v.reserve(test_size);
294  std::mt19937_64 gen(42);
295  std::uniform_int_distribution<typename vtr_colln_t::value_type> distribution(0, std::numeric_limits<typename vtr_colln_t::value_type>::max()/(test_size+1));
296  std::generate_n(std::back_inserter(v.colln()), test_size, std::bind(distribution, gen));
297  v.sync_size();
298  const typename vtr_colln_t::value_type res_chk=std::accumulate(v.colln().begin(), v.colln().end(), typename vtr_colln_t::value_type());
299  std::cout<<", items="<<v.size()<<std::flush;
300  const std::pair<timed_results_t, bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
301  10.0,
302  num_reps,
303  [&pool, res_chk, &v]() {
304  typedef typename pool_type::joinable joinable;
305 
306  typename vtr_colln_t::value_type res;
307  const auto t1=std::chrono::high_resolution_clock::now();
308  auto const &context=pool<<joinable()<<pool.accumulate(v, typename vtr_colln_t::value_type());
309  res=*context;
310  const auto t2=std::chrono::high_resolution_clock::now();
311  BOOST_CHECK_EQUAL(res, res_chk);
312  return timed_results_t::value_type(std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count())/1000000;
313  }
314  ));
315  std::cout<<", time taken: "<<timed_results.first<<" sec."<<std::endl;
316  std::cout<<pool.statistics()<<std::endl;
317  std::ofstream ofs("accumulate.dot");
318  pool.cfg().write_graphviz(ofs);
319 #ifdef JMMCG_PERFORMANCE_TESTS
320 // BOOST_CHECK(!timed_results.second);
321 #endif
322 }
323 
324 /**
325  \test <a href="./examples/dataflow_full_algos_performance_merge.svg">Graph</a> of scalability of parallel merge.
326  ===================================
327  For 2<<25 items.
328  -# Results:
329  -# Build 1655:
330  - Pool=0, items=1024, time taken=
331  - Pool=1, items=1024, time taken=
332  - Pool=2, items=1024, time taken=
333  - Pool=4, items=1024, time taken=
334  - Pool=8, items=1024, time taken=
335  - Pool=12, items=1024, time taken=
336 */
337 BOOST_AUTO_TEST_CASE_TEMPLATE(merge, T, finite_lifo_test_types) {
338  typedef typename T::vtr_colln_t vtr_colln_t;
339  typedef typename T::pool_type pool_type;
340 
341 #ifdef JMMCG_PERFORMANCE_TESTS
342  const unsigned long test_size=2<<25;
343  const unsigned long num_reps=2000;
344 #else
345  const unsigned long test_size=2<<3;
346  const unsigned long num_reps=2;
347 #endif
348  pool_type pool(T::pool_size);
349  std::cout<<"Pool="<<pool.pool_size()<<std::flush;
350  vtr_colln_t v, v1;
351  v.reserve(test_size);
352  v1.reserve(test_size);
353  std::mt19937_64 gen(42);
354  std::uniform_int_distribution<typename vtr_colln_t::value_type> distribution(0, std::numeric_limits<typename vtr_colln_t::value_type>::max()/test_size);
355  std::generate_n(std::back_inserter(v.colln()), test_size, std::bind(distribution, gen));
356  v.sync_size();
357  std::sort(v.colln().begin(), v.colln().end());
358  std::generate_n(std::back_inserter(v1.colln()), test_size, std::bind(distribution, gen));
359  v1.sync_size();
360  std::sort(v1.colln().begin(), v1.colln().end());
361  vtr_colln_t v_out;
362  // This causes the sequential checks to fail.
363  v_out.colln().reserve(v.size()+v1.size());
364  v_out.resize_noinit_nolk(v.size()+v1.size());
365  vtr_colln_t v_chk;
366  v_chk.colln().reserve(v.size()+v1.size());
367  std::merge(v.colln().begin(), v.colln().end(), v1.colln().begin(), v1.colln().end(), std::back_inserter(v_chk.colln()));
368  v_chk.sync_size();
369  std::cout<<", items="<<(v.size()+v1.size())<<std::flush;
370  const std::pair<timed_results_t, bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
371  10.0,
372  num_reps,
373  [&pool, &v_chk, &v, &v1, &v_out]() {
374  typedef typename pool_type::joinable joinable;
375 
376  const auto t1=std::chrono::high_resolution_clock::now();
377  auto const &context=pool<<joinable()<<pool.merge(v, v1, v_out);
378  *context;
379  const auto t2=std::chrono::high_resolution_clock::now();
380  BOOST_CHECK_EQUAL(v_out.size(), v_chk.size());
381  BOOST_CHECK(v_out==v_chk);
382  return timed_results_t::value_type(std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count())/1000000;
383  }
384  ));
385  std::cout<<", time taken: "<<timed_results.first<<" sec."<<std::endl;
386  std::cout<<pool.statistics()<<std::endl;
387  std::ofstream ofs("merge.dot");
388  pool.cfg().write_graphviz(ofs);
389 #ifdef JMMCG_PERFORMANCE_TESTS
390  BOOST_CHECK(!timed_results.second);
391 #endif
392 }
393 
394 /**
395  \test <a href="./examples/dataflow_full_algos_performance_sort.svg">Graph</a> of scalability of parallel sort.
396  ===================================
397  For 2<<25 items.
398  -# Results:
399  -# Build 1655:
400  - Pool=0, items=1024, time taken=
401  - Pool=1, items=1024, time taken=
402  - Pool=2, items=1024, time taken=
403  - Pool=4, items=1024, time taken=
404  - Pool=8, items=1024, time taken=
405  - Pool=12, items=1024, time taken=
406 */
407 BOOST_AUTO_TEST_CASE_TEMPLATE(sort_unsorted, T, finite_lifo_test_types) {
408  typedef typename T::vtr_colln_t vtr_colln_t;
409  typedef typename T::pool_type pool_type;
410 
411 #ifdef JMMCG_PERFORMANCE_TESTS
412  const unsigned long test_size=2<<25;
413  const unsigned long num_reps=2000;
414 #else
415  const unsigned long test_size=2<<3;
416  const unsigned long num_reps=2;
417 #endif
418  pool_type pool(T::pool_size);
419  std::cout<<"Pool="<<pool.pool_size()<<std::flush;
420  vtr_colln_t v;
421  v.reserve(test_size);
422  std::mt19937_64 gen(42);
423  std::uniform_int_distribution<typename vtr_colln_t::value_type> distribution(0, std::numeric_limits<typename vtr_colln_t::value_type>::max()/test_size);
424  std::generate_n(std::back_inserter(v.colln()), test_size, std::bind(distribution, gen));
425  vtr_colln_t v_chk(v);
426  v.sync_size();
427  v_chk.sync_size();
428  std::sort(v_chk.colln().begin(), v_chk.colln().end());
429  std::cout<<", items="<<v.size()<<std::flush;
430  const std::pair<timed_results_t, bool> timed_results(compute_average_deviation<timed_results_t::value_type>(
431  10.0,
432  num_reps,
433  [&pool, &v_chk, &v]() {
434  typedef typename pool_type::joinable joinable;
435 
436  const auto t1=std::chrono::high_resolution_clock::now();
437  auto const &context=pool<<joinable()<<pool.sort(v);
438  *context;
439  const auto t2=std::chrono::high_resolution_clock::now();
440  BOOST_CHECK(v==v_chk);
441  return timed_results_t::value_type(std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count())/1000000;
442  }
443  ));
444  std::cout<<", time taken: "<<timed_results.first<<" sec."<<std::endl;
445  std::cout<<pool.statistics()<<std::endl;
446  std::ofstream ofs("sort.dot");
447  pool.cfg().write_graphviz(ofs);
448 #ifdef JMMCG_PERFORMANCE_TESTS
449  BOOST_CHECK(!timed_results.second);
450 #endif
451 }
452 
453 BOOST_AUTO_TEST_SUITE_END()
454 
455 BOOST_AUTO_TEST_SUITE_END()
456 
457 BOOST_AUTO_TEST_SUITE_END()
458 
459 BOOST_AUTO_TEST_SUITE_END()