Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc6521b7c1 | ||
|
|
16e2001064 | ||
|
|
5bb93308eb | ||
|
|
e5f864bac0 | ||
|
|
bfc0353d2c | ||
|
|
90012d84d7 | ||
|
|
808c284b55 | ||
|
|
b48283f4aa | ||
|
|
5c03015715 | ||
|
|
18fa1fdeb3 | ||
|
|
824936176c | ||
|
|
2f5b37e0ad |
94
README.md
Normal file
94
README.md
Normal file
@@ -0,0 +1,94 @@
|
||||
## pg-copy-streams
|
||||
|
||||
COPY FROM / COPY TO for node-postgres. Stream from one database to another, and stuff.
|
||||
|
||||
## how? what? huh?
|
||||
|
||||
Did you know the _all powerful_ PostgreSQL supports streaming binary data directly into and out of a table?
|
||||
This means you can take your favorite CSV or TSV or whatever format file and pipe it directly into an existing PostgreSQL table.
|
||||
You can also take a table and pipe it directly to a file, another database, stdout, even to `/dev/null` if you're crazy!
|
||||
|
||||
What this module gives you is a [Readable](http://nodejs.org/api/stream.html#stream_class_stream_readable) or [Writable](http://nodejs.org/api/stream.html#stream_class_stream_writable) stream directly into/out of a table in your database.
|
||||
This mode of interfacing with your table is _very fast_ and _very brittle_. You are responsible for properly encoding and ordering all your columns. If anything is out of place PostgreSQL will send you back an error. The stream works within a transaction so you wont leave things in a 1/2 borked state, but it's still good to be aware of.
|
||||
|
||||
If you're not familiar with the feature (I wasn't either) you can read this for some good helps: http://www.postgresql.org/docs/9.3/static/sql-copy.html
|
||||
|
||||
## examples
|
||||
|
||||
### pipe from a table to stdout
|
||||
|
||||
```js
|
||||
var pg = require('pg');
|
||||
var copyTo = require('pg-copy-streams').to;
|
||||
|
||||
pg.connect(function(err, client, done) {
|
||||
var stream = client.query(copyTo('COPY my_table TO STDOUT'));
|
||||
stream.pipe(process.stdout);
|
||||
stream.on('end', done);
|
||||
stream.on('error', done);
|
||||
});
|
||||
```
|
||||
|
||||
### pipe from a file to table
|
||||
|
||||
```js
|
||||
var fs = require('fs');
|
||||
var pg = require('pg');
|
||||
var copyFrom = require('pg-copy-streams').from;
|
||||
|
||||
pg.connect(function(err, client, done) {
|
||||
var text = 'COPY my_table FROM STDIN';
|
||||
var stream = client.query(copyFrom('COPY my_table FROM STDIN'));
|
||||
var fileStream = fs.createReadStream('some_file.tdv')
|
||||
fileStream.pipe(stream);
|
||||
fileStream.on('end', done);
|
||||
fileStream.on('error', done);
|
||||
});
|
||||
```
|
||||
|
||||
## install
|
||||
|
||||
```sh
|
||||
$ npm install pg-copy-streams
|
||||
```
|
||||
|
||||
## notice
|
||||
|
||||
Before you set out on this magical piping journey, you _really_ should read this: http://www.postgresql.org/docs/9.3/static/sql-copy.html, and you might want to take a look at the [tests](https://github.com/brianc/node-pg-copy-streams/tree/master/test) to get an idea of how things work.
|
||||
|
||||
## contributing
|
||||
|
||||
Instead of adding a bunch more code to the already bloated [node-postgres](https://github.com/brianc/node-postgres) I am trying to make the internals extensible and work on adding edge-case features as 3rd party modules.
|
||||
This is one of those.
|
||||
|
||||
Please, if you have any issues with this, open an issue.
|
||||
|
||||
Better yet, submit a pull request. I _love_ pull requests.
|
||||
|
||||
Generally how I work is if you submit a few pull requests and you're interested I'll make you a contributor and give you full access to everything.
|
||||
|
||||
Since this isn't a module with tons of installs and dependent modules I hope we can work together on this to iterate faster here and make something really useful.
|
||||
|
||||
## license
|
||||
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Brian M. Carlson
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
@@ -10,6 +10,7 @@ var CopyStreamQuery = function(text) {
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._copyOutResponse = null
|
||||
this.rowCount = 0
|
||||
}
|
||||
|
||||
util.inherits(CopyStreamQuery, Transform)
|
||||
@@ -78,7 +79,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
var slice = chunk.slice(offset, offset + length)
|
||||
offset += length
|
||||
this.push(slice)
|
||||
this.emit('row')
|
||||
this.rowCount++
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
5
index.js
5
index.js
@@ -17,6 +17,7 @@ var CopyStreamQuery = function(text) {
|
||||
this.text = text
|
||||
this._listeners = null
|
||||
this._copyOutResponse = null
|
||||
this.rowCount = 0
|
||||
}
|
||||
|
||||
util.inherits(CopyStreamQuery, Transform)
|
||||
@@ -39,7 +40,7 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
|
||||
this.push(lenBuffer)
|
||||
this.push(chunk)
|
||||
this.emit('row')
|
||||
this.rowCount++
|
||||
cb()
|
||||
}
|
||||
|
||||
@@ -54,7 +55,7 @@ CopyStreamQuery.prototype.handleError = function(e) {
|
||||
this.emit('error', e)
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.streamData = function(connection) {
|
||||
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
|
||||
this.pipe(connection.stream)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "pg-copy-streams",
|
||||
"version": "0.1.0",
|
||||
"version": "0.2.1",
|
||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
|
||||
@@ -36,7 +36,7 @@ var testRange = function(top) {
|
||||
console.log('found ', res.rows.length, 'rows')
|
||||
countDone()
|
||||
var firstRowDone = gonna('have correct result')
|
||||
assert.equal(rowEmitCount, top, 'should have emitted "row" event ' + top + ' times')
|
||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' ')
|
||||
fromClient.query('SELECT (max(num)) AS num FROM numbers', function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.rows[0].num, top-1)
|
||||
|
||||
@@ -18,10 +18,6 @@ var testRange = function(top) {
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var rowEmitCount = 0
|
||||
stream.on('row', function() {
|
||||
rowEmitCount++
|
||||
})
|
||||
var done = gonna('finish piping out', 1000, function() {
|
||||
fromClient.end()
|
||||
})
|
||||
@@ -30,7 +26,7 @@ var testRange = function(top) {
|
||||
var res = buf.toString('utf8')
|
||||
var expected = _.range(0, top).join('\n') + '\n'
|
||||
assert.equal(res, expected)
|
||||
assert.equal(rowEmitCount, top, 'should have emitted "row" ' + top + ' times but got ' + rowEmitCount)
|
||||
assert.equal(stream.rowCount, top, 'should have rowCount ' + top + ' but got ' + stream.rowCount)
|
||||
done()
|
||||
}))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user