16 Commits

Author SHA1 Message Date
Brian M. Carlson
8f2355e454 0.3.0 2014-09-15 20:56:39 -04:00
Brian M. Carlson
d6eab36b66 Make tests a bit more robusto 2014-09-15 20:56:34 -04:00
Brian M. Carlson
33f6ecc11b Add workflow boilerplate files 2014-09-15 20:49:09 -04:00
Brian C
36572a8b7b Merge pull request #16 from drob/fix-docs
Fixes pipe from a file to table example in README.md.
2014-09-15 20:48:23 -04:00
Brian C
0c5d08edae Merge pull request #19 from drob/transform-opts
Accept stream options in constructors, pass to internal transform streams.
2014-09-15 20:47:38 -04:00
Dan
b78a3eb845 Accept stream options in constructors, pass to internal transform streams.
Includes tests.
2014-09-15 15:01:39 -04:00
Dan
107f007249 Fixes pipe from a file to table example in README.md. 2014-08-10 13:33:33 -07:00
Brian C
25b8d6da5f Update README.md
Providing clarity for #6
2014-05-01 11:07:33 -05:00
Dan Robinson
1c0c8871c1 Bump version 2014-04-07 11:56:28 -07:00
Dan Robinson
beb54334e2 Merge pull request #12 from drob/error-handling
Adds handling for errors after initial response.
2014-04-07 11:55:36 -07:00
Dan Robinson
1db9b3ec3d Adds handling for errors after initial response.
Includes a test.
2014-04-06 22:04:20 -07:00
Brian M. Carlson
1822f399d2 Bump version 2014-04-06 11:09:28 -05:00
Brian C
208861d057 Merge pull request #11 from drob/add-test
Adds a smoke test for copying with BINARY.
2014-04-06 11:06:16 -05:00
Brian C
1957554301 Merge pull request #10 from drob/handler-leak
Fixes another event handler leak.
2014-04-06 11:06:09 -05:00
Dan Robinson
bef3ba5fcd Adds a smoke test for copying with BINARY. 2014-04-06 04:50:54 -07:00
Dan Robinson
591a11c955 Fixes another event handler leak.
It turns out 'error' handlers were leaking as well, although more slowly.
2014-04-06 03:49:25 -07:00
10 changed files with 162 additions and 29 deletions

6
.travis.yml Normal file
View File

@@ -0,0 +1,6 @@
language: node_js
node_js:
- "0.10"
- "0.11"
env:
- PGUSER=postgres

14
Makefile Normal file
View File

@@ -0,0 +1,14 @@
.PHONY: publish-patch test
test:
npm test
patch: test
npm version patch -m "Bump version"
git push origin master --tags
npm publish
minor: test
npm version minor -m "Bump version"
git push origin master --tags
npm publish

View File

