5 Commits

Author SHA1 Message Date
Rafa de la Torre
fd3cc95573 Remove unused var buffer_sent 2018-06-11 12:17:39 +02:00
Rafa de la Torre
922627daaf Small refactor 2018-06-11 12:14:28 +02:00
Rafa de la Torre
61bc713e0c Improve performance of COPY TO #56
Under some circumstances, the COPY TO streamming can be CPU-bound,
particularly when PG holds the resultset in memory buffers and the size
of the rows << chunk (64 KB in my linux box).

This commits improves the situation by creating a buffer of `chunk`
size and fitting in as many rows as it can before pushing them. This
results in more balanced read and writes (in terms of size and in bigger
chunks) as well as more frequent calls to the callback, thus freeing the
main loop for other events to be processed, and therefore avoiding
starvation.
2018-06-08 15:04:42 +02:00
jeromew
e15feb199a README.md: copy-from error and vacuum #26 2016-08-23 12:10:25 +02:00
jeromew
191a4ec16a Fix documentation of copy-from completion 2016-08-23 11:32:10 +02:00
2 changed files with 25 additions and 3 deletions

View File

@@ -42,10 +42,15 @@ pg.connect(function(err, client, done) {
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
var fileStream = fs.createReadStream('some_file.tsv')
fileStream.on('error', done);
fileStream.pipe(stream).on('finish', done).on('error', done);
stream.on('error', done);
stream.on('end', done);
fileStream.pipe(stream);
});
```
*Important*: Even if `pg-copy-streams.from` is used as a Writable (via `pipe`), you should not listen for the 'finish' event and expect that the COPY command has already been correctly acknowledged by the database. Internally, a duplex stream is used to pipe the data into the database connection and the COPY command should be considered complete only when the 'end' event is triggered.
## install
```sh
@@ -56,7 +61,10 @@ $ npm install pg-copy-streams
This module __only__ works with the pure JavaScript bindings. If you're using `require('pg').native` please make sure to use normal `require('pg')` or `require('pg.js')` when you're using copy streams.
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/current/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
Take note of the following warning in the PostgreSQL documentation:
> COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM to recover the wasted space.
## contributing

View File

@@ -42,6 +42,16 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
var messageCode;
var needPush = false;
var buffer = Buffer.alloc(chunk.length);
var buffer_offset = 0;
this.pushBufferIfneeded = function() {
if (needPush && buffer_offset > 0) {
this.push(buffer.slice(0, buffer_offset))
buffer_offset = 0;
}
}
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
var messageCode = chunk[offset]
@@ -70,6 +80,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
case code.ErrorResponse:
case code.CopyDone:
this.pushBufferIfneeded();
this._detach()
this.push(null)
return cb();
@@ -84,7 +95,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
if (needPush) {
var row = chunk.slice(offset, offset + length - Int32Len)
this.rowCount++
this.push(row)
row.copy(buffer, buffer_offset);
buffer_offset += row.length;
}
offset += (length - Int32Len)
} else {
@@ -93,6 +105,8 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
}
}
this.pushBufferIfneeded();
if(chunk.length - offset) {
var slice = chunk.slice(offset)
this._remainder = slice