ctkEACyclicBarrier_p.h 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. /*=============================================================================
  2. Library: CTK
  3. Copyright (c) German Cancer Research Center,
  4. Division of Medical and Biological Informatics
  5. Licensed under the Apache License, Version 2.0 (the "License");
  6. you may not use this file except in compliance with the License.
  7. You may obtain a copy of the License at
  8. http://www.apache.org/licenses/LICENSE-2.0
  9. Unless required by applicable law or agreed to in writing, software
  10. distributed under the License is distributed on an "AS IS" BASIS,
  11. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. See the License for the specific language governing permissions and
  13. limitations under the License.
  14. =============================================================================*/
  15. #ifndef CTKEACYCLICBARRIER_P_H
  16. #define CTKEACYCLICBARRIER_P_H
  17. #include <QMutex>
  18. #include <QWaitCondition>
  19. class ctkEARunnable;
  20. /**
  21. * A cyclic barrier is a reasonable choice for a barrier in contexts
  22. * involving a fixed sized group of threads that
  23. * must occasionally wait for each other.
  24. * <p>
  25. * ctkEACyclicBarriers use an all-or-none breakage model
  26. * for failed synchronization attempts: If threads
  27. * leave a barrier point prematurely because of timeout
  28. * or interruption, others will also leave abnormally
  29. * (via ctkEABrokenBarrierException), until
  30. * the barrier is <code>restart</code>ed. This is usually
  31. * the simplest and best strategy for sharing knowledge
  32. * about failures among cooperating threads in the most
  33. * common usages contexts of Barriers.
  34. * This implementation has the property that interruptions
  35. * among newly arriving threads can cause as-yet-unresumed
  36. * threads from a previous barrier cycle to return out
  37. * as broken. This transmits breakage
  38. * as early as possible, but with the possible byproduct that
  39. * only some threads returning out of a barrier will realize
  40. * that it is newly broken. (Others will not realize this until a
  41. * future cycle.)
  42. * <p>
  43. * Barriers support an optional ctkEARunnable command
  44. * that is run once per barrier point.
  45. * <p>
  46. * <b>Sample usage</b> Here is a code sketch of
  47. * a barrier in a parallel decomposition design.
  48. * \code
  49. * class Solver
  50. * {
  51. * int N;
  52. * float** data;
  53. * ctkEACyclicBarrier* barrier;
  54. *
  55. * class Worker : public ctkEARunnable
  56. * {
  57. * int myRow;
  58. *
  59. * public:
  60. * Worker(int row) : myRow(row) {}
  61. * void run()
  62. * {
  63. * while (!done())
  64. * {
  65. * processRow(myRow);
  66. *
  67. * try
  68. * {
  69. * barrier.barrier();
  70. * }
  71. * catch (const ctkEAInterruptedException& ex) { return; }
  72. * catch (const ctkEABrokenBarrierException& ex) { return; }
  73. * };
  74. * }
  75. * };
  76. *
  77. * class MyRunnable : public ctkEARunnable
  78. * {
  79. * public: void run() { mergeRows(...); }
  80. * };
  81. *
  82. * public:
  83. * Solver(float** matrix)
  84. * : data(matrix)
  85. * {
  86. * N = sizeof(matrix)/sizeof(float);
  87. * barrier = new ctkEACyclicBarrier(N);
  88. * barrier->setBarrierCommand(new MyRunnable());
  89. * for (int i = 0; i < N; ++i)
  90. * {
  91. * new ctkEAInterruptibleThread(new Worker(i))->start();
  92. * waitUntilDone();
  93. * }
  94. * }
  95. * };
  96. * \endcode
  97. *
  98. * The design of this class was inspired by:
  99. * http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
  100. **/
  101. class ctkEACyclicBarrier
  102. {
  103. public:
  104. /**
  105. * Create a ctkEACyclicBarrier for the indicated number of parties.
  106. * and the given command to run at each barrier point.
  107. * @throws std::invalid_argument if parties less than or equal to zero.
  108. **/
  109. ctkEACyclicBarrier(int parties, ctkEARunnable* command = 0);
  110. /**
  111. * Set the command to run at the point at which all threads reach the
  112. * barrier. This command is run exactly once, by the thread
  113. * that trips the barrier. The command is not run if the barrier is
  114. * broken.
  115. * @param command the command to run. If null, no command is run.
  116. * @return the previous command
  117. **/
  118. ctkEARunnable* setBarrierCommand(ctkEARunnable* command);
  119. /**
  120. * Returns true if the barrier has been compromised
  121. * by threads leaving the barrier before a synchronization
  122. * point (normally due to interruption or timeout).
  123. * Barrier methods in implementation classes throw
  124. * ctkEABrokenBarrierException upon detection of breakage.
  125. * Implementations may also support some means
  126. * to clear this status.
  127. **/
  128. bool broken() const;
  129. /**
  130. * Reset to initial state. Clears both the broken status
  131. * and any record of waiting threads, and releases all
  132. * currently waiting threads with indeterminate return status.
  133. * This method is intended only for use in recovery actions
  134. * in which it is somehow known
  135. * that no thread could possibly be relying on the
  136. * the synchronization properties of this barrier.
  137. **/
  138. void restart();
  139. /**
  140. * Return the number of parties that must meet per barrier
  141. * point. The number of parties is always at least 1.
  142. **/
  143. int parties() const;
  144. /**
  145. * Enter barrier and wait for the other parties()-1 threads.
  146. *
  147. * @return the arrival index: the number of other parties
  148. * that were still waiting
  149. * upon entry. This is a unique value from zero to parties()-1.
  150. * If it is zero, then the current
  151. * thread was the last party to hit barrier point
  152. * and so was responsible for releasing the others.
  153. * @throws ctkEABrokenBarrierException if any other thread
  154. * in any previous or current barrier
  155. * since either creation or the last <code>restart</code>
  156. * operation left the barrier
  157. * prematurely due to interruption or time-out. (If so,
  158. * the <code>broken</code> status is also set.)
  159. * Threads that are notified to have been
  160. * interrupted <em>after</em> being released are not considered
  161. * to have broken the barrier.
  162. * In all cases, the interruption
  163. * status of the current thread is preserved, so can be tested
  164. * by checking <code>ctkEAInterruptibleThread::interrupted()</code>.
  165. * @throws ctkEAInterruptedException if this thread was interrupted
  166. * during the barrier, and was the one causing breakage.
  167. * If so, <code>broken</code> status is also set.
  168. **/
  169. int barrier();
  170. /**
  171. * Enter barrier and wait at most msecs for the other parties()-1 threads.
  172. *
  173. * @return if not timed out, the arrival index: the number of other parties
  174. * that were still waiting
  175. * upon entry. This is a unique value from zero to parties()-1.
  176. * If it is zero, then the current
  177. * thread was the last party to hit barrier point
  178. * and so was responsible for releasing the others.
  179. * @throws ctkEABrokenBarrierException if any other thread
  180. * in any previous or current barrier
  181. * since either creation or the last <code>restart</code>
  182. * operation left the barrier
  183. * prematurely due to interruption or time-out. (If so,
  184. * the <code>broken</code> status is also set.)
  185. * Threads that are noticed to have been
  186. * interrupted <em>after</em> being released are not considered
  187. * to have broken the barrier.
  188. * In all cases, the interruption
  189. * status of the current thread is preserved, so can be tested
  190. * by checking <code>ctkEAInterruptibleThread::interrupted()</code>.
  191. * @throws ctkEAInterruptedException if this thread was interrupted
  192. * during the barrier. If so, <code>broken</code> status is also set.
  193. * @throws ctkEATimeoutException if this thread timed out waiting for
  194. * the barrier. If the timeout occured while already in the
  195. * barrier, <code>broken</code> status is also set.
  196. **/
  197. int attemptBarrier(long msecs);
  198. protected:
  199. int doBarrier(bool timed, long msecs);
  200. protected:
  201. const int parties_;
  202. bool broken_;
  203. ctkEARunnable* barrierCommand_;
  204. int count_; // number of parties still waiting
  205. int resets_; // incremented on each release
  206. mutable QMutex mutex;
  207. mutable QMutex monitor;
  208. QWaitCondition waitCond;
  209. };
  210. #endif // CTKEACYCLICBARRIER_P_H