Wired up OperationsThread to use the new OperationQueue and thereby support

thread pooling where multiple OperationsThreads share a single OperationsQueue
This commit is contained in:
Robert Osfield
2007-07-10 17:36:01 +00:00
parent 44c07b9fad
commit 6a9551ebfb
2 changed files with 157 additions and 147 deletions

View File

@@ -30,7 +30,8 @@ class RefBlock: virtual public osg::Referenced, public OpenThreads::Block
{
public:
RefBlock() {}
RefBlock():
osg::Referenced(true) {}
};
@@ -39,6 +40,7 @@ class Operation : virtual public Referenced
{
public:
Operation(const std::string& name, bool keep):
osg::Referenced(true),
_name(name),
_keep(keep) {}
@@ -75,13 +77,12 @@ class OSG_EXPORT OperationQueue : public Referenced
OperationQueue();
RefBlock* getOperationsBlock() { return _operationsBlock.get(); }
const RefBlock* getOperationsBlock() const { return _operationsBlock.get(); }
/** Get the next operation from the operation queue.
* Return null ref_ptr<> if no operations are left in queue. */
osg::ref_ptr<Operation> getNextOperation();
osg::ref_ptr<Operation> getNextOperation(bool blockIfEmpty = false);
/** Return true if the operation queue is empty. */
bool empty() const { return _operations.empty(); }
/** Add operation to end of OperationQueue, this will be
* executed by the operation thread once this operation gets to the head of the queue.*/
@@ -95,10 +96,14 @@ class OSG_EXPORT OperationQueue : public Referenced
/** Remove all operations from OperationQueue.*/
void removeAllOperations();
/** Call release on all operations. */
void releaseAllOperations();
void releaseAllOperations();
/** Release operations block that is used to block threads that are waiting on an empty operations queue.*/
void releaseOperationsBlock();
protected:
virtual ~OperationQueue();
@@ -124,6 +129,16 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa
const Object* getParent() const { return _parent.get(); }
/** Set the OperationQueue. */
void setOperationQueue(OperationQueue* opq);
/** Get the OperationQueue. */
OperationQueue* getOperationQueue() { return _operationQueue.get(); }
/** Get the const OperationQueue. */
const OperationQueue* getOperationQueue() const { return _operationQueue.get(); }
/** Add operation to end of OperationQueue, this will be
* executed by the graphics thread once this operation gets to the head of the queue.*/
@@ -137,6 +152,7 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa
/** Remove all operations from OperationQueue.*/
void removeAllOperations();
/** Get the operation currently being run.*/
osg::ref_ptr<Operation> getCurrentOperation() { return _currentOperation; }
@@ -155,16 +171,13 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa
virtual ~OperationsThread();
observer_ptr<Object> _parent;
observer_ptr<Object> _parent;
typedef std::list< ref_ptr<Operation> > Operations;
bool _done;
bool _done;
OpenThreads::Mutex _operationsMutex;
osg::ref_ptr<osg::RefBlock> _operationsBlock;
Operations _operations;
osg::ref_ptr<Operation> _currentOperation;
OpenThreads::Mutex _threadMutex;
osg::ref_ptr<OperationQueue> _operationQueue;
osg::ref_ptr<Operation> _currentOperation;
};

View File

