Refactored the way that the RequestQueue's are pruned and highest prioty items taken from them so the operation is

now O(n) rather than O(nlogn) where n is the number of requests.  The refactoring also cleans up the access of the
request lists so that the code is more readable/maintainable.
This commit is contained in:
Robert Osfield
2010-03-24 14:27:00 +00:00
parent 627a4b78c9
commit 1319e270f6
2 changed files with 145 additions and 166 deletions

View File

@@ -404,19 +404,33 @@ class OSGDB_EXPORT DatabasePager : public osg::NodeVisitor::DatabaseRequestHandl
bool isRequestCurrent (int frameNumber) const
{
return frameNumber - _frameNumberLastRequest <= 1;
return _valid && (frameNumber - _frameNumberLastRequest <= 1);
}
};
struct RequestQueue : public osg::Referenced
{
public:
typedef std::vector< osg::ref_ptr<DatabaseRequest> > RequestList;
void sort();
RequestQueue(DatabasePager* pager);
RequestList _requestList;
OpenThreads::Mutex _requestMutex;
void add(DatabaseRequest* databaseRequest);
void takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest);
/// prune all the old requests and then return true if requestList left empty
bool pruneOldRequestsAndCheckIfEmpty();
virtual void updateBlock() {}
void clear();
typedef std::list< osg::ref_ptr<DatabaseRequest> > RequestList;
DatabasePager* _pager;
RequestList _requestList;
OpenThreads::Mutex _requestMutex;
int _frameNumberLastPruned;
protected:
virtual ~RequestQueue();
@@ -434,23 +448,12 @@ class OSGDB_EXPORT DatabasePager : public osg::NodeVisitor::DatabaseRequestHandl
void release() { _block->release(); }
void updateBlock()
{
_block->set((!_requestList.empty() || !_childrenToDeleteList.empty()) &&
!_pager->_databasePagerThreadPaused);
}
virtual void updateBlock();
void clear();
void add(DatabaseRequest* databaseRequest);
void takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest);
osg::ref_ptr<osg::RefBlock> _block;
DatabasePager* _pager;
std::string _name;
int _frameNumberLastPruned;
OpenThreads::Mutex _childrenToDeleteListMutex;
ObjectList _childrenToDeleteList;

View File

