4 Commits

Author SHA1 Message Date
brianc
af1dc6cda9 Bump version 2016-07-28 16:13:39 -05:00
brianc
18be125596 Bump version 2016-07-26 14:40:01 -05:00
jeromew
ae5b344395 Refactor message format codes handling (#45) 2016-07-26 14:39:48 -05:00
jeromew
ee84aba89f Use end stream option instead of not calling _flush callback (#44) 2016-07-26 14:39:33 -05:00
4 changed files with 30 additions and 24 deletions

View File

@@ -4,6 +4,7 @@ module.exports = function(txt, options) {
var Transform = require('stream').Transform
var util = require('util')
var code = require('./message-formats')
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
@@ -23,13 +24,6 @@ CopyStreamQuery.prototype.submit = function(connection) {
connection.stream.pipe(this)
}
var code = {
E: 69, //Error
H: 72, //CopyOutResponse
d: 0x64, //CopyData
c: 0x63 //CopyDone
}
CopyStreamQuery.prototype._detach = function() {
this.connection.stream.unpipe(this)
// Unpipe can drop us out of flowing mode
@@ -44,13 +38,13 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
}
if(!this._copyOutResponse) {
this._copyOutResponse = true
if(chunk[0] == code.E) {
if(chunk[0] == code.ErrorResponse) {
this._detach()
this.push(null)
return cb();
}
if(chunk[0] != code.H) {
this.emit('error', new Error('Expected copy out response'))
if(chunk[0] != code.CopyOutResponse) {
this.emit('error', new Error('Expected CopyOutResponse code (H)'))
}
var length = chunk.readUInt32BE(1)
offset = 1
@@ -59,14 +53,14 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
while((chunk.length - offset) > 5) {
var messageCode = chunk[offset]
//complete or error
if(messageCode == code.c || messageCode == code.E) {
if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) {
this._detach()
this.push(null)
return cb();
}
//something bad happened
if(messageCode != code.d) {
return this.emit('error', new Error('expected "d" (copydata message)'))
if(messageCode != code.CopyData) {
return this.emit('error', new Error('Expected CopyData code (d)'))
}
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
//can we read the next row?

View File

@@ -10,6 +10,7 @@ module.exports = {
var Transform = require('stream').Transform
var util = require('util')
var code = require('./message-formats')
var CopyStreamQuery = function(text, options) {
Transform.call(this, options)
@@ -26,13 +27,8 @@ CopyStreamQuery.prototype.submit = function(connection) {
connection.query(this.text)
}
var code = {
H: 72, //CopyOutResponse
d: 0x64, //CopyData
c: 0x63 //CopyDone
}
var copyDataBuffer = Buffer([code.d])
var copyDataBuffer = Buffer([code.CopyData])
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
this.push(copyDataBuffer)
var lenBuffer = Buffer(4)
@@ -43,10 +39,9 @@ CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
}
CopyStreamQuery.prototype._flush = function(cb) {
var finBuffer = Buffer([code.c, 0, 0, 0, 4])
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4])
this.push(finBuffer)
//never call this callback, do not close underlying stream
//cb()
cb()
}
CopyStreamQuery.prototype.handleError = function(e) {
@@ -54,7 +49,7 @@ CopyStreamQuery.prototype.handleError = function(e) {
}
CopyStreamQuery.prototype.handleCopyInResponse = function(connection) {
this.pipe(connection.stream)
this.pipe(connection.stream, { end: false })
}
CopyStreamQuery.prototype.handleCommandComplete = function(msg) {

17
message-formats.js Normal file
View File

@@ -0,0 +1,17 @@
/**
* The COPY feature uses the following protocol codes.
* The codes for the most recent protocol version are documented on
* https://www.postgresql.org/docs/current/static/protocol-message-formats.html
*
* The protocol flow itself is described on
* https://www.postgresql.org/docs/current/static/protocol-flow.html
*/
module.exports = {
ErrorResponse: 0x45,
CopyInResponse: 0x47,
CopyOutResponse: 0x48,
CopyBothResponse: 0x57,
CopyData: 0x64,
CopyDone: 0x63,
CopyFail: 0x66
}

View File

@@ -1,6 +1,6 @@
{
"name": "pg-copy-streams",
"version": "1.1.0",
"version": "1.1.2",
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
"main": "index.js",
"scripts": {