Expose more pipelining controls on HTTP code

- used for both implementations, restrict default pipeline depth to
  5 instead of 32 which was perhaps a little ambitious for some
  servers.
This commit is contained in:
James Turner
2016-03-01 12:44:22 +00:00
parent 8009a33b26
commit 49146f41e3
4 changed files with 125 additions and 48 deletions

View File

@@ -68,7 +68,6 @@ namespace HTTP
extern const int DEFAULT_HTTP_PORT = 80;
const char* CONTENT_TYPE_URL_ENCODED = "application/x-www-form-urlencoded";
const unsigned int MAX_INFLIGHT_REQUESTS = 4;
class Connection;
typedef std::multimap<std::string, Connection*> ConnectionDict;
@@ -89,7 +88,10 @@ public:
curl_multi_setopt(curlMulti, CURLMOPT_PIPELINING, 1 /* aka CURLPIPE_HTTP1 */);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxConnections);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH,
(long) MAX_INFLIGHT_REQUESTS);
(long) maxPipelineDepth);
curl_multi_setopt(curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS,
(long) maxHostConnections);
}
#else
@@ -103,11 +105,11 @@ public:
int proxyPort;
std::string proxyAuth;
unsigned int maxConnections;
unsigned int maxHostConnections;
unsigned int maxPipelineDepth;
RequestList pendingRequests;
SGTimeStamp timeTransferSample;
unsigned int bytesTransferred;
unsigned int lastTransferRate;
@@ -122,7 +124,8 @@ public:
client(pr),
state(STATE_CLOSED),
port(DEFAULT_HTTP_PORT),
connectionId(conId)
_connectionId(conId),
_maxPipelineLength(255)
{
}
@@ -139,7 +142,7 @@ public:
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
state = STATE_GETTING_BODY;
setState(STATE_GETTING_BODY);
responseComplete();
}
@@ -149,11 +152,16 @@ public:
port = p;
}
void setMaxPipelineLength(unsigned int m)
{
_maxPipelineLength = m;
}
// socket-level errors
virtual void handleError(int error)
{
const char* errStr = strerror(error);
SG_LOG(SG_IO, SG_WARN, "HTTP Connection handleError:" << error << " ("
SG_LOG(SG_IO, SG_WARN, _connectionId << " handleError:" << error << " ("
<< errStr << ")");
debugDumpRequests();
@@ -183,7 +191,7 @@ public:
_contentDecoder.reset();
}
state = STATE_SOCKET_ERROR;
setState(STATE_SOCKET_ERROR);
}
void handleTimeout()
@@ -198,12 +206,23 @@ public:
// closing of the connection from the server side when getting the body,
bool canCloseState = (state == STATE_GETTING_BODY);
if (canCloseState && activeRequest) {
// check bodyTransferSize matches how much we actually transferred
if (bodyTransferSize > 0) {
if (_contentDecoder.getTotalReceivedBytes() != bodyTransferSize) {
SG_LOG(SG_IO, SG_WARN, _connectionId << " saw connection close while still receiving bytes for:" << activeRequest->url()
<< "\n\thave:" << _contentDecoder.getTotalReceivedBytes() << " of " << bodyTransferSize);
}
}
// force state here, so responseComplete can avoid closing the
// socket again
state = STATE_CLOSED;
SG_LOG(SG_IO, SG_DEBUG, _connectionId << " saw connection close after getting:" << activeRequest->url());
setState(STATE_CLOSED);
responseComplete();
} else {
if (state == STATE_WAITING_FOR_RESPONSE) {
SG_LOG(SG_IO, SG_DEBUG, _connectionId << ":close while waiting for response, front request is:"
<< sentRequests.front()->url());
assert(!sentRequests.empty());
sentRequests.front()->setFailure(500, "server closed connection unexpectedly");
// no active request, but don't restore the front sent one
@@ -223,7 +242,7 @@ public:
_contentDecoder.reset();
}
state = STATE_CLOSED;
setState(STATE_CLOSED);
}
if (sentRequests.empty()) {
@@ -250,13 +269,14 @@ public:
activeRequest = sentRequests.front();
try {
SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " saw start of response for " << activeRequest->url());
activeRequest->responseStart(buffer);
} catch (sg_exception& e) {
handleError(EIO);
return;
}
state = STATE_GETTING_HEADERS;
setState(STATE_GETTING_HEADERS);
buffer.clear();
if (activeRequest->responseCode() == 204) {
noMessageBody = true;
@@ -282,18 +302,19 @@ public:
return;
}
if (sentRequests.size() > MAX_INFLIGHT_REQUESTS) {
if (sentRequests.size() >= _maxPipelineLength) {
return;
}
if (state == STATE_CLOSED) {
if (!connectToHost()) {
setState(STATE_SOCKET_ERROR);
return;
}
SG_LOG(SG_IO, SG_DEBUG, "connection " << _connectionId << " connected.");
setTerminator("\r\n");
state = STATE_IDLE;
setState(STATE_IDLE);
}
Request_ptr r = queuedRequests.front();
@@ -373,12 +394,12 @@ public:
}
}
SG_LOG(SG_IO, SG_DEBUG, "con:" << connectionId << " did start request:" << r->url());
SG_LOG(SG_IO, SG_DEBUG, "con:" << _connectionId << " did send request:" << r->url());
// successfully sent, remove from queue, and maybe send the next
queuedRequests.pop_front();
sentRequests.push_back(r);
if (state == STATE_IDLE) {
state = STATE_WAITING_FOR_RESPONSE;
setState(STATE_WAITING_FOR_RESPONSE);
}
// pipelining, let's maybe send the next request right away
@@ -420,7 +441,7 @@ public:
case STATE_GETTING_CHUNKED_BYTES:
setTerminator("\r\n");
state = STATE_GETTING_CHUNKED;
setState(STATE_GETTING_CHUNKED);
buffer.clear();
break;
@@ -466,7 +487,7 @@ public:
bool shouldStartNext() const
{
return !queuedRequests.empty() && (sentRequests.size() < MAX_INFLIGHT_REQUESTS);
return !queuedRequests.empty() && (sentRequests.size() < _maxPipelineLength);
}
bool isActive() const
@@ -474,9 +495,14 @@ public:
return !queuedRequests.empty() || !sentRequests.empty();
}
std::string connectionId() const
{
return _connectionId;
}
void debugDumpRequests() const
{
SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << connectionId
SG_LOG(SG_IO, SG_DEBUG, "requests for:" << host << ":" << port << " (conId=" << _connectionId
<< "; state=" << state << ")");
if (activeRequest) {
SG_LOG(SG_IO, SG_DEBUG, "\tactive:" << activeRequest->url());
@@ -493,6 +519,27 @@ public:
}
}
private:
enum ConnectionState {
STATE_IDLE = 0,
STATE_WAITING_FOR_RESPONSE,
STATE_GETTING_HEADERS,
STATE_GETTING_BODY,
STATE_GETTING_CHUNKED,
STATE_GETTING_CHUNKED_BYTES,
STATE_GETTING_TRAILER,
STATE_SOCKET_ERROR,
STATE_CLOSED ///< connection should be closed now
};
void setState(ConnectionState newState)
{
if (state == newState) {
return;
}
state = newState;
}
bool connectToHost()
{
SG_LOG(SG_IO, SG_DEBUG, "HTTP connecting to " << host << ":" << port);
@@ -579,11 +626,11 @@ private:
buffer.clear();
if (chunkSize == 0) { // trailer start
state = STATE_GETTING_TRAILER;
setState(STATE_GETTING_TRAILER);
return;
}
state = STATE_GETTING_CHUNKED_BYTES;
setState(STATE_GETTING_CHUNKED_BYTES);
setByteCount(chunkSize);
}
@@ -605,15 +652,15 @@ private:
_contentDecoder.initWithRequest(activeRequest);
if (chunkedTransfer) {
state = STATE_GETTING_CHUNKED;
setState(STATE_GETTING_CHUNKED);
} else if (noMessageBody || (bodyTransferSize == 0)) {
// force the state to GETTING_BODY, to simplify logic in
// responseComplete and handleClose
state = STATE_GETTING_BODY;
setState(STATE_GETTING_BODY);
responseComplete();
} else {
setByteCount(bodyTransferSize); // may be -1, that's fine
state = STATE_GETTING_BODY;
setState(STATE_GETTING_BODY);
}
}
@@ -629,6 +676,7 @@ private:
if ((state == STATE_GETTING_BODY) || (state == STATE_GETTING_TRAILER)) {
if (doClose) {
SG_LOG(SG_IO, SG_DEBUG, _connectionId << " doClose requested");
// this will bring us into handleClose() above, which updates
// state to STATE_CLOSED
close();
@@ -639,7 +687,7 @@ private:
}
if (state != STATE_CLOSED) {
state = sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE;
setState(sentRequests.empty() ? STATE_IDLE : STATE_WAITING_FOR_RESPONSE);
}
// notify request after we change state, so this connection is idle
@@ -650,18 +698,6 @@ private:
setTerminator("\r\n");
}
enum ConnectionState {
STATE_IDLE = 0,
STATE_WAITING_FOR_RESPONSE,
STATE_GETTING_HEADERS,
STATE_GETTING_BODY,
STATE_GETTING_CHUNKED,
STATE_GETTING_CHUNKED_BYTES,
STATE_GETTING_TRAILER,
STATE_SOCKET_ERROR,
STATE_CLOSED ///< connection should be closed now
};
Client* client;
Request_ptr activeRequest;
ConnectionState state;
@@ -677,7 +713,8 @@ private:
RequestList sentRequests;
ContentDecoder _contentDecoder;
std::string connectionId;
std::string _connectionId;
unsigned int _maxPipelineLength;
};
#endif // of !ENABLE_CURL
@@ -686,11 +723,12 @@ Client::Client() :
{
d->proxyPort = 0;
d->maxConnections = 4;
d->maxHostConnections = 4;
d->bytesTransferred = 0;
d->lastTransferRate = 0;
d->timeTransferSample.stamp();
d->totalBytesDownloaded = 0;
d->maxPipelineDepth = 5;
setUserAgent("SimGear-" SG_STRINGIZE(SIMGEAR_VERSION));
#if defined(ENABLE_CURL)
static bool didInitCurlGlobal = false;
@@ -712,16 +750,33 @@ Client::~Client()
void Client::setMaxConnections(unsigned int maxCon)
{
if (maxCon < 1) {
throw sg_range_exception("illegal HTTP::Client::setMaxConnections value");
}
d->maxConnections = maxCon;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) maxCon);
#endif
}
void Client::setMaxHostConnections(unsigned int maxHostCon)
{
d->maxHostConnections = maxHostCon;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_HOST_CONNECTIONS, (long) maxHostCon);
#endif
}
void Client::setMaxPipelineDepth(unsigned int depth)
{
d->maxPipelineDepth = depth;
#if defined(ENABLE_CURL)
curl_multi_setopt(d->curlMulti, CURLMOPT_MAX_PIPELINE_LENGTH, (long) depth);
#else
ConnectionDict::iterator it = d->connections.begin();
for (; it != d->connections.end(); ) {
it->second->setMaxPipelineLength(depth);
}
#endif
}
void Client::update(int waitTimeout)
{
#if defined(ENABLE_CURL)
@@ -938,25 +993,35 @@ void Client::makeRequest(const Request_ptr& r)
}
}
if (!con && atConnectionsLimit) {
bool atHostConnectionsLimit = (count >= d->maxHostConnections);
if (!con && (atConnectionsLimit || atHostConnectionsLimit)) {
// all current connections are busy (active), and we don't
// have free connections to allocate, so let's assign to
// an existing one randomly. Ideally we'd used whichever one will
// complete first but we don't have that info.
int index = rand() % count;
for (it = d->connections.find(connectionId); index > 0; --index) { ; }
for (it = d->connections.find(connectionId); index > 0; --index, ++it) { ; }
con = it->second;
}
// allocate a new connection object
if (!con) {
con = new Connection(this, connectionId);
static int connectionSuffx = 0;
std::stringstream ss;
ss << connectionId << "-" << connectionSuffx++;
SG_LOG(SG_IO, SG_DEBUG, "allocating new connection for ID:" << ss.str());
con = new Connection(this, ss.str());
con->setServer(host, port);
con->setMaxPipelineLength(d->maxPipelineDepth);
d->poller.addChannel(con);
d->connections.insert(d->connections.end(),
ConnectionDict::value_type(connectionId, con));
}
SG_LOG(SG_IO, SG_DEBUG, "queing request for " << r->url() << " on:" << con->connectionId());
con->queueRequest(r);
#endif
}

