Compare commits

..

16 Commits

Author SHA1 Message Date
Automatic Release Builder
d95b1c0441 new version: 2020.3.3 2020-11-12 11:14:23 +00:00
James Turner
837ba86d57 DNSClient: own requests, and cancel them on timeout
Fixes crashes where a request times-out, but then is completed by
UDN sometime afterwards, with a free-d object. Have the DNS::Client own
requests, and be able to retrieve the udns_query to cancel them, in 
the timeout case.

Fixes a couple of Sentry reports.
2020-11-12 09:39:58 +00:00
Automatic Release Builder
0cb1b463e1 Catalogs: fix ownership of new Catalogs
When doing the initial download of a Catalog, ensure we still keep
an owning ref to it.
2020-11-11 21:18:57 +00:00
Automatic Release Builder
8a772c8edd new version: 2020.3.2 2020-11-04 22:08:28 +00:00
James Turner
03bdad0a10 Try to capture cause of BTG load failures 2020-11-04 11:52:58 +00:00
James Turner
537776e1f8 TerraSync: fix local-file copying
Avoid downloading data corresponding to files shipped in FGData.
2020-11-04 11:22:48 +00:00
James Turner
f34a4a304e Untar: log error details when output create fails
Trying to understand why writing to the output files fails for some
users.

Sentry-Id: FLIGHTGEAR-SS
2020-11-03 21:26:03 +00:00
James Turner
a59c4e2c8b TerraSync: make tarball extraction asynchronous 2020-11-03 17:49:19 +00:00
Julian Smith
46f4967f6e Allow use of old zlib-1.2.3 on OpenBSD.
As of 2020-08-01, OpenBSD's system zlib is 1.2.3 which doesn't
have gzoffset(). However we can get away with this by making
gzfilebuf::approxOffset() always return zero.
2020-11-01 11:12:00 +00:00
James Turner
9305417207 Terrasync: tarball extraction, use larger buffer
Use a 1MB buffer, 2kByte is very 1979 :)
2020-11-01 11:11:31 +00:00
James Turner
11da8b33f9 TerraSync: switch to per-directory hash caching
Avoid a central hash cache becoming enormous, now we use the selective
download scheme for the tile dirs.

Hash name is changed to co-exist with older versions.
2020-11-01 11:11:27 +00:00
James Turner
c7b320eb55 Fix Airports/ initial sync 2020-11-01 11:11:23 +00:00
James Turner
72b2eb0ebf TerraSync: avoid 404s to probe missing tiles
Use the top-level dirIndex files to determine if a 1x1 tile dir
exists, instead of proving the server via a 404. This reduces the
number of requests we make considerably, which is … important.
2020-11-01 11:11:19 +00:00
James Turner
ec3829addb TerraSync: validate local dirs incrementally
Add a process() method to HTTPRepository, and use this to
incrementally validate subdirs after the .dirIndex is received. This 
avoids large pauses of the TerraSync thread, when all of Airports/
is validated at once.
2020-11-01 11:11:15 +00:00
James Turner
96bafef3f3 TerraSync: use an unordered_map for the hash cache
Linear-scan is a bit slow in debug builds, for the large Airports/ tree;
switch to an unordered_map.

Will back-port to the LTS once tested a bit more.
2020-11-01 11:11:12 +00:00
James Turner
3ff3bd0a6c Props: allow flushing the atomic change listener
Trying to narrow down causes of the ‘unregister listeners crashes on
shutdown’ reports.
2020-11-01 11:11:07 +00:00
16 changed files with 702 additions and 334 deletions

View File

@@ -278,7 +278,14 @@ else()
endif()
endif(SIMGEAR_HEADLESS)
find_package(ZLIB 1.2.4 REQUIRED)
if(${CMAKE_SYSTEM_NAME} MATCHES "OpenBSD")
# As of 2020-08-01, OpenBSD's system zlib is slightly old, but it's usable
# with a workaround in simgear/io/iostreams/gzfstream.cxx.
find_package(ZLIB 1.2.3 REQUIRED)
else()
find_package(ZLIB 1.2.4 REQUIRED)
endif()
find_package(CURL REQUIRED)
if (SYSTEM_EXPAT)

View File

@@ -1 +1 @@
2020.3.1
2020.3.3

View File

