From b78a3eb8456bc4b843f0ec9d3bc4f8dfa2b25558 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 15 Sep 2014 15:01:39 -0400 Subject: [PATCH] Accept stream options in constructors, pass to internal transform streams. Includes tests. --- copy-to.js | 8 ++++---- index.js | 12 ++++++------ test/copy-from.js | 28 +++++++++++++++++++--------- test/copy-to.js | 8 ++++++++ 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/copy-to.js b/copy-to.js index 404406b..baccfa4 100644 --- a/copy-to.js +++ b/copy-to.js @@ -1,12 +1,12 @@ -module.exports = function(txt) { - return new CopyStreamQuery(txt) +module.exports = function(txt, options) { + return new CopyStreamQuery(txt, options) } var Transform = require('stream').Transform var util = require('util') -var CopyStreamQuery = function(text) { - Transform.call(this) +var CopyStreamQuery = function(text, options) { + Transform.call(this, options) this.text = text this._listeners = {} this._copyOutResponse = null diff --git a/index.js b/index.js index a056df3..9bfd4a4 100644 --- a/index.js +++ b/index.js @@ -1,19 +1,19 @@ var CopyToQueryStream = require('./copy-to') module.exports = { - to: function(txt) { - return new CopyToQueryStream(txt) + to: function(txt, options) { + return new CopyToQueryStream(txt, options) }, - from: function (txt) { - return new CopyStreamQuery(txt) + from: function (txt, options) { + return new CopyStreamQuery(txt, options) } } var Transform = require('stream').Transform var util = require('util') -var CopyStreamQuery = function(text) { - Transform.call(this) +var CopyStreamQuery = function(text, options) { + Transform.call(this, options) this.text = text this._listeners = null this._copyOutResponse = null diff --git a/test/copy-from.js b/test/copy-from.js index 1f5bd68..fd5d06b 100644 --- a/test/copy-from.js +++ b/test/copy-from.js @@ -5,20 +5,30 @@ var concat = require('concat-stream') var _ = require('lodash') var pg = require('pg.js') -var testRange = function(top) { - var client = function() { - var client = new pg.Client() - client.connect() - client.query('CREATE TEMP TABLE numbers(num int, bigger_num int)') - return client +var copy = require('../').from + +var client = function() { + var client = new pg.Client() + client.connect() + return client +} + +var testConstruction = function() { + var highWaterMark = 10 + var stream = copy('COPY numbers FROM STDIN', {highWaterMark: 10, objectMode: true}) + for(var i = 0; i < highWaterMark * 1.5; i++) { + stream.write('1\t2\n') } + assert(!stream.write('1\t2\n'), 'Should correctly set highWaterMark.') +} +testConstruction() + +var testRange = function(top) { var fromClient = client() - var copy = require('../').from - + fromClient.query('CREATE TEMP TABLE numbers(num int, bigger_num int)') var txt = 'COPY numbers FROM STDIN' - var stream = fromClient.query(copy(txt)) var rowEmitCount = 0 stream.on('row', function() { diff --git a/test/copy-to.js b/test/copy-to.js index 0545630..8c07347 100644 --- a/test/copy-to.js +++ b/test/copy-to.js @@ -14,6 +14,14 @@ var client = function() { return client } +var testConstruction = function() { + var txt = 'COPY (SELECT * FROM generate_series(0, 10)) TO STDOUT' + var stream = copy(txt, {highWaterMark: 10}) + assert.equal(stream._readableState.highWaterMark, 10, 'Client should have been set with a correct highWaterMark.') +} + +testConstruction() + var testRange = function(top) { var fromClient = client() var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'