View File

@@ -75,6 +75,13 @@ public:
*/
void setMaxConnections(unsigned int maxCons);
void setMaxHostConnections(unsigned int maxHostConns);
/**
* maximum depth to pipeline requests - set to 0 to disable pipelining
*/
void setMaxPipelineDepth(unsigned int depth);
const std::string& userAgent() const;
const std::string& proxyHost() const;

View File

@@ -51,9 +51,9 @@ ContentDecoder::ContentDecoder() :
_output(NULL),
_zlib(NULL),
_input(NULL),
_inputAllocated(0),
_inputSize(0)
_inputAllocated(0)
{
reset();
}
ContentDecoder::~ContentDecoder()
@@ -82,6 +82,7 @@ void ContentDecoder::reset()
_contentDeflate = false;
_needGZipHeader = false;
_inputSize = 0;
_totalReceivedBytes = 0;
}
void ContentDecoder::initWithRequest(Request_ptr req)
@@ -120,6 +121,7 @@ void ContentDecoder::finish()
void ContentDecoder::receivedBytes(const char* n, size_t s)
{
_totalReceivedBytes += s;
if (!_contentDeflate) {
_request->processBodyBytes(n, s);
return;

View File

@@ -49,6 +49,8 @@ public:
void receivedBytes(const char* n, size_t s);
size_t getTotalReceivedBytes() const
{ return _totalReceivedBytes; }
private:
bool consumeGZipHeader();
void runDecoder();
@@ -63,6 +65,7 @@ private:
unsigned char* _input;
size_t _inputAllocated, _inputSize;
bool _contentDeflate, _needGZipHeader;
size_t _totalReceivedBytes;
};
} // of namespace HTTP