ctkEAPooledExecutor.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /*=============================================================================
  2. Library: CTK
  3. Copyright (c) German Cancer Research Center,
  4. Division of Medical and Biological Informatics
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. =============================================================================*/
  15. #include "ctkEAPooledExecutor_p.h"
  16. // for ctk::msecsTo() - remove after switching to Qt 4.7
  17. #include <ctkUtils.h>
  18. #include "ctkEAChannel_p.h"
  19. #include <dispatch/ctkEAInterruptibleThread_p.h>
  20. #include <dispatch/ctkEAInterruptedException_p.h>
  21. #include <limits>
  22. #include <stdexcept>
  23. #include <QDateTime>
  24. #include <QDebug>
  25. const int ctkEAPooledExecutor::DEFAULT_MAXIMUMPOOLSIZE = std::numeric_limits<int>::max();
  26. const int ctkEAPooledExecutor::DEFAULT_MINIMUMPOOLSIZE = 1;
  27. const long ctkEAPooledExecutor::DEFAULT_KEEPALIVETIME = 60 * 1000;
  28. ctkEAPooledExecutor::ctkEAPooledExecutor(ctkEAChannel* channel, int maxPoolSize)
  29. : maximumPoolSize_(maxPoolSize), minimumPoolSize_(DEFAULT_MINIMUMPOOLSIZE),
  30. poolSize_(0), keepAliveTime_(DEFAULT_KEEPALIVETIME), shutdown_(false),
  31. handOff_(channel), blockedExecutionHandler_(0), waitWhenBlocked_(this),
  32. discardOldestWhenBlocked_(this)
  33. {
  34. runWhenBlocked();
  35. }
  36. ctkEAPooledExecutor::~ctkEAPooledExecutor()
  37. {
  38. delete handOff_;
  39. qDeleteAll(stoppedThreads_);
  40. }
  41. int ctkEAPooledExecutor::getMaximumPoolSize() const
  42. {
  43. QMutexLocker lock(&mutex);
  44. return maximumPoolSize_;
  45. }
  46. void ctkEAPooledExecutor::setMaximumPoolSize(int newMaximum)
  47. {
  48. QMutexLocker lock(&mutex);
  49. if (newMaximum <= 0) throw ctkInvalidArgumentException("maximum must be > 0");
  50. maximumPoolSize_ = newMaximum;
  51. }
  52. int ctkEAPooledExecutor::getMinimumPoolSize() const
  53. {
  54. QMutexLocker lock(&mutex);
  55. return minimumPoolSize_;
  56. }
  57. void ctkEAPooledExecutor::setMinimumPoolSize(int newMinimum)
  58. {
  59. QMutexLocker lock(&mutex);
  60. if (newMinimum < 0) throw ctkInvalidArgumentException("minimum must be >= 0");
  61. minimumPoolSize_ = newMinimum;
  62. }
  63. int ctkEAPooledExecutor::getPoolSize() const
  64. {
  65. QMutexLocker lock(&mutex);
  66. return poolSize_;
  67. }
  68. long ctkEAPooledExecutor::getKeepAliveTime() const
  69. {
  70. QMutexLocker lock(&mutex);
  71. return keepAliveTime_;
  72. }
  73. void ctkEAPooledExecutor::setKeepAliveTime(long msecs)
  74. {
  75. QMutexLocker lock(&mutex);
  76. keepAliveTime_ = msecs;
  77. }
  78. ctkEAPooledExecutor::BlockedExecutionHandler* ctkEAPooledExecutor::getBlockedExecutionHandler() const
  79. {
  80. QMutexLocker lock(&mutex);
  81. return blockedExecutionHandler_;
  82. }
  83. void ctkEAPooledExecutor::setBlockedExecutionHandler(BlockedExecutionHandler* h)
  84. {
  85. QMutexLocker lock(&mutex);
  86. blockedExecutionHandler_ = h;
  87. }
  88. int ctkEAPooledExecutor::createThreads(int numberOfThreads)
  89. {
  90. int ncreated = 0;
  91. for (int i = 0; i < numberOfThreads; ++i)
  92. {
  93. if (poolSize_ < maximumPoolSize_)
  94. {
  95. addThread(0);
  96. ++ncreated;
  97. }
  98. else
  99. break;
  100. }
  101. return ncreated;
  102. }
  103. void ctkEAPooledExecutor::interruptAll()
  104. {
  105. QMutexLocker lock(&mutex);
  106. foreach (ctkEAInterruptibleThread* t, threads_)
  107. {
  108. t->interrupt();
  109. }
  110. }
  111. void ctkEAPooledExecutor::shutdownNow()
  112. {
  113. shutdownNow(&discardWhenBlocked_);
  114. }
  115. void ctkEAPooledExecutor::shutdownNow(BlockedExecutionHandler* handler)
  116. {
  117. QMutexLocker lock(&mutex);
  118. setBlockedExecutionHandler(handler);
  119. shutdown_ = true; // don't allow new tasks
  120. minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
  121. interruptAll(); // interrupt all existing threads
  122. }
  123. void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks()
  124. {
  125. shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
  126. }
  127. void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler* handler)
  128. {
  129. QMutexLocker lock(&mutex);
  130. setBlockedExecutionHandler(handler);
  131. shutdown_ = true;
  132. if (poolSize_ == 0) // disable new thread construction when idle
  133. minimumPoolSize_ = maximumPoolSize_ = 0;
  134. }
  135. bool ctkEAPooledExecutor::isTerminatedAfterShutdown() const
  136. {
  137. QMutexLocker lock(&mutex);
  138. return shutdown_ && poolSize_ == 0;
  139. }
  140. bool ctkEAPooledExecutor::awaitTerminationAfterShutdown(long maxWaitTime) const
  141. {
  142. QMutexLocker lock(&mutex);
  143. QMutexLocker shutdownLock(&shutdownMutex);
  144. if (!shutdown_)
  145. throw ctkIllegalStateException("not in shutdown state");
  146. if (poolSize_ == 0)
  147. return true;
  148. qint64 waitTime = static_cast<qint64>(maxWaitTime);
  149. if (waitTime <= 0)
  150. return false;
  151. //TODO Use Qt4.7 API
  152. QDateTime start = QDateTime::currentDateTime();
  153. forever
  154. {
  155. waitCond.wait(&shutdownMutex, waitTime);
  156. if (poolSize_ == 0)
  157. return true;
  158. qint64 currWait = ctk::msecsTo(start, QDateTime::currentDateTime());
  159. waitTime = static_cast<qint64>(maxWaitTime) - currWait;
  160. if (waitTime <= 0)
  161. return false;
  162. }
  163. }
  164. void ctkEAPooledExecutor::awaitTerminationAfterShutdown() const
  165. {
  166. QMutexLocker lock(&mutex);
  167. if (!shutdown_)
  168. throw ctkIllegalStateException("not in shutdown state");
  169. while (poolSize_ > 0)
  170. {
  171. lock.unlock();
  172. QMutexLocker shutdownLock(&shutdownMutex);
  173. waitCond.wait(&shutdownMutex);
  174. lock.relock();
  175. // worker is done, wait for possibly not yet finished worker-thread
  176. foreach(QThread* t, stoppedThreads_)
  177. {
  178. t->wait();
  179. }
  180. }
  181. }
  182. QList<ctkEARunnable*> ctkEAPooledExecutor::drain()
  183. {
  184. bool wasInterrupted = false;
  185. QList<ctkEARunnable*> tasks;
  186. forever
  187. {
  188. try
  189. {
  190. ctkEARunnable* x = handOff_->poll(0);
  191. if (x == 0)
  192. break;
  193. else
  194. tasks.push_back(x);
  195. }
  196. catch (const ctkEAInterruptedException& )
  197. {
  198. wasInterrupted = true; // postpone re-interrupt until drained
  199. }
  200. }
  201. if (wasInterrupted)
  202. {
  203. qobject_cast<ctkEAInterruptibleThread*>(QThread::currentThread())->interrupt();
  204. }
  205. return tasks;
  206. }
  207. void ctkEAPooledExecutor::runWhenBlocked()
  208. {
  209. setBlockedExecutionHandler(&runWhenBlocked_);
  210. }
  211. void ctkEAPooledExecutor::waitWhenBlocked()
  212. {
  213. setBlockedExecutionHandler(&waitWhenBlocked_);
  214. }
  215. void ctkEAPooledExecutor::discardWhenBlocked()
  216. {
  217. setBlockedExecutionHandler(&discardWhenBlocked_);
  218. }
  219. void ctkEAPooledExecutor::abortWhenBlocked()
  220. {
  221. setBlockedExecutionHandler(&abortWhenBlocked_);
  222. }
  223. void ctkEAPooledExecutor::discardOldestWhenBlocked()
  224. {
  225. setBlockedExecutionHandler(&discardOldestWhenBlocked_);
  226. }
  227. void ctkEAPooledExecutor::execute(ctkEARunnable* command)
  228. {
  229. forever
  230. {
  231. {
  232. QMutexLocker lock(&mutex);
  233. if (!shutdown_)
  234. {
  235. int size = poolSize_;
  236. // Ensure minimum number of threads
  237. if (size < minimumPoolSize_)
  238. {
  239. addThread(command);
  240. return;
  241. }
  242. // Try to give to existing thread
  243. if (handOff_->offer(command, 0))
  244. {
  245. return;
  246. }
  247. // If cannot handoff and still under maximum, create new thread
  248. if (size < maximumPoolSize_)
  249. {
  250. addThread(command);
  251. return;
  252. }
  253. }
  254. }
  255. // Cannot hand off and cannot create -- ask for help
  256. if (getBlockedExecutionHandler()->blockedAction(command))
  257. {
  258. return;
  259. }
  260. }
  261. }
  262. ctkEAPooledExecutor::Worker::Worker(ctkEAPooledExecutor* pe, ctkEARunnable* firstTask)
  263. : firstTask_(firstTask), pe(pe)
  264. {
  265. if (firstTask) ++firstTask->ref;
  266. }
  267. void ctkEAPooledExecutor::Worker::run()
  268. {
  269. try
  270. {
  271. ctkEARunnable* task = firstTask_;
  272. firstTask_ = 0;
  273. if (task != 0)
  274. {
  275. const bool autoDelete = task->autoDelete();
  276. task->run();
  277. if (autoDelete && !--task->ref) delete task;
  278. }
  279. while ( (task = pe->getTask()) != 0)
  280. {
  281. const bool autoDelete = task->autoDelete();
  282. task->run();
  283. if (autoDelete && !--task->ref) delete task;
  284. }
  285. }
  286. catch (const ctkEAInterruptedException&)
  287. {
  288. pe->workerDone(this);
  289. return;
  290. }
  291. pe->workerDone(this);
  292. }
  293. bool ctkEAPooledExecutor::RunWhenBlocked::blockedAction(ctkEARunnable* command)
  294. {
  295. const bool autoDelete = command->autoDelete();
  296. command->run();
  297. if (autoDelete && !--command->ref) delete command;
  298. return true;
  299. }
  300. ctkEAPooledExecutor::WaitWhenBlocked::WaitWhenBlocked(ctkEAPooledExecutor* pe)
  301. : pe(pe)
  302. {}
  303. bool ctkEAPooledExecutor::WaitWhenBlocked::blockedAction(ctkEARunnable* command)
  304. {
  305. {
  306. QMutexLocker lock(&pe->mutex);
  307. if (pe->shutdown_)
  308. return true;
  309. }
  310. pe->handOff_->put(command);
  311. return true;
  312. }
  313. bool ctkEAPooledExecutor::DiscardWhenBlocked::blockedAction(ctkEARunnable* command)
  314. {
  315. Q_UNUSED(command)
  316. return true;
  317. }
  318. bool ctkEAPooledExecutor::AbortWhenBlocked::blockedAction(ctkEARunnable* command)
  319. {
  320. Q_UNUSED(command)
  321. throw ctkRuntimeException("Pool is blocked");
  322. }
  323. ctkEAPooledExecutor::DiscardOldestWhenBlocked::DiscardOldestWhenBlocked(ctkEAPooledExecutor* pe)
  324. : pe(pe)
  325. {}
  326. bool ctkEAPooledExecutor::DiscardOldestWhenBlocked::blockedAction(ctkEARunnable* command)
  327. {
  328. ctkEARunnable* tmp = pe->handOff_->poll(0);
  329. if (tmp && tmp->autoDelete() && !--tmp->ref) delete tmp;
  330. if (!pe->handOff_->offer(command, 0))
  331. {
  332. const bool autoDelete = command->autoDelete();
  333. command->run();
  334. if (autoDelete && !--command->ref) delete command;
  335. }
  336. return true;
  337. }
  338. void ctkEAPooledExecutor::addThread(ctkEARunnable* command)
  339. {
  340. Worker* worker = new Worker(this, command);
  341. ++worker->ref;
  342. ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);
  343. threads_.insert(worker, thread);
  344. ++poolSize_;
  345. // do some garbage collection
  346. foreach (ctkEAInterruptibleThread* t, stoppedThreads_)
  347. {
  348. if (t != ctkEAInterruptibleThread::currentThread() && t->isFinished())
  349. {
  350. delete t;
  351. stoppedThreads_.removeAll(t);
  352. }
  353. }
  354. thread->start();
  355. }
  356. void ctkEAPooledExecutor::workerDone(Worker* w)
  357. {
  358. QMutexLocker lock(&mutex);
  359. stoppedThreads_ << threads_.take(w);
  360. if (--poolSize_ == 0 && shutdown_)
  361. {
  362. maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
  363. waitCond.wakeAll(); // notify awaitTerminationAfterShutdown
  364. }
  365. // Create a replacement if needed
  366. if (poolSize_ == 0 || poolSize_ < minimumPoolSize_)
  367. {
  368. try
  369. {
  370. ctkEARunnable* r = handOff_->poll(0);
  371. if (r != 0)
  372. {
  373. if(shutdown_) // just consume task if shut down
  374. {
  375. if (r->autoDelete() && !r->ref) delete r;
  376. }
  377. else
  378. {
  379. addThread(r);
  380. }
  381. }
  382. }
  383. catch(const ctkEAInterruptedException& ) {
  384. return;
  385. }
  386. }
  387. }
  388. ctkEARunnable* ctkEAPooledExecutor::getTask()
  389. {
  390. long waitTime;
  391. {
  392. QMutexLocker lock(&mutex);
  393. if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
  394. return 0;
  395. waitTime = (shutdown_)? 0 : keepAliveTime_;
  396. }
  397. if (waitTime >= 0)
  398. return handOff_->poll(waitTime);
  399. else
  400. return handOff_->take();
  401. }