11 Commits

Author SHA1 Message Date
Brian M. Carlson
8f2355e454 0.3.0 2014-09-15 20:56:39 -04:00
Brian M. Carlson
d6eab36b66 Make tests a bit more robusto 2014-09-15 20:56:34 -04:00
Brian M. Carlson
33f6ecc11b Add workflow boilerplate files 2014-09-15 20:49:09 -04:00
Brian C
36572a8b7b Merge pull request #16 from drob/fix-docs
Fixes pipe from a file to table example in README.md.
2014-09-15 20:48:23 -04:00
Brian C
0c5d08edae Merge pull request #19 from drob/transform-opts
Accept stream options in constructors, pass to internal transform streams.
2014-09-15 20:47:38 -04:00
Dan
b78a3eb845 Accept stream options in constructors, pass to internal transform streams.
Includes tests.
2014-09-15 15:01:39 -04:00
Dan
107f007249 Fixes pipe from a file to table example in README.md. 2014-08-10 13:33:33 -07:00
Brian C
25b8d6da5f Update README.md
Providing clarity for #6
2014-05-01 11:07:33 -05:00
Dan Robinson
1c0c8871c1 Bump version 2014-04-07 11:56:28 -07:00
Dan Robinson
beb54334e2 Merge pull request #12 from drob/error-handling
Adds handling for errors after initial response.
2014-04-07 11:55:36 -07:00
Dan Robinson
1db9b3ec3d Adds handling for errors after initial response.
Includes a test.
2014-04-06 22:04:20 -07:00
9 changed files with 95 additions and 29 deletions

6
.travis.yml Normal file
View File

@@ -0,0 +1,6 @@
language: node_js
node_js:
- "0.10"
- "0.11"
env:
- PGUSER=postgres

14
Makefile Normal file
View File

@@ -0,0 +1,14 @@
.PHONY: publish-patch test
test:
npm test
patch: test
npm version patch -m "Bump version"
git push origin master --tags
npm publish
minor: test
npm version minor -m "Bump version"
git push origin master --tags
npm publish

View File

@@ -39,9 +39,8 @@ var copyFrom = require('pg-copy-streams').from;
pg.connect(function(err, client, done) {
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
var fileStream = fs.createReadStream('some_file.tdv')
fileStream.pipe(stream);
fileStream.on('end', done);
fileStream.on('error', done);
fileStream.pipe(stream).on('finish', done).on('error', done);
});
```
@@ -53,9 +52,11 @@ $ npm install pg-copy-streams
## notice
This module __only__ works with the pure JavaScript bindings. If you're using `require('pg').native` please make sure to use normal `require('pg')` or `require('pg.js')` when you're using copy streams.
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
## contributing
## contributing
Instead of adding a bunch more code to the already bloated [node-postgres](https://github.com/brianc/node-postgres) I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules.
This is one of those.

View File

@@ -1,12 +1,12 @@
module.exports = function(txt) {
return new CopyStreamQuery(txt)
module.exports = function(txt, options) {
return new CopyStreamQuery(txt, options)
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
this.text = text
this._listeners = {}
this._copyOutResponse = null
@@ -68,10 +68,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
}
while((chunk.length - offset) > 5) {
var messageCode = chunk[offset]
//complete
if(messageCode == code.c) {
//complete or error
if(messageCode == code.c || messageCode == code.E) {
this._detach()
this.connection.stream.unshift(chunk.slice(offset + 5))
if (messageCode == code.c) {
this.connection.stream.unshift(chunk.slice(offset + 5))
} else {
this.connection.stream.unshift(chunk.slice(offset))
}
this.push(null)
return cb();
}

View File

@@ -1,19 +1,19 @@
var CopyToQueryStream = require('./copy-to')
module.exports = {
to: function(txt) {
return new CopyToQueryStream(txt)
to: function(txt, options) {
return new CopyToQueryStream(txt, options)
},
from: function (txt) {
return new CopyStreamQuery(txt)
from: function (txt, options) {
return new CopyStreamQuery(txt, options)
}
}
var Transform = require('stream').Transform
var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
this.text = text
this._listeners = null
this._copyOutResponse = null

View File

@@ -1,6 +1,6 @@
{
"name": "pg-copy-streams",
"version": "0.2.3",
"version": "0.3.0",
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
"main": "index.js",
"scripts": {

View File

@@ -20,9 +20,11 @@ var testBinaryCopy = function() {
var toClient = client()
queries = [
'CREATE TABLE data (num BIGINT, word TEXT)',
'DROP TABLE IF EXISTS data',
'CREATE TABLE IF NOT EXISTS data (num BIGINT, word TEXT)',
'INSERT INTO data (num, word) VALUES (1, \'hello\'), (2, \'other thing\'), (3, \'goodbye\')',
'CREATE TABLE data_copy (LIKE data INCLUDING ALL)'
'DROP TABLE IF EXISTS data_copy',
'CREATE TABLE IF NOT EXISTS data_copy (LIKE data INCLUDING ALL)'
]
async.eachSeries(queries, _.bind(fromClient.query, fromClient), function(err) {

View File

@@ -5,20 +5,30 @@ var concat = require('concat-stream')
var _ = require('lodash')
var pg = require('pg.js')
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
return client
var copy = require('../').from
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var testConstruction = function() {
var highWaterMark = 10
var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
for(var i = 0; i < highWaterMark * 1.5; i++) {
stream.write('1\t2\n')
}
assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.')
}
testConstruction()
var testRange = function(top) {
var fromClient = client()
var copy = require('../').from
fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
var txt = 'COPY numbers FROM STDIN'
var stream = fromClient.query(copy(txt))
var rowEmitCount = 0
stream.on('row', function() {
@@ -33,7 +43,7 @@ var testRange = function(top) {
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
assert.ifError(err)
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
console.log('found ', res.rows.length, 'rows')
//console.log('found ', res.rows.length, 'rows')
countDone()
var firstRowDone = gonna('have correct result')
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' ')

View File

@@ -14,6 +14,14 @@ var client = function() {
return client
}
var testConstruction = function() {
var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT'
var stream = copy(txt, {highWaterMark: 10})
assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.')
}
testConstruction()
var testRange = function(top) {
var fromClient = client()
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
@@ -58,3 +66,24 @@ var testLeak = function(rounds) {
}
testLeak(5)
var testInternalPostgresError = function() {
var fromClient = client()
// This attempts to make an array that's too large, and should fail.
var txt = "COPY (SELECT asdlfsdf AS e) t) TO STDOUT"
var runStream = function(callback) {
var stream = fromClient.query(copy(txt))
stream.on('data', function(data) {
// Just throw away the data.
})
stream.on('error', callback)
}
runStream(function(err) {
assert.notEqual(err, null)
fromClient.end()
})
}
testInternalPostgresError()