Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8f2355e454 | ||
|
|
d6eab36b66 | ||
|
|
33f6ecc11b | ||
|
|
36572a8b7b | ||
|
|
0c5d08edae | ||
|
|
b78a3eb845 | ||
|
|
107f007249 | ||
|
|
25b8d6da5f | ||
|
|
1c0c8871c1 | ||
|
|
beb54334e2 | ||
|
|
1db9b3ec3d | ||
|
|
1822f399d2 | ||
|
|
208861d057 | ||
|
|
1957554301 | ||
|
|
bef3ba5fcd | ||
|
|
591a11c955 |
6
.travis.yml
Normal file
6
.travis.yml
Normal file
@@ -0,0 +1,6 @@
|
||||
language: node_js
|
||||
node_js:
|
||||
- "0.10"
|
||||
- "0.11"
|
||||
env:
|
||||
- PGUSER=postgres
|
||||
14
Makefile
Normal file
14
Makefile
Normal file
@@ -0,0 +1,14 @@
|
||||
.PHONY: publish-patch test
|
||||
|
||||
test:
|
||||
npm test
|
||||
|
||||
patch: test
|
||||
npm version patch -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
|
||||
minor: test
|
||||
npm version minor -m "Bump version"
|
||||
git push origin master --tags
|
||||
npm publish
|
||||
@@ -39,9 +39,8 @@ var copyFrom = require('pg-copy-streams').from;
|
||||
pg.connect(function(err, client, done) {
|
||||
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
|
||||
var fileStream = fs.createReadStream('some_file.tdv')
|
||||
fileStream.pipe(stream);
|
||||
fileStream.on('end', done);
|
||||
fileStream.on('error', done);
|
||||
fileStream.pipe(stream).on('finish', done).on('error', done);
|
||||
});
|
||||
```
|
||||
|
||||
@@ -53,9 +52,11 @@ $ npm install pg-copy-streams
|
||||
|
||||
## notice
|
||||
|
||||
This module __only__ works with the pure JavaScript bindings. If you're using `require('pg').native` please make sure to use normal `require('pg')` or `require('pg.js')` when you're using copy streams.
|
||||
|
||||
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
|
||||
|
||||
## contributing
|
||||
## contributing
|
||||
|
||||
Instead of adding a bunch more code to the already bloated [node-postgres](https://github.com/brianc/node-postgres) I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules.
|
||||
This is one of those.
|
||||
|
||||
20
copy-to.js
20
copy-to.js
@@ -1,12 +1,12 @@
|
||||
module.exports = function(txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
module.exports = function(txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = {}
|
||||
this._copyOutResponse = null
|
||||
@@ -15,7 +15,7 @@ var CopyStreamQuery = function(text) {
|
||||
|
||||
util.inherits(CopyStreamQuery, Transform)
|
||||
|
||||
var eventTypes = ['close', 'data', 'end']
|
||||
var eventTypes = ['close', 'data', 'end', 'error']
|
||||
|
||||
CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
@@ -68,10 +68,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
}
|
||||
while((chunk.length - offset) > 5) {
|
||||
var messageCode = chunk[offset]
|
||||
//complete
|
||||
if(messageCode == code.c) {
|
||||
//complete or error
|
||||
if(messageCode == code.c || messageCode == code.E) {
|
||||
this._detach()
|
||||
this.connection.stream.unshift(chunk.slice(offset + 5))
|
||||
if (messageCode == code.c) {
|
||||
this.connection.stream.unshift(chunk.slice(offset + 5))
|
||||
} else {
|
||||
this.connection.stream.unshift(chunk.slice(offset))
|
||||
}
|
||||
this.push(null)
|
||||
return cb();
|
||||
}
|
||||
|
||||
12
index.js
12
index.js
@@ -1,19 +1,19 @@
|
||||
var CopyToQueryStream = require('./copy-to')
|
||||
|
||||
module.exports = {
|
||||
to: function(txt) {
|
||||
return new CopyToQueryStream(txt)
|
||||
to: function(txt, options) {
|
||||
return new CopyToQueryStream(txt, options)
|
||||
},
|
||||
from: function (txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
from: function (txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._copyOutResponse = null
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "pg-copy-streams",
|
||||
"version": "0.2.2",
|
||||
"version": "0.3.0",
|
||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
|
||||
67
test/binary.js
Normal file
67
test/binary.js
Normal file
@@ -0,0 +1,67 @@
|
||||
var assert = require('assert')
|
||||
var gonna = require('gonna')
|
||||
|
||||
var async = require('async')
|
||||
var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var pg = require('pg.js')
|
||||
|
||||
var from = require('../').from
|
||||
var to = require('../').to
|
||||
|
||||
var testBinaryCopy = function() {
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var fromClient = client()
|
||||
var toClient = client()
|
||||
|
||||
queries = [
|
||||
'DROP TABLE IF EXISTS data',
|
||||
'CREATE TABLE IF NOT EXISTS data (num BIGINT, word TEXT)',
|
||||
'INSERT INTO data (num, word) VALUES (1, \'hello\'), (2, \'other thing\'), (3, \'goodbye\')',
|
||||
'DROP TABLE IF EXISTS data_copy',
|
||||
'CREATE TABLE IF NOT EXISTS data_copy (LIKE data INCLUDING ALL)'
|
||||
]
|
||||
|
||||
async.eachSeries(queries, _.bind(fromClient.query, fromClient), function(err) {
|
||||
assert.ifError(err)
|
||||
|
||||
var fromStream = fromClient.query(to('COPY (SELECT * FROM data) TO STDOUT BINARY'))
|
||||
var toStream = toClient.query(from('COPY data_copy FROM STDIN BINARY'))
|
||||
|
||||
runStream = function(callback) {
|
||||
fromStream.on('error', callback)
|
||||
toStream.on('error', callback)
|
||||
toStream.on('finish', callback)
|
||||
fromStream.pipe(toStream)
|
||||
}
|
||||
runStream(function(err) {
|
||||
assert.ifError(err)
|
||||
|
||||
toClient.query('SELECT * FROM data_copy ORDER BY num', function(err, res){
|
||||
assert.equal(res.rowCount, 3, 'expected 3 rows but got ' + res.rowCount)
|
||||
assert.equal(res.rows[0].num, 1)
|
||||
assert.equal(res.rows[0].word, 'hello')
|
||||
assert.equal(res.rows[1].num, 2)
|
||||
assert.equal(res.rows[1].word, 'other thing')
|
||||
assert.equal(res.rows[2].num, 3)
|
||||
assert.equal(res.rows[2].word, 'goodbye')
|
||||
queries = [
|
||||
'DROP TABLE data',
|
||||
'DROP TABLE data_copy'
|
||||
]
|
||||
async.each(queries, _.bind(fromClient.query, fromClient), function(err) {
|
||||
assert.ifError(err)
|
||||
fromClient.end()
|
||||
toClient.end()
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
testBinaryCopy()
|
||||
@@ -5,20 +5,30 @@ var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var pg = require('pg.js')
|
||||
|
||||
var testRange = function(top) {
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
return client
|
||||
var copy = require('../').from
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var highWaterMark = 10
|
||||
var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
|
||||
for(var i = 0; i < highWaterMark * 1.5; i++) {
|
||||
stream.write('1\t2\n')
|
||||
}
|
||||
assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var copy = require('../').from
|
||||
|
||||
fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
|
||||
var txt = 'COPY numbers FROM STDIN'
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var rowEmitCount = 0
|
||||
stream.on('row', function() {
|
||||
@@ -33,7 +43,7 @@ var testRange = function(top) {
|
||||
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
|
||||
console.log('found ', res.rows.length, 'rows')
|
||||
//console.log('found ', res.rows.length, 'rows')
|
||||
countDone()
|
||||
var firstRowDone = gonna('have correct result')
|
||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' ')
|
||||
|
||||
@@ -14,6 +14,14 @@ var client = function() {
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT'
|
||||
var stream = copy(txt, {highWaterMark: 10})
|
||||
assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
@@ -49,11 +57,33 @@ var testLeak = function(rounds) {
|
||||
|
||||
async.timesSeries(rounds, runStream, function(err) {
|
||||
assert.equal(err, null)
|
||||
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
|
||||
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
|
||||
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
|
||||
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
|
||||
assert.equal(fromClient.connection.stream.listeners('error').length, 1)
|
||||
fromClient.end()
|
||||
})
|
||||
}
|
||||
|
||||
testLeak(5)
|
||||
|
||||
var testInternalPostgresError = function() {
|
||||
var fromClient = client()
|
||||
// This attempts to make an array that's too large, and should fail.
|
||||
var txt = "COPY (SELECT asdlfsdf AS e) t) TO STDOUT"
|
||||
|
||||
var runStream = function(callback) {
|
||||
var stream = fromClient.query(copy(txt))
|
||||
stream.on('data', function(data) {
|
||||
// Just throw away the data.
|
||||
})
|
||||
stream.on('error', callback)
|
||||
}
|
||||
|
||||
runStream(function(err) {
|
||||
assert.notEqual(err, null)
|
||||
fromClient.end()
|
||||
})
|
||||
}
|
||||
|
||||
testInternalPostgresError()
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
require('./copy-from')
|
||||
require('./copy-to')
|
||||
require('./binary')
|
||||
|
||||
Reference in New Issue
Block a user