diff --git a/simgear/io/HTTPClient.cxx b/simgear/io/HTTPClient.cxx index a860daf3..66c0f578 100644 --- a/simgear/io/HTTPClient.cxx +++ b/simgear/io/HTTPClient.cxx @@ -165,6 +165,14 @@ void Client::setMaxPipelineDepth(unsigned int depth) #endif } +void Client::reset() +{ + curl_multi_cleanup(d->curlMulti); + d.reset(new ClientPrivate); + d->tlsCertificatePath = SGPath::fromEnv("SIMGEAR_TLS_CERT_PATH"); + d->createCurlMulti(); +} + void Client::update(int waitTimeout) { if (d->requests.empty()) { diff --git a/simgear/io/HTTPClient.hxx b/simgear/io/HTTPClient.hxx index ffc5a395..3f579090 100644 --- a/simgear/io/HTTPClient.hxx +++ b/simgear/io/HTTPClient.hxx @@ -47,6 +47,8 @@ public: void update(int waitTimeout = 0); + void reset(); + void makeRequest(const Request_ptr& r); void cancelRequest(const Request_ptr& r, std::string reason = std::string()); diff --git a/simgear/scene/tsync/terrasync.cxx b/simgear/scene/tsync/terrasync.cxx index 841d1fa5..f34d1ab5 100644 --- a/simgear/scene/tsync/terrasync.cxx +++ b/simgear/scene/tsync/terrasync.cxx @@ -155,20 +155,16 @@ public: class SyncSlot { public: - SyncSlot() : - isNewDirectory(false), - busy(false), - pendingKBytes(0) - {} + SyncSlot() = default; SyncItem currentItem; - bool isNewDirectory; - std::queue queue; + bool isNewDirectory = false; + std::deque queue; std::unique_ptr repository; SGTimeStamp stamp; - bool busy; ///< is the slot working or idle - unsigned int pendingKBytes; - unsigned int nextWarnTimeout; + bool busy = false; ///< is the slot working or idle + unsigned int pendingKBytes = 0; + unsigned int nextWarnTimeout = 0; }; static const int SYNC_SLOT_TILES = 0; ///< Terrain and Objects sync @@ -312,7 +308,6 @@ public: _state._allowed_errors = errors; } - void setCachePath(const SGPath& p) {_persistentCachePath = p;} void setCacheHits(unsigned int hits) { std::lock_guard g(_stateLock); @@ -328,6 +323,9 @@ public: } return st; } + + bool isDirActive(const std::string& path) const; + private: void incrementCacheHits() { @@ -341,11 +339,11 @@ private: void runInternal(); void updateSyncSlot(SyncSlot& slot); + void drainWaitingTiles(); + // commond helpers between both internal and external models SyncItem::Status isPathCached(const SyncItem& next) const; - void initCompletedTilesPersistentCache(); - void writeCompletedTilesPersistentCache() const; void updated(SyncItem item, bool isNewDirectory); void fail(SyncItem failedItem); void notFound(SyncItem notFoundItem); @@ -370,7 +368,7 @@ private: string _dnsdn; TerrasyncThreadState _state; - std::mutex _stateLock; + mutable std::mutex _stateLock; }; SGTerraSync::WorkerThread::WorkerThread() : @@ -398,6 +396,18 @@ void SGTerraSync::WorkerThread::stop() SyncItem w(string(), SyncItem::Stop); request(w); join(); + + // clear the sync slots, in case we restart + for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) { + _syncSlots[slot] = {}; + } + + // clear these so if re-init-ing, we check again + _completedTiles.clear(); + _notFoundItems.clear(); + + _http.reset(); + _http.setUserAgent("terrascenery-" SG_STRINGIZE(SIMGEAR_VERSION)); } bool SGTerraSync::WorkerThread::start() @@ -417,12 +427,18 @@ bool SGTerraSync::WorkerThread::start() SGPath path(_local_dir); if (!path.exists()) { - SG_LOG(SG_TERRASYNC,SG_ALERT, - "Cannot start scenery download. Directory '" << _local_dir << - "' does not exist. Set correct directory path or create directory folder."); - _state._fail_count++; - _state._stalled = true; - return false; + const SGPath parentDir = path.dirPath(); + if (parentDir.exists()) { + // attempt to create terraSync dir ourselves + bool ok = path.create_dir(0755); + if (!ok) { + SG_LOG(SG_TERRASYNC, SG_ALERT, + "Cannot start scenery download. Directory '" << _local_dir << "' does not exist. Set correct directory path or create directory folder."); + _state._fail_count++; + _state._stalled = true; + return false; + } + } } path.append("version"); @@ -523,8 +539,6 @@ void SGTerraSync::WorkerThread::run() _running = true; } - initCompletedTilesPersistentCache(); - runInternal(); { @@ -541,7 +555,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) if (slot.stamp.elapsedMSec() > (int)slot.nextWarnTimeout) { SG_LOG(SG_TERRASYNC, SG_INFO, "sync taking a long time:" << slot.currentItem._dir << " taken " << slot.stamp.elapsedMSec()); SG_LOG(SG_TERRASYNC, SG_INFO, "HTTP request count:" << _http.hasActiveRequests()); - slot.nextWarnTimeout += 10000; + slot.nextWarnTimeout += 30 * 1000; } #endif // convert bytes to kbytes here @@ -565,12 +579,13 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) slot.busy = false; slot.repository.reset(); slot.pendingKBytes = 0; + slot.currentItem = {}; } // init and start sync of the next repository if (!slot.queue.empty()) { slot.currentItem = slot.queue.front(); - slot.queue.pop(); + slot.queue.pop_front(); SGPath path(_local_dir); path.append(slot.currentItem._dir); @@ -605,7 +620,7 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot) return; } - slot.nextWarnTimeout = 20000; + slot.nextWarnTimeout = 30 * 1000; slot.stamp.stamp(); slot.busy = true; slot.pendingKBytes = slot.repository->bytesToDownload(); @@ -648,21 +663,7 @@ void SGTerraSync::WorkerThread::runInternal() if (_stop) break; - // drain the waiting tiles queue into the sync slot queues. - while (!waitingTiles.empty()) { - SyncItem next = waitingTiles.pop_front(); - SyncItem::Status cacheStatus = isPathCached(next); - if (cacheStatus != SyncItem::Invalid) { - incrementCacheHits(); - SG_LOG(SG_TERRASYNC, SG_DEBUG, "\nTerraSync Cache hit for: '" << next._dir << "'"); - next._status = cacheStatus; - _freshTiles.push_back(next); - continue; - } - - unsigned int slot = syncSlotForType(next._type); - _syncSlots[slot].queue.push(next); - } + drainWaitingTiles(); bool anySlotBusy = false; unsigned int newPendingCount = 0; @@ -691,7 +692,7 @@ void SGTerraSync::WorkerThread::runInternal() SyncItem::Status SGTerraSync::WorkerThread::isPathCached(const SyncItem& next) const { - TileAgeCache::const_iterator ii = _completedTiles.find( next._dir ); + auto ii = _completedTiles.find(next._dir); if (ii == _completedTiles.end()) { ii = _notFoundItems.find( next._dir ); // Invalid means 'not cached', otherwise we want to return to @@ -735,7 +736,6 @@ void SGTerraSync::WorkerThread::notFound(SyncItem item) item._status = SyncItem::NotFound; _freshTiles.push_back(item); _notFoundItems[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; - writeCompletedTilesPersistentCache(); } void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory) @@ -756,72 +756,57 @@ void SGTerraSync::WorkerThread::updated(SyncItem item, bool isNewDirectory) _freshTiles.push_back(item); _completedTiles[ item._dir ] = now + UpdateInterval::SuccessfulAttempt; } - - writeCompletedTilesPersistentCache(); } -void SGTerraSync::WorkerThread::initCompletedTilesPersistentCache() +void SGTerraSync::WorkerThread::drainWaitingTiles() { - if (!_persistentCachePath.exists()) { - return; - } - - SGPropertyNode_ptr cacheRoot(new SGPropertyNode); - time_t now = time(0); - - try { - readProperties(_persistentCachePath, cacheRoot); - } catch (sg_exception& e) { - SG_LOG(SG_TERRASYNC, SG_INFO, "corrupted persistent cache, discarding " << e.getFormattedMessage()); - return; - } - - for (int i=0; inChildren(); ++i) { - SGPropertyNode* entry = cacheRoot->getChild(i); - bool isNotFound = (strcmp(entry->getName(), "not-found") == 0); - string tileName = entry->getStringValue("path"); - time_t stamp = entry->getIntValue("stamp"); - if (stamp < now) { + // drain the waiting tiles queue into the sync slot queues. + while (!waitingTiles.empty()) { + SyncItem next = waitingTiles.pop_front(); + SyncItem::Status cacheStatus = isPathCached(next); + if (cacheStatus != SyncItem::Invalid) { + incrementCacheHits(); + SG_LOG(SG_TERRASYNC, SG_BULK, "\nTerraSync Cache hit for: '" << next._dir << "'"); + next._status = cacheStatus; + _freshTiles.push_back(next); continue; } - if (isNotFound) { - _completedTiles[tileName] = stamp; - } else { - _notFoundItems[tileName] = stamp; - } + const auto slot = syncSlotForType(next._type); + _syncSlots[slot].queue.push_back(next); } } -void SGTerraSync::WorkerThread::writeCompletedTilesPersistentCache() const +bool SGTerraSync::WorkerThread::isDirActive(const std::string& path) const { - // cache is disabled - if (_persistentCachePath.isNull()) { - return; + // check waiting tiles first. we have to copy it to check safely, + // but since it's normally empty, this is not a big deal. + const auto copyOfWaiting = waitingTiles.copy(); + auto it = std::find_if(copyOfWaiting.begin(), copyOfWaiting.end(), [&path](const SyncItem& i) { + return i._dir == path; + }); + + if (it != copyOfWaiting.end()) { + return true; } - sg_ofstream f(_persistentCachePath, std::ios::trunc); - if (!f.is_open()) { - return; - } + // check each sync slot in turn + std::lock_guard g(_stateLock); + for (unsigned int slot = 0; slot < NUM_SYNC_SLOTS; ++slot) { + const auto& syncSlot = _syncSlots[slot]; + if (syncSlot.currentItem._dir == path) + return true; - SGPropertyNode_ptr cacheRoot(new SGPropertyNode); - TileAgeCache::const_iterator it = _completedTiles.begin(); - for (; it != _completedTiles.end(); ++it) { - SGPropertyNode* entry = cacheRoot->addChild("entry"); - entry->setStringValue("path", it->first); - entry->setIntValue("stamp", it->second); - } + auto it = std::find_if(syncSlot.queue.begin(), syncSlot.queue.end(), [&path](const SyncItem& i) { + return i._dir == path; + }); - it = _notFoundItems.begin(); - for (; it != _notFoundItems.end(); ++it) { - SGPropertyNode* entry = cacheRoot->addChild("not-found"); - entry->setStringValue("path", it->first); - entry->setIntValue("stamp", it->second); - } + if (it != syncSlot.queue.end()) { + return true; + } + } // of sync slots iteration - writeProperties(f, cacheRoot, true /* write_all */); - f.close(); + return false; } /////////////////////////////////////////////////////////////////////////////// @@ -996,12 +981,9 @@ void SGTerraSync::update(double) while (_workerThread->hasNewTiles()) { - SyncItem next = _workerThread->getNewTile(); - - if ((next._type == SyncItem::Tile) || (next._type == SyncItem::AIData)) { - _activeTileDirs.erase(next._dir); - } - } // of freshly synced items + // ensure they are popped + _workerThread->getNewTile(); + } } bool SGTerraSync::isIdle() {return _workerThread->isIdle();} @@ -1046,17 +1028,16 @@ string_list SGTerraSync::getSceneryPathSuffixes() const void SGTerraSync::syncAreaByPath(const std::string& aPath) { - string_list scenerySuffixes = getSceneryPathSuffixes(); - string_list::const_iterator it = scenerySuffixes.begin(); + if (!_workerThread->isRunning()) { + return; + } - for (; it != scenerySuffixes.end(); ++it) - { - std::string dir = *it + "/" + aPath; - if (_activeTileDirs.find(dir) != _activeTileDirs.end()) { + for (const auto& suffix : getSceneryPathSuffixes()) { + const auto dir = suffix + "/" + aPath; + if (_workerThread->isDirActive(dir)) { continue; } - _activeTileDirs.insert(dir); SyncItem w(dir, SyncItem::Tile); _workerThread->request( w ); } @@ -1075,13 +1056,9 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const return false; } - string_list scenerySuffixes = getSceneryPathSuffixes(); - string_list::const_iterator it = scenerySuffixes.begin(); - - for (; it != scenerySuffixes.end(); ++it) - { - string s = *it + "/" + sceneryDir; - if (_activeTileDirs.find(s) != _activeTileDirs.end()) { + for (const auto& suffix : getSceneryPathSuffixes()) { + const auto s = suffix + "/" + sceneryDir; + if (_workerThread->isDirActive(s)) { return true; } } @@ -1091,14 +1068,16 @@ bool SGTerraSync::isTileDirPending(const std::string& sceneryDir) const void SGTerraSync::scheduleDataDir(const std::string& dataDir) { - if (_activeTileDirs.find(dataDir) != _activeTileDirs.end()) { + if (!_workerThread->isRunning()) { + return; + } + + if (_workerThread->isDirActive(dataDir)) { return; } - _activeTileDirs.insert(dataDir); SyncItem w(dataDir, SyncItem::AIData); _workerThread->request( w ); - } bool SGTerraSync::isDataDirPending(const std::string& dataDir) const @@ -1107,7 +1086,7 @@ bool SGTerraSync::isDataDirPending(const std::string& dataDir) const return false; } - return (_activeTileDirs.find(dataDir) != _activeTileDirs.end()); + return _workerThread->isDirActive(dataDir); } void SGTerraSync::reposition() diff --git a/simgear/scene/tsync/terrasync.hxx b/simgear/scene/tsync/terrasync.hxx index 4d4c0e39..fd625668 100644 --- a/simgear/scene/tsync/terrasync.hxx +++ b/simgear/scene/tsync/terrasync.hxx @@ -116,9 +116,6 @@ private: simgear::TiedPropertyList _tiedProperties; BufferedLogCallback* _log; - - typedef std::set string_set; - string_set _activeTileDirs; }; } diff --git a/simgear/threads/SGQueue.hxx b/simgear/threads/SGQueue.hxx index 54836623..2db9efe4 100644 --- a/simgear/threads/SGQueue.hxx +++ b/simgear/threads/SGQueue.hxx @@ -272,30 +272,35 @@ template class SGBlockingDeque { public: + using value_type = T; + using container_type = std::deque; + /** * Create a new SGBlockingDequeue. */ - SGBlockingDeque() {} + SGBlockingDeque() = default; /** * Destroy this dequeue. */ - virtual ~SGBlockingDeque() {} + ~SGBlockingDeque() = default; /** * */ - virtual void clear() { - std::lock_guard g(mutex); - this->queue.clear(); + void clear() + { + std::lock_guard g(mutex); + this->queue.clear(); } /** * */ - virtual bool empty() { - std::lock_guard g(mutex); - return this->queue.empty(); + bool empty() const + { + std::lock_guard g(mutex); + return this->queue.empty(); } /** @@ -303,10 +308,11 @@ public: * * @param item The object to add. */ - virtual void push_front( const T& item ) { - std::lock_guard g(mutex); - this->queue.push_front( item ); - not_empty.signal(); + void push_front(const T& item) + { + std::lock_guard g(mutex); + this->queue.push_front(item); + not_empty.signal(); } /** @@ -314,10 +320,11 @@ public: * * @param item The object to add. */ - virtual void push_back( const T& item ) { - std::lock_guard g(mutex); - this->queue.push_back( item ); - not_empty.signal(); + void push_back(const T& item) + { + std::lock_guard g(mutex); + this->queue.push_back(item); + not_empty.signal(); } /** @@ -326,14 +333,15 @@ public: * * @return The next available object. */ - virtual T front() { - std::lock_guard g(mutex); + T front() const + { + std::lock_guard g(mutex); - assert(this->queue.empty() != true); - //if (queue.empty()) throw ?? + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? - T item = this->queue.front(); - return item; + T item = this->queue.front(); + return item; } /** @@ -342,18 +350,19 @@ public: * * @return The next available object. */ - virtual T pop_front() { - std::lock_guard g(mutex); + T pop_front() + { + std::lock_guard g(mutex); - while (this->queue.empty()) - not_empty.wait(mutex); + while (this->queue.empty()) + not_empty.wait(mutex); - assert(this->queue.empty() != true); - //if (queue.empty()) throw ?? + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? - T item = this->queue.front(); - this->queue.pop_front(); - return item; + T item = this->queue.front(); + this->queue.pop_front(); + return item; } /** @@ -362,18 +371,19 @@ public: * * @return The next available object. */ - virtual T pop_back() { - std::lock_guard g(mutex); + T pop_back() + { + std::lock_guard g(mutex); - while (this->queue.empty()) - not_empty.wait(mutex); + while (this->queue.empty()) + not_empty.wait(mutex); - assert(this->queue.empty() != true); - //if (queue.empty()) throw ?? + assert(this->queue.empty() != true); + //if (queue.empty()) throw ?? - T item = this->queue.back(); - this->queue.pop_back(); - return item; + T item = this->queue.back(); + this->queue.pop_back(); + return item; } /** @@ -381,8 +391,9 @@ public: * * @return Size of queue. */ - virtual size_t size() { - std::lock_guard g(mutex); + size_t size() const + { + std::lock_guard g(mutex); return this->queue.size(); } @@ -391,12 +402,19 @@ public: while (this->queue.empty()) not_empty.wait(mutex); } + + container_type copy() const + { + std::lock_guard g(mutex); + return queue; + } + private: /** * Mutex to serialise access. */ - std::mutex mutex; + mutable std::mutex mutex; /** * Condition to signal when queue not empty. @@ -409,7 +427,7 @@ private: SGBlockingDeque& operator=( const SGBlockingDeque& ); protected: - std::deque queue; + container_type queue; }; #endif // SGQUEUE_HXX_INCLUDED