From Glenn Waldron, "Attached are modifications to ZipArchive to make it safe for mutli-threaded access.

Here's a summary:

* Uses a separate ZIP file handle per thread
* Maintains a single shared (read-only) index, created the first time through
* Stress-tested with the DatabasePager using 24 threads under osgEarth

I also updated the member variables to use OSG's leading-underscore convention."
This commit is contained in:
Robert Osfield
2012-02-29 10:22:18 +00:00
parent 33e7857af9
commit 7bcf5e3b36
2 changed files with 169 additions and 100 deletions

View File

@@ -37,26 +37,35 @@
#endif
ZipArchive::ZipArchive()
: mZipLoaded(false)
, mZipRecord(NULL)
ZipArchive::ZipArchive() :
_zipLoaded( false )
{
}
ZipArchive::~ZipArchive()
{
}
/** close the archive.*/
/** close the archive (on all threads) */
void ZipArchive::close()
{
if(mZipLoaded)
if ( _zipLoaded )
{
CloseZip(mZipRecord);
OpenThreads::ScopedLock<OpenThreads::Mutex> exclusive(_zipMutex);
if ( _zipLoaded )
{
// close the file (on one thread since it's a shared file)
const PerThreadData& data = getDataNoLock();
CloseZip( data._zipHandle );
mZipRecord = NULL;
mZipIndex.clear();
// clear out the file handles
_perThreadData.clear();
// clear out the index.
_zipIndex.clear();
_zipLoaded = false;
}
}
}
@@ -75,9 +84,9 @@ std::string ZipArchive::getMasterFileName() const
std::string ZipArchive::getArchiveFileName() const
{
std::string result;
if(mZipLoaded)
if( _zipLoaded )
{
result = mMainRecord.name;
result = _mainRecord.name;
}
return result;
}
@@ -85,11 +94,11 @@ std::string ZipArchive::getArchiveFileName() const
/** Get the full list of file names available in the archive.*/
bool ZipArchive::getFileNames(osgDB::Archive::FileNameList& fileNameList) const
{
if(mZipLoaded)
if(_zipLoaded)
{
ZipEntryMap::const_iterator iter = mZipIndex.begin();
ZipEntryMap::const_iterator iter = _zipIndex.begin();
for(;iter != mZipIndex.end(); ++iter)
for(;iter != _zipIndex.end(); ++iter)
{
fileNameList.push_back((*iter).first);
}
@@ -102,68 +111,72 @@ bool ZipArchive::getFileNames(osgDB::Archive::FileNameList& fileNameList) const
}
}
bool ZipArchive::open(const std::string& file, ArchiveStatus status, const osgDB::ReaderWriter::Options* options)
{
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string fileName = osgDB::findDataFile( file, options );
if (fileName.empty()) return osgDB::ReaderWriter::ReadResult::FILE_NOT_FOUND;
std::string password = ReadPassword(options);
HZIP hz = OpenZip(fileName.c_str(), password.c_str());
if(hz != NULL)
if ( !_zipLoaded )
{
IndexZipFiles(hz);
return true;
}
else
{
return false;
// exclusive lock when we open for the first time:
OpenThreads::ScopedLock<OpenThreads::Mutex> exclusiveLock( _zipMutex );
if ( !_zipLoaded ) // double-check avoids race condition
{
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
// save the filename + password so other threads can open the file
_filename = osgDB::findDataFile( file, options );
if (_filename.empty()) return osgDB::ReaderWriter::ReadResult::FILE_NOT_FOUND;
_password = ReadPassword(options);
// open the zip file in this thread:
const PerThreadData& data = getDataNoLock();
// establish a shared (read-only) index:
if ( data._zipHandle != NULL )
{
IndexZipFiles( data._zipHandle );
_zipLoaded = true;
}
}
}
return _zipLoaded;
}
bool ZipArchive::open(std::istream& fin, const osgDB::ReaderWriter::Options* options)
{
osgDB::ReaderWriter::ReadResult result = osgDB::ReaderWriter::ReadResult(osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED);
if (fin.fail()) return false;
fin.seekg(0,std::ios_base::end);
unsigned int ulzipFileLength = fin.tellg();
fin.seekg(0,std::ios_base::beg);
// Need to decouple stream content as I can't see any other way to get access to a byte array
// containing the content in the stream. One saving grace here is that we know that the
// stream has already been fully read in, hence no need to concern ourselves with asynchronous
// reads.
char * pMemBuffer = new char [ulzipFileLength];
if (!pMemBuffer) return false;
std::string password = ReadPassword(options);
HZIP hz = NULL;
fin.read(pMemBuffer, ulzipFileLength);
if ((unsigned int)fin.gcount() == ulzipFileLength)
if ( !_zipLoaded )
{
hz = OpenZip(pMemBuffer, ulzipFileLength, password.c_str());
}
delete [] pMemBuffer;
// exclusive lock when we open for the first time:
OpenThreads::ScopedLock<OpenThreads::Mutex> exclusive(_zipMutex);
if(hz != NULL)
{
IndexZipFiles(hz);
return true;
}
else
{
return false;
if ( !_zipLoaded ) // double-check avoids race condition
{
osgDB::ReaderWriter::ReadResult result = osgDB::ReaderWriter::ReadResult(osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED);
if (fin.fail()) return false;
// read the stream into a memory buffer that we'll keep around for any other
// threads that want to open the file
std::stringstream buf;
buf << fin.rdbuf();
_membuffer = buf.str();
_password = ReadPassword(options);
// open on this thread:
const PerThreadData& data = getDataNoLock();
if ( data._zipHandle != NULL )
{
IndexZipFiles( data._zipHandle );
_zipLoaded = true;
}
}
}
return _zipLoaded;
}
osgDB::ReaderWriter::ReadResult ZipArchive::readObject(const std::string& file, const osgDB::ReaderWriter::Options* options) const
@@ -171,7 +184,7 @@ osgDB::ReaderWriter::ReadResult ZipArchive::readObject(const std::string& file,
osgDB::ReaderWriter::ReadResult rresult = osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!mZipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
if (!_zipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
const ZIPENTRY* ze = GetZipEntry(file);
if(ze != NULL)
@@ -204,7 +217,7 @@ osgDB::ReaderWriter::ReadResult ZipArchive::readImage(const std::string& file,co
osgDB::ReaderWriter::ReadResult rresult = osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!mZipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
if (!_zipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
const ZIPENTRY* ze = GetZipEntry(file);
if(ze != NULL)
@@ -237,7 +250,7 @@ osgDB::ReaderWriter::ReadResult ZipArchive::readHeightField(const std::string& f
osgDB::ReaderWriter::ReadResult rresult = osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!mZipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
if (!_zipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
const ZIPENTRY* ze = GetZipEntry(file);
if(ze != NULL)
@@ -270,7 +283,7 @@ osgDB::ReaderWriter::ReadResult ZipArchive::readNode(const std::string& file,con
osgDB::ReaderWriter::ReadResult rresult = osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!mZipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
if (!_zipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
const ZIPENTRY* ze = GetZipEntry(file);
if(ze != NULL)
@@ -304,7 +317,7 @@ osgDB::ReaderWriter::ReadResult ZipArchive::readShader(const std::string& file,c
osgDB::ReaderWriter::ReadResult rresult = osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
std::string ext = osgDB::getLowerCaseFileExtension(file);
if (!mZipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
if (!_zipLoaded || !acceptsExtension(ext)) return osgDB::ReaderWriter::ReadResult::FILE_NOT_HANDLED;
const ZIPENTRY* ze = GetZipEntry(file);
if(ze != NULL)
@@ -365,21 +378,26 @@ osgDB::ReaderWriter* ZipArchive::ReadFromZipEntry(const ZIPENTRY* ze, const osgD
char* ibuf = new (std::nothrow) char[ze->unc_size];
if (ibuf)
{
ZRESULT result = UnzipItem(mZipRecord, ze->index, ibuf, ze->unc_size);
bool unzipSuccesful = CheckZipErrorCode(result);
if(unzipSuccesful)
// fetch the handle for the current thread:
const PerThreadData& data = getData();
if ( data._zipHandle != NULL )
{
buffer.write(ibuf,ze->unc_size);
}
ZRESULT result = UnzipItem(data._zipHandle, ze->index, ibuf, ze->unc_size);
bool unzipSuccesful = CheckZipErrorCode(result);
if(unzipSuccesful)
{
buffer.write(ibuf,ze->unc_size);
}
delete[] ibuf;
delete[] ibuf;
std::string file_ext = osgDB::getFileExtension(ze->name);
std::string file_ext = osgDB::getFileExtension(ze->name);
osgDB::ReaderWriter* rw = osgDB::Registry::instance()->getReaderWriterForExtension(file_ext);
if (rw != NULL)
{
return rw;
osgDB::ReaderWriter* rw = osgDB::Registry::instance()->getReaderWriterForExtension(file_ext);
if (rw != NULL)
{
return rw;
}
}
}
else
@@ -422,12 +440,12 @@ void CleanupFileString(std::string& strFileOrDir)
void ZipArchive::IndexZipFiles(HZIP hz)
{
if(hz != NULL && !mZipLoaded)
if(hz != NULL && !_zipLoaded)
{
mZipRecord = hz;
//mZipRecord = hz;
GetZipItem(hz,-1,&mMainRecord);
int numitems = mMainRecord.index;
GetZipItem(hz, -1, &_mainRecord);
int numitems = _mainRecord.index;
// Now loop through each file in zip
for (int i = 0; i < numitems; i++)
@@ -441,12 +459,9 @@ void ZipArchive::IndexZipFiles(HZIP hz)
if(!name.empty())
{
mZipIndex.insert(ZipEntryMapping(name, ze));
_zipIndex.insert(ZipEntryMapping(name, ze));
}
}
mZipLoaded = true;
}
}
@@ -456,8 +471,8 @@ ZIPENTRY* ZipArchive::GetZipEntry(const std::string& filename)
std::string fileToLoad = filename;
CleanupFileString(fileToLoad);
ZipEntryMap::iterator iter = mZipIndex.find(fileToLoad);
if(iter != mZipIndex.end())
ZipEntryMap::iterator iter = _zipIndex.find(fileToLoad);
if(iter != _zipIndex.end())
{
ze = (*iter).second;
}
@@ -471,8 +486,8 @@ const ZIPENTRY* ZipArchive::GetZipEntry(const std::string& filename) const
std::string fileToLoad = filename;
CleanupFileString(fileToLoad);
ZipEntryMap::const_iterator iter = mZipIndex.find(fileToLoad);
if(iter != mZipIndex.end())
ZipEntryMap::const_iterator iter = _zipIndex.find(fileToLoad);
if(iter != _zipIndex.end())
{
ze = (*iter).second;
}
@@ -506,8 +521,8 @@ osgDB::DirectoryContents ZipArchive::getDirectoryContents(const std::string& dir
{
osgDB::DirectoryContents dirContents;
ZipEntryMap::const_iterator iter = mZipIndex.begin();
ZipEntryMap::const_iterator iterEnd = mZipIndex.end();
ZipEntryMap::const_iterator iter = _zipIndex.begin();
ZipEntryMap::const_iterator iterEnd = _zipIndex.end();
for(; iter != iterEnd; ++iter)
{
@@ -599,3 +614,45 @@ bool ZipArchive::CheckZipErrorCode(ZRESULT result) const
}
}
const ZipArchive::PerThreadData&
ZipArchive::getData() const
{
OpenThreads::ScopedLock<OpenThreads::Mutex> exclusive( const_cast<ZipArchive*>(this)->_zipMutex );
return getDataNoLock();
}
const ZipArchive::PerThreadData&
ZipArchive::getDataNoLock() const
{
// get/create data for the currently running thread:
OpenThreads::Thread* current = OpenThreads::Thread::CurrentThread();
PerThreadDataMap::const_iterator i = _perThreadData.find( current );
if ( i == _perThreadData.end() || i->second._zipHandle == NULL )
{
// cache pattern: cast to const for caching purposes
ZipArchive* ncThis = const_cast<ZipArchive*>(this);
// data does not already exist, so open the ZIP with a handle exclusively for this thread:
PerThreadData& data = ncThis->_perThreadData[current];
if ( !_filename.empty() )
{
data._zipHandle = OpenZip( _filename.c_str(), _password.c_str() );
}
else if ( _membuffer.empty() )
{
data._zipHandle = OpenZip( (void*)_membuffer.c_str(), _membuffer.length(), _password.c_str() );
}
else
{
data._zipHandle = NULL;
}
return data;
}
else
{
return i->second;
}
}

View File

@@ -5,6 +5,7 @@
#include <osgDB/FileUtils>
#include <osgDB/Archive>
#include <OpenThreads/Mutex>
#include "unzip.h"
@@ -74,15 +75,26 @@ class ZipArchive : public osgDB::Archive
private:
bool mZipLoaded;
HZIP mZipRecord;
ZIPENTRY mMainRecord;
typedef std::pair<std::string, ZIPENTRY*> ZipEntryMapping;
typedef std::map<std::string, ZIPENTRY*> ZipEntryMap;
ZipEntryMap mZipIndex;
std::string _filename, _password, _membuffer;
OpenThreads::Mutex _zipMutex;
bool _zipLoaded;
ZipEntryMap _zipIndex;
ZIPENTRY _mainRecord;
struct PerThreadData {
HZIP _zipHandle;
};
typedef std::map<OpenThreads::Thread*, PerThreadData> PerThreadDataMap;
PerThreadDataMap _perThreadData;
const PerThreadData& getData() const;
const PerThreadData& getDataNoLock() const;
};