Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
399bff7ed7 | ||
|
|
8174e10fb5 | ||
|
|
f29aef3bba | ||
|
|
b2e108571e | ||
|
|
7003f6070f | ||
|
|
ade7ab95a0 | ||
|
|
9ccda04036 | ||
|
|
a5e532f20b | ||
|
|
2a4db2920e | ||
|
|
f155899570 |
87
copy-to.js
87
copy-to.js
@@ -9,7 +9,7 @@ var code = require('./message-formats')
|
||||
var CopyStreamQuery = function(text, options) {
|
||||
Transform.call(this, options)
|
||||
this.text = text
|
||||
this._copyOutResponse = null
|
||||
this._gotCopyOutResponse = false
|
||||
this.rowCount = 0
|
||||
}
|
||||
|
||||
@@ -30,50 +30,69 @@ CopyStreamQuery.prototype._detach = function() {
|
||||
this.connection.stream.resume()
|
||||
}
|
||||
|
||||
|
||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
var offset = 0
|
||||
var Byte1Len = 1;
|
||||
var Int32Len = 4;
|
||||
if(this._remainder && chunk) {
|
||||
chunk = Buffer.concat([this._remainder, chunk])
|
||||
}
|
||||
if(!this._copyOutResponse) {
|
||||
this._copyOutResponse = true
|
||||
if(chunk[0] == code.ErrorResponse) {
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
}
|
||||
if(chunk[0] != code.CopyOutResponse) {
|
||||
this.emit('error', new Error('Expected CopyOutResponse code (H)'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(1)
|
||||
offset = 1
|
||||
offset += length
|
||||
}
|
||||
while((chunk.length - offset) > 5) {
|
||||
|
||||
var length;
|
||||
var messageCode;
|
||||
var needPush = false;
|
||||
|
||||
while((chunk.length - offset) >= (Byte1Len + Int32Len)) {
|
||||
var messageCode = chunk[offset]
|
||||
//complete or error
|
||||
if(messageCode == code.CopyDone || messageCode == code.ErrorResponse) {
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
|
||||
//console.log('PostgreSQL message ' + String.fromCharCode(messageCode))
|
||||
switch(messageCode) {
|
||||
|
||||
// detect COPY start
|
||||
case code.CopyOutResponse:
|
||||
if (!this._gotCopyOutResponse) {
|
||||
this._gotCopyOutResponse = true
|
||||
} else {
|
||||
this.emit('error', new Error('Unexpected CopyOutResponse message (H)'))
|
||||
}
|
||||
break;
|
||||
|
||||
// meaningful row
|
||||
case code.CopyData:
|
||||
needPush = true;
|
||||
break;
|
||||
|
||||
// standard interspersed messages. discard
|
||||
case code.ParameterStatus:
|
||||
case code.NoticeResponse:
|
||||
case code.NotificationResponse:
|
||||
break;
|
||||
|
||||
case code.ErrorResponse:
|
||||
case code.CopyDone:
|
||||
this._detach()
|
||||
this.push(null)
|
||||
return cb();
|
||||
break;
|
||||
default:
|
||||
this.emit('error', new Error('Unexpected PostgreSQL message ' + String.fromCharCode(messageCode)))
|
||||
}
|
||||
//something bad happened
|
||||
if(messageCode != code.CopyData) {
|
||||
return this.emit('error', new Error('Expected CopyData code (d)'))
|
||||
}
|
||||
var length = chunk.readUInt32BE(offset + 1) - 4 //subtract length of UInt32
|
||||
//can we read the next row?
|
||||
if(chunk.length > (offset + length + 5)) {
|
||||
offset += 5
|
||||
var slice = chunk.slice(offset, offset + length)
|
||||
offset += length
|
||||
this.push(slice)
|
||||
this.rowCount++
|
||||
|
||||
length = chunk.readUInt32BE(offset+Byte1Len)
|
||||
if(chunk.length >= (offset + Byte1Len + length)) {
|
||||
offset += Byte1Len + Int32Len
|
||||
if (needPush) {
|
||||
var row = chunk.slice(offset, offset + length - Int32Len)
|
||||
this.rowCount++
|
||||
this.push(row)
|
||||
}
|
||||
offset += (length - Int32Len)
|
||||
} else {
|
||||
// we need more chunks for a complete message
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if(chunk.length - offset) {
|
||||
var slice = chunk.slice(offset)
|
||||
this._remainder = slice
|
||||
|
||||
19
index.js
19
index.js
@@ -30,18 +30,20 @@ CopyStreamQuery.prototype.submit = function(connection) {
|
||||
|
||||
var copyDataBuffer = Buffer([code.CopyData])
|
||||
CopyStreamQuery.prototype._transform = function(chunk, enc, cb) {
|
||||
var Int32Len = 4;
|
||||
this.push(copyDataBuffer)
|
||||
var lenBuffer = Buffer(4)
|
||||
lenBuffer.writeUInt32BE(chunk.length + 4, 0)
|
||||
var lenBuffer = Buffer(Int32Len)
|
||||
lenBuffer.writeUInt32BE(chunk.length + Int32Len, 0)
|
||||
this.push(lenBuffer)
|
||||
this.push(chunk)
|
||||
cb()
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype._flush = function(cb) {
|
||||
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, 4])
|
||||
var Int32Len = 4;
|
||||
var finBuffer = Buffer([code.CopyDone, 0, 0, 0, Int32Len])
|
||||
this.push(finBuffer)
|
||||
cb()
|
||||
this.cb_flush = cb
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleError = function(e) {
|
||||
@@ -60,8 +62,13 @@ CopyStreamQuery.prototype.handleCommandComplete = function(msg) {
|
||||
this.rowCount = parseInt(match[1], 10)
|
||||
}
|
||||
|
||||
this.unpipe()
|
||||
this.emit('end')
|
||||
// we delay the _flush cb so that the 'end' event is
|
||||
// triggered after CommandComplete
|
||||
this.cb_flush()
|
||||
|
||||
// unpipe from connection
|
||||
this.unpipe(this.connection)
|
||||
this.connection = null
|
||||
}
|
||||
|
||||
CopyStreamQuery.prototype.handleReadyForQuery = function() {
|
||||
|
||||
@@ -7,11 +7,19 @@
|
||||
* https://www.postgresql.org/docs/current/static/protocol-flow.html
|
||||
*/
|
||||
module.exports = {
|
||||
ErrorResponse: 0x45,
|
||||
CopyInResponse: 0x47,
|
||||
CopyOutResponse: 0x48,
|
||||
CopyBothResponse: 0x57,
|
||||
CopyData: 0x64,
|
||||
CopyDone: 0x63,
|
||||
CopyFail: 0x66
|
||||
ErrorResponse: 0x45, // E
|
||||
CopyInResponse: 0x47, // G
|
||||
CopyOutResponse: 0x48, // H
|
||||
CopyBothResponse: 0x57, // W
|
||||
CopyDone: 0x63, // c
|
||||
CopyData: 0x64, // d
|
||||
CopyFail: 0x66, // f
|
||||
|
||||
// It is possible for NoticeResponse and ParameterStatus messages to be interspersed between CopyData messages;
|
||||
// frontends must handle these cases, and should be prepared for other asynchronous message types as well
|
||||
// (see Section 50.2.6).
|
||||
// Otherwise, any message type other than CopyData or CopyDone may be treated as terminating copy-out mode.
|
||||
NotificationResponse: 0x41, // A
|
||||
NoticeResponse: 0x4E, // N
|
||||
ParameterStatus: 0x53 // S
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "pg-copy-streams",
|
||||
"version": "1.1.1",
|
||||
"version": "1.2.0",
|
||||
"description": "Low-Level COPY TO and COPY FROM streams for PostgreSQL in JavaScript using",
|
||||
"main": "index.js",
|
||||
"scripts": {
|
||||
|
||||
@@ -39,6 +39,7 @@ var testRange = function(top) {
|
||||
fromClient.query('SELECT COUNT(*) FROM numbers', function(err, res) {
|
||||
assert.ifError(err)
|
||||
assert.equal(res.rows[0].count, top, 'expected ' + top + ' rows but got ' + res.rows[0].count)
|
||||
assert.equal(stream.rowCount, top, 'expected ' + top + ' rows but db count is ' + stream.rowCount)
|
||||
//console.log('found ', res.rows.length, 'rows')
|
||||
countDone()
|
||||
var firstRowDone = gonna('have correct result')
|
||||
@@ -54,3 +55,19 @@ var testRange = function(top) {
|
||||
}
|
||||
|
||||
testRange(1000)
|
||||
|
||||
var testSingleEnd = function() {
|
||||
var fromClient = client()
|
||||
fromClient.query('CREATE TEMP TABLE numbers(num int)')
|
||||
var txt = 'COPY numbers FROM STDIN';
|
||||
var stream = fromClient.query(copy(txt))
|
||||
var count = 0;
|
||||
stream.on('end', function() {
|
||||
count++;
|
||||
assert(count==1, '`end` Event was triggered ' + count + ' times');
|
||||
if (count == 1) fromClient.end();
|
||||
})
|
||||
stream.end(Buffer('1\n'))
|
||||
|
||||
}
|
||||
testSingleEnd()
|
||||
|
||||
@@ -7,6 +7,7 @@ var concat = require('concat-stream')
|
||||
var pg = require('pg')
|
||||
|
||||
var copy = require('../').to
|
||||
var code = require('../message-formats')
|
||||
|
||||
var client = function() {
|
||||
var client = new pg.Client()
|
||||
@@ -22,6 +23,18 @@ var testConstruction = function() {
|
||||
|
||||
testConstruction()
|
||||
|
||||
var testComparators = function() {
|
||||
var copy1 = copy();
|
||||
copy1.pipe(concat(function(buf) {
|
||||
assert(copy1._gotCopyOutResponse, 'should have received CopyOutResponse')
|
||||
assert(!copy1._remainder, 'Message with no additional data (len=Int4Len+0) should not leave a remainder')
|
||||
}))
|
||||
copy1.end(new Buffer([code.CopyOutResponse, 0x00, 0x00, 0x00, 0x04]));
|
||||
|
||||
|
||||
}
|
||||
testComparators();
|
||||
|
||||
var testRange = function(top) {
|
||||
var fromClient = client()
|
||||
var txt = 'COPY (SELECT * from generate_series(0, ' + (top - 1) + ')) TO STDOUT'
|
||||
@@ -73,5 +86,37 @@ var testInternalPostgresError = function() {
|
||||
queryClient.end()
|
||||
})
|
||||
}
|
||||
|
||||
testInternalPostgresError()
|
||||
|
||||
var testNoticeResponse = function() {
|
||||
// we use a special trick to generate a warning
|
||||
// on the copy stream.
|
||||
var queryClient = client()
|
||||
var set = '';
|
||||
set += 'SET SESSION client_min_messages = WARNING;'
|
||||
set += 'SET SESSION standard_conforming_strings = off;'
|
||||
set += 'SET SESSION escape_string_warning = on;'
|
||||
queryClient.query(set, function(err, res) {
|
||||
assert.equal(err, null, 'testNoticeResponse - could not SET parameters')
|
||||
var runStream = function(callback) {
|
||||
var txt = "COPY (SELECT '\\\n') TO STDOUT"
|
||||
var stream = queryClient.query(copy(txt))
|
||||
stream.on('data', function(data) {
|
||||
})
|
||||
stream.on('error', callback)
|
||||
|
||||
// make sure stream is pulled from
|
||||
stream.pipe(concat(callback.bind(null,null)))
|
||||
}
|
||||
|
||||
runStream(function(err) {
|
||||
assert.equal(err, null, err)
|
||||
queryClient.end()
|
||||
})
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
testNoticeResponse();
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user