Added support for using multiple load threads as a thread pool used by the MasterOperation

This commit is contained in:
Robert Osfield
2007-07-19 12:09:25 +00:00
parent 5ec2969330
commit fb1a34a8c5

View File

@@ -51,6 +51,133 @@
#include <iostream>
typedef std::vector< osg::ref_ptr<osg::GraphicsThread> > GraphicsThreads;
class CountedBlock : public osg::Referenced
{
public:
CountedBlock(unsigned int blockCount);
void completed();
void block();
void reset();
void release();
void setBlockCount(unsigned int blockCount);
protected:
~CountedBlock();
OpenThreads::Mutex _mut;
OpenThreads::Condition _cond;
unsigned int _numberOfBlocks;
unsigned int _blockCount;
};
CountedBlock::CountedBlock(unsigned int numberOfBlocks):
_numberOfBlocks(numberOfBlocks),
_blockCount(0)
{
}
void CountedBlock::completed()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
if (_blockCount>0)
{
--_blockCount;
if (_blockCount==0)
{
// osg::notify(osg::NOTICE)<<"Released"<<std::endl;
_cond.broadcast();
}
}
}
void CountedBlock::block()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
if (_blockCount)
_cond.wait(&_mut);
}
void CountedBlock::release()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
if (_blockCount)
{
_blockCount = 0;
_cond.broadcast();
}
}
void CountedBlock::reset()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> mutlock(_mut);
if (_numberOfBlocks!=_blockCount)
{
if (_numberOfBlocks==0) _cond.broadcast();
_blockCount = _numberOfBlocks;
}
}
void CountedBlock::setBlockCount(unsigned int blockCount)
{
_numberOfBlocks = blockCount;
}
CountedBlock::~CountedBlock()
{
_blockCount = 0;
release();
}
class LoadAndCompileOperation : public osg::Operation
{
public:
LoadAndCompileOperation(const std::string& filename, GraphicsThreads& graphicsThreads, CountedBlock* block):
Operation("Load and compile Operation", false),
_filename(filename),
_graphicsThreads(graphicsThreads),
_block(block) {}
virtual void operator () (osg::Object* object)
{
// osg::notify(osg::NOTICE)<<"LoadAndCompileOperation "<<_filename<<std::endl;
_loadedModel = osgDB::readNodeFile(_filename);
if (_loadedModel.valid() && !_graphicsThreads.empty())
{
osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(_loadedModel.get());
for(GraphicsThreads::iterator gitr = _graphicsThreads.begin();
gitr != _graphicsThreads.end();
++gitr)
{
(*gitr)->add( compileOperation.get() );
// requiresBarrier = true;
}
}
if (_block.valid()) _block->completed();
// osg::notify(osg::NOTICE)<<"done LoadAndCompileOperation "<<_filename<<std::endl;
}
std::string _filename;
GraphicsThreads _graphicsThreads;
osg::ref_ptr<osg::Node> _loadedModel;
osg::ref_ptr<CountedBlock> _block;
};
class MasterOperation : public osg::Operation
{
public:
@@ -66,6 +193,11 @@ public:
{
}
/** Set the OperationQueue that the MasterOperation can use to place tasks like file loading on for other processes to handle.*/
void setOperationQueue(osg::OperationQueue* oq) { _operationQueue = oq; }
osg::OperationQueue* getOperationQueue() { return _operationQueue.get(); }
bool readMasterFile(Files& files) const
{
std::ifstream fin(_filename.c_str());
@@ -170,43 +302,93 @@ public:
}
bool requiresBarrier = false;
for(Files::iterator nitr = newFiles.begin();
nitr != newFiles.end();
++nitr)
if (_operationQueue.valid())
{
osg::ref_ptr<osg::Node> loadedModel = osgDB::readNodeFile(*nitr);
// osg::notify(osg::NOTICE)<<"Using OperationQueue"<<std::endl;
if (loadedModel.get())
_endOfLoadBlock = new CountedBlock(newFiles.size());
typedef std::list< osg::ref_ptr<LoadAndCompileOperation> > LoadAndCompileList;
LoadAndCompileList loadAndCompileList;
for(Files::iterator nitr = newFiles.begin();
nitr != newFiles.end();
++nitr)
{
nodesToAdd[*nitr] = loadedModel;
osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(loadedModel.get());
osg::ref_ptr<LoadAndCompileOperation> loadAndCompile = new LoadAndCompileOperation( *nitr, threads, _endOfLoadBlock.get() );
loadAndCompileList.push_back(loadAndCompile);
_operationQueue->add( loadAndCompile.get() );
}
for(GraphicsThreads::iterator gitr = threads.begin();
gitr != threads.end();
++gitr)
osg::ref_ptr<osg::Operation> operation;
while ((operation=_operationQueue->getNextOperation()).valid())
{
// osg::notify(osg::NOTICE)<<"Local running of operation"<<std::endl;
(*operation)(0);
}
// osg::notify(osg::NOTICE)<<"Waiting for completion of LoadAndCompile operations"<<std::endl;
_endOfLoadBlock->block();
// osg::notify(osg::NOTICE)<<"done ... Waiting for completion of LoadAndCompile operations"<<std::endl;
for(LoadAndCompileList::iterator litr = loadAndCompileList.begin();
litr != loadAndCompileList.end();
++litr)
{
if ((*litr)->_loadedModel.valid())
{
(*gitr)->add( compileOperation.get() );
nodesToAdd[(*litr)->_filename] = (*litr)->_loadedModel;
requiresBarrier = true;
}
}
}
else
{
for(Files::iterator nitr = newFiles.begin();
nitr != newFiles.end();
++nitr)
{
osg::ref_ptr<osg::Node> loadedModel = osgDB::readNodeFile(*nitr);
if (loadedModel.get())
{
nodesToAdd[*nitr] = loadedModel;
osg::ref_ptr<osgUtil::GLObjectsOperation> compileOperation = new osgUtil::GLObjectsOperation(loadedModel.get());
for(GraphicsThreads::iterator gitr = threads.begin();
gitr != threads.end();
++gitr)
{
(*gitr)->add( compileOperation.get() );
requiresBarrier = true;
}
}
}
}
if (requiresBarrier)
{
_barrier = new osg::BarrierOperation(threads.size()+1);
_barrier->setKeep(false);
_endOfCompilebarrier = new osg::BarrierOperation(threads.size()+1);
_endOfCompilebarrier->setKeep(false);
for(GraphicsThreads::iterator gitr = threads.begin();
gitr != threads.end();
++gitr)
{
(*gitr)->add(_barrier.get());
(*gitr)->add(_endOfCompilebarrier.get());
}
// osg::notify(osg::NOTICE)<<"Waiting for Compile to complete"<<std::endl;
// wait for the graphics threads to complete.
_barrier->block();
_endOfCompilebarrier->block();
// osg::notify(osg::NOTICE)<<"done ... Waiting for Compile to complete"<<std::endl;
}
}
@@ -294,8 +476,11 @@ public:
// work even when blocks and barriers are used.
virtual void release()
{
if (_operationQueue.valid()) _operationQueue->removeAllOperations();
_updatesMergedBlock.release();
if (_barrier.valid()) _barrier.release();
if (_endOfCompilebarrier.valid()) _endOfCompilebarrier.release();
if (_endOfLoadBlock.valid()) _endOfLoadBlock.release();
}
@@ -306,7 +491,11 @@ public:
Files _nodesToRemove;
FilenameNodeMap _nodesToAdd;
OpenThreads::Block _updatesMergedBlock;
osg::ref_ptr<osg::BarrierOperation> _barrier;
osg::ref_ptr<osg::BarrierOperation> _endOfCompilebarrier;
osg::ref_ptr<CountedBlock> _endOfLoadBlock;
osg::ref_ptr<osg::OperationQueue> _operationQueue;
};
class FilterHandler : public osgGA::GUIEventHandler
@@ -521,12 +710,8 @@ int main(int argc, char** argv)
double w = 1.0;
double h = 1.0;
bool createBackgroundContextForCompiling = false;
while (arguments.read("--bc")) { createBackgroundContextForCompiling = true; }
bool createBackgroundThreadsForCompiling = false;
while (arguments.read("--bt")) { createBackgroundContextForCompiling = true; createBackgroundThreadsForCompiling = true; }
unsigned int numLoadThreads = 1;
while (arguments.read("--load-threads",numLoadThreads)) { }
osg::ref_ptr<MasterOperation> masterOperation;
std::string masterFilename;
@@ -772,12 +957,31 @@ int main(int argc, char** argv)
// start operation thread if a master file has been used.
osg::ref_ptr<osg::OperationThread> operationThread;
osg::ref_ptr<osg::OperationThread> masterOperationThread;
typedef std::list< osg::ref_ptr<osg::OperationThread> > OperationThreadList;
OperationThreadList generalThreadList;
if (masterOperation.valid())
{
operationThread = new osg::OperationThread;
operationThread->startThread();
operationThread->add(masterOperation.get());
masterOperationThread = new osg::OperationThread;
masterOperationThread->startThread();
masterOperationThread->add(masterOperation.get());
// if (numLoadThreads>0)
{
osg::ref_ptr<osg::OperationQueue> operationQueue = new osg::OperationQueue;
masterOperation->setOperationQueue(operationQueue.get());
for(unsigned int i=0; i<numLoadThreads; ++i)
{
osg::ref_ptr<osg::OperationThread> thread = new osg::OperationThread;
thread->setOperationQueue(operationQueue.get());
thread->startThread();
generalThreadList.push_back(thread);
}
}
}
viewer.setThreadingModel(osgViewer::Viewer::SingleThreaded);