@@ -61,6 +61,10 @@ public:
struct dns_ctx * ctx;
static size_t instanceCounter;
using RequestVec = std::vector<Request_ptr>;
RequestVec _activeRequests;
};
size_t Client::ClientPrivate::instanceCounter = 0;
@@ -78,6 +82,11 @@ Request::~Request()
{
}
void Request::cancel()
{
_cancelled = true;
}
bool Request::isTimeout() const
{
return (time(NULL) - _start) > _timeout_secs;
@@ -114,18 +123,20 @@ static void dnscbSRV(struct dns_ctx *ctx, struct dns_rr_srv *result, void *data)
{
SRVRequest * r = static_cast<SRVRequest*>(data);
if (result) {
r->cname = result->dnssrv_cname;
r->qname = result->dnssrv_qname;
r->ttl = result->dnssrv_ttl;
for (int i = 0; i < result->dnssrv_nrr; i++) {
SRVRequest::SRV_ptr srv(new SRVRequest::SRV);
r->entries.push_back(srv);
srv->priority = result->dnssrv_srv[i].priority;
srv->weight = result->dnssrv_srv[i].weight;
srv->port = result->dnssrv_srv[i].port;
srv->target = result->dnssrv_srv[i].name;
if (!r->isCancelled()) {
r->cname = result->dnssrv_cname;
r->qname = result->dnssrv_qname;
r->ttl = result->dnssrv_ttl;
for (int i = 0; i < result->dnssrv_nrr; i++) {
SRVRequest::SRV_ptr srv(new SRVRequest::SRV);
r->entries.push_back(srv);
srv->priority = result->dnssrv_srv[i].priority;
srv->weight = result->dnssrv_srv[i].weight;
srv->port = result->dnssrv_srv[i].port;
srv->target = result->dnssrv_srv[i].name;
}
std::sort(r->entries.begin(), r->entries.end(), sortSRV);
}
std::sort( r->entries.begin(), r->entries.end(), sortSRV );
free(result);
}
r->setComplete();
@@ -134,11 +145,16 @@ static void dnscbSRV(struct dns_ctx *ctx, struct dns_rr_srv *result, void *data)
void SRVRequest::submit( Client * client )
{
// if service is defined, pass service and protocol
if (!dns_submit_srv(client->d->ctx, getDn().c_str(), _service.empty() ? NULL : _service.c_str(), _service.empty() ? NULL : _protocol.c_str(), 0, dnscbSRV, this )) {
auto q = dns_submit_srv(client->d->ctx, getDn().c_str(), _service.empty() ? NULL : _service.c_str(),
_service.empty() ? NULL : _protocol.c_str(),
0, dnscbSRV, this);
if (!q) {
SG_LOG(SG_IO, SG_ALERT, "Can't submit dns request for " << getDn());
return;
}
_start = time(NULL);
_query = q;
}
TXTRequest::TXTRequest( const std::string & dn ) :
@@ -151,22 +167,24 @@ static void dnscbTXT(struct dns_ctx *ctx, struct dns_rr_txt *result, void *data)
{
TXTRequest * r = static_cast<TXTRequest*>(data);
if (result) {
r->cname = result->dnstxt_cname;
r->qname = result->dnstxt_qname;
r->ttl = result->dnstxt_ttl;
for (int i = 0; i < result->dnstxt_nrr; i++) {
//TODO: interprete the .len field of dnstxt_txt?
auto rawTxt = reinterpret_cast<char*>(result->dnstxt_txt[i].txt);
if (!rawTxt) {
continue;
}
if (!r->isCancelled()) {
r->cname = result->dnstxt_cname;
r->qname = result->dnstxt_qname;
r->ttl = result->dnstxt_ttl;
for (int i = 0; i < result->dnstxt_nrr; i++) {
//TODO: interprete the .len field of dnstxt_txt?
auto rawTxt = reinterpret_cast<char*>(result->dnstxt_txt[i].txt);
if (!rawTxt) {
continue;
}
const string txt{rawTxt};
r->entries.push_back(txt);
string_list tokens = simgear::strutils::split( txt, "=", 1 );
if( tokens.size() == 2 ) {
r->attributes[tokens[0]] = tokens[1];
}
const string txt{rawTxt};
r->entries.push_back(txt);
string_list tokens = simgear::strutils::split(txt, "=", 1);
if (tokens.size() == 2) {
r->attributes[tokens[0]] = tokens[1];
}
}
}
free(result);
}
@@ -176,11 +194,13 @@ static void dnscbTXT(struct dns_ctx *ctx, struct dns_rr_txt *result, void *data)
void TXTRequest::submit( Client * client )
{
// protocol and service an already encoded in DN so pass in NULL for both
if (!dns_submit_txt(client->d->ctx, getDn().c_str(), DNS_C_IN, 0, dnscbTXT, this )) {
auto q = dns_submit_txt(client->d->ctx, getDn().c_str(), DNS_C_IN, 0, dnscbTXT, this);
if (!q) {
SG_LOG(SG_IO, SG_ALERT, "Can't submit dns request for " << getDn());
return;
}
_start = time(NULL);
_query = q;
}
@@ -195,27 +215,29 @@ static void dnscbNAPTR(struct dns_ctx *ctx, struct dns_rr_naptr *result, void *d
{
NAPTRRequest * r = static_cast<NAPTRRequest*>(data);
if (result) {
r->cname = result->dnsnaptr_cname;
r->qname = result->dnsnaptr_qname;
r->ttl = result->dnsnaptr_ttl;
for (int i = 0; i < result->dnsnaptr_nrr; i++) {
if( !r->qservice.empty() && r->qservice != result->dnsnaptr_naptr[i].service )
continue;
if (!r->isCancelled()) {
r->cname = result->dnsnaptr_cname;
r->qname = result->dnsnaptr_qname;
r->ttl = result->dnsnaptr_ttl;
for (int i = 0; i < result->dnsnaptr_nrr; i++) {
if (!r->qservice.empty() && r->qservice != result->dnsnaptr_naptr[i].service)
continue;
//TODO: case ignore and result flags may have more than one flag
if( !r->qflags.empty() && r->qflags != result->dnsnaptr_naptr[i].flags )
continue;
//TODO: case ignore and result flags may have more than one flag
if (!r->qflags.empty() && r->qflags != result->dnsnaptr_naptr[i].flags)
continue;
NAPTRRequest::NAPTR_ptr naptr(new NAPTRRequest::NAPTR);
r->entries.push_back(naptr);
naptr->order = result->dnsnaptr_naptr[i].order;
naptr->preference = result->dnsnaptr_naptr[i].preference;
naptr->flags = result->dnsnaptr_naptr[i].flags;
naptr->service = result->dnsnaptr_naptr[i].service;
naptr->regexp = result->dnsnaptr_naptr[i].regexp;
naptr->replacement = result->dnsnaptr_naptr[i].replacement;
NAPTRRequest::NAPTR_ptr naptr(new NAPTRRequest::NAPTR);
r->entries.push_back(naptr);
naptr->order = result->dnsnaptr_naptr[i].order;
naptr->preference = result->dnsnaptr_naptr[i].preference;
naptr->flags = result->dnsnaptr_naptr[i].flags;
naptr->service = result->dnsnaptr_naptr[i].service;
naptr->regexp = result->dnsnaptr_naptr[i].regexp;
naptr->replacement = result->dnsnaptr_naptr[i].replacement;
}
std::sort(r->entries.begin(), r->entries.end(), sortNAPTR);
}
std::sort( r->entries.begin(), r->entries.end(), sortNAPTR );
free(result);
}
r->setComplete();
@@ -223,11 +245,13 @@ static void dnscbNAPTR(struct dns_ctx *ctx, struct dns_rr_naptr *result, void *d
void NAPTRRequest::submit( Client * client )
{
if (!dns_submit_naptr(client->d->ctx, getDn().c_str(), 0, dnscbNAPTR, this )) {
auto q = dns_submit_naptr(client->d->ctx, getDn().c_str(), 0, dnscbNAPTR, this);
if (!q) {
SG_LOG(SG_IO, SG_ALERT, "Can't submit dns request for " << getDn());
return;
}
_start = time(NULL);
_query = q;
}
@@ -242,6 +266,7 @@ Client::Client() :
void Client::makeRequest(const Request_ptr& r)
{
d->_activeRequests.push_back(r);
r->submit(this);
}
@@ -252,6 +277,19 @@ void Client::update(int waitTimeout)
return;
dns_ioevent(d->ctx, now);
// drop our owning ref to completed requests,
// and cancel any which timed out
auto it = std::remove_if(d->_activeRequests.begin(), d->_activeRequests.end(),
[this](const Request_ptr& r) {
if (r->isTimeout()) {
dns_cancel(d->ctx, reinterpret_cast<struct dns_query*>(r->_query));
return true;
}
return r->isComplete();
});
d->_activeRequests.erase(it, d->_activeRequests.end());
}
} // of namespace DNS

View File

@@ -40,28 +40,38 @@ namespace DNS
{
class Client;
using UDNSQueryPtr = void*;
class Request : public SGReferenced
{
public:
Request( const std::string & dn );
virtual ~Request();
std::string getDn() const { return _dn; }
const std::string& getDn() const { return _dn; }
int getType() const { return _type; }
bool isComplete() const { return _complete; }
bool isTimeout() const;
void setComplete( bool b = true ) { _complete = b; }
bool isCancelled() const { return _cancelled; }
virtual void submit( Client * client) = 0;
void cancel();
std::string cname;
std::string qname;
unsigned ttl;
protected:
friend class Client;
UDNSQueryPtr _query = nullptr;
std::string _dn;
int _type;
bool _complete;
time_t _timeout_secs;
time_t _start;
bool _cancelled = false;
};
typedef SGSharedPtr<Request> Request_ptr;
@@ -69,7 +79,7 @@ class NAPTRRequest : public Request
{
public:
NAPTRRequest( const std::string & dn );
virtual void submit( Client * client );
void submit(Client* client) override;
struct NAPTR : SGReferenced {
int order;
@@ -92,7 +102,7 @@ class SRVRequest : public Request
public:
SRVRequest( const std::string & dn );
SRVRequest( const std::string & dn, const string & service, const string & protocol );
virtual void submit( Client * client );
void submit(Client* client) override;
struct SRV : SGReferenced {
int priority;
@@ -112,7 +122,7 @@ class TXTRequest : public Request
{
public:
TXTRequest( const std::string & dn );
virtual void submit( Client * client );
void submit(Client* client) override;
typedef std::vector<string> TXT_list;
typedef std::map<std::string,std::string> TXT_Attribute_map;

View File

@@ -81,6 +81,45 @@ std::string innerResultCodeAsString(HTTPRepository::ResultCode code) {
return "Unknown response code";
}
struct HashCacheEntry {
std::string filePath;
time_t modTime;
size_t lengthBytes;
std::string hashHex;
};
using HashCache = std::unordered_map<std::string, HashCacheEntry>;
std::string computeHashForPath(const SGPath& p)
{
if (!p.exists())
return {};
sha1nfo info;
sha1_init(&info);
const int bufSize = 1024 * 1024;
char* buf = static_cast<char*>(malloc(bufSize));
if (!buf) {
sg_io_exception("Couldn't allocate SHA1 computation buffer");
}
size_t readLen;
SGBinaryFile f(p);
if (!f.open(SG_IO_IN)) {
free(buf);
throw sg_io_exception("Couldn't open file for compute hash", p);
}
while ((readLen = f.read(buf, bufSize)) > 0) {
sha1_write(&info, buf, readLen);
}
f.close();
free(buf);
std::string hashBytes((char*)sha1_result(&info), HASH_LENGTH);
return strutils::encodeHex(hashBytes);
}
} // namespace
class HTTPDirectory
@@ -111,6 +150,9 @@ class HTTPDirectory
typedef std::vector<ChildInfo> ChildInfoList;
ChildInfoList children;
mutable HashCache hashes;
mutable bool hashCacheDirty = false;
public:
HTTPDirectory(HTTPRepoPrivate* repo, const std::string& path) :
_repository(repo),
@@ -124,6 +166,8 @@ public:
// already exists on disk
parseDirIndex(children);
std::sort(children.begin(), children.end());
parseHashCache();
} catch (sg_exception& ) {
// parsing cache failed
children.clear();
@@ -149,7 +193,7 @@ public:
{
SGPath fpath(absolutePath());
fpath.append(".dirindex");
_repository->updatedFileContents(fpath, hash);
updatedFileContents(fpath, hash);
children.clear();
parseDirIndex(children);
@@ -177,7 +221,7 @@ public:
char* buf = nullptr;
size_t bufSize = 0;
for (const auto& child : children) {
for (auto &child : children) {
if (child.type != HTTPRepository::FileType)
continue;
@@ -189,30 +233,36 @@ public:
cp.append(child.name);
if (!cp.exists()) {
continue;
}
}
SGBinaryFile src(cp);
SGBinaryFile dst(child.path);
src.open(SG_IO_IN);
dst.open(SG_IO_OUT);
SGBinaryFile src(cp);
SGBinaryFile dst(child.path);
src.open(SG_IO_IN);
dst.open(SG_IO_OUT);
if (bufSize < cp.sizeInBytes()) {
bufSize = cp.sizeInBytes();
free(buf);
buf = (char*) malloc(bufSize);
if (!buf) {
continue;
}
}
if (bufSize < cp.sizeInBytes()) {
bufSize = cp.sizeInBytes();
free(buf);
buf = (char *)malloc(bufSize);
if (!buf) {
continue;
}
}
src.read(buf, cp.sizeInBytes());
dst.write(buf, cp.sizeInBytes());
src.close();
dst.close();
src.read(buf, cp.sizeInBytes());
dst.write(buf, cp.sizeInBytes());
src.close();
dst.close();
}
// reset caching
child.path.set_cached(false);
child.path.set_cached(true);
free(buf);
std::string hash = computeHashForPath(child.path);
updatedFileContents(child.path, hash);
}
free(buf);
}
void updateChildrenBasedOnHash()
@@ -274,7 +324,7 @@ public:
if (c.type == HTTPRepository::DirectoryType) {
// If it's a directory,perform a recursive check.
HTTPDirectory *childDir = childDirectory(c.name);
childDir->updateChildrenBasedOnHash();
_repository->scheduleUpdateOfChildren(childDir);
}
}
} // of repository-defined (well, .dirIndex) children iteration
@@ -372,6 +422,66 @@ public:
return _relativePath;
}
class ArchiveExtractTask {
public:
ArchiveExtractTask(SGPath p, const std::string &relPath)
: relativePath(relPath), file(p), extractor(p.dir()) {
if (!file.open(SG_IO_IN)) {
SG_LOG(SG_TERRASYNC, SG_ALERT,
"Unable to open " << p << " to extract");
return;
}
buffer = (uint8_t *)malloc(bufferSize);
}
ArchiveExtractTask(const ArchiveExtractTask &) = delete;
HTTPRepoPrivate::ProcessResult run(HTTPRepoPrivate *repo) {
size_t rd = file.read((char *)buffer, bufferSize);
extractor.extractBytes(buffer, rd);
if (file.eof()) {
extractor.flush();
file.close();
if (!extractor.isAtEndOfArchive()) {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Corrupt tarball " << relativePath);
repo->failedToUpdateChild(relativePath,
HTTPRepository::REPO_ERROR_IO);
return HTTPRepoPrivate::ProcessFailed;
}
if (extractor.hasError()) {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Error extracting " << relativePath);
repo->failedToUpdateChild(relativePath,
HTTPRepository::REPO_ERROR_IO);
return HTTPRepoPrivate::ProcessFailed;
}
return HTTPRepoPrivate::ProcessDone;
}
return HTTPRepoPrivate::ProcessContinue;
}
~ArchiveExtractTask() { free(buffer); }
private:
// intentionally small so we extract incrementally on Windows
// where Defender throttles many small files, sorry
// if you make this bigger we will be more efficient but stall for
// longer when extracting the Airports_archive
const int bufferSize = 1024 * 64;
std::string relativePath;
uint8_t *buffer = nullptr;
SGBinaryFile file;
ArchiveExtractor extractor;
};
using ArchiveExtractTaskPtr = std::shared_ptr<ArchiveExtractTask>;
void didUpdateFile(const std::string& file, const std::string& hash, size_t sz)
{
// check hash matches what we expected
@@ -387,7 +497,7 @@ public:
_relativePath + "/" + file,
HTTPRepository::REPO_ERROR_CHECKSUM);
} else {
_repository->updatedFileContents(it->path, hash);
updatedFileContents(it->path, hash);
_repository->updatedChildSuccessfully(_relativePath + "/" +
file);
@@ -410,33 +520,22 @@ public:
}
if (pathAvailable) {
// If this is a tarball, then extract it.
SGBinaryFile f(p);
if (! f.open(SG_IO_IN)) SG_LOG(SG_TERRASYNC, SG_ALERT, "Unable to open " << p << " to extract");
// we use a Task helper to extract tarballs incrementally.
// without this, archive extraction blocks here, which
// prevents other repositories downloading / updating.
// Unfortunately due Windows AV (Defender, etc) we cna block
// here for many minutes.
SG_LOG(SG_TERRASYNC, SG_INFO, "Extracting " << absolutePath() << "/" << file << " to " << p.dir());
SGPath extractDir = p.dir();
ArchiveExtractor ex(extractDir);
uint8_t *buf = (uint8_t *)alloca(2048);
while (!f.eof()) {
size_t bufSize = f.read((char *)buf, 2048);
ex.extractBytes(buf, bufSize);
}
ex.flush();
if (! ex.isAtEndOfArchive()) {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Corrupt tarball " << p);
_repository->failedToUpdateChild(
_relativePath, HTTPRepository::REPO_ERROR_IO);
}
if (ex.hasError()) {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Error extracting " << p);
_repository->failedToUpdateChild(
_relativePath, HTTPRepository::REPO_ERROR_IO);
}
// use a lambda to own this shared_ptr; this means when the
// lambda is destroyed, the ArchiveExtraTask will get
// cleaned up.
ArchiveExtractTaskPtr t =
std::make_shared<ArchiveExtractTask>(p, _relativePath);
auto cb = [t](HTTPRepoPrivate *repo) {
return t->run(repo);
};
_repository->addTask(cb);
} else {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Unable to remove old file/directory " << removePath);
} // of pathAvailable
@@ -452,6 +551,50 @@ public:
fpath.append(file);
_repository->failedToUpdateChild(fpath, status);
}
std::string hashForPath(const SGPath& p) const
{
const auto ps = p.utf8Str();
auto it = hashes.find(ps);
if (it != hashes.end()) {
const auto& entry = it->second;
// ensure data on disk hasn't changed.
// we could also use the file type here if we were paranoid
if ((p.sizeInBytes() == entry.lengthBytes) && (p.modTime() == entry.modTime)) {
return entry.hashHex;
}
// entry in the cache, but it's stale so remove and fall through
hashes.erase(it);
}
std::string hash = computeHashForPath(p);
updatedFileContents(p, hash);
return hash;
}
bool isHashCacheDirty() const
{
return hashCacheDirty;
}
void writeHashCache() const
{
if (!hashCacheDirty)
return;
hashCacheDirty = false;
SGPath cachePath = absolutePath() / ".dirhash";
sg_ofstream stream(cachePath, std::ios::out | std::ios::trunc | std::ios::binary);
for (const auto& e : hashes) {
const auto& entry = e.second;
stream << entry.filePath << "*" << entry.modTime << "*"
<< entry.lengthBytes << "*" << entry.hashHex << "\n";
}
stream.close();
}
private:
struct ChildWithName
@@ -575,7 +718,7 @@ private:
ok = _repository->deleteDirectory(fpath, path);
} else {
// remove the hash cache entry
_repository->updatedFileContents(path, std::string());
updatedFileContents(path, std::string());
ok = path.remove();
}
@@ -593,9 +736,80 @@ private:
if (child.type == HTTPRepository::TarballType)
p.concat(
".tgz"); // For tarballs the hash is against the tarball file itself
return _repository->hashForPath(p);
return hashForPath(p);
}
void parseHashCache()
{
hashes.clear();
SGPath cachePath = absolutePath() / ".dirhash";
if (!cachePath.exists()) {
return;
}
sg_ifstream stream(cachePath, std::ios::in);
while (!stream.eof()) {
std::string line;
std::getline(stream, line);
line = simgear::strutils::strip(line);
if (line.empty() || line[0] == '#')
continue;
string_list tokens = simgear::strutils::split(line, "*");
if (tokens.size() < 4) {
SG_LOG(SG_TERRASYNC, SG_WARN, "invalid entry in '" << cachePath << "': '" << line << "' (ignoring line)");
continue;
}
const std::string nameData = simgear::strutils::strip(tokens[0]);
const std::string timeData = simgear::strutils::strip(tokens[1]);
const std::string sizeData = simgear::strutils::strip(tokens[2]);
const std::string hashData = simgear::strutils::strip(tokens[3]);
if (nameData.empty() || timeData.empty() || sizeData.empty() || hashData.empty()) {
SG_LOG(SG_TERRASYNC, SG_WARN, "invalid entry in '" << cachePath << "': '" << line << "' (ignoring line)");
continue;
}
HashCacheEntry entry;
entry.filePath = nameData;
entry.hashHex = hashData;
entry.modTime = strtol(timeData.c_str(), NULL, 10);
entry.lengthBytes = strtol(sizeData.c_str(), NULL, 10);
hashes.insert(std::make_pair(entry.filePath, entry));
}
}
void updatedFileContents(const SGPath& p, const std::string& newHash) const
{
// remove the existing entry
const auto ps = p.utf8Str();
auto it = hashes.find(ps);
if (it != hashes.end()) {
hashes.erase(it);
hashCacheDirty = true;
}
if (newHash.empty()) {
return; // we're done
}
// use a cloned SGPath and reset its caching to force one stat() call
SGPath p2(p);
p2.set_cached(false);
p2.set_cached(true);
HashCacheEntry entry;
entry.filePath = ps;
entry.hashHex = newHash;
entry.modTime = p2.modTime();
entry.lengthBytes = p2.sizeInBytes();
hashes.insert(std::make_pair(ps, entry));
hashCacheDirty = true;
}
HTTPRepoPrivate* _repository;
std::string _relativePath; // in URL and file-system space
};
@@ -606,7 +820,6 @@ HTTPRepository::HTTPRepository(const SGPath& base, HTTP::Client *cl) :
_d->http = cl;
_d->basePath = base;
_d->rootDir.reset(new HTTPDirectory(_d.get(), ""));
_d->parseHashCache();
}
HTTPRepository::~HTTPRepository()
@@ -654,6 +867,38 @@ bool HTTPRepository::isDoingSync() const
return _d->isUpdating;
}
void HTTPRepository::process()
{
int processedCount = 0;
const int maxToProcess = 16;
while (processedCount < maxToProcess) {
if (_d->pendingTasks.empty()) {
break;
}
auto task = _d->pendingTasks.front();
auto result = task(_d.get());
if (result == HTTPRepoPrivate::ProcessContinue) {
// assume we're not complete
return;
}
_d->pendingTasks.pop_front();
++processedCount;
}
_d->checkForComplete();
}
void HTTPRepoPrivate::checkForComplete()
{
if (pendingTasks.empty() && activeRequests.empty() &&
queuedRequests.empty()) {
isUpdating = false;
}
}
size_t HTTPRepository::bytesToDownload() const
{
size_t result = 0;
@@ -868,7 +1113,7 @@ HTTPRepository::failure() const
return;
}
std::string curHash = _directory->repository()->hashForPath(path());
std::string curHash = _directory->hashForPath(path());
if (hash != curHash) {
simgear::Dir d(_directory->absolutePath());
if (!d.exists()) {
@@ -967,6 +1212,7 @@ HTTPRepository::failure() const
http->cancelRequest(*rq, "Repository object deleted");
}
flushHashCaches();
directories.clear(); // wil delete them all
}
@@ -986,154 +1232,6 @@ HTTPRepository::failure() const
return r;
}
class HashEntryWithPath
{
public:
HashEntryWithPath(const SGPath& p) : path(p.utf8Str()) {}
bool operator()(const HTTPRepoPrivate::HashCacheEntry& entry) const
{ return entry.filePath == path; }
private:
std::string path;
};
std::string HTTPRepoPrivate::hashForPath(const SGPath& p)
{
HashCache::iterator it = std::find_if(hashes.begin(), hashes.end(), HashEntryWithPath(p));
if (it != hashes.end()) {
// ensure data on disk hasn't changed.
// we could also use the file type here if we were paranoid
if ((p.sizeInBytes() == it->lengthBytes) && (p.modTime() == it->modTime)) {
return it->hashHex;
}
// entry in the cache, but it's stale so remove and fall through
hashes.erase(it);
}
std::string hash = computeHashForPath(p);
updatedFileContents(p, hash);
return hash;
}
std::string HTTPRepoPrivate::computeHashForPath(const SGPath& p)
{
if (!p.exists())
return {};
sha1nfo info;
sha1_init(&info);
const int bufSize = 1024 * 1024;
char* buf = static_cast<char*>(malloc(bufSize));
if (!buf) {
sg_io_exception("Couldn't allocate SHA1 computation buffer");
}
size_t readLen;
SGBinaryFile f(p);
if (!f.open(SG_IO_IN)) {
free(buf);
throw sg_io_exception("Couldn't open file for compute hash", p);
}
while ((readLen = f.read(buf, bufSize)) > 0) {
sha1_write(&info, buf, readLen);
}
f.close();
free(buf);
std::string hashBytes((char*) sha1_result(&info), HASH_LENGTH);
return strutils::encodeHex(hashBytes);
}
void HTTPRepoPrivate::updatedFileContents(const SGPath& p, const std::string& newHash)
{
// remove the existing entry
auto it = std::find_if(hashes.begin(), hashes.end(), HashEntryWithPath(p));
if (it != hashes.end()) {
hashes.erase(it);
++hashCacheDirty;
}
if (newHash.empty()) {
return; // we're done
}
// use a cloned SGPath and reset its caching to force one stat() call
SGPath p2(p);
p2.set_cached(false);
p2.set_cached(true);
HashCacheEntry entry;
entry.filePath = p.utf8Str();
entry.hashHex = newHash;
entry.modTime = p2.modTime();
entry.lengthBytes = p2.sizeInBytes();
hashes.push_back(entry);
++hashCacheDirty ;
}
void HTTPRepoPrivate::writeHashCache()
{
if (hashCacheDirty == 0) {
return;
}
SGPath cachePath = basePath;
cachePath.append(".hashes");
sg_ofstream stream(cachePath, std::ios::out | std::ios::trunc | std::ios::binary);
HashCache::const_iterator it;
for (it = hashes.begin(); it != hashes.end(); ++it) {
stream << it->filePath << "*" << it->modTime << "*"
<< it->lengthBytes << "*" << it->hashHex << "\n";
}
stream.close();
hashCacheDirty = 0;
}
void HTTPRepoPrivate::parseHashCache()
{
hashes.clear();
SGPath cachePath = basePath;
cachePath.append(".hashes");
if (!cachePath.exists()) {
return;
}
sg_ifstream stream(cachePath, std::ios::in);
while (!stream.eof()) {
std::string line;
std::getline(stream,line);
line = simgear::strutils::strip(line);
if( line.empty() || line[0] == '#' )
continue;
string_list tokens = simgear::strutils::split(line, "*");
if( tokens.size() < 4 ) {
SG_LOG(SG_TERRASYNC, SG_WARN, "invalid entry in '" << cachePath << "': '" << line << "' (ignoring line)");
continue;
}
const std::string nameData = simgear::strutils::strip(tokens[0]);
const std::string timeData = simgear::strutils::strip(tokens[1]);
const std::string sizeData = simgear::strutils::strip(tokens[2]);
const std::string hashData = simgear::strutils::strip(tokens[3]);
if (nameData.empty() || timeData.empty() || sizeData.empty() || hashData.empty() ) {
SG_LOG(SG_TERRASYNC, SG_WARN, "invalid entry in '" << cachePath << "': '" << line << "' (ignoring line)");
continue;
}
HashCacheEntry entry;
entry.filePath = nameData;
entry.hashHex = hashData;
entry.modTime = strtol(timeData.c_str(), NULL, 10);
entry.lengthBytes = strtol(sizeData.c_str(), NULL, 10);
hashes.push_back(entry);
}
}
class DirectoryWithPath
{
public:
@@ -1163,16 +1261,12 @@ HTTPRepository::failure() const
if (it != directories.end()) {
assert((*it)->absolutePath() == absPath);
directories.erase(it);
} else {
// we encounter this code path when deleting an orphaned directory
}
} else {
// we encounter this code path when deleting an orphaned directory
}
Dir dir(absPath);
Dir dir(absPath);
bool result = dir.remove(true);
// update the hash cache too
updatedFileContents(absPath, std::string());
return result;
}
@@ -1208,17 +1302,10 @@ HTTPRepository::failure() const
http->makeRequest(rr);
}
// rate limit how often we write this, since otherwise
// it dominates the time on Windows. 256 seems about right,
// causes a write a few times a minute.
if (hashCacheDirty > 256) {
writeHashCache();
}
if (activeRequests.empty() && queuedRequests.empty()) {
isUpdating = false;
writeHashCache();
if (countDirtyHashCaches() > 32) {
flushHashCaches();
}
checkForComplete();
}
void HTTPRepoPrivate::failedToGetRootIndex(HTTPRepository::ResultCode st)
@@ -1279,4 +1366,38 @@ HTTPRepository::failure() const
failures.end());
}
void HTTPRepoPrivate::scheduleUpdateOfChildren(HTTPDirectory* dir)
{
auto updateChildTask = [dir](const HTTPRepoPrivate *) {
dir->updateChildrenBasedOnHash();
return ProcessDone;
};
addTask(updateChildTask);
}
void HTTPRepoPrivate::addTask(RepoProcessTask task) {
pendingTasks.push_back(task);
}
int HTTPRepoPrivate::countDirtyHashCaches() const
{
int result = rootDir->isHashCacheDirty() ? 1 : 0;
for (const auto& dir : directories) {
if (dir->isHashCacheDirty()) {
++result;
}
}
return result;
}
void HTTPRepoPrivate::flushHashCaches()
{
rootDir->writeHashCache();
for (const auto& dir : directories) {
dir->writeHashCache();
}
}
} // of namespace simgear

