| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 | /*=============================================================================  Library: CTK  Copyright (c) German Cancer Research Center,    Division of Medical and Biological Informatics  Licensed under the Apache License, Version 2.0 (the "License");  you may not use this file except in compliance with the License.  You may obtain a copy of the License at    http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or agreed to in writing, software  distributed under the License is distributed on an "AS IS" BASIS,  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the License for the specific language governing permissions and  limitations under the License.=============================================================================*/#include "ctkEAPooledExecutor_p.h"// for ctk::msecsTo() - remove after switching to Qt 4.7#include <ctkUtils.h>#include "ctkEAChannel_p.h"#include <dispatch/ctkEAInterruptibleThread_p.h>#include <dispatch/ctkEAInterruptedException_p.h>#include <limits>#include <stdexcept>#include <QDateTime>#include <QDebug>const int ctkEAPooledExecutor::DEFAULT_MAXIMUMPOOLSIZE = std::numeric_limits<int>::max();const int ctkEAPooledExecutor::DEFAULT_MINIMUMPOOLSIZE = 1;const long ctkEAPooledExecutor::DEFAULT_KEEPALIVETIME = 60 * 1000;ctkEAPooledExecutor::ctkEAPooledExecutor(ctkEAChannel* channel, int maxPoolSize)  : maximumPoolSize_(maxPoolSize), minimumPoolSize_(DEFAULT_MINIMUMPOOLSIZE),    poolSize_(0), keepAliveTime_(DEFAULT_KEEPALIVETIME), shutdown_(false),    handOff_(channel), blockedExecutionHandler_(0), waitWhenBlocked_(this),    discardOldestWhenBlocked_(this){  runWhenBlocked();}ctkEAPooledExecutor::~ctkEAPooledExecutor(){  delete handOff_;  qDeleteAll(stoppedThreads_);}int ctkEAPooledExecutor::getMaximumPoolSize() const{  QMutexLocker lock(&mutex);  return maximumPoolSize_;}void ctkEAPooledExecutor::setMaximumPoolSize(int newMaximum){  QMutexLocker lock(&mutex);  if (newMaximum <= 0) throw std::invalid_argument("maximum must be > 0");  maximumPoolSize_ = newMaximum;}int ctkEAPooledExecutor::getMinimumPoolSize() const{  QMutexLocker lock(&mutex);  return minimumPoolSize_;}void ctkEAPooledExecutor::setMinimumPoolSize(int newMinimum){  QMutexLocker lock(&mutex);  if (newMinimum < 0) throw std::invalid_argument("minimum must be >= 0");  minimumPoolSize_ = newMinimum;}int ctkEAPooledExecutor::getPoolSize() const{  QMutexLocker lock(&mutex);  return poolSize_;}long ctkEAPooledExecutor::getKeepAliveTime() const{  QMutexLocker lock(&mutex);  return keepAliveTime_;}void ctkEAPooledExecutor::setKeepAliveTime(long msecs){  QMutexLocker lock(&mutex);  keepAliveTime_ = msecs;}ctkEAPooledExecutor::BlockedExecutionHandler* ctkEAPooledExecutor::getBlockedExecutionHandler() const{  QMutexLocker lock(&mutex);  return blockedExecutionHandler_;}void ctkEAPooledExecutor::setBlockedExecutionHandler(BlockedExecutionHandler* h){  QMutexLocker lock(&mutex);  blockedExecutionHandler_ = h;}int ctkEAPooledExecutor::createThreads(int numberOfThreads){  int ncreated = 0;  for (int i = 0; i < numberOfThreads; ++i)  {    if (poolSize_ < maximumPoolSize_)    {      addThread(0);      ++ncreated;    }    else      break;  }  return ncreated;}void ctkEAPooledExecutor::interruptAll(){  QMutexLocker lock(&mutex);  foreach (ctkEAInterruptibleThread* t, threads_)  {    t->interrupt();  }}void ctkEAPooledExecutor::shutdownNow(){  shutdownNow(&discardWhenBlocked_);}void ctkEAPooledExecutor::shutdownNow(BlockedExecutionHandler* handler){  QMutexLocker lock(&mutex);  setBlockedExecutionHandler(handler);  shutdown_ = true; // don't allow new tasks  minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads  interruptAll(); // interrupt all existing threads}void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks(){  shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());}void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler* handler){  QMutexLocker lock(&mutex);  setBlockedExecutionHandler(handler);  shutdown_ = true;  if (poolSize_ == 0) // disable new thread construction when idle    minimumPoolSize_ = maximumPoolSize_ = 0;}bool ctkEAPooledExecutor::isTerminatedAfterShutdown() const{  QMutexLocker lock(&mutex);  return shutdown_ && poolSize_ == 0;}bool ctkEAPooledExecutor::awaitTerminationAfterShutdown(long maxWaitTime) const{  QMutexLocker lock(&mutex);  QMutexLocker shutdownLock(&shutdownMutex);  if (!shutdown_)    throw std::logic_error("not in shutdown state");  if (poolSize_ == 0)    return true;  qint64 waitTime = static_cast<qint64>(maxWaitTime);  if (waitTime <= 0)    return false;  //TODO Use Qt4.7 API  QDateTime start = QDateTime::currentDateTime();  forever  {    waitCond.wait(&shutdownMutex, waitTime);    if (poolSize_ == 0)      return true;    qint64 currWait = ctk::msecsTo(start, QDateTime::currentDateTime());    waitTime = static_cast<qint64>(maxWaitTime) - currWait;    if (waitTime <= 0)      return false;  }}void ctkEAPooledExecutor::awaitTerminationAfterShutdown() const{  QMutexLocker lock(&mutex);  if (!shutdown_)    throw std::logic_error("not in shutdown state");  while (poolSize_ > 0)  {    lock.unlock();    QMutexLocker shutdownLock(&shutdownMutex);    waitCond.wait(&shutdownMutex);    lock.relock();    // worker is done, wait for possibly not yet finished worker-thread    foreach(QThread* t, stoppedThreads_)    {      t->wait();    }  }}QList<ctkEARunnable*> ctkEAPooledExecutor::drain(){  bool wasInterrupted = false;  QList<ctkEARunnable*> tasks;  forever  {    try    {      ctkEARunnable* x = handOff_->poll(0);      if (x == 0)        break;      else        tasks.push_back(x);    }    catch (const ctkEAInterruptedException& )    {      wasInterrupted = true; // postpone re-interrupt until drained    }  }  if (wasInterrupted)  {    qobject_cast<ctkEAInterruptibleThread*>(QThread::currentThread())->interrupt();  }  return tasks;}void ctkEAPooledExecutor::runWhenBlocked(){  setBlockedExecutionHandler(&runWhenBlocked_);}void ctkEAPooledExecutor::waitWhenBlocked(){  setBlockedExecutionHandler(&waitWhenBlocked_);}void ctkEAPooledExecutor::discardWhenBlocked(){  setBlockedExecutionHandler(&discardWhenBlocked_);}void ctkEAPooledExecutor::abortWhenBlocked(){  setBlockedExecutionHandler(&abortWhenBlocked_);}void ctkEAPooledExecutor::discardOldestWhenBlocked(){  setBlockedExecutionHandler(&discardOldestWhenBlocked_);}void ctkEAPooledExecutor::execute(ctkEARunnable* command){  forever  {    {      QMutexLocker lock(&mutex);      if (!shutdown_)      {        int size = poolSize_;        // Ensure minimum number of threads        if (size < minimumPoolSize_)        {          addThread(command);          return;        }        // Try to give to existing thread        if (handOff_->offer(command, 0))        {          return;        }        // If cannot handoff and still under maximum, create new thread        if (size < maximumPoolSize_)        {          addThread(command);          return;        }      }    }    // Cannot hand off and cannot create -- ask for help    if (getBlockedExecutionHandler()->blockedAction(command))    {      return;    }  }}ctkEAPooledExecutor::Worker::Worker(ctkEAPooledExecutor* pe, ctkEARunnable* firstTask)  : firstTask_(firstTask), pe(pe){  if (firstTask) ++firstTask->ref;}void ctkEAPooledExecutor::Worker::run(){  try  {    ctkEARunnable* task = firstTask_;    firstTask_ = 0;    if (task != 0)    {      const bool autoDelete = task->autoDelete();      task->run();      if (autoDelete && !--task->ref) delete task;    }    while ( (task = pe->getTask()) != 0)    {      const bool autoDelete = task->autoDelete();      task->run();      if (autoDelete && !--task->ref) delete task;    }  }  catch (const ctkEAInterruptedException&)  {    pe->workerDone(this);    return;  }  pe->workerDone(this);}bool ctkEAPooledExecutor::RunWhenBlocked::blockedAction(ctkEARunnable* command){  const bool autoDelete = command->autoDelete();  command->run();  if (autoDelete && !--command->ref) delete command;  return true;}ctkEAPooledExecutor::WaitWhenBlocked::WaitWhenBlocked(ctkEAPooledExecutor* pe)  : pe(pe){}bool ctkEAPooledExecutor::WaitWhenBlocked::blockedAction(ctkEARunnable* command){  {    QMutexLocker lock(&pe->mutex);    if (pe->shutdown_)      return true;  }  pe->handOff_->put(command);  return true;}bool ctkEAPooledExecutor::DiscardWhenBlocked::blockedAction(ctkEARunnable* command){  Q_UNUSED(command)  return true;}bool ctkEAPooledExecutor::AbortWhenBlocked::blockedAction(ctkEARunnable* command){  Q_UNUSED(command)  throw ctkRuntimeException("Pool is blocked");}ctkEAPooledExecutor::DiscardOldestWhenBlocked::DiscardOldestWhenBlocked(ctkEAPooledExecutor* pe)  : pe(pe){}bool ctkEAPooledExecutor::DiscardOldestWhenBlocked::blockedAction(ctkEARunnable* command){  ctkEARunnable* tmp = pe->handOff_->poll(0);  if (tmp && tmp->autoDelete() && !--tmp->ref) delete tmp;  if (!pe->handOff_->offer(command, 0))  {    const bool autoDelete = command->autoDelete();    command->run();    if (autoDelete && !--command->ref) delete command;  }  return true;}void ctkEAPooledExecutor::addThread(ctkEARunnable* command){  Worker* worker = new Worker(this, command);  ++worker->ref;  ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);  threads_.insert(worker, thread);  ++poolSize_;  // do some garbage collection  foreach (ctkEAInterruptibleThread* t, stoppedThreads_)  {    if (t != ctkEAInterruptibleThread::currentThread() && t->isFinished())    {      delete t;      stoppedThreads_.removeAll(t);    }  }  thread->start();}void ctkEAPooledExecutor::workerDone(Worker* w){  QMutexLocker lock(&mutex);  stoppedThreads_ << threads_.take(w);  if (--poolSize_ == 0 && shutdown_)  {    maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads    waitCond.wakeAll(); // notify awaitTerminationAfterShutdown  }  // Create a replacement if needed  if (poolSize_ == 0 || poolSize_ < minimumPoolSize_)  {    try    {      ctkEARunnable* r = handOff_->poll(0);      if (r != 0)      {        if(shutdown_) // just consume task if shut down        {          if (r->autoDelete() && !r->ref) delete r;        }        else        {          addThread(r);        }      }    }    catch(const ctkEAInterruptedException& ) {      return;    }  }}ctkEARunnable* ctkEAPooledExecutor::getTask(){  long waitTime;  {    QMutexLocker lock(&mutex);    if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads      return 0;    waitTime = (shutdown_)? 0 : keepAliveTime_;  }  if (waitTime >= 0)    return handOff_->poll(waitTime);  else    return handOff_->take();}
 |