Compare commits
5 Commits
v1.1.1
...
PushSpring
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
be7af371d8 | ||
|
|
17697e98d7 | ||
|
|
bd4a87d3a0 | ||
|
|
9d197a91e1 | ||
|
|
e1ce9a3948 |
20
copy-to.js
20
copy-to.js
@@ -4,7 +4,6 @@ module.exports = function(txt, options) {
|
|||||||
|
|
||||||
var Transform = require('stream').Transform
|
var Transform = require('stream').Transform
|
||||||
var util = require('util')
|
var util = require('util')
|
||||||
var code = require('./message-formats')
|
|
||||||
|
|
||||||
var CopyStreamQuery = function(text, options) {
|
var CopyStreamQuery = function(text, options) {
|
||||||
Transform.call(this, options)
|
Transform.call(this, options)
|
||||||
@@ -24,6 +23,13 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
|||||||
connection.stream.pipe(this)
|
connection.stream.pipe(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var code = {
|
||||||
|
E: 69, //Error
|
||||||
|
H: 72, //CopyOutResponse
|
||||||
|
d: 0x64, //CopyData
|
||||||
|
c: 0x63 //CopyDone
|
||||||
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype._detach = function() {
|
CopyStreamQuery.prototype._detach = function() {
|
||||||
this.connection.stream.unpipe(this)
|
this.connection.stream.unpipe(this)
|
||||||
// Unpipe can drop us out of flowing mode
|
// Unpipe can drop us out of flowing mode
|
||||||
@@ -38,13 +44,13 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
}
|
}
|
||||||
if(!this._copyOutResponse) {
|
if(!this._copyOutResponse) {
|
||||||
this._copyOutResponse = true
|
this._copyOutResponse = true
|
||||||
if(chunk[0] == code.ErrorResponse) {
|
if(chunk[0] == code.E) {
|
||||||
this._detach()
|
this._detach()
|
||||||
this.push(null)
|
this.push(null)
|
||||||
return cb();
|
return cb();
|
||||||
}
|
}
|
||||||
if(chunk[0] != code.CopyOutResponse) {
|
if(chunk[0] != code.H) {
|
||||||
this.emit('error', new Error('Expected CopyOutResponse code (H)'))
|
this.emit('error', new Error('Expected copy out response'))
|
||||||
}
|
}
|
||||||
var length = chunk.readUInt32BE(1)
|
var length = chunk.readUInt32BE(1)
|
||||||
offset = 1
|
offset = 1
|
||||||
@@ -53,14 +59,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
|||||||
while((chunk.length - offset) > 5) {
|
while((chunk.length - offset) > 5) {
|
||||||
var messageCode = chunk[offset]
|
var messageCode = chunk[offset]
|
||||||
//complete or error
|
//complete or error
|
||||||
if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) {
|
if(messageCode == code.c || messageCode == code.E) {
|
||||||
this._detach()
|
this._detach()
|
||||||
this.push(null)
|
this.push(null)
|
||||||
return cb();
|
return cb();
|
||||||
}
|
}
|
||||||
//something bad happened
|
//something bad happened
|
||||||
if(messageCode != code.CopyData) {
|
if(messageCode != code.d) {
|
||||||
return this.emit('error', new Error('Expected CopyData code (d)'))
|
return this.emit('error', new Error('expected "d" (copydata message)'))
|
||||||
}
|
}
|
||||||
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
|
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
|
||||||
//can we read the next row?
|
//can we read the next row?
|
||||||
|
|||||||
25
index.js
25
index.js
@@ -10,7 +10,6 @@ module.exports = {
|
|||||||
|
|
||||||
var Transform = require('stream').Transform
|
var Transform = require('stream').Transform
|
||||||
var util = require('util')
|
var util = require('util')
|
||||||
var code = require('./message-formats')
|
|
||||||
|
|
||||||
var CopyStreamQuery = function(text, options) {
|
var CopyStreamQuery = function(text, options) {
|
||||||
Transform.call(this, options)
|
Transform.call(this, options)
|
||||||
@@ -27,21 +26,28 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
|||||||
connection.query(this.text)
|
connection.query(this.text)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var code = {
|
||||||
|
H: 72, //CopyOutResponse
|
||||||
|
d: 0x64, //CopyData
|
||||||
|
c: 0x63 //CopyDone
|
||||||
|
}
|
||||||
|
|
||||||
var copyDataBuffer = Buffer([code.CopyData])
|
var copyDataBuffer = Buffer([code.d])
|
||||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||||
this.push(copyDataBuffer)
|
this.push(copyDataBuffer)
|
||||||
var lenBuffer = Buffer(4)
|
var lenBuffer = Buffer(4)
|
||||||
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
|
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
|
||||||
this.push(lenBuffer)
|
this.push(lenBuffer)
|
||||||
this.push(chunk)
|
this.push(chunk)
|
||||||
|
this.rowCount++
|
||||||
cb()
|
cb()
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype._flush = function(cb) {
|
CopyStreamQuery.prototype._flush = function(cb) {
|
||||||
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4])
|
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
|
||||||
this.push(finBuffer)
|
this.push(finBuffer)
|
||||||
cb()
|
//never call this callback, do not close underlying stream
|
||||||
|
//cb()
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype.handleError = function(e) {
|
CopyStreamQuery.prototype.handleError = function(e) {
|
||||||
@@ -49,17 +55,10 @@ CopyStreamQuery.prototype.handleError = function(e) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
|
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
|
||||||
this.pipe(connection.stream, { end: false })
|
this.pipe(connection.stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
CopyStreamQuery.prototype.handleCommandComplete = function(msg) {
|
CopyStreamQuery.prototype.handleCommandComplete = function() {
|
||||||
// Parse affected row count as in
|
|
||||||
// https://github.com/brianc/node-postgres/blob/35e5567f86774f808c2a8518dd312b8aa3586693/lib/result.js#L37
|
|
||||||
var match = /COPY (\d+)/.exec((msg || {}).text)
|
|
||||||
if (match) {
|
|
||||||
this.rowCount = parseInt(match[1], 10)
|
|
||||||
}
|
|
||||||
|
|
||||||
this.unpipe()
|
this.unpipe()
|
||||||
this.emit('end')
|
this.emit('end')
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,17 +0,0 @@
|
|||||||
/**
|
|
||||||
* The COPY feature uses the following protocol codes.
|
|
||||||
* The codes for the most recent protocol version are documented on
|
|
||||||
* https://www.postgresql.org/docs/current/static/protocol-message-formats.html
|
|
||||||
*
|
|
||||||
* The protocol flow itself is described on
|
|
||||||
* https://www.postgresql.org/docs/current/static/protocol-flow.html
|
|
||||||
*/
|
|
||||||
module.exports = {
|
|
||||||
ErrorResponse: 0x45,
|
|
||||||
CopyInResponse: 0x47,
|
|
||||||
CopyOutResponse: 0x48,
|
|
||||||
CopyBothResponse: 0x57,
|
|
||||||
CopyData: 0x64,
|
|
||||||
CopyDone: 0x63,
|
|
||||||
CopyFail: 0x66
|
|
||||||
}
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "pg-copy-streams",
|
"name": "pg-copy-streams",
|
||||||
"version": "1.1.1",
|
"version": "0.3.0",
|
||||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -30,6 +30,10 @@ var testRange = function(top) {
|
|||||||
|
|
||||||
var txt = 'COPY numbers FROM STDIN'
|
var txt = 'COPY numbers FROM STDIN'
|
||||||
var stream = fromClient.query(copy(txt))
|
var stream = fromClient.query(copy(txt))
|
||||||
|
var rowEmitCount = 0
|
||||||
|
stream.on('row', function() {
|
||||||
|
rowEmitCount++
|
||||||
|
})
|
||||||
for(var i = 0; i < top; i++) {
|
for(var i = 0; i < top; i++) {
|
||||||
stream.write(Buffer('' + i + '\t' + i*10 + '\n'))
|
stream.write(Buffer('' + i + '\t' + i*10 + '\n'))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user