From 61bc713e0cd60ae1ef18c035787f75d9abbf7fc1 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Fri, 8 Jun 2018 12:49:22 +0200 Subject: [PATCH 1/4] Improve performance of COPY TO #56 Under some circumstances, the COPY TO streamming can be CPU-bound, particularly when PG holds the resultset in memory buffers and the size of the rows << chunk (64 KB in my linux box). This commits improves the situation by creating a buffer of `chunk` size and fitting in as many rows as it can before pushing them. This results in more balanced read and writes (in terms of size and in bigger chunks) as well as more frequent calls to the callback, thus freeing the main loop for other events to be processed, and therefore avoiding starvation. --- copy-to.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/copy-to.js b/copy-to.js index a45ee6e..4063abb 100644 --- a/copy-to.js +++ b/copy-to.js @@ -42,6 +42,10 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { var messageCode; var needPush = false; + var buffer = Buffer.alloc(chunk.length); + var buffer_offset = 0; + var buffer_sent = false; + while((chunk.length - offset) >= (Byte1Len + Int32Len)) { var messageCode = chunk[offset] @@ -70,6 +74,11 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { case code.ErrorResponse: case code.CopyDone: + if(needPush && !buffer_sent && buffer_offset > 0) { + this.push(buffer.slice(0, buffer_offset)) + buffer_sent = true; + buffer_offset = 0; + } this._detach() this.push(null) return cb(); @@ -84,7 +93,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { if (needPush) { var row = chunk.slice(offset, offset + length - Int32Len) this.rowCount++ - this.push(row) + row.copy(buffer, buffer_offset); + buffer_offset += row.length; } offset += (length - Int32Len) } else { @@ -93,6 +103,12 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { } } + if(needPush && !buffer_sent && buffer_offset > 0) { + this.push(buffer.slice(0, buffer_offset)) + buffer_sent = true; + buffer_offset = 0; + } + if(chunk.length - offset) { var slice = chunk.slice(offset) this._remainder = slice From 922627daaf38937d4ffa85bf5f817f957b2f2fdc Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 11 Jun 2018 12:14:28 +0200 Subject: [PATCH 2/4] Small refactor --- copy-to.js | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/copy-to.js b/copy-to.js index 4063abb..2334459 100644 --- a/copy-to.js +++ b/copy-to.js @@ -46,6 +46,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { var buffer_offset = 0; var buffer_sent = false; + this.pushBufferIfneeded = function() { + if(needPush && !buffer_sent && buffer_offset > 0) { + this.push(buffer.slice(0, buffer_offset)) + buffer_sent = true; + buffer_offset = 0; + } + } + while((chunk.length - offset) >= (Byte1Len + Int32Len)) { var messageCode = chunk[offset] @@ -74,11 +82,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { case code.ErrorResponse: case code.CopyDone: - if(needPush && !buffer_sent && buffer_offset > 0) { - this.push(buffer.slice(0, buffer_offset)) - buffer_sent = true; - buffer_offset = 0; - } + this.pushBufferIfneeded(); this._detach() this.push(null) return cb(); @@ -103,11 +107,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { } } - if(needPush && !buffer_sent && buffer_offset > 0) { - this.push(buffer.slice(0, buffer_offset)) - buffer_sent = true; - buffer_offset = 0; - } + this.pushBufferIfneeded(); if(chunk.length - offset) { var slice = chunk.slice(offset) From fd3cc955736fe01c061f1bbabb10c09b71466a91 Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 11 Jun 2018 12:17:39 +0200 Subject: [PATCH 3/4] Remove unused var buffer_sent --- copy-to.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/copy-to.js b/copy-to.js index 2334459..ad66b45 100644 --- a/copy-to.js +++ b/copy-to.js @@ -44,12 +44,10 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) { var buffer = Buffer.alloc(chunk.length); var buffer_offset = 0; - var buffer_sent = false; this.pushBufferIfneeded = function() { - if(needPush && !buffer_sent && buffer_offset > 0) { + if (needPush && buffer_offset > 0) { this.push(buffer.slice(0, buffer_offset)) - buffer_sent = true; buffer_offset = 0; } } From 7930d1b8dd29b37d932403c89ebe1bad50595d2d Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Mon, 11 Jun 2018 13:27:44 +0200 Subject: [PATCH 4/4] Add entry to changelog --- NEWS.carto.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/NEWS.carto.md b/NEWS.carto.md index 095fb9f..d5fae03 100644 --- a/NEWS.carto.md +++ b/NEWS.carto.md @@ -1,5 +1,11 @@ # CARTO's Changelog +## v1.2.0-carto.1 +Released 2018-mm-dd + +Bug fixes: + * Improves performance of COPY TO by sending bigger chunks through low level `push()`. See https://github.com/CartoDB/node-pg-copy-streams/pull/1 + ## v1.2.0 Released 2016-08-22