Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b40918ddb8 | ||
|
|
b8946265c3 | ||
|
|
d649905dbb | ||
|
|
22da85448e | ||
|
|
531f72dcb3 |
@@ -37,7 +37,6 @@ var pg = require('pg');
|
||||
var copyFrom = require('pg-copy-streams').from;
|
||||
|
||||
pg.connect(function(err, client, done) {
|
||||
var text = 'COPY my_table FROM STDIN';
|
||||
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
|
||||
var fileStream = fs.createReadStream('some_file.tdv')
|
||||
fileStream.pipe(stream);
|
||||
|
||||
19
copy-to.js
19
copy-to.js
@@ -8,18 +8,23 @@ var util = require('util')
|
||||
var CopyStreamQuery = function(text) {
|
||||
Transform.call(this)
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._listeners = {}
|
||||
this._copyOutResponse = null
|
||||
this.rowCount = 0
|
||||
}
|
||||
|
||||
util.inherits(CopyStreamQuery, Transform)
|
||||
|
||||
var eventTypes = ['close', 'data', 'end']
|
||||
|
||||
CopyStreamQuery.prototype.submit = function(connection) {
|
||||
connection.query(this.text)
|
||||
this.connection = connection
|
||||
this._listeners = connection.stream.listeners('data')
|
||||
connection.stream.removeAllListeners('data')
|
||||
var self = this
|
||||
eventTypes.forEach(function(type) {
|
||||
self._listeners[type] = connection.stream.listeners(type)
|
||||
connection.stream.removeAllListeners(type)
|
||||
})
|
||||
connection.stream.pipe(this)
|
||||
}
|
||||
|
||||
@@ -32,10 +37,12 @@ var code = {
|
||||
|
||||
CopyStreamQuery.prototype._detach = function() {
|
||||
this.connection.stream.unpipe()
|
||||
this.connection.stream.removeAllListeners('data')
|
||||
var self = this
|
||||
this._listeners.forEach(function(listener) {
|
||||
self.connection.stream.on('data', listener)
|
||||
eventTypes.forEach(function(type) {
|
||||
self.connection.stream.removeAllListeners(type)
|
||||
self._listeners[type].forEach(function(listener) {
|
||||
self.connection.stream.on(type, listener)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "pg-copy-streams",
|
||||
"version": "0.2.1",
|
||||
"version": "0.2.2",
|
||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
@@ -27,6 +27,7 @@
|
||||
"concat-stream": "~1.1.0",
|
||||
"gonna": "0.0.0",
|
||||
"lodash": "~2.2.1",
|
||||
"heroku-env": "~0.1.1"
|
||||
"heroku-env": "~0.1.1",
|
||||
"async": "~0.2.10"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,20 +1,21 @@
|
||||
var assert = require('assert')
|
||||
var gonna = require('gonna')
|
||||
|
||||
var concat = require('concat-stream')
|
||||
var _ = require('lodash')
|
||||
var async = require('async')
|
||||
var concat = require('concat-stream')
|
||||
var pg = require('pg.js')
|
||||
|
||||
var copy = require('../').to
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var testRange = function(top) {
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
client.connect()
|
||||
return client
|
||||
}
|
||||
|
||||
var fromClient = client()
|
||||
var copy = require('../').to
|
||||
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
@@ -32,3 +33,27 @@ var testRange = function(top) {
|
||||
}
|
||||
|
||||
testRange(10000)
|
||||
|
||||
var testLeak = function(rounds) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT 10) TO STDOUT'
|
||||
|
||||
var runStream = function(num, callback) {
|
||||
var stream = fromClient.query(copy(txt))
|
||||
stream.on('data', function(data) {
|
||||
// Just throw away the data.
|
||||
})
|
||||
stream.on('end', callback)
|
||||
stream.on('error', callback)
|
||||
}
|
||||
|
||||
async.timesSeries(rounds, runStream, function(err) {
|
||||
assert.equal(err, null)
|
||||
assert.equal(fromClient.connection.stream.listeners('data').length, 1)
|
||||
assert.equal(fromClient.connection.stream.listeners('end').length, 2)
|
||||
assert.equal(fromClient.connection.stream.listeners('close').length, 0)
|
||||
fromClient.end()
|
||||
})
|
||||
}
|
||||
|
||||
testLeak(5)
|
||||
|
||||
Reference in New Issue
Block a user