| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452 | 
							- /*=============================================================================
 
-   Library: CTK
 
-   Copyright (c) German Cancer Research Center,
 
-     Division of Medical and Biological Informatics
 
-   Licensed under the Apache License, Version 2.0 (the "License");
 
-   you may not use this file except in compliance with the License.
 
-   You may obtain a copy of the License at
 
-     http://www.apache.org/licenses/LICENSE-2.0
 
-   Unless required by applicable law or agreed to in writing, software
 
-   distributed under the License is distributed on an "AS IS" BASIS,
 
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
-   See the License for the specific language governing permissions and
 
-   limitations under the License.
 
- =============================================================================*/
 
- #include "ctkEAPooledExecutor_p.h"
 
- #include "ctkEAChannel_p.h"
 
- #include <dispatch/ctkEAInterruptibleThread_p.h>
 
- #include <dispatch/ctkEAInterruptedException_p.h>
 
- #include <limits>
 
- #include <stdexcept>
 
- #include <QDateTime>
 
- #include <QDebug>
 
- const int ctkEAPooledExecutor::DEFAULT_MAXIMUMPOOLSIZE = std::numeric_limits<int>::max();
 
- const int ctkEAPooledExecutor::DEFAULT_MINIMUMPOOLSIZE = 1;
 
- const long ctkEAPooledExecutor::DEFAULT_KEEPALIVETIME = 60 * 1000;
 
- ctkEAPooledExecutor::ctkEAPooledExecutor(ctkEAChannel* channel, int maxPoolSize)
 
-   : maximumPoolSize_(maxPoolSize), minimumPoolSize_(DEFAULT_MINIMUMPOOLSIZE),
 
-     poolSize_(0), keepAliveTime_(DEFAULT_KEEPALIVETIME), shutdown_(false),
 
-     handOff_(channel), blockedExecutionHandler_(0), waitWhenBlocked_(this),
 
-     discardOldestWhenBlocked_(this)
 
- {
 
-   runWhenBlocked();
 
- }
 
- ctkEAPooledExecutor::~ctkEAPooledExecutor()
 
- {
 
-   delete handOff_;
 
-   qDeleteAll(stoppedThreads_);
 
- }
 
- int ctkEAPooledExecutor::getMaximumPoolSize() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return maximumPoolSize_;
 
- }
 
- void ctkEAPooledExecutor::setMaximumPoolSize(int newMaximum)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   if (newMaximum <= 0) throw std::invalid_argument("maximum must be > 0");
 
-   maximumPoolSize_ = newMaximum;
 
- }
 
- int ctkEAPooledExecutor::getMinimumPoolSize() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return minimumPoolSize_;
 
- }
 
- void ctkEAPooledExecutor::setMinimumPoolSize(int newMinimum)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   if (newMinimum < 0) throw std::invalid_argument("minimum must be >= 0");
 
-   minimumPoolSize_ = newMinimum;
 
- }
 
- int ctkEAPooledExecutor::getPoolSize() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return poolSize_;
 
- }
 
- long ctkEAPooledExecutor::getKeepAliveTime() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return keepAliveTime_;
 
- }
 
- void ctkEAPooledExecutor::setKeepAliveTime(long msecs)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   keepAliveTime_ = msecs;
 
- }
 
- ctkEAPooledExecutor::BlockedExecutionHandler* ctkEAPooledExecutor::getBlockedExecutionHandler() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return blockedExecutionHandler_;
 
- }
 
- void ctkEAPooledExecutor::setBlockedExecutionHandler(BlockedExecutionHandler* h)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   blockedExecutionHandler_ = h;
 
- }
 
- int ctkEAPooledExecutor::createThreads(int numberOfThreads)
 
- {
 
-   int ncreated = 0;
 
-   for (int i = 0; i < numberOfThreads; ++i)
 
-   {
 
-     if (poolSize_ < maximumPoolSize_)
 
-     {
 
-       addThread(0);
 
-       ++ncreated;
 
-     }
 
-     else
 
-       break;
 
-   }
 
-   return ncreated;
 
- }
 
- void ctkEAPooledExecutor::interruptAll()
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   foreach (ctkEAInterruptibleThread* t, threads_)
 
-   {
 
-     t->interrupt();
 
-   }
 
- }
 
- void ctkEAPooledExecutor::shutdownNow()
 
- {
 
-   shutdownNow(&discardWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::shutdownNow(BlockedExecutionHandler* handler)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   setBlockedExecutionHandler(handler);
 
-   shutdown_ = true; // don't allow new tasks
 
-   minimumPoolSize_ = maximumPoolSize_ = 0; // don't make new threads
 
-   interruptAll(); // interrupt all existing threads
 
- }
 
- void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks()
 
- {
 
-   shutdownAfterProcessingCurrentlyQueuedTasks(new DiscardWhenBlocked());
 
- }
 
- void ctkEAPooledExecutor::shutdownAfterProcessingCurrentlyQueuedTasks(BlockedExecutionHandler* handler)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   setBlockedExecutionHandler(handler);
 
-   shutdown_ = true;
 
-   if (poolSize_ == 0) // disable new thread construction when idle
 
-     minimumPoolSize_ = maximumPoolSize_ = 0;
 
- }
 
