From 6a9551ebfb15daedce1b248feb9c3495d1f5d06b Mon Sep 17 00:00:00 2001 From: Robert Osfield Date: Tue, 10 Jul 2007 17:36:01 +0000 Subject: [PATCH] Wired up OperationsThread to use the new OperationQueue and thereby support thread pooling where multiple OperationsThreads share a single OperationsQueue --- include/osg/OperationsThread | 47 ++++--- src/osg/OperationsThread.cpp | 257 +++++++++++++++++------------------ 2 files changed, 157 insertions(+), 147 deletions(-) diff --git a/include/osg/OperationsThread b/include/osg/OperationsThread index 2df1a9324..ad4b16b8b 100644 --- a/include/osg/OperationsThread +++ b/include/osg/OperationsThread @@ -30,7 +30,8 @@ class RefBlock: virtual public osg::Referenced, public OpenThreads::Block { public: - RefBlock() {} + RefBlock(): + osg::Referenced(true) {} }; @@ -39,6 +40,7 @@ class Operation : virtual public Referenced { public: Operation(const std::string& name, bool keep): + osg::Referenced(true), _name(name), _keep(keep) {} @@ -75,13 +77,12 @@ class OSG_EXPORT OperationQueue : public Referenced OperationQueue(); - RefBlock* getOperationsBlock() { return _operationsBlock.get(); } - - const RefBlock* getOperationsBlock() const { return _operationsBlock.get(); } - /** Get the next operation from the operation queue. * Return null ref_ptr<> if no operations are left in queue. */ - osg::ref_ptr getNextOperation(); + osg::ref_ptr getNextOperation(bool blockIfEmpty = false); + + /** Return true if the operation queue is empty. */ + bool empty() const { return _operations.empty(); } /** Add operation to end of OperationQueue, this will be * executed by the operation thread once this operation gets to the head of the queue.*/ @@ -95,10 +96,14 @@ class OSG_EXPORT OperationQueue : public Referenced /** Remove all operations from OperationQueue.*/ void removeAllOperations(); - + /** Call release on all operations. */ - void releaseAllOperations(); - + void releaseAllOperations(); + + /** Release operations block that is used to block threads that are waiting on an empty operations queue.*/ + void releaseOperationsBlock(); + + protected: virtual ~OperationQueue(); @@ -124,6 +129,16 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa const Object* getParent() const { return _parent.get(); } + + /** Set the OperationQueue. */ + void setOperationQueue(OperationQueue* opq); + + /** Get the OperationQueue. */ + OperationQueue* getOperationQueue() { return _operationQueue.get(); } + + /** Get the const OperationQueue. */ + const OperationQueue* getOperationQueue() const { return _operationQueue.get(); } + /** Add operation to end of OperationQueue, this will be * executed by the graphics thread once this operation gets to the head of the queue.*/ @@ -137,6 +152,7 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa /** Remove all operations from OperationQueue.*/ void removeAllOperations(); + /** Get the operation currently being run.*/ osg::ref_ptr getCurrentOperation() { return _currentOperation; } @@ -155,16 +171,13 @@ class OSG_EXPORT OperationsThread : public Referenced, public OpenThreads::Threa virtual ~OperationsThread(); - observer_ptr _parent; + observer_ptr _parent; - typedef std::list< ref_ptr > Operations; + bool _done; - bool _done; - - OpenThreads::Mutex _operationsMutex; - osg::ref_ptr _operationsBlock; - Operations _operations; - osg::ref_ptr _currentOperation; + OpenThreads::Mutex _threadMutex; + osg::ref_ptr _operationQueue; + osg::ref_ptr _currentOperation; }; diff --git a/src/osg/OperationsThread.cpp b/src/osg/OperationsThread.cpp index dcfabb616..0b22b1ab3 100644 --- a/src/osg/OperationsThread.cpp +++ b/src/osg/OperationsThread.cpp @@ -44,7 +44,8 @@ struct BlockOperation : public Operation, public Block // OperationsQueue // -OperationQueue::OperationQueue() +OperationQueue::OperationQueue(): + osg::Referenced(true) { _currentOperationIterator = _operations.begin(); _operationsBlock = new RefBlock; @@ -54,9 +55,49 @@ OperationQueue::~OperationQueue() { } -osg::ref_ptr OperationQueue::getNextOperation() +ref_ptr OperationQueue::getNextOperation(bool blockIfEmpty) { - return osg::ref_ptr(); + if (blockIfEmpty && _operations.empty()) + { + _operationsBlock->block(); + } + + OpenThreads::ScopedLock lock(_operationsMutex); + + if (_operations.empty()) return osg::ref_ptr(); + + if (_currentOperationIterator == _operations.end()) + { + // iterator at end of operations so reset to begining. + _currentOperationIterator = _operations.begin(); + } + + ref_ptr currentOperation = *_currentOperationIterator; + + if (!currentOperation->getKeep()) + { + // osg::notify(osg::INFO)<<"removing "<getName()<set(false); + } + } + else + { + // osg::notify(osg::INFO)<<"increment "<<_currentOperation->getName()<getName()==name) itr = _operations.erase(itr); + if ((*itr)->getName()==name) + { + bool needToResetCurrentIterator = (_currentOperationIterator == itr); + + itr = _operations.erase(itr); + + if (needToResetCurrentIterator) _currentOperationIterator = itr; + } else ++itr; } @@ -113,7 +169,11 @@ void OperationQueue::removeAllOperations() osg::notify(osg::INFO)<<"Doing remove all operations"< lock(_operationsMutex); + _operations.clear(); + + // reset current operator. + _currentOperationIterator = _operations.begin(); if (_operations.empty()) { @@ -121,10 +181,15 @@ void OperationQueue::removeAllOperations() } } -void OperationQueue::releaseAllOperations() +void OperationQueue::releaseOperationsBlock() +{ + _operationsBlock->release(); +} + + void OperationQueue::releaseAllOperations() { OpenThreads::ScopedLock lock(_operationsMutex); - + for(Operations::iterator itr = _operations.begin(); itr!=_operations.end(); ++itr) @@ -140,10 +205,11 @@ void OperationQueue::releaseAllOperations() // OperationsThread::OperationsThread(): + osg::Referenced(true), _parent(0), _done(false) { - _operationsBlock = new RefBlock; + _operationQueue = new OperationQueue; } OperationsThread::~OperationsThread() @@ -155,6 +221,12 @@ OperationsThread::~OperationsThread() //osg::notify(osg::NOTICE)<<"Done Destructing graphics thread "< lock(_threadMutex); + _operationQueue = opq; +} + void OperationsThread::setDone(bool done) { if (_done==done) return; @@ -166,7 +238,7 @@ void OperationsThread::setDone(bool done) osg::notify(osg::INFO)<<"set done "< lock(_operationsMutex); + OpenThreads::ScopedLock lock(_threadMutex); if (_currentOperation.valid()) { osg::notify(osg::INFO)<<"releasing "<<_currentOperation.get()<release(); + if (_operationQueue.valid()) _operationQueue->releaseOperationsBlock(); } - } int OperationsThread::cancel() { - osg::notify(osg::INFO)<<"Cancelling OperationsThred "<release(); + OpenThreads::ScopedLock lock(_threadMutex); + + if (_operationQueue.valid()) + { + _operationQueue->releaseOperationsBlock(); + //_operationQueue->releaseAllOperations(); + } + + if (_currentOperation.valid()) _currentOperation->release(); } - // release the frameBlock and _databasePagerThreadBlock incase its holding up thread cancelation. - _operationsBlock->release(); - // then wait for the the thread to stop running. while(isRunning()) { - _operationsBlock->release(); +#if 1 { - OpenThreads::ScopedLock lock(_operationsMutex); + OpenThreads::ScopedLock lock(_threadMutex); - for(Operations::iterator itr = _operations.begin(); - itr != _operations.end(); - ++itr) + if (_operationQueue.valid()) { - (*itr)->release(); + _operationQueue->releaseOperationsBlock(); + // _operationQueue->releaseAllOperations(); } if (_currentOperation.valid()) _currentOperation->release(); } - +#endif // commenting out debug info as it was cashing crash on exit, presumable // due to osg::notify or std::cout destructing earlier than this destructor. osg::notify(osg::INFO)<<" Waiting for OperationsThread to cancel "< lock(_operationsMutex); - - // add the operation to the end of the list - _operations.push_back(operation); - - _operationsBlock->set(true); + OpenThreads::ScopedLock lock(_threadMutex); + if (!_operationQueue) _operationQueue = new OperationQueue; + _operationQueue->add(operation); } void OperationsThread::remove(Operation* operation) { - osg::notify(osg::INFO)<<"Doing remove operation"< lock(_operationsMutex); - - for(Operations::iterator itr = _operations.begin(); - itr!=_operations.end();) - { - if ((*itr)==operation) itr = _operations.erase(itr); - else ++itr; - } + OpenThreads::ScopedLock lock(_threadMutex); + if (_operationQueue.valid()) _operationQueue->remove(operation); } void OperationsThread::remove(const std::string& name) { - osg::notify(osg::INFO)<<"Doing remove named operation"< lock(_operationsMutex); - - // find the remove all operations with specificed name - for(Operations::iterator itr = _operations.begin(); - itr!=_operations.end();) - { - if ((*itr)->getName()==name) itr = _operations.erase(itr); - else ++itr; - } - - if (_operations.empty()) - { - _operationsBlock->set(false); - } + OpenThreads::ScopedLock lock(_threadMutex); + if (_operationQueue.valid()) _operationQueue->remove(name); } void OperationsThread::removeAllOperations() { - osg::notify(osg::INFO)<<"Doing remove all operations"< lock(_operationsMutex); - _operations.clear(); - - if (_operations.empty()) - { - _operationsBlock->set(false); - } + OpenThreads::ScopedLock lock(_threadMutex); + if (_operationQueue.valid()) _operationQueue->removeAllOperations(); } - void OperationsThread::run() { // make the graphics context current. @@ -306,73 +341,35 @@ void OperationsThread::run() bool firstTime = true; - Operations::iterator itr = _operations.begin(); - do { - // osg::notify(osg::NOTICE)<<"In main loop "<block(); - - // exit from loop if _done is set. - if (_done) break; - - itr = _operations.begin(); - } - else - { - if (itr == _operations.end()) itr = _operations.begin(); - } - - // osg::notify(osg::INFO)<<"get op "<<_done<<" "< lock(_operationsMutex); - if (!_operations.empty()) - { - // get the next item - _currentOperation = *itr; - - if (!_currentOperation->getKeep()) - { - // osg::notify(osg::INFO)<<"removing "<<_currentOperation->getName()<set(false); - } - } - else - { - // osg::notify(osg::INFO)<<"increment "<<_currentOperation->getName()< operation; + ref_ptr operationQueue; - if (_currentOperation.valid()) { + OpenThreads::ScopedLock lock(_threadMutex); + operationQueue = _operationQueue; + } + + operation = operationQueue->getNextOperation(true); + + if (_done) break; + + if (operation.valid()) + { + { + OpenThreads::ScopedLock lock(_threadMutex); + _currentOperation = operation; + } + // osg::notify(osg::INFO)<<"Doing op "<<_currentOperation->getName()<<" "< lock(_operationsMutex); + OpenThreads::ScopedLock lock(_threadMutex); _currentOperation = 0; } }