|
@@ -379,7 +379,9 @@ ctkEAPooledExecutor::DiscardOldestWhenBlocked::DiscardOldestWhenBlocked(ctkEAPoo
|
|
|
|
|
|
bool ctkEAPooledExecutor::DiscardOldestWhenBlocked::blockedAction(ctkEARunnable* command)
|
|
|
{
|
|
|
- pe->handOff_->poll(0);
|
|
|
+ ctkEARunnable* tmp = pe->handOff_->poll(0);
|
|
|
+ if (tmp && tmp->autoDelete() && !--tmp->ref) delete tmp;
|
|
|
+
|
|
|
if (!pe->handOff_->offer(command, 0))
|
|
|
{
|
|
|
const bool autoDelete = command->autoDelete();
|
|
@@ -393,7 +395,7 @@ void ctkEAPooledExecutor::addThread(ctkEARunnable* command)
|
|
|
{
|
|
|
Worker* worker = new Worker(this, command);
|
|
|
++worker->ref;
|
|
|
- ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);
|
|
|
+ ctkEAInterruptibleThread* thread = getThreadFactory()->newThread(worker);
|
|
|
threads_.insert(worker, thread);
|
|
|
++poolSize_;
|
|
|
|
|
@@ -426,8 +428,17 @@ void ctkEAPooledExecutor::workerDone(Worker* w)
|
|
|
try
|
|
|
{
|
|
|
ctkEARunnable* r = handOff_->poll(0);
|
|
|
- if (r != 0 && !shutdown_) // just consume task if shut down
|
|
|
- addThread(r);
|
|
|
+ if (r != 0)
|
|
|
+ {
|
|
|
+ if(shutdown_) // just consume task if shut down
|
|
|
+ {
|
|
|
+ if (r->autoDelete() && !r->ref) delete r;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ addThread(r);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
catch(const ctkEAInterruptedException& ) {
|
|
|
return;
|