ctkEAPooledExecutor.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. /*=============================================================================
  2. Library: CTK
  3. Copyright (c) German Cancer Research Center,
  4. Division of Medical and Biological Informatics
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. =============================================================================*/
  15. #include "ctkEAPooledExecutor_p.h"
  16. #include "ctkEAChannel_p.h"
  17. #include <dispatch/ctkEAInterruptibleThread_p.h>
  18. #include <dispatch/ctkEAInterruptedException_p.h>
  19. #include <limits>
  20. #include <stdexcept>
  21. #include <QDateTime>
  22. #include <QDebug>
  23. const int ctkEAPooledExecutor::DEFAULT_MAXIMUMPOOLSIZE = std::numeric_limits<int>::max();
  24. const int ctkEAPooledExecutor::DEFAULT_MINIMUMPOOLSIZE = 1;
  25. const long ctkEAPooledExecutor::DEFAULT_KEEPALIVETIME = 60 * 1000;
  26. ctkEAPooledExecutor::ctkEAPooledExecutor(ctkEAChannel* channel, int maxPoolSize)
  27. : maximumPoolSize_(maxPoolSize), minimumPoolSize_(DEFAULT_MINIMUMPOOLSIZE),
  28. poolSize_(0), keepAliveTime_(DEFAULT_KEEPALIVETIME), shutdown_(false),
  29. handOff_(channel), blockedExecutionHandler_(0), waitWhenBlocked_(this),
  30. discardOldestWhenBlocked_(this)
  31. {
  32. runWhenBlocked();
  33. }
  34. ctkEAPooledExecutor::~ctkEAPooledExecutor()
  35. {
  36. delete handOff_;
  37. qDeleteAll(stoppedThreads_);
  38. }
  39. int ctkEAPooledExecutor::getMaximumPoolSize() const
  40. {
  41. QMutexLocker lock(&mutex);
  42. return maximumPoolSize_;
  43. }
  44. void ctkEAPooledExecutor::setMaximumPoolSize(int newMaximum)
  45. {
  46. QMutexLocker lock(&mutex);
  47. if (newMaximum <= 0) throw std::invalid_argument("maximum must be > 0");
  48. maximumPoolSize_ = newMaximum;
  49. }
  50. int ctkEAPooledExecutor::getMinimumPoolSize() const
  51. {
  52. QMutexLocker lock(&mutex);
  53. return minimumPoolSize_;
  54. }
  55. void ctkEAPooledExecutor::setMinimumPoolSize(int newMinimum)
  56. {
  57. QMutexLocker lock(&mutex);
  58. if (newMinimum < 0) throw std::invalid_argument("minimum must be >= 0");
  59. minimumPoolSize_ = newMinimum;
  60. }
  61. int ctkEAPooledExecutor::getPoolSize() const
  62. {
  63. QMutexLocker lock(&mutex);
  64. return poolSize_;
  65. }
  66. long ctkEAPooledExecutor::getKeepAliveTime() const
  67. {
  68. QMutexLocker lock(&mutex);
  69. return keepAliveTime_;
  70. }
  71. void ctkEAPooledExecutor::setKeepAliveTime(long msecs)
  72. {
  73. QMutexLocker lock(&mutex);
  74. keepAliveTime_ = msecs;
  75. }
  76. ctkEAPooledExecutor::BlockedExecutionHandler* ctkEAPooledExecutor::getBlockedExecutionHandler() const
  77. {
  78. QMutexLocker lock(&mutex);
  79. return blockedExecutionHandler_;
  80. }
  81. void ctkEAPooledExecutor::setBlockedExecutionHandler(BlockedExecutionHandler* h)
  82. {
  83. QMutexLocker lock(&mutex);
  84. blockedExecutionHandler_ = h;
  85. }
  86. int ctkEAPooledExecutor::createThreads(int numberOfThreads)
  87. {
  88. int ncreated = 0;
  89. for (int i = 0; i < numberOfThreads; ++i)
  90. {
  91. if (poolSize_ < maximumPoolSize_)
  92. {
  93. addThread(0);
  94. ++ncreated;
  95. }
  96. else
  97. break;
  98. }
  99. return ncreated;
  100. }
  101. void ctkEAPooledExecutor::interruptAll()
  102. {
  103. QMutexLocker lock(&mutex);
  104. foreach (ctkEAInterruptibleThread* t, threads_)
  105. {
  106. t->interrupt();
  107. }
  108. }
  109. void ctkEAPooledExecutor::shutdownNow()
  110. {
  111. shutdownNow(&discardWhenBlocked_);
  112. }
  113. void ctkEAPooledExecutor::shutdownNow(BlockedExecutionHandler* handler)
  114. {
  115. QMutexLocker lock(&mutex);
  116. setBlockedExecutionHandler(handler);
  117. shutdown_ = true; // don't allow new tasks
  118. minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
  119. interruptAll(); // interrupt all existing threads
  120. }
  121. void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks()
  122. {
  123. shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
  124. }
  125. void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler* handler)
  126. {
  127. QMutexLocker lock(&mutex);
  128. setBlockedExecutionHandler(handler);
  129. shutdown_ = true;
  130. if (poolSize_ == 0) // disable new thread construction when idle
  131. minimumPoolSize_ = maximumPoolSize_ = 0;
  132. }
  133. bool ctkEAPooledExecutor::isTerminatedAfterShutdown() const
  134. {
  135. QMutexLocker lock(&mutex);
  136. return shutdown_ && poolSize_ == 0;
  137. }
  138. bool ctkEAPooledExecutor::awaitTerminationAfterShutdown(long maxWaitTime) const
  139. {
  140. QMutexLocker lock(&mutex);
  141. QMutexLocker shutdownLock(&shutdownMutex);
  142. if (!shutdown_)
  143. throw std::logic_error("not in shutdown state");
  144. if (poolSize_ == 0)
  145. return true;
  146. long waitTime = maxWaitTime;
  147. if (waitTime <= 0)
  148. return false;
  149. //TODO Use Qt4.7 API
  150. QDateTime start = QDateTime::currentDateTime();
  151. forever
  152. {
  153. waitCond.wait(&shutdownMutex, waitTime);
  154. if (poolSize_ == 0)
  155. return true;
  156. qint64 currWait = start.time().msecsTo(QDateTime::currentDateTime().time());
  157. waitTime = maxWaitTime - currWait;
  158. if (waitTime <= 0)
  159. return false;
  160. }
  161. }
  162. void ctkEAPooledExecutor::awaitTerminationAfterShutdown() const
  163. {
  164. QMutexLocker lock(&mutex);
  165. if (!shutdown_)
  166. throw std::logic_error("not in shutdown state");
  167. while (poolSize_ > 0)
  168. {
  169. lock.unlock();
  170. QMutexLocker shutdownLock(&shutdownMutex);
  171. waitCond.wait(&shutdownMutex);
  172. lock.relock();
  173. // worker is done, wait for possibly not yet finished worker-thread
  174. foreach(QThread* t, stoppedThreads_)
  175. {
  176. t->wait();
  177. }
  178. }
  179. }
  180. QList<ctkEARunnable*> ctkEAPooledExecutor::drain()
  181. {
  182. bool wasInterrupted = false;
  183. QList<ctkEARunnable*> tasks;
  184. forever
  185. {
  186. try
  187. {
  188. ctkEARunnable* x = handOff_->poll(0);
  189. if (x == 0)
  190. break;
  191. else
  192. tasks.push_back(x);
  193. }
  194. catch (const ctkEAInterruptedException& )
  195. {
  196. wasInterrupted = true; // postpone re-interrupt until drained
  197. }
  198. }
  199. if (wasInterrupted)
  200. {
  201. qobject_cast<ctkEAInterruptibleThread*>(QThread::currentThread())->interrupt();
  202. }
  203. return tasks;
  204. }
  205. void ctkEAPooledExecutor::runWhenBlocked()
  206. {
  207. setBlockedExecutionHandler(&runWhenBlocked_);
  208. }
  209. void ctkEAPooledExecutor::waitWhenBlocked()
  210. {
  211. setBlockedExecutionHandler(&waitWhenBlocked_);
  212. }
  213. void ctkEAPooledExecutor::discardWhenBlocked()
  214. {
  215. setBlockedExecutionHandler(&discardWhenBlocked_);
  216. }
  217. void ctkEAPooledExecutor::abortWhenBlocked()
  218. {
  219. setBlockedExecutionHandler(&abortWhenBlocked_);
  220. }
  221. void ctkEAPooledExecutor::discardOldestWhenBlocked()
  222. {
  223. setBlockedExecutionHandler(&discardOldestWhenBlocked_);
  224. }
  225. void ctkEAPooledExecutor::execute(ctkEARunnable* command)
  226. {
  227. forever
  228. {
  229. {
  230. QMutexLocker lock(&mutex);
  231. if (!shutdown_)
  232. {
  233. int size = poolSize_;
  234. // Ensure minimum number of threads
  235. if (size < minimumPoolSize_)
  236. {
  237. addThread(command);
  238. return;
  239. }
  240. // Try to give to existing thread
  241. if (handOff_->offer(command, 0))
  242. {
  243. return;
  244. }
  245. // If cannot handoff and still under maximum, create new thread
  246. if (size < maximumPoolSize_)
  247. {
  248. addThread(command);
  249. return;
  250. }
  251. }
  252. }
  253. // Cannot hand off and cannot create -- ask for help
  254. if (getBlockedExecutionHandler()->blockedAction(command))
  255. {
  256. return;
  257. }
  258. }
  259. }
  260. ctkEAPooledExecutor::Worker::Worker(ctkEAPooledExecutor* pe, ctkEARunnable* firstTask)
  261. : firstTask_(firstTask), pe(pe)
  262. {
  263. if (firstTask) ++firstTask->ref;
  264. }
  265. void ctkEAPooledExecutor::Worker::run()
  266. {
  267. try
  268. {
  269. ctkEARunnable* task = firstTask_;
  270. firstTask_ = 0;
  271. if (task != 0)
  272. {
  273. const bool autoDelete = task->autoDelete();
  274. task->run();
  275. if (autoDelete && !--task->ref) delete task;
  276. }
  277. while ( (task = pe->getTask()) != 0)
  278. {
  279. const bool autoDelete = task->autoDelete();
  280. task->run();
  281. if (autoDelete && !--task->ref) delete task;
  282. }
  283. }
  284. catch (const ctkEAInterruptedException&)
  285. {
  286. pe->workerDone(this);
  287. return;
  288. }
  289. pe->workerDone(this);
  290. }
  291. bool ctkEAPooledExecutor::RunWhenBlocked::blockedAction(ctkEARunnable* command)
  292. {
  293. const bool autoDelete = command->autoDelete();
  294. command->run();
  295. if (autoDelete && !--command->ref) delete command;
  296. return true;
  297. }
  298. ctkEAPooledExecutor::WaitWhenBlocked::WaitWhenBlocked(ctkEAPooledExecutor* pe)
  299. : pe(pe)
  300. {}
  301. bool ctkEAPooledExecutor::WaitWhenBlocked::blockedAction(ctkEARunnable* command)
  302. {
  303. {
  304. QMutexLocker lock(&pe->mutex);
  305. if (pe->shutdown_)
  306. return true;
  307. }
  308. pe->handOff_->put(command);
  309. return true;
  310. }
  311. bool ctkEAPooledExecutor::DiscardWhenBlocked::blockedAction(ctkEARunnable* command)
  312. {
  313. Q_UNUSED(command)
  314. return true;
  315. }
  316. bool ctkEAPooledExecutor::AbortWhenBlocked::blockedAction(ctkEARunnable* command)
  317. {
  318. Q_UNUSED(command)
  319. throw ctkRuntimeException("Pool is blocked");
  320. }
  321. ctkEAPooledExecutor::DiscardOldestWhenBlocked::DiscardOldestWhenBlocked(ctkEAPooledExecutor* pe)
  322. : pe(pe)
  323. {}
  324. bool ctkEAPooledExecutor::DiscardOldestWhenBlocked::blockedAction(ctkEARunnable* command)
  325. {
  326. pe->handOff_->poll(0);
  327. if (!pe->handOff_->offer(command, 0))
  328. {
  329. const bool autoDelete = command->autoDelete();
  330. command->run();
  331. if (autoDelete && !--command->ref) delete command;
  332. }
  333. return true;
  334. }
  335. void ctkEAPooledExecutor::addThread(ctkEARunnable* command)
  336. {
  337. Worker* worker = new Worker(this, command);
  338. ++worker->ref;
  339. ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);
  340. threads_.insert(worker, thread);
  341. ++poolSize_;
  342. // do some garbage collection
  343. foreach (ctkEAInterruptibleThread* t, stoppedThreads_)
  344. {
  345. if (t != ctkEAInterruptibleThread::currentThread() && t->isFinished())
  346. {
  347. delete t;
  348. stoppedThreads_.removeAll(t);
  349. }
  350. }
  351. thread->start();
  352. }
  353. void ctkEAPooledExecutor::workerDone(Worker* w)
  354. {
  355. QMutexLocker lock(&mutex);
  356. stoppedThreads_ << threads_.take(w);
  357. if (--poolSize_ == 0 && shutdown_)
  358. {
  359. maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
  360. waitCond.wakeAll(); // notify awaitTerminationAfterShutdown
  361. }
  362. // Create a replacement if needed
  363. if (poolSize_ == 0 || poolSize_ < minimumPoolSize_)
  364. {
  365. try
  366. {
  367. ctkEARunnable* r = handOff_->poll(0);
  368. if (r != 0 && !shutdown_) // just consume task if shut down
  369. addThread(r);
  370. }
  371. catch(const ctkEAInterruptedException& ) {
  372. return;
  373. }
  374. }
  375. }
  376. ctkEARunnable* ctkEAPooledExecutor::getTask()
  377. {
  378. long waitTime;
  379. {
  380. QMutexLocker lock(&mutex);
  381. if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
  382. return 0;
  383. waitTime = (shutdown_)? 0 : keepAliveTime_;
  384. }
  385. if (waitTime >= 0)
  386. return handOff_->poll(waitTime);
  387. else
  388. return handOff_->take();
  389. }