@@ -281,14 +281,10 @@ struct DatabasePager::SortFileRequestFunctor
//
// RequestQueue
//
void DatabasePager::DatabaseRequest::invalidate()
DatabasePager::RequestQueue::RequestQueue(DatabasePager* pager):
_pager(pager),
_frameNumberLastPruned(-1)
{
OSG_INFO<<" DatabasePager::DatabaseRequest::invalidate()."<<std::endl;
_valid = false;
_groupForAddingLoadedSubgraph = 0;
_loadedModel = 0;
_dataToCompileMap.clear();
_requestQueue = 0;
}
DatabasePager::RequestQueue::~RequestQueue()
@@ -302,25 +298,49 @@ DatabasePager::RequestQueue::~RequestQueue()
}
}
void DatabasePager::RequestQueue::sort()
void DatabasePager::DatabaseRequest::invalidate()
{
std::sort(_requestList.begin(),_requestList.end(),SortFileRequestFunctor());
OSG_INFO<<" DatabasePager::DatabaseRequest::invalidate()."<<std::endl;
_valid = false;
_groupForAddingLoadedSubgraph = 0;
_loadedModel = 0;
_dataToCompileMap.clear();
_requestQueue = 0;
}
bool DatabasePager::RequestQueue::pruneOldRequestsAndCheckIfEmpty()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
if (_frameNumberLastPruned != _pager->_frameNumber)
{
for(RequestQueue::RequestList::iterator citr = _requestList.begin();
citr != _requestList.end();
)
{
if ((*citr)->isRequestCurrent(_pager->_frameNumber))
{
++citr;
}
else
{
(*citr)->invalidate();
citr = _requestList.erase(citr);
OSG_INFO<<"DatabasePager::RequestQueue::pruneOldRequestsAndCheckIfEmpty(): Pruning "<<(*citr)<<std::endl;
}
}
_frameNumberLastPruned = _pager->_frameNumber;
updateBlock();
}
return _requestList.empty();
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// ReadQueue
//
DatabasePager::ReadQueue::ReadQueue(DatabasePager* pager, const std::string& name):
_pager(pager),
_name(name),
_frameNumberLastPruned(-1)
{
_block = new osg::RefBlock;
}
void DatabasePager::ReadQueue::clear()
void DatabasePager::RequestQueue::clear()
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
@@ -333,59 +353,90 @@ void DatabasePager::ReadQueue::clear()
_requestList.clear();
_frameNumberLastPruned = _pager->_frameNumber;
updateBlock();
}
void DatabasePager::ReadQueue::add(DatabasePager::DatabaseRequest* databaseRequest)
void DatabasePager::RequestQueue::add(DatabasePager::DatabaseRequest* databaseRequest)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
_requestList.push_back(databaseRequest);
databaseRequest->_requestQueue = this;
updateBlock();
}
void DatabasePager::ReadQueue::takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest)
void DatabasePager::RequestQueue::takeFirst(osg::ref_ptr<DatabaseRequest>& databaseRequest)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_requestMutex);
if (!_requestList.empty())
{
sort();
DatabasePager::SortFileRequestFunctor highPriority;
if (_frameNumberLastPruned != _pager->_frameNumber)
RequestQueue::RequestList::iterator selected_itr = _requestList.end();
for(RequestQueue::RequestList::iterator citr = _requestList.begin();
citr != _requestList.end();
)
{
// Prune all the old entries.
RequestQueue::RequestList::iterator tooOld
= std::find_if(_requestList.begin(),
_requestList.end(),
refPtrAdapt(std::not1(std::bind2nd(std::mem_fun(&DatabaseRequest::isRequestCurrent),
_pager->_frameNumber))));
if ((*citr)->isRequestCurrent(_pager->_frameNumber))
{
if (selected_itr==_requestList.end() || highPriority(*citr, *selected_itr))
{
selected_itr = citr;
}
// delete the entries
for(RequestQueue::RequestList::iterator citr = tooOld;
citr != _requestList.end();
++citr)
++citr;
}
else
{
(*citr)->invalidate();
citr = _requestList.erase(citr);
OSG_INFO<<"DatabasePager::RequestQueue::takeFirst(): Pruning "<<(*citr)<<std::endl;
}
_requestList.erase(tooOld, _requestList.end());
_frameNumberLastPruned = _pager->_frameNumber;
}
if (!_requestList.empty())
_frameNumberLastPruned = _pager->_frameNumber;
if (selected_itr != _requestList.end())
{
databaseRequest = _requestList.front();
databaseRequest = *selected_itr;
databaseRequest->_requestQueue = 0;
_requestList.erase(_requestList.begin());
_requestList.erase(selected_itr);
OSG_INFO<<" DatabasePager::RequestQueue::takeFirst() Found DatabaseRequest size()="<<_requestList.size()<<std::endl;
}
else
{
OSG_INFO<<" DatabasePager::RequestQueue::takeFirst() No suitable DatabaseRequest found size()="<<_requestList.size()<<std::endl;
}
updateBlock();
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// ReadQueue
//
DatabasePager::ReadQueue::ReadQueue(DatabasePager* pager, const std::string& name):
RequestQueue(pager),
_name(name)
{
_block = new osg::RefBlock;
}
void DatabasePager::ReadQueue::updateBlock()
{
_block->set((!_requestList.empty() || !_childrenToDeleteList.empty()) &&
!_pager->_databasePagerThreadPaused);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//
// DatabaseThread
@@ -711,47 +762,20 @@ void DatabasePager::DatabaseThread::run()
// dataToCompile or dataToMerge lists.
if (loadedObjectsNeedToBeCompiled)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileList->_requestMutex);
databaseRequest->_requestQueue = _pager->_dataToCompileList.get();
_pager->_dataToCompileList->_requestList.push_back(databaseRequest);
_pager->_dataToCompileList->add(databaseRequest.get());
}
else
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToMergeList->_requestMutex);
databaseRequest->_requestQueue = _pager->_dataToMergeList.get();
_pager->_dataToMergeList->_requestList.push_back(databaseRequest);
_pager->_dataToMergeList->add(databaseRequest.get());
}
}
}
// Prepare and prune the to-be-compiled list here in
// the pager thread rather than in the draw or
// graphics context thread(s).
if (loadedObjectsNeedToBeCompiled)
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_pager->_dataToCompileList->_requestMutex);
_pager->_dataToCompileList->sort();
// Prune all the old entries.
RequestQueue::RequestList::iterator tooOld
= std::find_if(_pager->_dataToCompileList->_requestList.begin(),
_pager->_dataToCompileList->_requestList.end(),
refPtrAdapt(std::not1(std::bind2nd(std::mem_fun(&DatabaseRequest::isRequestCurrent),
_pager->_frameNumber))));
// This is the database thread, so just delete
for(RequestQueue::RequestList::iterator citr = tooOld;
citr != _pager->_dataToCompileList->_requestList.end();
++citr)
{
OSG_INFO<<_name<<": pruning from compile list"<<std::endl;
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_pager->_dataToCompileList->_requestList.erase(tooOld, _pager->_dataToCompileList->_requestList.end());
loadedObjectsNeedToBeCompiled = !_pager->_dataToCompileList->_requestList.empty();
loadedObjectsNeedToBeCompiled = ! _pager->_dataToCompileList->pruneOldRequestsAndCheckIfEmpty();
}
if (loadedObjectsNeedToBeCompiled && !_pager->_activeGraphicsContexts.empty())
@@ -971,8 +995,8 @@ DatabasePager::DatabasePager()
_fileRequestQueue = new ReadQueue(this,"fileRequestQueue");
_httpRequestQueue = new ReadQueue(this,"httpRequestQueue");
_dataToCompileList = new RequestQueue;
_dataToMergeList = new RequestQueue;
_dataToCompileList = new RequestQueue(this);
_dataToMergeList = new RequestQueue(this);
setUpThreads(
osg::DisplaySettings::instance()->getNumOfDatabaseThreadsHint(),
@@ -1019,8 +1043,8 @@ DatabasePager::DatabasePager(const DatabasePager& rhs)
_fileRequestQueue = new ReadQueue(this,"fileRequestQueue");
_httpRequestQueue = new ReadQueue(this,"httpRequestQueue");
_dataToCompileList = new RequestQueue;
_dataToMergeList = new RequestQueue;
_dataToCompileList = new RequestQueue(this);
_dataToMergeList = new RequestQueue(this);
for(DatabaseThreadList::const_iterator dt_itr = rhs._databaseThreads.begin();
dt_itr != rhs._databaseThreads.end();
@@ -1173,30 +1197,9 @@ void DatabasePager::clear()
{
_fileRequestQueue->clear();
_httpRequestQueue->clear();
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
for(RequestQueue::RequestList::iterator citr = _dataToCompileList->_requestList.begin();
citr != _dataToCompileList->_requestList.end();
++citr)
{
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_dataToCompileList->_requestList.clear();
}
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
for(RequestQueue::RequestList::iterator citr = _dataToMergeList->_requestList.begin();
citr != _dataToMergeList->_requestList.end();
++citr)
{
(*citr)->_loadedModel = 0;
(*citr)->_requestQueue = 0;
}
_dataToMergeList->_requestList.clear();
}
_dataToCompileList->clear();
_dataToMergeList->clear();
// note, no need to use a mutex as the list is only accessed from the update thread.
_activePagedLODList.clear();
@@ -1281,9 +1284,9 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
if (databaseRequestRef.valid())
{
DatabaseRequest* databaseRequest = dynamic_cast<DatabaseRequest*>(databaseRequestRef.get());
if (databaseRequest && databaseRequest->valid())
if (databaseRequest && !(databaseRequest->valid()))
{
OSG_NOTICE<<"DatabaseRequest has been previously invalidated whilst still attached to scene graph."<<std::endl;
OSG_INFO<<"DatabaseRequest has been previously invalidated whilst still attached to scene graph."<<std::endl;
databaseRequest = 0;
}
@@ -1296,6 +1299,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(requestQueue->_requestMutex);
databaseRequest->_valid = true;
databaseRequest->_frameNumberLastRequest = frameNumber;
databaseRequest->_timestampLastRequest = timestamp;
databaseRequest->_priorityLastRequest = priority;
@@ -1303,6 +1307,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
}
else
{
databaseRequest->_valid = true;
databaseRequest->_frameNumberLastRequest = frameNumber;
databaseRequest->_timestampLastRequest = timestamp;
databaseRequest->_priorityLastRequest = priority;
@@ -1315,6 +1320,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
{
OSG_INFO<<"DatabasePager::requestNodeFile("<<fileName<<") orphaned, resubmitting."<<std::endl;
databaseRequest->_valid = true;
databaseRequest->_frameNumberFirstRequest = frameNumber;
databaseRequest->_timestampFirstRequest = timestamp;
databaseRequest->_priorityFirstRequest = priority;
@@ -1344,6 +1350,7 @@ void DatabasePager::requestNodeFile(const std::string& fileName,osg::Group* grou
databaseRequestRef = databaseRequest.get();
databaseRequest->_valid = true;
databaseRequest->_fileName = fileName;
databaseRequest->_frameNumberFirstRequest = frameNumber;
databaseRequest->_timestampFirstRequest = timestamp;
@@ -1421,13 +1428,10 @@ void DatabasePager::setDatabasePagerThreadPause(bool pause)
bool DatabasePager::requiresUpdateSceneGraph() const
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
if (!_dataToMergeList->_requestList.empty()) return true;
}
return false;
#if 0
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
#endif
return !(_dataToMergeList->_requestList.empty());
}
// #define UPDATE_TIMING 1
@@ -1457,7 +1461,7 @@ void DatabasePager::updateSceneGraph(const osg::FrameStamp& frameStamp)
#ifdef UPDATE_TIMING
double elapsedTime = timer.elapsedTime_m();
if (elapsedTime>0.2)
if (elapsedTime>1.0)
{
OSG_NOTICE<<"DatabasePager::updateSceneGraph() total time = "<<elapsedTime<<"ms"<<std::endl;
OSG_NOTICE<<" timeFor_removeExpiredSubgraphs = "<<timeFor_removeExpiredSubgraphs<<"ms"<<std::endl;
@@ -1834,7 +1838,9 @@ void DatabasePager::registerPagedLODs(osg::Node* subgraph, int frameNumber)
bool DatabasePager::requiresCompileGLObjects() const
{
#if 0
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
#endif
return !_dataToCompileList->_requestList.empty();
}
@@ -1904,20 +1910,13 @@ void DatabasePager::compileGLObjects(osg::State& state, double& availableTime)
double estimatedDrawableDuration = 0.0001;
osg::ref_ptr<DatabaseRequest> databaseRequest;
// get the first compilable entry.
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
// advance to the next entry to compile if one is available.
databaseRequest = _dataToCompileList->_requestList.empty() ? 0 : _dataToCompileList->_requestList.front().get();
};
_dataToCompileList->takeFirst(databaseRequest);
unsigned int numObjectsCompiled = 0;
// while there are valid databaseRequest's in the to compile list and there is
// sufficient time left compile each databaseRequest's stateset and drawables.
while (databaseRequest.valid() && (compileAll || (elapsedTime<availableTime && numObjectsCompiled<_maximumNumOfObjectsToCompilePerFrame)) )
while (databaseRequest.valid() && databaseRequest->valid() && (compileAll || (elapsedTime<availableTime && numObjectsCompiled<_maximumNumOfObjectsToCompilePerFrame)) )
{
DataToCompileMap& dcm = databaseRequest->_dataToCompileMap;
DataToCompile& dtc = dcm[state.getContextID()];
@@ -2027,40 +2026,17 @@ void DatabasePager::compileGLObjects(osg::State& state, double& availableTime)
{
// we've compiled all of the current databaseRequest so we can now pop it off the
// to compile list and place it on the merge list.
// OSG_NOTICE<<"All compiled"<<std::endl;
OSG_INFO<<"All compiled"<<std::endl;
_dataToMergeList->add(databaseRequest.get());
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToCompileList->_requestMutex);
// The request might have been removed from the
// _dataToCompile list by another graphics thread, in
// which case it's already on the merge list, or by
// the pager, which means that the request has
// expired. Also, the compile list might have been
// shuffled by the pager, so the current request is
// not guaranteed to still be at the beginning of the
// list.
RequestQueue::RequestList::iterator requestIter
= std::find(_dataToCompileList->_requestList.begin(), _dataToCompileList->_requestList.end(),
databaseRequest);
if (requestIter != _dataToCompileList->_requestList.end())
{
{
OpenThreads::ScopedLock<OpenThreads::Mutex> lock(_dataToMergeList->_requestMutex);
databaseRequest->_requestQueue = _dataToMergeList.get();
_dataToMergeList->_requestList.push_back(databaseRequest);
}
_dataToCompileList->_requestList.erase(requestIter);
}
if (!_dataToCompileList->_requestList.empty()) databaseRequest = _dataToCompileList->_requestList.front();
else databaseRequest = 0;
databaseRequest = 0;
_dataToCompileList->takeFirst(databaseRequest);
}
else
else
{
// OSG_NOTICE<<"Not all compiled"<<std::endl;
OSG_INFO<<"Not all compiled"<<std::endl;
_dataToCompileList->add(databaseRequest.get());
databaseRequest = 0;
}