Compare commits
53 Commits
v0.2.3
...
v1.2.0-car
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f1d5cb4a5 | ||
|
|
99d397956c | ||
|
|
c85f7d27b8 | ||
|
|
8b355b6e72 | ||
|
|
d7e5c1383f | ||
|
|
cb2227d159 | ||
|
|
7930d1b8dd | ||
|
|
e94fefe902 | ||
|
|
9293926047 | ||
|
|
fd3cc95573 | ||
|
|
922627daaf | ||
|
|
61bc713e0c | ||
|
|
e15feb199a | ||
|
|
191a4ec16a | ||
|
|
399bff7ed7 | ||
|
|
8174e10fb5 | ||
|
|
f29aef3bba | ||
|
|
b2e108571e | ||
|
|
7003f6070f | ||
|
|
ade7ab95a0 | ||
|
|
9ccda04036 | ||
|
|
a5e532f20b | ||
|
|
2a4db2920e | ||
|
|
f155899570 | ||
|
|
18be125596 | ||
|
|
ae5b344395 | ||
|
|
ee84aba89f | ||
|
|
e0aa7db324 | ||
|
|
0f0ddf7ad4 | ||
|
|
ed57e131e9 | ||
|
|
c4a0e6dd58 | ||
|
|
d5b5c8c569 | ||
|
|
6981ea6ac5 | ||
|
|
d2b9677a02 | ||
|
|
bfee86543f | ||
|
|
21f47220fc | ||
|
|
b1b613125f | ||
|
|
4222053744 | ||
|
|
040ed5f4da | ||
|
|
f9ee1c083a | ||
|
|
dcfffd0670 | ||
|
|
12e4ca33b0 | ||
|
|
8f2355e454 | ||
|
|
d6eab36b66 | ||
|
|
33f6ecc11b | ||
|
|
36572a8b7b | ||
|
|
0c5d08edae | ||
|
|
b78a3eb845 | ||
|
|
107f007249 | ||
|
|
25b8d6da5f | ||
|
|
1c0c8871c1 | ||
|
|
beb54334e2 | ||
|
|
1db9b3ec3d |
17
.travis.yml
Normal file
17
.travis.yml
Normal file
@@ -0,0 +1,17 @@
|
||||
language: node_js
|
||||
node_js:
|
||||
- "0.12"
|
||||
- "4"
|
||||
- "5"
|
||||
- "6"
|
||||
|
||||
addons:
|
||||
postgresql: "9.2"
|
||||
|
||||
services:
|
||||
- postgresql
|
||||
|
||||
before_install:
|
||||
- npm install npm --global
|
||||
env:
|
||||
- PGUSER=postgres PGDATABASE=postgres
|
||||
14
Makefile
Normal file
14
Makefile
Normal file
@@ -0,0 +1,14 @@
|
||||
.PHONY: publish-patch test
|
||||
|
||||
test:
|
||||
npm test
|
||||
|
||||
patch: test
|
||||
npm version patch -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
|
||||
minor: test
|
||||
npm version minor -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
18
NEWS.carto.md
Normal file
18
NEWS.carto.md
Normal file
@@ -0,0 +1,18 @@
|
||||
# CARTO's Changelog
|
||||
|
||||
## v1.2.0-carto.2
|
||||
Released 2018-10-26
|
||||
|
||||
Bug fixes:
|
||||
* Make all modules to use strict mode semantics.
|
||||
|
||||
## v1.2.0-carto.1
|
||||
Released 2018-06-11
|
||||
|
||||
Bug fixes:
|
||||
* Improves performance of COPY TO by sending bigger chunks through low level `push()`. See https://github.com/CartoDB/node-pg-copy-streams/pull/1
|
||||
|
||||
## v1.2.0
|
||||
Released 2016-08-22
|
||||
|
||||
Vanilla version v1.2.0 from upstream repository. See https://github.com/CartoDB/node-pg-copy-streams/releases/tag/v1.2.0
|
||||
21
README.md
21
README.md
@@ -1,5 +1,7 @@
|
||||
## pg-copy-streams
|
||||
|
||||
[](https://travis-ci.org/brianc/node-pg-copy-streams)
|
||||
|
||||
COPY FROM / COPY TO for node-postgres. Stream from one database to another, and stuff.
|
||||
|
||||
## how? what? huh?
|
||||
@@ -38,13 +40,17 @@ var copyFrom = require('pg-copy-streams').from;
|
||||
|
||||
pg.connect(function(err, client, done) {
|
||||
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
|
||||
var fileStream = fs.createReadStream('some_file.tdv')
|
||||
fileStream.pipe(stream);
|
||||
fileStream.on('end', done);
|
||||
var fileStream = fs.createReadStream('some_file.tsv')
|
||||
fileStream.on('error', done);
|
||||
stream.on('error', done);
|
||||
stream.on('end', done);
|
||||
fileStream.pipe(stream);
|
||||
});
|
||||
```
|
||||
|
||||
*Important*: Even if `pg-copy-streams.from` is used as a Writable (via `pipe`), you should not listen for the 'finish' event and expect that the COPY command has already been correctly acknowledged by the database. Internally, a duplex stream is used to pipe the data into the database connection and the COPY command should be considered complete only when the 'end' event is triggered.
|
||||
|
||||
|
||||
## install
|
||||
|
||||
```sh
|
||||
@@ -53,9 +59,14 @@ $ npm install pg-copy-streams
|
||||
|
||||
## notice
|
||||
|
||||
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
|
||||
This module __only__ works with the pure JavaScript bindings. If you're using `require('pg').native` please make sure to use normal `require('pg')` or `require('pg.js')` when you're using copy streams.
|
||||
|
||||
## contributing
|
||||
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/current/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
|
||||
|
||||
Take note of the following warning in the PostgreSQL documentation:
|
||||
> COPY stops operation at the first error. This should not lead to problems in the event of a COPY TO, but the target table will already have received earlier rows in a COPY FROM. These rows will not be visible or accessible, but they still occupy disk space. This might amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke VACUUM to recover the wasted space.
|
||||
|
||||
## contributing
|
||||
|
||||
Instead of adding a bunch more code to the already bloated [node-postgres](https://github.com/brianc/node-postgres) I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules.
|
||||
This is one of those.
|
||||
|
||||
137
copy-to.js
137
copy-to.js
@@ -1,15 +1,17 @@
|
||||
module.exports = function(txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
'use strict';
|
||||
|
||||
module.exports = function(txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
var code = require('./message-formats')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = {}
|
||||
this._copyOutResponse = null
|
||||
this._gotCopyOutResponse = false
|
||||
this.rowCount = 0
|
||||
}
|
||||
|
||||
@@ -20,77 +22,93 @@ var eventTypes = ['close', 'data', 'end', 'error']
|
||||
CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
this.connection = connection
|
||||
var self = this
|
||||
eventTypes.forEach(function(type) {
|
||||
self._listeners[type] = connection.stream.listeners(type)
|
||||
connection.stream.removeAllListeners(type)
|
||||
})
|
||||
this.connection.removeAllListeners('copyData')
|
||||
connection.stream.pipe(this)
|
||||
}
|
||||
|
||||
var code = {
|
||||
E: 69, //Error
|
||||
H: 72, //CopyOutResponse
|
||||
d: 0x64, //CopyData
|
||||
c: 0x63 //CopyDone
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._detach = function() {
|
||||
this.connection.stream.unpipe()
|
||||
var self = this
|
||||
eventTypes.forEach(function(type) {
|
||||
self.connection.stream.removeAllListeners(type)
|
||||
self._listeners[type].forEach(function(listener) {
|
||||
self.connection.stream.on(type, listener)
|
||||
})
|
||||
})
|
||||
this.connection.stream.unpipe(this)
|
||||
// Unpipe can drop us out of flowing mode
|
||||
this.connection.stream.resume()
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
var offset = 0
|
||||
var Byte1Len = 1;
|
||||
var Int32Len = 4;
|
||||
if(this._remainder && chunk) {
|
||||
chunk = Buffer.concat([this._remainder, chunk])
|
||||
}
|
||||
if(!this._copyOutResponse) {
|
||||
this._copyOutResponse = true
|
||||
if(chunk[0] == code.E) {
|
||||
this._detach()
|
||||
this.connection.stream.unshift(chunk)
|
||||
this.push(null)
|
||||
return cb();
|
||||
|
||||
var length;
|
||||
var messageCode;
|
||||
var needPush = false;
|
||||
|
||||
var buffer = Buffer.alloc(chunk.length);
|
||||
var buffer_offset = 0;
|
||||
|
||||
this.pushBufferIfneeded = function() {
|
||||
if (needPush && buffer_offset > 0) {
|
||||
this.push(buffer.slice(0, buffer_offset))
|
||||
buffer_offset = 0;
|
||||
}
|
||||
if(chunk[0] != code.H) {
|
||||
this.emit('error', new Error('Expected copy out response'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(1)
|
||||
offset = 1
|
||||
offset += length
|
||||
}
|
||||
while((chunk.length - offset) > 5) {
|
||||
|
||||
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
|
||||
var messageCode = chunk[offset]
|
||||
//complete
|
||||
if(messageCode == code.c) {
|
||||
this._detach()
|
||||
this.connection.stream.unshift(chunk.slice(offset + 5))
|
||||
this.push(null)
|
||||
return cb();
|
||||
|
||||
//console.log('PostgreSQL message ' + String.fromCharCode(messageCode))
|
||||
switch(messageCode) {
|
||||
|
||||
// detect COPY start
|
||||
case code.CopyOutResponse:
|
||||
if (!this._gotCopyOutResponse) {
|
||||
this._gotCopyOutResponse = true
|
||||
} else {
|
||||
this.emit('error', new Error('Unexpected CopyOutResponse message (H)'))
|
||||
}
|
||||
break;
|
||||
|
||||
// meaningful row
|
||||
case code.CopyData:
|
||||
needPush = true;
|
||||
break;
|
||||
|
||||
// standard interspersed messages. discard
|
||||
case code.ParameterStatus:
|
||||
case code.NoticeResponse:
|
||||
case code.NotificationResponse:
|
||||
break;
|
||||
|
||||
case code.ErrorResponse:
|
||||
case code.CopyDone:
|
||||
this.pushBufferIfneeded();
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
break;
|
||||
default:
|
||||
this.emit('error', new Error('Unexpected PostgreSQL message ' + String.fromCharCode(messageCode)))
|
||||
}
|
||||
//something bad happened
|
||||
if(messageCode != code.d) {
|
||||
return this.emit('error', new Error('expected "d" (copydata message)'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
|
||||
//can we read the next row?
|
||||
if(chunk.length > (offset + length + 5)) {
|
||||
offset += 5
|
||||
var slice = chunk.slice(offset, offset + length)
|
||||
offset += length
|
||||
this.push(slice)
|
||||
this.rowCount++
|
||||
|
||||
length = chunk.readUInt32BE(offset+Byte1Len)
|
||||
if(chunk.length >= (offset + Byte1Len + length)) {
|
||||
offset += Byte1Len + Int32Len
|
||||
if (needPush) {
|
||||
var row = chunk.slice(offset, offset + length - Int32Len)
|
||||
this.rowCount++
|
||||
row.copy(buffer, buffer_offset);
|
||||
buffer_offset += row.length;
|
||||
}
|
||||
offset += (length - Int32Len)
|
||||
} else {
|
||||
// we need more chunks for a complete message
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
this.pushBufferIfneeded();
|
||||
|
||||
if(chunk.length - offset) {
|
||||
var slice = chunk.slice(offset)
|
||||
this._remainder = slice
|
||||
@@ -104,6 +122,9 @@ CopyStreamQuery.prototype.handleError = function(e) {
|
||||
this.emit('error', e)
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleCopyData = function(chunk) {
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleCommandComplete = function() {
|
||||
}
|
||||
|
||||
|
||||
55
index.js
55
index.js
@@ -1,19 +1,21 @@
|
||||
var CopyToQueryStream = require('./copy-to')
|
||||
'use strict';
|
||||
|
||||
var CopyToQueryStream = require('./copy-to')
|
||||
module.exports = {
|
||||
to: function(txt) {
|
||||
return new CopyToQueryStream(txt)
|
||||
to: function(txt, options) {
|
||||
return new CopyToQueryStream(txt, options)
|
||||
},
|
||||
from: function (txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
from: function (txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
var code = require('./message-formats')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._copyOutResponse = null
|
||||
@@ -27,28 +29,23 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
}
|
||||
|
||||
var code = {
|
||||
H: 72, //CopyOutResponse
|
||||
d: 0x64, //CopyData
|
||||
c: 0x63 //CopyDone
|
||||
}
|
||||
|
||||
var copyDataBuffer = Buffer([code.d])
|
||||
var copyDataBuffer = Buffer([code.CopyData])
|
||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
var Int32Len = 4;
|
||||
this.push(copyDataBuffer)
|
||||
var lenBuffer = Buffer(4)
|
||||
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
|
||||
var lenBuffer = Buffer(Int32Len)
|
||||
lenBuffer.writeUInt32BE(chunk.length + Int32Len, 0)
|
||||
this.push(lenBuffer)
|
||||
this.push(chunk)
|
||||
this.rowCount++
|
||||
cb()
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._flush = function(cb) {
|
||||
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
|
||||
var Int32Len = 4;
|
||||
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, Int32Len])
|
||||
this.push(finBuffer)
|
||||
//never call this callback, do not close underlying stream
|
||||
//cb()
|
||||
this.cb_flush = cb
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleError = function(e) {
|
||||
@@ -56,12 +53,24 @@ CopyStreamQuery.prototype.handleError = function(e) {
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
|
||||
this.pipe(connection.stream)
|
||||
this.pipe(connection.stream, { end: false })
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleCommandComplete = function() {
|
||||
this.unpipe()
|
||||
this.emit('end')
|
||||
CopyStreamQuery.prototype.handleCommandComplete = function(msg) {
|
||||
// Parse affected row count as in
|
||||
// https://github.com/brianc/node-postgres/blob/35e5567f86774f808c2a8518dd312b8aa3586693/lib/result.js#L37
|
||||
var match = /COPY (\d+)/.exec((msg || {}).text)
|
||||
if (match) {
|
||||
this.rowCount = parseInt(match[1], 10)
|
||||
}
|
||||
|
||||
// we delay the _flush cb so that the 'end' event is
|
||||
// triggered after CommandComplete
|
||||
this.cb_flush()
|
||||
|
||||
// unpipe from connection
|
||||
this.unpipe(this.connection)
|
||||
this.connection = null
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleReadyForQuery = function() {
|
||||
|
||||
25
message-formats.js
Normal file
25
message-formats.js
Normal file
@@ -0,0 +1,25 @@
|
||||
/**
|
||||
* The COPY feature uses the following protocol codes.
|
||||
* The codes for the most recent protocol version are documented on
|
||||
* https://www.postgresql.org/docs/current/static/protocol-message-formats.html
|
||||
*
|
||||
* The protocol flow itself is described on
|
||||
* https://www.postgresql.org/docs/current/static/protocol-flow.html
|
||||
*/
|
||||
module.exports = {
|
||||
ErrorResponse: 0x45, // E
|
||||
CopyInResponse: 0x47, // G
|
||||
CopyOutResponse: 0x48, // H
|
||||
CopyBothResponse: 0x57, // W
|
||||
CopyDone: 0x63, // c
|
||||
CopyData: 0x64, // d
|
||||
CopyFail: 0x66, // f
|
||||
|
||||
// It is possible for NoticeResponse and ParameterStatus messages to be interspersed between CopyData messages;
|
||||
// frontends must handle these cases, and should be prepared for other asynchronous message types as well
|
||||
// (see Section 50.2.6).
|
||||
// Otherwise, any message type other than CopyData or CopyDone may be treated as terminating copy-out mode.
|
||||
NotificationResponse: 0x41, // A
|
||||
NoticeResponse: 0x4E, // N
|
||||
ParameterStatus: 0x53 // S
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "pg-copy-streams",
|
||||
"version": "0.2.3",
|
||||
"version": "1.2.0-carto.2",
|
||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
@@ -23,7 +23,7 @@
|
||||
"url": "https://github.com/brianc/node-pg-copy-streams/issues"
|
||||
},
|
||||
"devDependencies": {
|
||||
"pg.js": "~2.8.1",
|
||||
"pg": "~4.4.3",
|
||||
"concat-stream": "~1.1.0",
|
||||
"gonna": "0.0.0",
|
||||
"lodash": "~2.2.1",
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
'use strict';
|
||||
|
||||
var assert = require('assert')
|
||||
var gonna = require('gonna')
|
||||
|
||||
var async = require('async')
|
||||
var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
|
||||
var from = require('../').from
|
||||
var to = require('../').to
|
||||
@@ -19,10 +21,12 @@ var testBinaryCopy = function() {
|
||||
var fromClient = client()
|
||||
var toClient = client()
|
||||
|
||||
queries = [
|
||||
'CREATE TABLE data (num BIGINT, word TEXT)',
|
||||
var queries = [
|
||||
'DROP TABLE IF EXISTS data',
|
||||
'CREATE TABLE IF NOT EXISTS data (num BIGINT, word TEXT)',
|
||||
'INSERT INTO data (num, word) VALUES (1, \'hello\'), (2, \'other thing\'), (3, \'goodbye\')',
|
||||
'CREATE TABLE data_copy (LIKE data INCLUDING ALL)'
|
||||
'DROP TABLE IF EXISTS data_copy',
|
||||
'CREATE TABLE IF NOT EXISTS data_copy (LIKE data INCLUDING ALL)'
|
||||
]
|
||||
|
||||
async.eachSeries(queries, _.bind(fromClient.query, fromClient), function(err) {
|
||||
@@ -31,7 +35,7 @@ var testBinaryCopy = function() {
|
||||
var fromStream = fromClient.query(to('COPY (SELECT * FROM data) TO STDOUT BINARY'))
|
||||
var toStream = toClient.query(from('COPY data_copy FROM STDIN BINARY'))
|
||||
|
||||
runStream = function(callback) {
|
||||
var runStream = function(callback) {
|
||||
fromStream.on('error', callback)
|
||||
toStream.on('error', callback)
|
||||
toStream.on('finish', callback)
|
||||
|
||||
@@ -1,29 +1,37 @@
|
||||
'use strict';
|
||||
|
||||
var assert = require('assert')
|
||||
var gonna = require('gonna')
|
||||
|
||||
var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
|
||||
var copy = require('../').from
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var highWaterMark = 10
|
||||
var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
|
||||
for(var i = 0; i < highWaterMark * 1.5; i++) {
|
||||
stream.write('1\t2\n')
|
||||
}
|
||||
assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testRange = function(top) {
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
return client
|
||||
}
|
||||
|
||||
var fromClient = client()
|
||||
var copy = require('../').from
|
||||
|
||||
fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
|
||||
var txt = 'COPY numbers FROM STDIN'
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var rowEmitCount = 0
|
||||
stream.on('row', function() {
|
||||
rowEmitCount++
|
||||
})
|
||||
for(var i = 0; i < top; i++) {
|
||||
stream.write(Buffer('' + i + '\t' + i*10 + '\n'))
|
||||
}
|
||||
@@ -33,7 +41,8 @@ var testRange = function(top) {
|
||||
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
|
||||
console.log('found ', res.rows.length, 'rows')
|
||||
assert.equal(stream.rowCount, top, 'expected ' + top + ' rows but db count is ' + stream.rowCount)
|
||||
//console.log('found ', res.rows.length, 'rows')
|
||||
countDone()
|
||||
var firstRowDone = gonna('have correct result')
|
||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' ')
|
||||
@@ -48,3 +57,19 @@ var testRange = function(top) {
|
||||
}
|
||||
|
||||
testRange(1000)
|
||||
|
||||
var testSingleEnd = function() {
|
||||
var fromClient = client()
|
||||
fromClient.query('CREATE TEMP TABLE numbers(num int)')
|
||||
var txt = 'COPY numbers FROM STDIN';
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var count = 0;
|
||||
stream.on('end', function() {
|
||||
count++;
|
||||
assert(count==1, '`end` Event was triggered ' + count + ' times');
|
||||
if (count == 1) fromClient.end();
|
||||
})
|
||||
stream.end(Buffer('1\n'))
|
||||
|
||||
}
|
||||
testSingleEnd()
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
'use strict';
|
||||
|
||||
var assert = require('assert')
|
||||
var gonna = require('gonna')
|
||||
|
||||
var _ = require('lodash')
|
||||
var async = require('async')
|
||||
var concat = require('concat-stream')
|
||||
var pg = require('pg.js')
|
||||
var pg = require('pg')
|
||||
|
||||
var copy = require('../').to
|
||||
var code = require('../message-formats')
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
@@ -14,9 +17,31 @@ var client = function() {
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT'
|
||||
var stream = copy(txt, {highWaterMark: 10})
|
||||
assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testComparators = function() {
|
||||
var copy1 = copy();
|
||||
copy1.pipe(concat(function(buf) {
|
||||
assert(copy1._gotCopyOutResponse, 'should have received CopyOutResponse')
|
||||
assert(!copy1._remainder, 'Message with no additional data (len=Int4Len+0) should not leave a remainder')
|
||||
}))
|
||||
copy1.end(new Buffer([code.CopyOutResponse, 0x00, 0x00, 0x00, 0x04]));
|
||||
|
||||
|
||||
}
|
||||
testComparators();
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
var res;
|
||||
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var done = gonna('finish piping out', 1000, function() {
|
||||
@@ -24,37 +49,74 @@ var testRange = function(top) {
|
||||
})
|
||||
|
||||
stream.pipe(concat(function(buf) {
|
||||
var res = buf.toString('utf8')
|
||||
res = buf.toString('utf8')
|
||||
}))
|
||||
|
||||
stream.on('end', function() {
|
||||
var expected = _.range(0, top).join('\n') + '\n'
|
||||
assert.equal(res, expected)
|
||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' but got ' + stream.rowCount)
|
||||
done()
|
||||
}))
|
||||
});
|
||||
}
|
||||
|
||||
testRange(10000)
|
||||
|
||||
var testLeak = function(rounds) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT 10) TO STDOUT'
|
||||
var testInternalPostgresError = function() {
|
||||
var cancelClient = client()
|
||||
var queryClient = client()
|
||||
|
||||
var runStream = function(num, callback) {
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var runStream = function(callback) {
|
||||
var txt = "COPY (SELECT pg_sleep(10)) TO STDOUT"
|
||||
var stream = queryClient.query(copy(txt))
|
||||
stream.on('data', function(data) {
|
||||
// Just throw away the data.
|
||||
})
|
||||
stream.on('end', callback)
|
||||
stream.on('error', callback)
|
||||
|
||||
setTimeout(function() {
|
||||
var cancelQuery = "SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE query ~ 'pg_sleep' AND NOT query ~ 'pg_cancel_backend'"
|
||||
cancelClient.query(cancelQuery)
|
||||
}, 50)
|
||||
}
|
||||
|
||||
async.timesSeries(rounds, runStream, function(err) {
|
||||
assert.equal(err, null)
|
||||
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
|
||||
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
|
||||
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
|
||||
assert.equal(fromClient.connection.stream.listeners('error').length, 1)
|
||||
fromClient.end()
|
||||
runStream(function(err) {
|
||||
assert.notEqual(err, null)
|
||||
var expectedMessage = 'canceling statement due to user request'
|
||||
assert.notEqual(err.toString().indexOf(expectedMessage), -1, 'Error message should mention reason for query failure.')
|
||||
cancelClient.end()
|
||||
queryClient.end()
|
||||
})
|
||||
}
|
||||
testInternalPostgresError()
|
||||
|
||||
var testNoticeResponse = function() {
|
||||
// we use a special trick to generate a warning
|
||||
// on the copy stream.
|
||||
var queryClient = client()
|
||||
var set = '';
|
||||
set += 'SET SESSION client_min_messages = WARNING;'
|
||||
set += 'SET SESSION standard_conforming_strings = off;'
|
||||
set += 'SET SESSION escape_string_warning = on;'
|
||||
queryClient.query(set, function(err, res) {
|
||||
assert.equal(err, null, 'testNoticeResponse - could not SET parameters')
|
||||
var runStream = function(callback) {
|
||||
var txt = "COPY (SELECT '\\\n') TO STDOUT"
|
||||
var stream = queryClient.query(copy(txt))
|
||||
stream.on('data', function(data) {
|
||||
})
|
||||
stream.on('error', callback)
|
||||
|
||||
// make sure stream is pulled from
|
||||
stream.pipe(concat(callback.bind(null,null)))
|
||||
}
|
||||
|
||||
runStream(function(err) {
|
||||
assert.equal(err, null, err)
|
||||
queryClient.end()
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
testLeak(5)
|
||||
testNoticeResponse();
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
'use strict';
|
||||
|
||||
require('./copy-from')
|
||||
require('./copy-to')
|
||||
require('./binary')
|
||||
|
||||
Reference in New Issue
Block a user