Parallel sync of items.

When using built-in sync code, separate items into distinct slots.
Slots process items sequentially, but each slot works in parallel (using 
a single shared HTTP engine). This allows tiles to be synced in parallel
with airports/shared models data, greatly increasing responsiveness
to get tiles synced on initial launch.
This commit is contained in:
James Turner
2013-09-25 16:31:10 +01:00
parent 3783ae2234
commit db98c7440e

View File

@@ -63,7 +63,7 @@
#include <simgear/props/props_io.hxx>
#include <simgear/io/HTTPClient.hxx>
#include <simgear/io/SVNRepository.hxx>
#include <simgear/structure/exception.hxx>
static const bool svn_built_in_available = true;
@@ -108,9 +108,9 @@ bool hasWhitespace(string path)
}
///////////////////////////////////////////////////////////////////////////////
// WaitingTile ////////////////////////////////////////////////////////////////
// WaitingSyncItem ////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
class WaitingSyncItem
class WaitingSyncItem
{
public:
enum Type
@@ -121,21 +121,65 @@ public:
SharedModels
};
WaitingSyncItem() :
_dir(),
_type(Stop),
_refreshScenery(false)
{
}
WaitingSyncItem(string dir, Type ty) :
_dir(dir),
_type(ty),
_refreshScenery(false)
{}
bool setRefresh()
void setRefresh()
{ _refreshScenery = true; }
const string _dir;
const Type _type;
string _dir;
Type _type;
bool _refreshScenery;
};
///////////////////////////////////////////////////////////////////////////////
/**
* @brief SyncSlot encapsulates a queue of sync items we will fetch
* serially. Multiple slots exist to sync different types of item in
* parallel.
*/
class SyncSlot
{
public:
WaitingSyncItem currentItem;
bool isNewDirectory;
std::queue<WaitingSyncItem> queue;
std::auto_ptr<SVNRepository> repository;
SGTimeStamp stamp;
};
static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync
static const int SYNC_SLOT_SHARED_DATA = 1; /// shared Models and Airport data
static const int NUM_SYNC_SLOTS = 2;
/**
* @brief translate a sync item type into one of the available slots.
* This provides the scheduling / balancing / prioritising between slots.
*/
static unsigned int syncSlotForType(WaitingSyncItem::Type ty)
{
switch (ty) {
case WaitingSyncItem::Tile: return SYNC_SLOT_TILES;
case WaitingSyncItem::SharedModels:
case WaitingSyncItem::AirportData:
return SYNC_SLOT_SHARED_DATA;
default:
return SYNC_SLOT_SHARED_DATA;
}
}
///////////////////////////////////////////////////////////////////////////////
// SGTerraSync::SvnThread /////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
@@ -178,21 +222,27 @@ public:
volatile int _cache_hits;
private:
virtual void run();
bool syncTree(const char* dir, bool& isNewDirectory);
bool syncTreeExternal(const char* dir);
// external model run and helpers
void runExternal();
void syncPathExternal(const WaitingSyncItem& next);
bool runExternalSyncCommand(const char* dir);
// internal mode run and helpers
void runInternal();
void updateSyncSlot(SyncSlot& slot);
// commond helpers between both internal and external models
bool isPathCached(const WaitingSyncItem& next) const;
void syncPath(const WaitingSyncItem& next);
void initCompletedTilesPersistentCache();
void writeCompletedTilesPersistentCache() const;
void updated(const WaitingSyncItem& item, bool isNewDirectory);
void fail(const WaitingSyncItem& failedItem);
void initCompletedTilesPersistentCache();
void writeCompletedTilesPersistentCache() const;
bool syncTreeInternal(const char* dir);
bool _use_built_in;
HTTP::Client _http;
std::auto_ptr<SVNRepository> _repository;
bool _use_built_in;
HTTP::Client _http;
SyncSlot _syncSlots[NUM_SYNC_SLOTS];
volatile bool _is_dirty;
volatile bool _stop;
@@ -330,66 +380,7 @@ bool SGTerraSync::SvnThread::start()
return true;
}
// sync one directory tree
bool SGTerraSync::SvnThread::syncTree(const char* dir, bool& isNewDirectory)
{
int rc;
SGPath path( _local_dir );
path.append( dir );
isNewDirectory = !path.exists();
if (isNewDirectory)
{
rc = path.create_dir( 0755 );
if (rc)
{
SG_LOG(SG_TERRAIN,SG_ALERT,
"Cannot create directory '" << dir << "', return code = " << rc );
return false;
}
}
if (_use_built_in)
return syncTreeInternal(dir);
else
return syncTreeExternal(dir);
}
bool SGTerraSync::SvnThread::syncTreeInternal(const char* dir)
{
ostringstream command;
command << _svn_server << "/" << dir;
SGPath path(_local_dir);
path.append(dir);
_repository.reset(new SVNRepository(path, &_http));
_repository->setBaseUrl(command.str());
SGTimeStamp st;
st.stamp();
SG_LOG(SG_IO, SG_DEBUG, "terrasync: will sync " << command.str());
_repository->update();
bool result = true;
while (!_stop && _repository->isDoingSync()) {
_http.update(100);
}
if (_repository->failure() == SVNRepository::SVN_ERROR_NOT_FOUND) {
// this is fine, but maybe we should use a different return code
// in the future to higher layers can distuinguish this case
} else if (_repository->failure() != SVNRepository::SVN_NO_ERROR) {
result = false;
} else {
SG_LOG(SG_IO, SG_DEBUG, "sync of " << command.str() << " finished ("
<< st.elapsedMSec() << " msec");
}
_repository.reset();
return result;
}
bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir)
bool SGTerraSync::SvnThread::runExternalSyncCommand(const char* dir)
{
ostringstream buf;
SGPath localPath( _local_dir );
@@ -446,9 +437,22 @@ bool SGTerraSync::SvnThread::syncTreeExternal(const char* dir)
void SGTerraSync::SvnThread::run()
{
_active = true;
initCompletedTilesPersistentCache();
if (_use_built_in) {
runInternal();
} else {
runExternal();
}
_active = false;
_running = false;
_is_dirty = true;
}
void SGTerraSync::SvnThread::runExternal()
{
while (!_stop)
{
WaitingSyncItem next = waitingTiles.pop_front();
@@ -462,7 +466,7 @@ void SGTerraSync::SvnThread::run()
continue;
}
syncPath(next);
syncPathExternal(next);
if ((_allowed_errors >= 0)&&
(_consecutive_errors >= _allowed_errors))
@@ -470,11 +474,114 @@ void SGTerraSync::SvnThread::run()
_stalled = true;
_stop = true;
}
}
} // of thread running loop
}
_active = false;
_running = false;
_is_dirty = true;
void SGTerraSync::SvnThread::syncPathExternal(const WaitingSyncItem& next)
{
_busy = true;
SGPath path( _local_dir );
path.append( next._dir );
bool isNewDirectory = !path.exists();
try {
if (isNewDirectory) {
int rc = path.create_dir( 0755 );
if (rc) {
SG_LOG(SG_TERRAIN,SG_ALERT,
"Cannot create directory '" << path << "', return code = " << rc );
throw sg_exception("Cannot create directory for terrasync", path.str());
}
}
if (!runExternalSyncCommand(next._dir.c_str())) {
throw sg_exception("Running external sync command failed");
}
} catch (sg_exception& e) {
fail(next);
return;
}
updated(next, isNewDirectory);
}
void SGTerraSync::SvnThread::updateSyncSlot(SyncSlot &slot)
{
if (slot.repository.get()) {
if (slot.repository->isDoingSync()) {
return; // easy, still working
}
// check result
SVNRepository::ResultCode res = slot.repository->failure();
if (res == SVNRepository::SVN_ERROR_NOT_FOUND) {
// this is fine, but maybe we should use a different return code
// in the future to higher layers can distinguish this case
} else if (res != SVNRepository::SVN_NO_ERROR) {
fail(slot.currentItem);
} else {
updated(slot.currentItem, slot.isNewDirectory);
SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " finished ("
<< slot.stamp.elapsedMSec() << " msec");
}
// whatever happened, we're done with this repository instance
slot.repository.reset();
}
// init and start sync of the next repository
if (!slot.queue.empty()) {
slot.currentItem = slot.queue.front();
slot.queue.pop();
SGPath path(_local_dir);
path.append(slot.currentItem._dir);
slot.isNewDirectory = !path.exists();
if (slot.isNewDirectory) {
int rc = path.create_dir( 0755 );
if (rc) {
SG_LOG(SG_TERRAIN,SG_ALERT,
"Cannot create directory '" << path << "', return code = " << rc );
fail(slot.currentItem);
return;
}
} // of creating directory step
slot.repository.reset(new SVNRepository(path, &_http));
slot.repository->setBaseUrl(_svn_server + "/" + slot.currentItem._dir);
slot.repository->update();
slot.stamp.stamp();
SG_LOG(SG_IO, SG_DEBUG, "sync of " << slot.repository->baseUrl() << " started");
}
}
void SGTerraSync::SvnThread::runInternal()
{
while (!_stop) {
_http.update(100);
if (_stop)
break;
// drain the waiting tiles queue into the sync slot queues.
while (!waitingTiles.empty()) {
WaitingSyncItem next = waitingTiles.pop_front();
if (isPathCached(next)) {
_cache_hits++;
SG_LOG(SG_TERRAIN, SG_DEBUG,
"Cache hit for: '" << next._dir << "'");
continue;
}
unsigned int slot = syncSlotForType(next._type);
_syncSlots[slot].queue.push(next);
}
// update each sync slot in turn
for (unsigned int slot=0; slot < NUM_SYNC_SLOTS; ++slot) {
updateSyncSlot(_syncSlots[slot]);
}
} // of thread running loop
}
bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const
@@ -495,40 +602,36 @@ bool SGTerraSync::SvnThread::isPathCached(const WaitingSyncItem& next) const
return (ii->second > now);
}
void SGTerraSync::SvnThread::syncPath(const WaitingSyncItem& next)
void SGTerraSync::SvnThread::fail(const WaitingSyncItem& failedItem)
{
bool isNewDirectory = false;
time_t now = time(0);
_busy = true;
if (!syncTree(next._dir.c_str(),isNewDirectory))
{
_consecutive_errors++;
_fail_count++;
_completedTiles[ next._dir ] = now + UpdateInterval::FailedAttempt;
}
else
{
_consecutive_errors = 0;
_success_count++;
SG_LOG(SG_TERRAIN,SG_INFO,
"Successfully synchronized directory '" << next._dir << "'");
if (next._refreshScenery)
_consecutive_errors++;
_fail_count++;
_completedTiles[ failedItem._dir ] = now + UpdateInterval::FailedAttempt;
_busy = false;
}
void SGTerraSync::SvnThread::updated(const WaitingSyncItem& item, bool isNewDirectory)
{
time_t now = time(0);
_consecutive_errors = 0;
_success_count++;
SG_LOG(SG_TERRAIN,SG_INFO,
"Successfully synchronized directory '" << item._dir << "'");
if (item._refreshScenery) {
// updated a tile
_updated_tile_count++;
if (isNewDirectory)
{
// updated a tile
_updated_tile_count++;
if (isNewDirectory)
{
// for now only report new directories to refresh display
// (i.e. only when ocean needs to be replaced with actual data)
_freshTiles.push_back(next);
_is_dirty = true;
}
// for now only report new directories to refresh display
// (i.e. only when ocean needs to be replaced with actual data)
_freshTiles.push_back(item);
_is_dirty = true;
}
_completedTiles[ next._dir ] = now + UpdateInterval::SuccessfulAttempt;
writeCompletedTilesPersistentCache();
}
_completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt;
writeCompletedTilesPersistentCache();
_busy = false;
}