5 Commits

Author SHA1 Message Date
Brian M. Carlson
b40918ddb8 Bump version 2014-03-30 00:10:18 -04:00
Brian C
b8946265c3 Merge pull request #8 from drob/handler-leak
Fixes an event handler leak.
2014-03-30 00:06:42 -04:00
Dan Robinson
d649905dbb Fixes an event handler leak.
Includes a test. Also includes async as a devDependency.
2014-03-28 04:19:19 -07:00
Brian C
22da85448e Merge pull request #7 from pensierinmusica/master
Update README.md
2014-03-27 20:43:46 -04:00
Alessandro Zanardi
531f72dcb3 Update README.md
Got rid of duplicate variable
2014-03-27 16:16:33 +01:00
4 changed files with 50 additions and 18 deletions

View File

@@ -37,7 +37,6 @@ var pg = require('pg');
var copyFrom = require('pg-copy-streams').from;
pg.connect(function(err, client, done) {
var text = 'COPY my_table FROM STDIN';
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
var fileStream = fs.createReadStream('some_file.tdv')
fileStream.pipe(stream);

View File

@@ -8,18 +8,23 @@ var util = require('util')
var CopyStreamQuery = function(text) {
Transform.call(this)
this.text = text
this._listeners = null
this._listeners = {}
this._copyOutResponse = null
this.rowCount = 0
}
util.inherits(CopyStreamQuery, Transform)
var eventTypes = ['close', 'data', 'end']
CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
this.connection = connection
this._listeners = connection.stream.listeners('data')
connection.stream.removeAllListeners('data')
var self = this
eventTypes.forEach(function(type) {
self._listeners[type] = connection.stream.listeners(type)
connection.stream.removeAllListeners(type)
})
connection.stream.pipe(this)
}
@@ -32,10 +37,12 @@ var code = {
CopyStreamQuery.prototype._detach = function() {
this.connection.stream.unpipe()
this.connection.stream.removeAllListeners('data')
var self = this
this._listeners.forEach(function(listener) {
self.connection.stream.on('data', listener)
eventTypes.forEach(function(type) {
self.connection.stream.removeAllListeners(type)
self._listeners[type].forEach(function(listener) {
self.connection.stream.on(type, listener)
})
})
}

View File

@@ -1,6 +1,6 @@
{
"name": "pg-copy-streams",
"version": "0.2.1",
"version": "0.2.2",
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
"main": "index.js",
"scripts": {
@@ -27,6 +27,7 @@
"concat-stream": "~1.1.0",
"gonna": "0.0.0",
"lodash": "~2.2.1",
"heroku-env": "~0.1.1"
"heroku-env": "~0.1.1",
"async": "~0.2.10"
}
}

View File

@@ -1,20 +1,21 @@
var assert = require('assert')
var gonna = require('gonna')
var concat = require('concat-stream')
var _ = require('lodash')
var async = require('async')
var concat = require('concat-stream')
var pg = require('pg.js')
var copy = require('../').to
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var testRange = function(top) {
var client = function() {
var client = new pg.Client()
client.connect()
return client
}
var fromClient = client()
var copy = require('../').to
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
var stream = fromClient.query(copy(txt))
@@ -32,3 +33,27 @@ var testRange = function(top) {
}
testRange(10000)
var testLeak = function(rounds) {
var fromClient = client()
var txt = 'COPY (SELECT 10) TO STDOUT'
var runStream = function(num, callback) {
var stream = fromClient.query(copy(txt))
stream.on('data', function(data) {
// Just throw away the data.
})
stream.on('end', callback)
stream.on('error', callback)
}
async.timesSeries(rounds, runStream, function(err) {
assert.equal(err, null)
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
fromClient.end()
})
}
testLeak(5)