View File

@@ -62,6 +62,11 @@ public:
virtual bool isDoingSync() const;
/**
@brief call this periodically to progress non-network tasks
*/
void process();
virtual ResultCode failure() const;
virtual size_t bytesToDownload() const;

View File

@@ -19,8 +19,11 @@
#pragma once
#include <deque>
#include <functional>
#include <memory>
#include <string>
#include <unordered_map>
#include <simgear/io/HTTPClient.hxx>
#include <simgear/misc/sg_path.hxx>
@@ -48,20 +51,11 @@ protected:
size_t _contentSize = 0;
};
typedef SGSharedPtr<HTTPRepoGetRequest> RepoRequestPtr;
using RepoRequestPtr = SGSharedPtr<HTTPRepoGetRequest>;
class HTTPRepoPrivate {
public:
struct HashCacheEntry {
std::string filePath;
time_t modTime;
size_t lengthBytes;
std::string hashHex;
};
typedef std::vector<HashCacheEntry> HashCache;
HashCache hashes;
int hashCacheDirty = 0;
HTTPRepository::FailureVec failures;
int maxPermittedFailures = 16;
@@ -89,18 +83,14 @@ public:
HTTP::Request_ptr updateDir(HTTPDirectory *dir, const std::string &hash,
size_t sz);
std::string hashForPath(const SGPath &p);
void updatedFileContents(const SGPath &p, const std::string &newHash);
void parseHashCache();
std::string computeHashForPath(const SGPath &p);
void writeHashCache();
void failedToGetRootIndex(HTTPRepository::ResultCode st);
void failedToUpdateChild(const SGPath &relativePath,
HTTPRepository::ResultCode fileStatus);
void updatedChildSuccessfully(const SGPath &relativePath);
void checkForComplete();
typedef std::vector<RepoRequestPtr> RequestVector;
RequestVector queuedRequests, activeRequests;
@@ -113,10 +103,24 @@ public:
HTTPDirectory *getOrCreateDirectory(const std::string &path);
bool deleteDirectory(const std::string &relPath, const SGPath &absPath);
typedef std::vector<HTTPDirectory_ptr> DirectoryVector;
DirectoryVector directories;
void scheduleUpdateOfChildren(HTTPDirectory *dir);
SGPath installedCopyPath;
int countDirtyHashCaches() const;
void flushHashCaches();
enum ProcessResult { ProcessContinue, ProcessDone, ProcessFailed };
using RepoProcessTask = std::function<ProcessResult(HTTPRepoPrivate *repo)>;
void addTask(RepoProcessTask task);
std::deque<RepoProcessTask> pendingTasks;
};
} // namespace simgear

