Accept stream options in constructors, pass to internal transform streams.
Includes tests.
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
module.exports = function(txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
module.exports = function(txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = {}
|
||||
this._copyOutResponse = null
|
||||
|
||||
12
index.js
12
index.js
@@ -1,19 +1,19 @@
|
||||
var CopyToQueryStream = require('./copy-to')
|
||||
|
||||
module.exports = {
|
||||
to: function(txt) {
|
||||
return new CopyToQueryStream(txt)
|
||||
to: function(txt, options) {
|
||||
return new CopyToQueryStream(txt, options)
|
||||
},
|
||||
from: function (txt) {
|
||||
return new CopyStreamQuery(txt)
|
||||
from: function (txt, options) {
|
||||
return new CopyStreamQuery(txt, options)
|
||||
}
|
||||
}
|
||||
|
||||
var Transform = require('stream').Transform
|
||||
var util = require('util')
|
||||
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._copyOutResponse = null
|
||||
|
||||
@@ -5,20 +5,30 @@ var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var pg = require('pg.js')
|
||||
|
||||
var testRange = function(top) {
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
return client
|
||||
var copy = require('../').from
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var highWaterMark = 10
|
||||
var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true})
|
||||
for(var i = 0; i < highWaterMark * 1.5; i++) {
|
||||
stream.write('1\t2\n')
|
||||
}
|
||||
assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var copy = require('../').from
|
||||
|
||||
fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)')
|
||||
|
||||
var txt = 'COPY numbers FROM STDIN'
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var rowEmitCount = 0
|
||||
stream.on('row', function() {
|
||||
|
||||
@@ -14,6 +14,14 @@ var client = function() {
|
||||
return client
|
||||
}
|
||||
|
||||
var testConstruction = function() {
|
||||
var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT'
|
||||
var stream = copy(txt, {highWaterMark: 10})
|
||||
assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.')
|
||||
}
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
|
||||
Reference in New Issue
Block a user