Seg3D  2.4
Seg3D is a free volume segmentation and processing tool developed by the NIH Center for Integrative Biomedical Computing at the University of Utah Scientific Computing and Imaging (SCI) Institute.
the_thread_pool.hxx
1 /*
2  For more information, please see: http://software.sci.utah.edu
3 
4  The MIT License
5 
6  Copyright (c) 2016 Scientific Computing and Imaging Institute,
7  University of Utah.
8 
9 
10  Permission is hereby granted, free of charge, to any person obtaining a
11  copy of this software and associated documentation files (the "Software"),
12  to deal in the Software without restriction, including without limitation
13  the rights to use, copy, modify, merge, publish, distribute, sublicense,
14  and/or sell copies of the Software, and to permit persons to whom the
15  Software is furnished to do so, subject to the following conditions:
16 
17  The above copyright notice and this permission notice shall be included
18  in all copies or substantial portions of the Software.
19 
20  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
26  DEALINGS IN THE SOFTWARE.
27 */
28 
29 // File : the_thread_pool.hxx
30 // Author : Pavel A. Koshevoy
31 // Created : Wed Feb 21 08:30:00 MST 2007
32 // Copyright : (C) 2004-2008 University of Utah
33 // Description : A thread pool class.
34 
35 #ifndef THE_THREAD_POOL_HXX_
36 #define THE_THREAD_POOL_HXX_
37 
38 // system includes:
39 #include <list>
40 #include <assert.h>
41 
42 // local includes:
43 #include <Core/ITKCommon/ThreadUtils/the_thread_interface.hxx>
44 #include <Core/ITKCommon/ThreadUtils/the_transaction.hxx>
45 
46 // forward declarations:
48 class the_thread_pool_t;
49 
50 
51 //----------------------------------------------------------------
52 // transaction_wrapper_t
53 //
55 {
56 public:
57  the_transaction_wrapper_t(const unsigned int & num_parts,
58  the_transaction_t * transaction);
60 
61  static void notify_cb(void * data,
63  the_transaction_t::state_t s);
64 
65  bool notify(the_transaction_t * t,
66  the_transaction_t::state_t s);
67 
68 private:
71 
72  the_mutex_interface_t * mutex_;
73 
74  the_transaction_t::notify_cb_t cb_;
75  void * cb_data_;
76 
77  const unsigned int num_parts_;
78  unsigned int notified_[the_transaction_t::DONE_E + 1];
79 };
80 
81 
82 //----------------------------------------------------------------
83 // the_thread_pool_data_t
84 //
86 {
88  parent_(NULL),
89  thread_(NULL),
90  id_(~0u)
91  {}
92 
94  { thread_->delete_this(); }
95 
96  the_thread_pool_t * parent_;
97  the_thread_interface_t * thread_;
98  unsigned int id_;
99 };
100 
101 
102 //----------------------------------------------------------------
103 // the_thread_pool_t
104 //
106 {
107  friend class the_thread_interface_t;
108 
109 public:
110  the_thread_pool_t(unsigned int num_threads);
111  virtual ~the_thread_pool_t();
112 
113  // start the threads:
114  void start();
115 
116  // this controls whether the thread will voluntarily terminate
117  // once it runs out of transactions:
118  void set_idle_sleep_duration(bool enable, unsigned int microseconds = 10000);
119 
120  // schedule a transaction:
121  // NOTE: when multithreaded is true, the transaction will be scheduled
122  // N times, where N is the number of threads in the pool.
123  // This means the transaction will be executed by N threads, so
124  // the transaction has to support concurrent execution internally.
125  virtual void push_front(the_transaction_t * transaction,
126  bool multithreaded = false);
127 
128  virtual void push_back(the_transaction_t * transaction,
129  bool multithreaded = false);
130 
131  virtual void push_back(std::list<the_transaction_t *> & schedule,
132  bool multithreaded = false);
133 
134  // split the work among the threads:
135  void pre_distribute_work();
136 
137  // check whether the thread pool has any work left:
138  bool has_work() const;
139 
140  // schedule a transaction and start the thread:
141  virtual void start(the_transaction_t * transaction,
142  bool multithreaded = false);
143 
144  // abort the current transaction and clear pending transactions;
145  // transactionFinished will be emitted for the aborted transaction
146  // and the discarded pending transactions:
147  void stop();
148 
149  // wait for all threads to finish:
150  void wait();
151 
152  // clear all pending transactions, do not abort the current transaction:
153  void flush();
154 
155  // this will call terminate_all for the terminators in this thread,
156  // but it will not stop the thread, so that new transactions may
157  // be scheduled while the old transactions are being terminated:
158  void terminate_transactions();
159 
160  // terminate the current transactions and schedule a new transaction:
161  void stop_and_go(the_transaction_t * transaction,
162  bool multithreaded = false);
163  void stop_and_go(std::list<the_transaction_t *> & schedule,
164  bool multithreaded = false);
165 
166  // flush the current transactions and schedule a new transaction:
167  void flush_and_go(the_transaction_t * transaction,
168  bool multithreaded = false);
169  void flush_and_go(std::list<the_transaction_t *> & schedule,
170  bool multithreaded = false);
171 
172  // virtual: default transaction communication handlers:
173  void handle(the_transaction_t * transaction, the_transaction_t::state_t s);
174  void blab(const char * message) const;
175 
176  // access control:
177  inline the_mutex_interface_t * mutex()
178  { return mutex_; }
179 
180  inline const unsigned int & pool_size() const
181  { return pool_size_; }
182 
183 private:
184  // intentionally disabled:
186  the_thread_pool_t & operator = (const the_thread_pool_t &);
187 
188 protected:
189  // thread callback handler:
190  virtual void handle_thread(the_thread_pool_data_t * data);
191 
192  // helpers:
193  inline the_thread_interface_t * thread(unsigned int id) const
194  {
195  assert(id < pool_size_);
196  return pool_[id].thread_;
197  }
198 
199  void no_lock_flush();
200  void no_lock_terminate_transactions();
201  void no_lock_push_front(the_transaction_t * transaction, bool multithreaded);
202  void no_lock_push_back(the_transaction_t * transaction, bool multithreaded);
203  void no_lock_push_back(std::list<the_transaction_t *> & schedule, bool mt);
204 
205  // thread synchronization control:
206  the_mutex_interface_t * mutex_;
207 
208  // the thread pool:
209  the_thread_pool_data_t * pool_;
210  unsigned int pool_size_;
211 
212  // the working threads:
213  std::list<unsigned int> busy_;
214 
215  // the waiting threads:
216  std::list<unsigned int> idle_;
217 
218  // scheduled transactions:
219  std::list<the_transaction_t *> transactions_;
220 };
221 
222 
223 #endif // THE_THREAD_POOL_HXX_
Definition: the_thread_pool.hxx:85
Definition: the_transaction.hxx:55
Definition: the_transaction.hxx:181
Definition: the_mutex_interface.hxx:42
Definition: the_thread_pool.hxx:105
Definition: the_thread_pool.hxx:54
Definition: the_thread_interface.hxx:57