@@ -44,7 +44,8 @@ struct BlockOperation : public Operation, public Block
// OperationsQueue
//
OperationQueue::OperationQueue()
OperationQueue::OperationQueue():
osg::Referenced(true)
{
_currentOperationIterator = _operations.begin();
_operationsBlock = new RefBlock;
@@ -54,9 +55,49 @@ OperationQueue::~OperationQueue()
{
}
osg::ref_ptr<Operation> OperationQueue::getNextOperation()
ref_ptr<Operation> OperationQueue::getNextOperation(bool blockIfEmpty)
{
return osg::ref_ptr<Operation>();
if (blockIfEmpty && _operations.empty())
{
_operationsBlock->block();
}
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
if (_operations.empty()) return osg::ref_ptr<Operation>();
if (_currentOperationIterator == _operations.end())
{
// iterator at end of operations so reset to begining.
_currentOperationIterator = _operations.begin();
}
ref_ptr<Operation> currentOperation = *_currentOperationIterator;
if (!currentOperation->getKeep())
{
// osg::notify(osg::INFO)<<"removing "<<currentOperation->getName()<<std::endl;
// remove it from the operations queue
_currentOperationIterator = _operations.erase(_currentOperationIterator);
// osg::notify(osg::INFO)<<"size "<<_operations.size()<<std::endl;
if (_operations.empty())
{
// osg::notify(osg::INFO)<<"setting block "<<_operations.size()<<std::endl;
_operationsBlock->set(false);
}
}
else
{
// osg::notify(osg::INFO)<<"increment "<<_currentOperation->getName()<<std::endl;
// move on to the next operation in the list.
++_currentOperationIterator;
}
return currentOperation;
}
void OperationQueue::add(Operation* operation)
@@ -82,7 +123,15 @@ void OperationQueue::remove(Operation* operation)
for(Operations::iterator itr = _operations.begin();
itr!=_operations.end();)
{
if ((*itr)==operation) itr = _operations.erase(itr);
if ((*itr)==operation)
{
bool needToResetCurrentIterator = (_currentOperationIterator == itr);
itr = _operations.erase(itr);
if (needToResetCurrentIterator) _currentOperationIterator = itr;
}
else ++itr;
}
}
@@ -98,7 +147,14 @@ void OperationQueue::remove(const std::string& name)
for(Operations::iterator itr = _operations.begin();
itr!=_operations.end();)
{
if ((*itr)->getName()==name) itr = _operations.erase(itr);
if ((*itr)->getName()==name)
{
bool needToResetCurrentIterator = (_currentOperationIterator == itr);
itr = _operations.erase(itr);
if (needToResetCurrentIterator) _currentOperationIterator = itr;
}
else ++itr;
}
@@ -113,7 +169,11 @@ void OperationQueue::removeAllOperations()
osg::notify(osg::INFO)<<"Doing remove all operations"<<std::endl;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
_operations.clear();
// reset current operator.
_currentOperationIterator = _operations.begin();
if (_operations.empty())
{
@@ -121,10 +181,15 @@ void OperationQueue::removeAllOperations()
}
}
void OperationQueue::releaseAllOperations()
void OperationQueue::releaseOperationsBlock()
{
_operationsBlock->release();
}
void OperationQueue::releaseAllOperations()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
for(Operations::iterator itr = _operations.begin();
itr!=_operations.end();
++itr)
@@ -140,10 +205,11 @@ void OperationQueue::releaseAllOperations()
//
OperationsThread::OperationsThread():
osg::Referenced(true),
_parent(0),
_done(false)
{
_operationsBlock = new RefBlock;
_operationQueue = new OperationQueue;
}
OperationsThread::~OperationsThread()
@@ -155,6 +221,12 @@ OperationsThread::~OperationsThread()
//osg::notify(osg::NOTICE)<<"Done Destructing graphics thread "<<this<<std::endl;
}
void OperationsThread::setOperationQueue(OperationQueue* opq)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
_operationQueue = opq;
}
void OperationsThread::setDone(bool done)
{
if (_done==done) return;
@@ -166,7 +238,7 @@ void OperationsThread::setDone(bool done)
osg::notify(osg::INFO)<<"set done "<<this<<std::endl;
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (_currentOperation.valid())
{
osg::notify(osg::INFO)<<"releasing "<<_currentOperation.get()<<std::endl;
@@ -174,14 +246,13 @@ void OperationsThread::setDone(bool done)
}
}
_operationsBlock->release();
if (_operationQueue.valid()) _operationQueue->releaseOperationsBlock();
}
}
int OperationsThread::cancel()
{
osg::notify(osg::INFO)<<"Cancelling OperationsThred "<<this<<" isRunning()="<<isRunning()<<std::endl;
osg::notify(osg::INFO)<<"Cancelling OperationsThread "<<this<<" isRunning()="<<isRunning()<<std::endl;
int result = 0;
if( isRunning() )
@@ -191,34 +262,35 @@ int OperationsThread::cancel()
osg::notify(osg::INFO)<<" Doing cancel "<<this<<std::endl;
for(Operations::iterator itr = _operations.begin();
itr != _operations.end();
++itr)
{
(*itr)->release();
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (_operationQueue.valid())
{
_operationQueue->releaseOperationsBlock();
//_operationQueue->releaseAllOperations();
}
if (_currentOperation.valid()) _currentOperation->release();
}
// release the frameBlock and _databasePagerThreadBlock incase its holding up thread cancelation.
_operationsBlock->release();
// then wait for the the thread to stop running.
while(isRunning())
{
_operationsBlock->release();
#if 1
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
for(Operations::iterator itr = _operations.begin();
itr != _operations.end();
++itr)
if (_operationQueue.valid())
{
(*itr)->release();
_operationQueue->releaseOperationsBlock();
// _operationQueue->releaseAllOperations();
}
if (_currentOperation.valid()) _currentOperation->release();
}
#endif
// commenting out debug info as it was cashing crash on exit, presumable
// due to osg::notify or std::cout destructing earlier than this destructor.
osg::notify(osg::INFO)<<" Waiting for OperationsThread to cancel "<<this<<std::endl;
@@ -233,66 +305,29 @@ int OperationsThread::cancel()
void OperationsThread::add(Operation* operation)
{
osg::notify(osg::INFO)<<"Doing add"<<std::endl;
// aquire the lock on the operations queue to prevent anyone else for modifying it at the same time
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
// add the operation to the end of the list
_operations.push_back(operation);
_operationsBlock->set(true);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (!_operationQueue) _operationQueue = new OperationQueue;
_operationQueue->add(operation);
}
void OperationsThread::remove(Operation* operation)
{
osg::notify(osg::INFO)<<"Doing remove operation"<<std::endl;
// aquire the lock on the operations queue to prevent anyone else for modifying it at the same time
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
for(Operations::iterator itr = _operations.begin();
itr!=_operations.end();)
{
if ((*itr)==operation) itr = _operations.erase(itr);
else ++itr;
}
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (_operationQueue.valid()) _operationQueue->remove(operation);
}
void OperationsThread::remove(const std::string& name)
{
osg::notify(osg::INFO)<<"Doing remove named operation"<<std::endl;
// aquire the lock on the operations queue to prevent anyone else for modifying it at the same time
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
// find the remove all operations with specificed name
for(Operations::iterator itr = _operations.begin();
itr!=_operations.end();)
{
if ((*itr)->getName()==name) itr = _operations.erase(itr);
else ++itr;
}
if (_operations.empty())
{
_operationsBlock->set(false);
}
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (_operationQueue.valid()) _operationQueue->remove(name);
}
void OperationsThread::removeAllOperations()
{
osg::notify(osg::INFO)<<"Doing remove all operations"<<std::endl;
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
_operations.clear();
if (_operations.empty())
{
_operationsBlock->set(false);
}
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
if (_operationQueue.valid()) _operationQueue->removeAllOperations();
}
void OperationsThread::run()
{
// make the graphics context current.
@@ -306,73 +341,35 @@ void OperationsThread::run()
bool firstTime = true;
Operations::iterator itr = _operations.begin();
do
{
// osg::notify(osg::NOTICE)<<"In main loop "<<this<<std::endl;
if (_operations.empty())
{
_operationsBlock->block();
// exit from loop if _done is set.
if (_done) break;
itr = _operations.begin();
}
else
{
if (itr == _operations.end()) itr = _operations.begin();
}
// osg::notify(osg::INFO)<<"get op "<<_done<<" "<<this<<std::endl;
// get the front of the file request list.
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
if (!_operations.empty())
{
// get the next item
_currentOperation = *itr;
if (!_currentOperation->getKeep())
{
// osg::notify(osg::INFO)<<"removing "<<_currentOperation->getName()<<std::endl;
// remove it from the opeations queue
itr = _operations.erase(itr);
// osg::notify(osg::INFO)<<"size "<<_operations.size()<<std::endl;
if (_operations.empty())
{
// osg::notify(osg::INFO)<<"setting block "<<_operations.size()<<std::endl;
_operationsBlock->set(false);
}
}
else
{
// osg::notify(osg::INFO)<<"increment "<<_currentOperation->getName()<<std::endl;
// move on to the next operation in the list.
++itr;
}
}
}
// osg::notify(osg::NOTICE)<<"In thread loop "<<this<<std::endl;
ref_ptr<Operation> operation;
ref_ptr<OperationQueue> operationQueue;
if (_currentOperation.valid())
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
operationQueue = _operationQueue;
}
operation = operationQueue->getNextOperation(true);
if (_done) break;
if (operation.valid())
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
_currentOperation = operation;
}
// osg::notify(osg::INFO)<<"Doing op "<<_currentOperation->getName()<<" "<<this<<std::endl;
// call the graphics operation.
(*_currentOperation)(_parent.get());
(*operation)(_parent.get());
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_operationsMutex);
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_threadMutex);
_currentOperation = 0;
}
}