ctkEAPooledExecutor_p.h 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723
  1. /*=============================================================================
  2. Library: CTK
  3. Copyright (c) German Cancer Research Center,
  4. Division of Medical and Biological Informatics
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. =============================================================================*/
  15. #ifndef CTKEAPOOLEDEXECUTOR_P_H
  16. #define CTKEAPOOLEDEXECUTOR_P_H
  17. #include "ctkEAThreadFactoryUser_p.h"
  18. #include <QHash>
  19. #include <QRunnable>
  20. #include <QWaitCondition>
  21. struct ctkEAChannel;
  22. class ctkEAInterruptibleThread;
  23. /**
  24. * A tunable, extensible thread pool class. The main supported public
  25. * method is <code>execute(ctkEARunnable* command)</code>, which can be
  26. * called instead of directly creating threads to execute commands.
  27. *
  28. * <p>
  29. * Thread pools can be useful for several, usually intertwined
  30. * reasons:
  31. *
  32. * <ul>
  33. *
  34. * <li> To bound resource use. A limit can be placed on the maximum
  35. * number of simultaneously executing threads.
  36. *
  37. * <li> To manage concurrency levels. A targeted number of threads
  38. * can be allowed to execute simultaneously.
  39. *
  40. * <li> To manage a set of threads performing related tasks.
  41. *
  42. * <li> To minimize overhead, by reusing previously constructed
  43. * Thread objects rather than creating new ones. (Note however
  44. * that pools are hardly ever cure-alls for performance problems
  45. * associated with thread construction, especially in programs that
  46. * themselves internally pool or recycle threads.)
  47. *
  48. * </ul>
  49. *
  50. * These goals introduce a number of policy parameters that are
  51. * encapsulated in this class. All of these parameters have defaults
  52. * and are tunable, either via get/set methods, or, in cases where
  53. * decisions should hold across lifetimes, via methods that can be
  54. * easily overridden in subclasses. The main, most commonly set
  55. * parameters can be established in constructors. Policy choices
  56. * across these dimensions can and do interact. Be careful, and
  57. * please read this documentation completely before using! See also
  58. * the usage examples below.
  59. *
  60. * <dl>
  61. * <dt> Queueing
  62. *
  63. * <dd> By default, this pool uses queueless synchronous channels to
  64. * to hand off work to threads. This is a safe, conservative policy
  65. * that avoids lockups when handling sets of requests that might
  66. * have internal dependencies. (In these cases, queuing one task
  67. * could lock up another that would be able to continue if the
  68. * queued task were to run.) If you are sure that this cannot
  69. * happen, then you can instead supply a queue of some sort (for
  70. * example, a BoundedBuffer or LinkedQueue) in the constructor.
  71. * This will cause new commands to be queued in cases where all
  72. * MaximumPoolSize threads are busy. Queues are sometimes
  73. * appropriate when each task is completely independent of others,
  74. * so tasks cannot affect each others execution. For example, in an
  75. * http server. <p>
  76. *
  77. * When given a choice, this pool always prefers adding a new thread
  78. * rather than queueing if there are currently fewer than the
  79. * current getMinimumPoolSize threads running, but otherwise always
  80. * prefers queuing a request rather than adding a new thread. Thus,
  81. * if you use an unbounded buffer, you will never have more than
  82. * getMinimumPoolSize threads running. (Since the default
  83. * minimumPoolSize is one, you will probably want to explicitly
  84. * setMinimumPoolSize.) <p>
  85. *
  86. * While queuing can be useful in smoothing out transient bursts of
  87. * requests, especially in socket-based services, it is not very
  88. * well behaved when commands continue to arrive on average faster
  89. * than they can be processed. Using bounds for both the queue and
  90. * the pool size, along with run-when-blocked policy is often a
  91. * reasonable response to such possibilities. <p>
  92. *
  93. * Queue sizes and maximum pool sizes can often be traded off for
  94. * each other. Using large queues and small pools minimizes CPU
  95. * usage, OS resources, and context-switching overhead, but can lead
  96. * to artifically low throughput. Especially if tasks frequently
  97. * block (for example if they are I/O bound), the underlying
  98. * OS may be able to schedule time for more threads than you
  99. * otherwise allow. Use of small queues or queueless handoffs
  100. * generally requires larger pool sizes, which keeps CPUs busier but
  101. * may encounter unacceptable scheduling overhead, which also
  102. * decreases throughput. <p>
  103. *
  104. * <dt> Maximum Pool size
  105. *
  106. * <dd> The maximum number of threads to use, when needed. The pool
  107. * does not by default preallocate threads. Instead, a thread is
  108. * created, if necessary and if there are fewer than the maximum,
  109. * only when an <code>execute</code> request arrives. The default
  110. * value is (for all practical purposes) infinite --
  111. * <code>std::numeric_limits<int>::max()</code>, so should be set in the
  112. * constructor or the set method unless you are just using the pool
  113. * to minimize construction overhead. Because task handoffs to idle
  114. * worker threads require synchronization that in turn relies on OS
  115. * scheduling policies to ensure progress, it is possible that a new
  116. * thread will be created even though an existing worker thread has
  117. * just become idle but has not progressed to the point at which it
  118. * can accept a new task. This phenomenon tends to occur when bursts
  119. * of short tasks are executed. <p>
  120. *
  121. * <dt> Minimum Pool size
  122. *
  123. * <dd> The minimum number of threads to use, when needed (default
  124. * 1). When a new request is received, and fewer than the minimum
  125. * number of threads are running, a new thread is always created to
  126. * handle the request even if other worker threads are idly waiting
  127. * for work. Otherwise, a new thread is created only if there are
  128. * fewer than the maximum and the request cannot immediately be
  129. * queued. <p>
  130. *
  131. * <dt> Preallocation
  132. *
  133. * <dd> You can override lazy thread construction policies via
  134. * method createThreads, which establishes a given number of warm
  135. * threads. Be aware that these preallocated threads will time out
  136. * and die (and later be replaced with others if needed) if not used
  137. * within the keep-alive time window. If you use preallocation, you
  138. * probably want to increase the keepalive time. The difference
  139. * between setMinimumPoolSize and createThreads is that
  140. * createThreads immediately establishes threads, while setting the
  141. * minimum pool size waits until requests arrive. <p>
  142. *
  143. * <dt> Keep-alive time
  144. *
  145. * <dd> If the pool maintained references to a fixed set of threads
  146. * in the pool, then it would impede garbage collection of otherwise
  147. * idle threads. This would defeat the resource-management aspects
  148. * of pools. One solution would be to use weak references. However,
  149. * this would impose costly and difficult synchronization issues.
  150. * Instead, threads are simply allowed to terminate and thus be
  151. * destroyed if they have been idle for the given keep-alive time. The
  152. * value of this parameter represents a trade-off between destruction
  153. * and construction time. The default keep-alive value is one minute, which
  154. * means that the time needed to construct and then destroy a thread is
  155. * expended at most once per minute.
  156. * <p>
  157. *
  158. * To establish worker threads permanently, use a <em>negative</em>
  159. * argument to setKeepAliveTime. <p>
  160. *
  161. * <dt> Blocked execution policy
  162. *
  163. * <dd> If the maximum pool size or queue size is bounded, then it
  164. * is possible for incoming <code>execute</code> requests to
  165. * block. There are four supported policies for handling this
  166. * problem, and mechanics (based on the Strategy Object pattern) to
  167. * allow others in subclasses: <p>
  168. *
  169. * <dl>
  170. * <dt> Run (the default)
  171. * <dd> The thread making the <code>execute</code> request
  172. * runs the task itself. This policy helps guard against lockup.
  173. * <dt> Wait
  174. * <dd> Wait until a thread becomes available. This
  175. * policy should, in general, not be used if the minimum number of
  176. * of threads is zero, in which case a thread may never become
  177. * available.
  178. * <dt> Abort
  179. * <dd> Throw a ctkRuntimeException
  180. * <dt> Discard
  181. * <dd> Throw away the current request and return.
  182. * <dt> DiscardOldest
  183. * <dd> Throw away the oldest request and return.
  184. * </dl>
  185. *
  186. * Other plausible policies include raising the maximum pool size
  187. * after checking with some other objects that this is OK. <p>
  188. *
  189. * These cases can never occur if the maximum pool size is unbounded
  190. * or the queue is unbounded. In these cases you instead face
  191. * potential resource exhaustion.) The execute method does not
  192. * throw any checked exceptions in any of these cases since any
  193. * errors associated with them must normally be dealt with via
  194. * handlers or callbacks. (Although in some cases, these might be
  195. * associated with throwing unchecked exceptions.) You may wish to
  196. * add special implementations even if you choose one of the listed
  197. * policies. For example, the supplied Discard policy does not
  198. * inform the caller of the drop. You could add your own version
  199. * that does so. Since choice of policies is normally a system-wide
  200. * decision, selecting a policy affects all calls to
  201. * <code>execute</code>. If for some reason you would instead like
  202. * to make per-call decisions, you could add variant versions of the
  203. * <code>execute</code> method (for example,
  204. * <code>executeIfWouldNotBlock</code>) in subclasses. <p>
  205. *
  206. * <dt> Thread construction parameters
  207. *
  208. * <dd> A settable ctkEAThreadFactory establishes each new thread. By
  209. * default, it merely generates a new instance of class ctkEAInterruptibleThread, but
  210. * can be changed to use a ctkEAInterruptibleThread subclass, to set priorities,
  211. * ThreadLocals, etc. <p>
  212. *
  213. * <dt> Interruption policy
  214. *
  215. * <dd> Worker threads check for interruption after processing each
  216. * command, and terminate upon interruption. Fresh threads will
  217. * replace them if needed. Thus, new tasks will not start out in an
  218. * interrupted state due to an uncleared interruption in a previous
  219. * task. Also, unprocessed commands are never dropped upon
  220. * interruption. It would conceptually suffice simply to clear
  221. * interruption between tasks, but implementation characteristics of
  222. * interruption-based methods are uncertain enough to warrant this
  223. * conservative strategy. It is a good idea to be equally
  224. * conservative in your code for the tasks running within pools.
  225. * <p>
  226. *
  227. * <dt> Shutdown policy
  228. *
  229. * <dd> The interruptAll method interrupts, but does not disable the
  230. * pool. Two different shutdown methods are supported for use when
  231. * you do want to (permanently) stop processing tasks. Method
  232. * shutdownAfterProcessingCurrentlyQueuedTasks waits until all
  233. * current tasks are finished. The shutDownNow method interrupts
  234. * current threads and leaves other queued requests unprocessed.
  235. * <p>
  236. *
  237. * <dt> Handling requests after shutdown
  238. *
  239. * <dd> When the pool is shutdown, new incoming requests are handled
  240. * by the blockedExecutionHandler. By default, the handler is set to
  241. * discard new requests, but this can be set with an optional
  242. * argument to method
  243. * shutdownAfterProcessingCurrentlyQueuedTasks. <p> Also, if you are
  244. * using some form of queuing, you may wish to call method drain()
  245. * to remove (and return) unprocessed commands from the queue after
  246. * shutting down the pool and its clients. If you need to be sure
  247. * these commands are processed, you can then run() each of the
  248. * commands in the list returned by drain().
  249. *
  250. * </dl>
  251. * <p>
  252. *
  253. * <b>Usage examples.</b>
  254. * <p>
  255. *
  256. * Probably the most common use of pools is in statics or singletons
  257. * accessible from a number of classes in a library; for example:
  258. *
  259. * \code
  260. * class MyPool
  261. * {
  262. * public:
  263. * static ctkEAPooledExecutor pool;
  264. * };
  265. *
  266. * // initialize to use a maximum of 8 threads.
  267. * ctkEAPooledExecutor MyPool::pool(8);
  268. * \endcode
  269. *
  270. * Here are some sample variants in initialization:
  271. * <ol>
  272. * <li> Using a bounded buffer of 10 tasks, at least 4 threads (started only
  273. * when needed due to incoming requests), but allowing
  274. * up to 100 threads if the buffer gets full.
  275. * \code
  276. * pool(new ctkEABoundedBuffer(10), 100);
  277. * pool.setMinimumPoolSize(4);
  278. * \endcode
  279. * <li> Same as (1), except pre-start 9 threads, allowing them to
  280. * die if they are not used for five minutes.
  281. * \code
  282. * pool(new ctkEABoundedBuffer(10), 100);
  283. * pool.setMinimumPoolSize(4);
  284. * pool.setKeepAliveTime(1000 * 60 * 5);
  285. * pool.createThreads(9);
  286. * \endcode
  287. * <li> Same as (2) except clients abort if both the buffer is full and
  288. * all 100 threads are busy:
  289. * \code
  290. * pool(new ctkEABoundedBuffer(10), 100);
  291. * pool.setMinimumPoolSize(4);
  292. * pool.setKeepAliveTime(1000 * 60 * 5);
  293. * pool.abortWhenBlocked();
  294. * pool.createThreads(9);
  295. * \endcode
  296. * <li> An unbounded queue serviced by exactly 5 threads:
  297. * \code
  298. * pool(new ctkEALinkedQueue());
  299. * pool.setKeepAliveTime(-1); // live forever
  300. * pool.createThreads(5);
  301. * \endcode
  302. * </ol>
  303. *
  304. * <p>
  305. * <b>Usage notes.</b>
  306. * <p>
  307. *
  308. * Pools do not mesh well with using thread-specific storage. Thread local
  309. * data usually relies on the identity of a
  310. * thread executing a particular task. Pools use the same thread to
  311. * perform different tasks. <p>
  312. *
  313. * If you need a policy not handled by the parameters in this class
  314. * consider writing a subclass. <p>
  315. *
  316. * The design of this class was inspired by:
  317. * href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
  318. **/
  319. class ctkEAPooledExecutor : public ctkEAThreadFactoryUser
  320. {
  321. public:
  322. /**
  323. * The maximum pool size; used if not otherwise specified. Default
  324. * value is essentially infinite (std::numeric_limits<int>::max())
  325. **/
  326. static const int DEFAULT_MAXIMUMPOOLSIZE; // = std::numeric_limits<int>::max()
  327. /**
  328. * The minimum pool size; used if not otherwise specified. Default
  329. * value is 1.
  330. **/
  331. static const int DEFAULT_MINIMUMPOOLSIZE; // = 1
  332. /**
  333. * The maximum time to keep worker threads alive waiting for new
  334. * tasks; used if not otherwise specified. Default value is one
  335. * minute (60000 milliseconds).
  336. **/
  337. static const long DEFAULT_KEEPALIVETIME; // = 60 * 1000
  338. /**
  339. * Class for actions to take when execute() blocks. Uses Strategy
  340. * pattern to represent different actions. You can add more in
  341. * subclasses, and/or create subclasses of these. If so, you will
  342. * also want to add or modify the corresponding methods that set the
  343. * current blockedExectionHandler_.
  344. **/
  345. struct BlockedExecutionHandler
  346. {
  347. virtual ~BlockedExecutionHandler() {}
  348. /**
  349. * Return true if successfully handled so, execute should
  350. * terminate; else return false if execute loop should be retried.
  351. **/
  352. virtual bool blockedAction(ctkEARunnable* command) = 0;
  353. };
  354. protected:
  355. class Worker;
  356. mutable QMutex shutdownMutex;
  357. mutable QWaitCondition waitCond;
  358. /** The maximum number of threads allowed in pool. **/
  359. int maximumPoolSize_;
  360. /** The minumum number of threads to maintain in pool. **/
  361. int minimumPoolSize_;
  362. /** Current pool size. **/
  363. int poolSize_;
  364. /** The maximum time for an idle thread to wait for new task. **/
  365. long keepAliveTime_;
  366. /**
  367. * Shutdown flag - latches true when a shutdown method is called
  368. * in order to disable queuing/handoffs of new tasks.
  369. **/
  370. bool shutdown_;
  371. /**
  372. * The channel used to hand off the command to a thread in the pool.
  373. **/
  374. ctkEAChannel* handOff_;
  375. /**
  376. * The set of active threads, declared as a map from workers to
  377. * their threads. This is needed by the interruptAll() method. It
  378. * may also be useful in subclasses that need to perform other
  379. * thread management chores.
  380. **/
  381. QHash<Worker*, ctkEAInterruptibleThread*> threads_;
  382. /**
  383. * Keeps a list of stopped threads which will be deleted later.
  384. */
  385. QList<ctkEAInterruptibleThread*> stoppedThreads_;
  386. /** The current handler for unserviceable requests. **/
  387. BlockedExecutionHandler* blockedExecutionHandler_;
  388. public:
  389. /**
  390. * Create a new pool that uses the supplied Channel for queuing, and
  391. * with all default parameter settings except for maximum pool size.
  392. **/
  393. ctkEAPooledExecutor(ctkEAChannel* channel, int maxPoolSize = DEFAULT_MAXIMUMPOOLSIZE);
  394. ~ctkEAPooledExecutor();
  395. /**
  396. * Return the maximum number of threads to simultaneously execute
  397. * New unqueued requests will be handled according to the current
  398. * blocking policy once this limit is exceeded.
  399. **/
  400. int getMaximumPoolSize() const;
  401. /**
  402. * Set the maximum number of threads to use. Decreasing the pool
  403. * size will not immediately kill existing threads, but they may
  404. * later die when idle.
  405. *
  406. * @throws ctkInvalidArgumentException if less or equal to zero.
  407. * (It is not considered an error to set the maximum to be less than than
  408. * the minimum. However, in this case there are no guarantees
  409. * about behavior.)
  410. **/
  411. void setMaximumPoolSize(int newMaximum);
  412. /**
  413. * Return the minimum number of threads to simultaneously execute.
  414. * (Default value is 1). If fewer than the mininum number are
  415. * running upon reception of a new request, a new thread is started
  416. * to handle this request.
  417. **/
  418. int getMinimumPoolSize() const;
  419. /**
  420. * Set the minimum number of threads to use.
  421. *
  422. * @throws ctkInvalidArgumentException if less than zero. (It is not
  423. * considered an error to set the minimum to be greater than the
  424. * maximum. However, in this case there are no guarantees about
  425. * behavior.)
  426. **/
  427. void setMinimumPoolSize(int newMinimum);
  428. /**
  429. * Return the current number of active threads in the pool. This
  430. * number is just a snaphot, and may change immediately upon
  431. * returning
  432. **/
  433. int getPoolSize() const;
  434. /**
  435. * Return the number of milliseconds to keep threads alive waiting
  436. * for new commands. A negative value means to wait forever. A zero
  437. * value means not to wait at all.
  438. **/
  439. long getKeepAliveTime() const;
  440. /**
  441. * Set the number of milliseconds to keep threads alive waiting for
  442. * new commands. A negative value means to wait forever. A zero
  443. * value means not to wait at all.
  444. **/
  445. void setKeepAliveTime(long msecs);
  446. /** Get the handler for blocked execution **/
  447. BlockedExecutionHandler* getBlockedExecutionHandler() const;
  448. /** Set the handler for blocked execution **/
  449. void setBlockedExecutionHandler(BlockedExecutionHandler* h);
  450. /**
  451. * Create and start up to numberOfThreads threads in the pool.
  452. * Return the number created. This may be less than the number
  453. * requested if creating more would exceed maximum pool size bound.
  454. **/
  455. int createThreads(int numberOfThreads);
  456. /**
  457. * Interrupt all threads in the pool, causing them all to
  458. * terminate. Assuming that executed tasks do not disable (clear)
  459. * interruptions, each thread will terminate after processing its
  460. * current task. Threads will terminate sooner if the executed tasks
  461. * themselves respond to interrupts.
  462. **/
  463. void interruptAll();
  464. /**
  465. * Interrupt all threads and disable construction of new
  466. * threads. Any tasks entered after this point will be discarded. A
  467. * shut down pool cannot be restarted.
  468. */
  469. void shutdownNow();
  470. /**
  471. * Interrupt all threads and disable construction of new
  472. * threads. Any tasks entered after this point will be handled by
  473. * the given BlockedExecutionHandler. A shut down pool cannot be
  474. * restarted.
  475. */
  476. void shutdownNow(BlockedExecutionHandler* handler);
  477. /**
  478. * Terminate threads after processing all elements currently in
  479. * queue. Any tasks entered after this point will be discarded. A
  480. * shut down pool cannot be restarted.
  481. **/
  482. void shutdownAfterProcessingCurrentlyQueuedTasks();
  483. /**
  484. * Terminate threads after processing all elements currently in
  485. * queue. Any tasks entered after this point will be handled by the
  486. * given BlockedExecutionHandler. A shut down pool cannot be
  487. * restarted.
  488. **/
  489. void shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler* handler);
  490. /**
  491. * Return true if a shutDown method has succeeded in terminating all
  492. * threads.
  493. */
  494. bool isTerminatedAfterShutdown() const;
  495. /**
  496. * Wait for a shutdown pool to fully terminate, or until the timeout
  497. * has expired. This method may only be called <em>after</em>
  498. * invoking shutdownNow or
  499. * shutdownAfterProcessingCurrentlyQueuedTasks.
  500. *
  501. * @param maxWaitTime the maximum time in milliseconds to wait
  502. * @return true if the pool has terminated within the max wait period
  503. * @throws ctkIllegalStateException if shutdown has not been requested
  504. * @throws ctkEAInterruptedException if the current thread has been interrupted in the course of waiting
  505. */
  506. bool awaitTerminationAfterShutdown(long maxWaitTime) const;
  507. /**
  508. * Wait for a shutdown pool to fully terminate. This method may
  509. * only be called <em>after</em> invoking shutdownNow or
  510. * shutdownAfterProcessingCurrentlyQueuedTasks.
  511. *
  512. * @throws ctkIllegalStateException if shutdown has not been requested
  513. * @throws ctkEAInterruptedException if the current thread has been interrupted in the course of waiting
  514. */
  515. void awaitTerminationAfterShutdown() const;
  516. /**
  517. * Remove all unprocessed tasks from pool queue, and return them in
  518. * a QList. This method should be used only when there are
  519. * not any active clients of the pool. Otherwise you face the
  520. * possibility that the method will loop pulling out tasks as
  521. * clients are putting them in. This method can be useful after
  522. * shutting down a pool (via shutdownNow) to determine whether there
  523. * are any pending tasks that were not processed. You can then, for
  524. * example execute all unprocessed commands via code along the lines
  525. * of:
  526. *
  527. * \code
  528. * QList<ctkEARunnable*> tasks = pool.drain();
  529. * foreach (ctkEARunnable* runnable, tasks)
  530. * runnable->run();
  531. * \endcode
  532. **/
  533. QList<ctkEARunnable*> drain();
  534. /**
  535. * Set the policy for blocked execution to be that the current
  536. * thread executes the command if there are no available threads in
  537. * the pool.
  538. **/
  539. void runWhenBlocked();
  540. /**
  541. * Set the policy for blocked execution to be to wait until a thread
  542. * is available, unless the pool has been shut down, in which case
  543. * the action is discarded.
  544. **/
  545. void waitWhenBlocked();
  546. /**
  547. * Set the policy for blocked execution to be to return without
  548. * executing the request.
  549. **/
  550. void discardWhenBlocked();
  551. /**
  552. * Set the policy for blocked execution to be to
  553. * throw a RuntimeException.
  554. **/
  555. void abortWhenBlocked();
  556. /**
  557. * Set the policy for blocked execution to be to discard the oldest
  558. * unhandled request
  559. **/
  560. void discardOldestWhenBlocked();
  561. /**
  562. * Arrange for the given command to be executed by a thread in this
  563. * pool. The method normally returns when the command has been
  564. * handed off for (possibly later) execution.
  565. **/
  566. void execute(ctkEARunnable* command);
  567. protected:
  568. /**
  569. * Class defining the basic run loop for pooled threads.
  570. **/
  571. class Worker : public ctkEARunnable
  572. {
  573. protected:
  574. ctkEARunnable* firstTask_;
  575. public:
  576. Worker(ctkEAPooledExecutor* pe, ctkEARunnable* firstTask);
  577. void run();
  578. private:
  579. ctkEAPooledExecutor* pe;
  580. };
  581. /** Class defining Run action. **/
  582. struct RunWhenBlocked : public BlockedExecutionHandler
  583. {
  584. bool blockedAction(ctkEARunnable* command);
  585. };
  586. RunWhenBlocked runWhenBlocked_;
  587. /** Class defining Wait action. **/
  588. struct WaitWhenBlocked : public BlockedExecutionHandler
  589. {
  590. WaitWhenBlocked(ctkEAPooledExecutor* pe);
  591. bool blockedAction(ctkEARunnable* command);
  592. private:
  593. ctkEAPooledExecutor* pe;
  594. };
  595. WaitWhenBlocked waitWhenBlocked_;
  596. /** Class defining Discard action. **/
  597. struct DiscardWhenBlocked : public BlockedExecutionHandler
  598. {
  599. bool blockedAction(ctkEARunnable* command);
  600. };
  601. DiscardWhenBlocked discardWhenBlocked_;
  602. /** Class defining Abort action. **/
  603. struct AbortWhenBlocked : public BlockedExecutionHandler
  604. {
  605. bool blockedAction(ctkEARunnable* command);
  606. };
  607. AbortWhenBlocked abortWhenBlocked_;
  608. /**
  609. * Class defining DiscardOldest action. Under this policy, at most
  610. * one old unhandled task is discarded. If the new task can then be
  611. * handed off, it is. Otherwise, the new task is run in the current
  612. * thread (i.e., RunWhenBlocked is used as a backup policy.)
  613. **/
  614. struct DiscardOldestWhenBlocked : public BlockedExecutionHandler
  615. {
  616. DiscardOldestWhenBlocked(ctkEAPooledExecutor* pe);
  617. bool blockedAction(ctkEARunnable* command);
  618. private:
  619. ctkEAPooledExecutor* pe;
  620. };
  621. DiscardOldestWhenBlocked discardOldestWhenBlocked_;
  622. /**
  623. * Create and start a thread to handle a new command. Call only
  624. * when holding lock.
  625. **/
  626. void addThread(ctkEARunnable* command);
  627. /**
  628. * Cleanup method called upon termination of worker thread.
  629. **/
  630. void workerDone(Worker* w);
  631. /**
  632. * Get a task from the handoff queue, or null if shutting down.
  633. **/
  634. ctkEARunnable* getTask();
  635. };
  636. #endif // CTKEAPOOLEDEXECUTOR_P_H