- bool ctkEAPooledExecutor::isTerminatedAfterShutdown() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   return shutdown_ && poolSize_ == 0;
 
- }
 
- bool ctkEAPooledExecutor::awaitTerminationAfterShutdown(long maxWaitTime) const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   QMutexLocker shutdownLock(&shutdownMutex);
 
-   if (!shutdown_)
 
-     throw std::logic_error("not in shutdown state");
 
-   if (poolSize_ == 0)
 
-     return true;
 
-   long waitTime = maxWaitTime;
 
-   if (waitTime <= 0)
 
-     return false;
 
-   //TODO Use Qt4.7 API
 
-   QDateTime start = QDateTime::currentDateTime();
 
-   forever
 
-   {
 
-     waitCond.wait(&shutdownMutex, waitTime);
 
-     if (poolSize_ == 0)
 
-       return true;
 
-     qint64 currWait = start.time().msecsTo(QDateTime::currentDateTime().time());
 
-     waitTime = maxWaitTime - currWait;
 
-     if (waitTime <= 0)
 
-       return false;
 
-   }
 
- }
 
- void ctkEAPooledExecutor::awaitTerminationAfterShutdown() const
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   if (!shutdown_)
 
-     throw std::logic_error("not in shutdown state");
 
-   while (poolSize_ > 0)
 
-   {
 
-     lock.unlock();
 
-     QMutexLocker shutdownLock(&shutdownMutex);
 
-     waitCond.wait(&shutdownMutex);
 
-     lock.relock();
 
-     // worker is done, wait for possibly not yet finished worker-thread
 
-     foreach(QThread* t, stoppedThreads_)
 
-     {
 
-       t->wait();
 
-     }
 
-   }
 
- }
 
- QList<ctkEARunnable*> ctkEAPooledExecutor::drain()
 
- {
 
-   bool wasInterrupted = false;
 
-   QList<ctkEARunnable*> tasks;
 
-   forever
 
-   {
 
-     try
 
-     {
 
-       ctkEARunnable* x = handOff_->poll(0);
 
-       if (x == 0)
 
-         break;
 
-       else
 
-         tasks.push_back(x);
 
-     }
 
-     catch (const ctkEAInterruptedException& )
 
-     {
 
-       wasInterrupted = true; // postpone re-interrupt until drained
 
-     }
 
-   }
 
-   if (wasInterrupted)
 
-   {
 
-     qobject_cast<ctkEAInterruptibleThread*>(QThread::currentThread())->interrupt();
 
-   }
 
-   return tasks;
 
- }
 
- void ctkEAPooledExecutor::runWhenBlocked()
 