@@ -39,9 +39,8 @@ var copyFrom = require('pg-copy-streams').from;
pg.connect(function(err, client, done) {
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
var fileStream = fs.createReadStream('some_file.tdv')
fileStream.pipe(stream);
fileStream.on('end', done);
fileStream.on('error', done);
fileStream.pipe(stream).on('finish', done).on('error', done);
});
```
@@ -53,9 +52,11 @@ $ npm install pg-copy-streams
## notice
This module __only__ works with the pure JavaScript bindings. If you're using `require('pg').native` please make sure to use normal `require('pg')` or `require('pg.js')` when you're using copy streams.
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
## contributing
## contributing
Instead of adding a bunch more code to the already bloated [node-postgres](https://github.com/brianc/node-postgres) I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules.
This is one of those.

View File

@@ -1,12 +1,12 @@
module.exports = function(txt) {
return new CopyStreamQuery(txt)
module.exports = function(txt, options) {
return new CopyStreamQuery(txt, options)
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
this.text = text
this._listeners = {}
this._copyOutResponse = null
@@ -15,7 +15,7 @@ var CopyStreamQuery = function(text) {
util.inherits(CopyStreamQuery, Transform)
var eventTypes = ['close', 'data', 'end']
var eventTypes = ['close', 'data', 'end', 'error']
CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
@@ -68,10 +68,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
}
while((chunk.length - offset) > 5) {
var messageCode = chunk[offset]
//complete
if(messageCode == code.c) {
//complete or error
if(messageCode == code.c || messageCode == code.E) {
this._detach()
this.connection.stream.unshift(chunk.slice(offset + 5))
if (messageCode == code.c) {
this.connection.stream.unshift(chunk.slice(offset + 5))
} else {
this.connection.stream.unshift(chunk.slice(offset))
}
this.push(null)
return cb();
}

View File

@@ -1,19 +1,19 @@
var CopyToQueryStream = require('./copy-to')
module.exports = {
to: function(txt) {
return new CopyToQueryStream(txt)
to: function(txt, options) {
return new CopyToQueryStream(txt, options)
},
from: function (txt) {
return new CopyStreamQuery(txt)
from: function (txt, options) {
return new CopyStreamQuery(txt, options)
}
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
this.text = text
this._listeners = null
this._copyOutResponse = null

View File

@@ -1,6 +1,6 @@
{
"name": "pg-copy-streams",
"version": "0.2.2",
"version": "0.3.0",
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
"main": "index.js",
"scripts": {

67
test/binary.js Normal file
View File

@@ -0,0 +1,67 @@
var assert = require('assert')
var gonna = require('gonna')
var async = require('async')
var concat = require('concat-stream')
var _ = require('lodash')
var pg = require('pg.js')
var from = require('../').from
var to = require('../').to
var testBinaryCopy = function() {
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var fromClient = client()
var toClient = client()
queries = [
'DROP TABLE IF EXISTS data',
'CREATE TABLE IF NOT EXISTS data (num BIGINT, word TEXT)',
'INSERT INTO data (num, word) VALUES (1, \'hello\'), (2, \'other thing\'), (3, \'goodbye\')',
'DROP TABLE IF EXISTS data_copy',
'CREATE TABLE IF NOT EXISTS data_copy (LIKE data INCLUDING ALL)'
]
async.eachSeries(queries, _.bind(fromClient.query, fromClient), function(err) {
assert.ifError(err)
var fromStream = fromClient.query(to('COPY (SELECT * FROM data) TO STDOUT BINARY'))
var toStream = toClient.query(from('COPY data_copy FROM STDIN BINARY'))
runStream = function(callback) {
fromStream.on('error', callback)
toStream.on('error', callback)
toStream.on('finish', callback)
fromStream.pipe(toStream)
}
runStream(function(err) {
assert.ifError(err)
toClient.query('SELECT * FROM data_copy ORDER BY num', function(err, res){
assert.equal(res.rowCount, 3, 'expected 3 rows but got ' + res.rowCount)
assert.equal(res.rows[0].num, 1)
assert.equal(res.rows[0].word, 'hello')
assert.equal(res.rows[1].num, 2)
assert.equal(res.rows[1].word, 'other thing')
assert.equal(res.rows[2].num, 3)
assert.equal(res.rows[2].word, 'goodbye')
queries = [
'DROP TABLE data',
'DROP TABLE data_copy'
]
async.each(queries, _.bind(fromClient.query, fromClient), function(err) {
assert.ifError(err)
fromClient.end()
toClient.end()
})
})
})
})
}
testBinaryCopy()

View File

@@ -5,20 +5,30 @@ var concat = require('concat-stream')
var _ = require('lodash')
var pg = require('pg.js')
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
return client
var copy = require('../').from
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var testConstruction = function() {
var highWaterMark = 10
var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
for(var i = 0; i < highWaterMark * 1.5; i++) {
stream.write('1\t2\n')
}
assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.')
}
testConstruction()
var testRange = function(top) {
var fromClient = client()
var copy = require('../').from
fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
var txt = 'COPY numbers FROM STDIN'
var stream = fromClient.query(copy(txt))
var rowEmitCount = 0
stream.on('row', function() {
@@ -33,7 +43,7 @@ var testRange = function(top) {
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
assert.ifError(err)
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
console.log('found ', res.rows.length, 'rows')
//console.log('found ', res.rows.length, 'rows')
countDone()
var firstRowDone = gonna('have correct result')
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' ')

View File

@@ -14,6 +14,14 @@ var client = function() {
return client
}
var testConstruction = function() {
var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT'
var stream = copy(txt, {highWaterMark: 10})
assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.')
}
testConstruction()
var testRange = function(top) {
var fromClient = client()
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
@@ -49,11 +57,33 @@ var testLeak = function(rounds) {
async.timesSeries(rounds, runStream, function(err) {
assert.equal(err, null)
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
assert.equal(fromClient.connection.stream.listeners('error').length, 1)
fromClient.end()
})
}
testLeak(5)
var testInternalPostgresError = function() {
var fromClient = client()
// This attempts to make an array that's too large, and should fail.
var txt = "COPY (SELECT asdlfsdf AS e) t) TO STDOUT"
var runStream = function(callback) {
var stream = fromClient.query(copy(txt))
stream.on('data', function(data) {
// Just throw away the data.
})
stream.on('error', callback)
}
runStream(function(err) {
assert.notEqual(err, null)
fromClient.end()
})
}
testInternalPostgresError()

View File

@@ -1,2 +1,3 @@
require('./copy-from')
require('./copy-to')
require('./binary')