View File

@@ -185,6 +185,9 @@ gzfilebuf::setcompressionstrategy( int comp_strategy )
z_off_t
gzfilebuf::approxOffset() {
#ifdef __OpenBSD__
z_off_t res = 0;
#else
z_off_t res = gzoffset(file);
if (res == -1) {
@@ -201,7 +204,7 @@ gzfilebuf::approxOffset() {
SG_LOG( SG_GENERAL, SG_ALERT, errMsg );
throw sg_io_exception(errMsg);
}
#endif
return res;
}

View File

@@ -42,6 +42,7 @@
#include <simgear/bucket/newbucket.hxx>
#include <simgear/misc/sg_path.hxx>
#include <simgear/misc/strutils.hxx>
#include <simgear/math/SGGeometry.hxx>
#include <simgear/structure/exception.hxx>
@@ -563,6 +564,12 @@ bool SGBinObject::read_bin( const SGPath& file ) {
// read headers
unsigned int header;
sgReadUInt( fp, &header );
if (sgReadError()) {
gzclose(fp);
throw sg_io_exception("Unable to read BTG header: " + simgear::strutils::error_string(errno), sg_location(file));
}
if ( ((header & 0xFF000000) >> 24) == 'S' &&
((header & 0x00FF0000) >> 16) == 'G' ) {

View File

@@ -409,6 +409,7 @@ void waitForUpdateComplete(HTTP::Client* cl, HTTPRepository* repo)
cl->update();
testServer.poll();
repo->process();
if (!repo->isDoingSync()) {
return;
}
@@ -423,6 +424,7 @@ void runForTime(HTTP::Client *cl, HTTPRepository *repo, int msec = 15) {
while (start.elapsedMSec() < msec) {
cl->update();
testServer.poll();
repo->process();
SGTimeStamp::sleepForMSec(1);
}
}

View File

@@ -31,6 +31,8 @@
#include <simgear/sg_inlines.h>
#include <simgear/io/sg_file.hxx>
#include <simgear/misc/sg_dir.hxx>
#include <simgear/misc/strutils.hxx>
#include <simgear/io/iostreams/sgstream.hxx>
#include <simgear/debug/logstream.hxx>
#include <simgear/package/unzip.h>
@@ -592,7 +594,7 @@ public:
outFile.open(path, std::ios::binary | std::ios::trunc | std::ios::out);
if (outFile.fail()) {
throw sg_io_exception("failed to open output file for writing", path);
throw sg_io_exception("failed to open output file for writing:" + strutils::error_string(errno), path);
}
while (!eof) {

View File

@@ -722,12 +722,6 @@ void Root::catalogRefreshStatus(CatalogRef aCat, Delegate::StatusCode aReason)
auto catIt = d->catalogs.find(aCat->id());
d->fireRefreshStatus(aCat, aReason);
if (aReason == Delegate::STATUS_IN_PROGRESS) {
d->refreshing.insert(aCat);
} else {
d->refreshing.erase(aCat);
}
if (aCat->isUserEnabled() &&
(aReason == Delegate::STATUS_REFRESHED) &&
(catIt == d->catalogs.end()))
@@ -761,6 +755,17 @@ void Root::catalogRefreshStatus(CatalogRef aCat, Delegate::StatusCode aReason)
}
} // of catalog is disabled
// remove from refreshing /after/ checking for enable / disabled, since for
// new catalogs, the reference in d->refreshing might be our /only/
// reference to the catalog. Once the refresh is done (either failed or
// succeeded) the Catalog will be in either d->catalogs or
// d->disabledCatalogs
if (aReason == Delegate::STATUS_IN_PROGRESS) {
d->refreshing.insert(aCat);
} else {
d->refreshing.erase(aCat);
}
if (d->refreshing.empty()) {
d->fireRefreshStatus(CatalogRef(), Delegate::STATUS_REFRESHED);
d->firePackagesChanged();

View File

@@ -59,6 +59,12 @@ void AtomicChangeListener::fireChangeListeners()
listeners.clear();
}
void AtomicChangeListener::clearPendingChanges()
{
auto& listeners = ListenerListSingleton::instance()->listeners;
listeners.clear();
}
void AtomicChangeListener::valueChangedImplementation()
{
if (!_dirty) {

View File

@@ -52,7 +52,17 @@ public:
bool isDirty() { return _dirty; }
bool isValid() { return _valid; }
virtual void unregister_property(SGPropertyNode* node) override;
static void fireChangeListeners();
/**
* @brief Ensure we've deleted any pending changes.
*
* This is important in shutdown and reset, to avoid holding
* property listeners around after the property tree is destroyed
*/
static void clearPendingChanges();
private:
virtual void valueChangedImplementation() override;
virtual void valuesChanged();

View File

@@ -334,6 +334,10 @@ public:
void runInternal();
void updateSyncSlot(SyncSlot& slot);
void beginSyncAirports(SyncSlot& slot);
void beginSyncTile(SyncSlot& slot);
void beginNormalSync(SyncSlot& slot);
void drainWaitingTiles();
// commond helpers between both internal and external models
@@ -547,6 +551,7 @@ void SGTerraSync::WorkerThread::run()
void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
{
if (slot.repository.get()) {
slot.repository->process();
if (slot.repository->isDoingSync()) {
#if 1
if (slot.stamp.elapsedMSec() > (int)slot.nextWarnTimeout) {
@@ -588,47 +593,14 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
SGPath path(_local_dir);
path.append(slot.currentItem._dir);
slot.isNewDirectory = !path.exists();
if (slot.isNewDirectory) {
int rc = path.create_dir( 0755 );
if (rc) {
SG_LOG(SG_TERRASYNC,SG_ALERT,
"Cannot create directory '" << path << "', return code = " << rc );
fail(slot.currentItem);
return;
}
} // of creating directory step
// optimise initial Airport download
if (slot.isNewDirectory &&
(slot.currentItem._type == SyncItem::AirportData)) {
SG_LOG(SG_TERRASYNC, SG_INFO, "doing Airports download via tarball");
// we want to sync the 'root' TerraSync dir, but not all of it, just
// the Airports_archive.tar.gz file so we use our TerraSync local root
// as the path (since the archive will add Airports/)
slot.repository.reset(new HTTPRepository(_local_dir, &_http));
slot.repository->setBaseUrl(_httpServer + "/");
// filter callback to *only* sync the Airport_archive tarball,
// and ensure no other contents are touched
auto f = [](const HTTPRepository::SyncItem &item) {
if (!item.directory.empty())
return false;
return (item.filename.find("Airports_archive.") == 0);
};
slot.repository->setFilter(f);
const auto type = slot.currentItem._type;
if (type == SyncItem::AirportData) {
beginSyncAirports(slot);
} else if (type == SyncItem::Tile) {
beginSyncTile(slot);
} else {
slot.repository.reset(new HTTPRepository(path, &_http));
slot.repository->setBaseUrl(_httpServer + "/" +
slot.currentItem._dir);
}
if (_installRoot.exists()) {
SGPath p = _installRoot;
p.append(slot.currentItem._dir);
slot.repository->setInstalledCopyPath(p);
beginNormalSync(slot);
}
try {
@@ -651,6 +623,96 @@ void SGTerraSync::WorkerThread::updateSyncSlot(SyncSlot &slot)
}
}
void SGTerraSync::WorkerThread::beginSyncAirports(SyncSlot& slot)
{
if (!slot.isNewDirectory) {
beginNormalSync(slot);
return;
}
SG_LOG(SG_TERRASYNC, SG_INFO, "doing Airports download via tarball");
// we want to sync the 'root' TerraSync dir, but not all of it, just
// the Airports_archive.tar.gz file so we use our TerraSync local root
// as the path (since the archive will add Airports/)
slot.repository.reset(new HTTPRepository(_local_dir, &_http));
slot.repository->setBaseUrl(_httpServer);
// filter callback to *only* sync the Airport_archive tarball,
// and ensure no other contents are touched
auto f = [](const HTTPRepository::SyncItem& item) {
if (!item.directory.empty())
return false;
return (item.filename.find("Airports_archive.") == 0);
};
slot.repository->setFilter(f);
}
void SGTerraSync::WorkerThread::beginSyncTile(SyncSlot& slot)
{
// avoid 404 requests by doing a sync which excludes all paths
// except our tile path. In the case of a missing 1x1 tile, we will
// stop becuase all directories are filtered out, which is what we want
auto comps = strutils::split(slot.currentItem._dir, "/");
if (comps.size() != 3) {
SG_LOG(SG_TERRASYNC, SG_ALERT, "Bad tile path:" << slot.currentItem._dir);
beginNormalSync(slot);
return;
}
const auto tileCategory = comps.front();
const auto tenByTenDir = comps.at(1);
const auto oneByOneDir = comps.at(2);
const auto path = SGPath::fromUtf8(_local_dir) / tileCategory;
slot.repository.reset(new HTTPRepository(path, &_http));
slot.repository->setBaseUrl(_httpServer + "/" + tileCategory);
if (_installRoot.exists()) {
SGPath p = _installRoot / tileCategory;
slot.repository->setInstalledCopyPath(p);
}
const auto dirPrefix = tenByTenDir + "/" + oneByOneDir;
// filter callback to *only* sync the 1x1 dir we want, if it exists
// if doesn't, we'll simply stop, which is what we want
auto f = [tenByTenDir, oneByOneDir, dirPrefix](const HTTPRepository::SyncItem& item) {
// only allow the specific 10x10 and 1x1 dirs we want
if (item.directory.empty()) {
return item.filename == tenByTenDir;
} else if (item.directory == tenByTenDir) {
return item.filename == oneByOneDir;
}
// allow arbitrary children below dirPrefix, including sub-dirs
if (item.directory.find(dirPrefix) == 0) {
return true;
}
SG_LOG(SG_TERRASYNC, SG_ALERT, "Tile sync: saw weird path:" << item.directory << " file " << item.filename);
return false;
};
slot.repository->setFilter(f);
}
void SGTerraSync::WorkerThread::beginNormalSync(SyncSlot& slot)
{
SGPath path(_local_dir);
path.append(slot.currentItem._dir);
slot.repository.reset(new HTTPRepository(path, &_http));
slot.repository->setBaseUrl(_httpServer + "/" + slot.currentItem._dir);
if (_installRoot.exists()) {
SGPath p = _installRoot;
p.append(slot.currentItem._dir);
slot.repository->setInstalledCopyPath(p);
}
}
void SGTerraSync::WorkerThread::runInternal()
{
while (!_stop) {

View File

@@ -0,0 +1,86 @@
#if 0
set bindings for action seperate from defintion ?
- in XML, especially aircraft XML
define actions from command / Nasal
add behaviours from Nasal
define keymapping
- manager of keybindings defined against actions?
- for a toggle or enum, define behaviour
- each key repeat cycles
- alternate key to go the other way (G/shift-G)
release bindings for momentary actions:
button up / key-up
send activate, release
send deactivate, release for 'alternate' action
#endif
void SGAction::setValueExpression()
{
// watch all the properties
}
void SGAction::setValueCondition()
{
//
}
void SGAction::updateProperties()
{
//
_node->setBoolValue("enabled", isEnabled());
switch (_type) {
case Momentary:
case Toggle:
_node->setBoolValue("value", getValue());
break;
case Enumerated:
if (!_valueEnumeration.empty()) {
// map to the string value
_node->setStringValue("value", _valueEnumeration.at(getValue()));
} else {
// set as an integer
_node->setIntValue("value", getValue());
}
}
// set description
}
bool SGAction::isEnabled()
{
if (_enableCondition) {
} else {
return _enabled;
}
updateProperties();
}
int SGAction::getValue()
{
if (type == Enumerated) {
if (_valueExpression) {
// invoke it
}
} else {
if (_valueCondition) {
return _valueCondition.test();
}
}
return _value;
}
// commands enable-action, disable-action