- {
 
-   setBlockedExecutionHandler(&runWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::waitWhenBlocked()
 
- {
 
-   setBlockedExecutionHandler(&waitWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::discardWhenBlocked()
 
- {
 
-   setBlockedExecutionHandler(&discardWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::abortWhenBlocked()
 
- {
 
-   setBlockedExecutionHandler(&abortWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::discardOldestWhenBlocked()
 
- {
 
-   setBlockedExecutionHandler(&discardOldestWhenBlocked_);
 
- }
 
- void ctkEAPooledExecutor::execute(ctkEARunnable* command)
 
- {
 
-   forever
 
-   {
 
-     {
 
-       QMutexLocker lock(&mutex);
 
-       if (!shutdown_)
 
-       {
 
-         int size = poolSize_;
 
-         // Ensure minimum number of threads
 
-         if (size < minimumPoolSize_)
 
-         {
 
-           addThread(command);
 
-           return;
 
-         }
 
-         // Try to give to existing thread
 
-         if (handOff_->offer(command, 0))
 
-         {
 
-           return;
 
-         }
 
-         // If cannot handoff and still under maximum, create new thread
 
-         if (size < maximumPoolSize_)
 
-         {
 
-           addThread(command);
 
-           return;
 
-         }
 
-       }
 
-     }
 
-     // Cannot hand off and cannot create -- ask for help
 
-     if (getBlockedExecutionHandler()->blockedAction(command))
 
-     {
 
-       return;
 
-     }
 
-   }
 
- }
 
- ctkEAPooledExecutor::Worker::Worker(ctkEAPooledExecutor* pe, ctkEARunnable* firstTask)
 
-   : firstTask_(firstTask), pe(pe)
 
- {
 
-   if (firstTask) ++firstTask->ref;
 
- }
 
- void ctkEAPooledExecutor::Worker::run()
 
- {
 
-   try
 
-   {
 
-     ctkEARunnable* task = firstTask_;
 
-     firstTask_ = 0;
 
-     if (task != 0)
 
-     {
 
-       const bool autoDelete = task->autoDelete();
 
-       task->run();
 
-       if (autoDelete && !--task->ref) delete task;
 
-     }
 
-     while ( (task = pe->getTask()) != 0)
 
-     {
 
-       const bool autoDelete = task->autoDelete();
 
-       task->run();
 
-       if (autoDelete && !--task->ref) delete task;
 
-     }
 
-   }
 
-   catch (const ctkEAInterruptedException&)
 
-   {
 
-     pe->workerDone(this);
 
-     return;
 
-   }
 
-   pe->workerDone(this);
 
- }
 
- bool ctkEAPooledExecutor::RunWhenBlocked::blockedAction(ctkEARunnable* command)
 
- {
 
-   const bool autoDelete = command->autoDelete();
 
-   command->run();
 
-   if (autoDelete && !--command->ref) delete command;
 
-   return true;
 
- }
 
- ctkEAPooledExecutor::WaitWhenBlocked::WaitWhenBlocked(ctkEAPooledExecutor* pe)
 
-   : pe(pe)
 
- {}
 
- bool ctkEAPooledExecutor::WaitWhenBlocked::blockedAction(ctkEARunnable* command)
 
- {
 
-   {
 
-     QMutexLocker lock(&pe->mutex);
 
-     if (pe->shutdown_)
 
-       return true;
 
-   }
 
-   pe->handOff_->put(command);
 
-   return true;
 
- }
 
- bool ctkEAPooledExecutor::DiscardWhenBlocked::blockedAction(ctkEARunnable* command)
 
- {
 
-   Q_UNUSED(command)
 
-   return true;
 
- }
 
- bool ctkEAPooledExecutor::AbortWhenBlocked::blockedAction(ctkEARunnable* command)
 
- {
 
-   Q_UNUSED(command)
 
-   throw ctkRuntimeException("Pool is blocked");
 
- }
 
- ctkEAPooledExecutor::DiscardOldestWhenBlocked::DiscardOldestWhenBlocked(ctkEAPooledExecutor* pe)
 
-   : pe(pe)
 
- {}
 
- bool ctkEAPooledExecutor::DiscardOldestWhenBlocked::blockedAction(ctkEARunnable* command)
 
- {
 
-   pe->handOff_->poll(0);
 
-   if (!pe->handOff_->offer(command, 0))
 
-   {
 
-     const bool autoDelete = command->autoDelete();
 
-     command->run();
 
-     if (autoDelete && !--command->ref) delete command;
 
-   }
 
-   return true;
 
- }
 
- void ctkEAPooledExecutor::addThread(ctkEARunnable* command)
 
- {
 
-   Worker* worker = new Worker(this, command);
 
-   ++worker->ref;
 
-  ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);
 
-   threads_.insert(worker, thread);
 
-   ++poolSize_;
 
-   // do some garbage collection
 
-   foreach (ctkEAInterruptibleThread* t, stoppedThreads_)
 
-   {
 
-     if (t != ctkEAInterruptibleThread::currentThread() && t->isFinished())
 
-     {
 
-       delete t;
 
-       stoppedThreads_.removeAll(t);
 
-     }
 
-   }
 
-   thread->start();
 
- }
 
- void ctkEAPooledExecutor::workerDone(Worker* w)
 
- {
 
-   QMutexLocker lock(&mutex);
 
-   stoppedThreads_ << threads_.take(w);
 
-   if (--poolSize_ == 0 && shutdown_)
 
-   {
 
-     maximumPoolSize_ = minimumPoolSize_ = 0; // disable new threads
 
-     waitCond.wakeAll(); // notify awaitTerminationAfterShutdown
 
-   }
 
-   // Create a replacement if needed
 
-   if (poolSize_ == 0 || poolSize_ < minimumPoolSize_)
 
-   {
 
-     try
 
-     {
 
-       ctkEARunnable* r = handOff_->poll(0);
 
-       if (r != 0 && !shutdown_) // just consume task if shut down
 
-         addThread(r);
 
-     }
 
-     catch(const ctkEAInterruptedException& ) {
 
-       return;
 
-     }
 
-   }
 
- }
 
- ctkEARunnable* ctkEAPooledExecutor::getTask()
 
- {
 
-   long waitTime;
 
-   {
 
-     QMutexLocker lock(&mutex);
 
-     if (poolSize_ > maximumPoolSize_) // Cause to die if too many threads
 
-       return 0;
 
-     waitTime = (shutdown_)? 0 : keepAliveTime_;
 
-   }
 
-   if (waitTime >= 0)
 
-     return handOff_->poll(waitTime);
 
-   else
 
-     return handOff_->take();
 
- }
 
 
  |