Skip to content

Commit aefe415

Browse files
committed
Added pipeline
1 parent a84ebb3 commit aefe415

File tree

4 files changed

+212
-16
lines changed

4 files changed

+212
-16
lines changed

packages/pg/lib/client.js

+130-9
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class Client extends EventEmitter {
4242
this._connected = false
4343
this._connectionError = false
4444
this._queryable = true
45-
4645
this.connection =
4746
c.connection ||
4847
new Connection({
@@ -52,10 +51,14 @@ class Client extends EventEmitter {
5251
keepAliveInitialDelayMillis: c.keepAliveInitialDelayMillis || 0,
5352
encoding: this.connectionParameters.client_encoding || 'utf8',
5453
})
54+
this.hasExecuted = false;
5555
this.queryQueue = []
5656
this.binary = c.binary || defaults.binary
57+
this.usePipeline = false;
58+
this.preparedQueries = [];
5759
this.processID = null
5860
this.secretKey = null
61+
this.hasQueriesPrepared = false;
5962
this.ssl = this.connectionParameters.ssl || false
6063
// As with Password, make SSL->Key (the private key) non-enumerable.
6164
// It won't show up in stack traces
@@ -311,7 +314,9 @@ class Client extends EventEmitter {
311314
if (activeQuery) {
312315
activeQuery.handleReadyForQuery(this.connection)
313316
}
314-
this._pulseQueryQueue()
317+
if (this.usePipeline == false) {
318+
this._pulseQueryQueue()
319+
}
315320
}
316321

317322
// if we receieve an error event or error message
@@ -478,11 +483,10 @@ class Client extends EventEmitter {
478483

479484
_pulseQueryQueue() {
480485
if (this.readyForQuery === true) {
481-
this.activeQuery = this.queryQueue.shift()
486+
this.activeQuery = this.queryQueue.shift();
482487
if (this.activeQuery) {
483488
this.readyForQuery = false
484489
this.hasExecuted = true
485-
486490
const queryError = this.activeQuery.submit(this.connection)
487491
if (queryError) {
488492
process.nextTick(() => {
@@ -498,6 +502,120 @@ class Client extends EventEmitter {
498502
}
499503
}
500504

505+
async queryPipeline(queries) {
506+
console.log("Entered pipeline");
507+
this.usePipeline = true;
508+
let queriesComputed = [];
509+
let resultList = [];
510+
for (let elem of queries) {
511+
let queryToInsert, result;
512+
[queryToInsert, result] = this.queryForPipeline(elem.config, elem.values);
513+
queriesComputed.push(queryToInsert);
514+
resultList.push(result);
515+
}
516+
for (let i = 0; i < queriesComputed.length; i++) {
517+
console.log(`Processing query ${i}`);
518+
let current = queriesComputed[i];
519+
this.activeQuery = current;
520+
let next;
521+
if (i + 1 < queriesComputed.length) {
522+
next = queriesComputed[i + 1];
523+
}
524+
if (!current.isPreparedSomehow()) {
525+
await current.queryPrepareNew(this.connection);
526+
} else {
527+
console.log("Already prepared");
528+
}
529+
let pList = [];
530+
pList.push(current.submitNew(this.connection));
531+
if (next) {
532+
pList.push(next.queryPrepareNew(this.connection));
533+
}
534+
await Promise.all(pList);
535+
console.log(`Done processing query ${i}`);
536+
}
537+
this.activeQuery = null;
538+
this.emit('drain');
539+
this.usePipeline = false;
540+
return resultList;
541+
}
542+
543+
queryForPipeline(config, values, callback) {
544+
// can take in strings, config object or query object
545+
var query
546+
var result
547+
var readTimeout
548+
var readTimeoutTimer
549+
var queryCallback
550+
551+
if (config === null || config === undefined) {
552+
throw new TypeError('Client was passed a null or undefined query')
553+
} else if (typeof config.submit === 'function') {
554+
readTimeout = config.query_timeout || this.connectionParameters.query_timeout
555+
result = query = config
556+
if (typeof values === 'function') {
557+
query.callback = query.callback || values
558+
}
559+
} else {
560+
readTimeout = this.connectionParameters.query_timeout
561+
query = new Query(config, values, callback)
562+
if (!query.callback) {
563+
result = new this._Promise((resolve, reject) => {
564+
query.callback = (err, res) => (err ? reject(err) : resolve(res))
565+
}).catch(err => {
566+
// replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the
567+
// application that created the query
568+
Error.captureStackTrace(err);
569+
throw err;
570+
})
571+
}
572+
}
573+
574+
if (readTimeout) {
575+
queryCallback = query.callback
576+
577+
readTimeoutTimer = setTimeout(() => {
578+
var error = new Error('Query read timeout')
579+
process.nextTick(() => {
580+
query.handleError(error, this.connection)
581+
})
582+
queryCallback(error)
583+
// we already returned an error,
584+
// just do nothing if query completes
585+
query.callback = () => { }
586+
}, readTimeout)
587+
588+
query.callback = (err, res) => {
589+
clearTimeout(readTimeoutTimer)
590+
queryCallback(err, res)
591+
}
592+
}
593+
594+
if (this.binary && !query.binary) {
595+
query.binary = true
596+
}
597+
598+
if (query._result && !query._result._types) {
599+
query._result._types = this._types
600+
}
601+
602+
if (!this._queryable) {
603+
process.nextTick(() => {
604+
query.handleError(new Error('Client has encountered a connection error and is not queryable'), this.connection)
605+
})
606+
return result
607+
}
608+
609+
if (this._ending) {
610+
process.nextTick(() => {
611+
query.handleError(new Error('Client was closed and is not queryable'), this.connection)
612+
})
613+
return result
614+
}
615+
616+
return [query, result]
617+
}
618+
501619
query(config, values, callback) {
502620
// can take in strings, config object or query object
503621
var query
@@ -543,15 +661,16 @@ class Client extends EventEmitter {
543661

544662
// we already returned an error,
545663
// just do nothing if query completes
546-
query.callback = () => {}
664+
query.callback = () => { }
547665

548666
// Remove from queue
549667
var index = this.queryQueue.indexOf(query)
550668
if (index > -1) {
551669
this.queryQueue.splice(index, 1)
552670
}
553-
554-
this._pulseQueryQueue()
671+
if (this.usePipeline == false) {
672+
this._pulseQueryQueue()
673+
}
555674
}, readTimeout)
556675

557676
query.callback = (err, res) => {
@@ -582,8 +701,10 @@ class Client extends EventEmitter {
582701
return result
583702
}
584703

585-
this.queryQueue.push(query)
586-
this._pulseQueryQueue()
704+
this.queryQueue.push(query);
705+
if (this.usePipeline == false) {
706+
this._pulseQueryQueue();
707+
}
587708
return result
588709
}
589710

packages/pg/lib/connection.js

+9-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,15 @@ class Connection extends EventEmitter {
148148
}
149149
return this.stream.write(buffer)
150150
}
151-
151+
writeArray(array) {
152+
if (!this.stream.writable) {
153+
return false
154+
}
155+
for (let buffer_ins of array) {
156+
this.stream.write(buffer_ins);
157+
}
158+
return true;
159+
}
152160
query(text) {
153161
this._send(serialize.query(text))
154162
}

packages/pg/lib/query.js

+63-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const { EventEmitter } = require('events')
4-
4+
const { parse, serialize } = require('pg-protocol')
55
const Result = require('./result')
66
const utils = require('./utils')
77

@@ -21,6 +21,7 @@ class Query extends EventEmitter {
2121
this.portal = config.portal || ''
2222
this.callback = config.callback
2323
this._rowMode = config.rowMode
24+
this.toBeWrittenInBuffer = [];
2425
if (process.domain && config.callback) {
2526
this.callback = process.domain.bind(config.callback)
2627
}
@@ -29,6 +30,7 @@ class Query extends EventEmitter {
2930
// potential for multiple results
3031
this._results = this._result
3132
this.isPreparedStatement = false
33+
this.alreadyPrepared = false;
3234
this._canceledDueToError = false
3335
this._promise = null
3436
}
@@ -134,6 +136,11 @@ class Query extends EventEmitter {
134136
if (this._canceledDueToError) {
135137
return this.handleError(this._canceledDueToError, con)
136138
}
139+
if (this.submitRes) {
140+
this.submitRes();
141+
this.submitP = null;
142+
this.submitRes = null;
143+
}
137144
if (this.callback) {
138145
try {
139146
this.callback(null, this._results)
@@ -163,7 +170,60 @@ class Query extends EventEmitter {
163170
} else {
164171
connection.query(this.text)
165172
}
166-
return null
173+
return null;
174+
}
175+
176+
async queryPrepareNew(connection) {
177+
this.isPreparedStatement = true
178+
this.toBeWrittenInBuffer.push(await serialize.parse({
179+
text: this.text,
180+
name: this.name,
181+
types: this.types,
182+
}));
183+
this.dataParsed = true;
184+
try {
185+
this.toBeWrittenInBuffer.push(serialize.bind({
186+
portal: this.portal,
187+
statement: this.name,
188+
values: this.values,
189+
binary: this.binary,
190+
valueMapper: utils.prepareValue,
191+
}));
192+
} catch (err) {
193+
this.handleError(err, connection)
194+
return
195+
}
196+
197+
this.toBeWrittenInBuffer.push(await serialize.describe({
198+
type: 'P',
199+
name: this.portal || '',
200+
}));
201+
}
202+
203+
isPreparedSomehow(connection) {
204+
return this.hasBeenParsed(connection) || this.dataParsed || this.requiresPreparation() == false;
205+
}
206+
207+
async submitNew(connection) {
208+
let res;
209+
let p = new Promise((resolve) => {
210+
res = resolve;
211+
})
212+
this.submitP = p;
213+
this.submitRes = res;
214+
console.log("Submitting");
215+
if (this.requiresPreparation()) {
216+
console.log("Is of type require preparation");
217+
connection.writeArray(this.toBeWrittenInBuffer);
218+
this._getRows(connection, this.rows)
219+
} else {
220+
console.log("Does not require preparation");
221+
connection.query(this.text);
222+
}
223+
await this.submitP;
224+
console.log("Done submitting");
225+
this.dataParsed = undefined;
226+
this.toBeWrittenInBuffer = [];
167227
}
168228

169229
hasBeenParsed(connection) {
@@ -196,7 +256,7 @@ class Query extends EventEmitter {
196256
this.isPreparedStatement = true
197257

198258
// TODO refactor this poor encapsulation
199-
if (!this.hasBeenParsed(connection)) {
259+
if (!this.hasBeenParsed(connection) && this.alreadyPrepared == false) {
200260
connection.parse({
201261
text: this.text,
202262
name: this.name,

packages/pg/lib/utils.js

+10-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ function escapeElement(elementRepresentation) {
1313
// a different array separator.
1414
function arrayString(val) {
1515
var result = '{'
16-
for (var i = 0; i < val.length; i++) {
16+
var len = val.length;
17+
for (var i = 0; i < len; i++) {
1718
if (i > 0) {
1819
result = result + ','
1920
}
@@ -43,6 +44,12 @@ var prepareValue = function (val, seen) {
4344
if (val instanceof Buffer) {
4445
return val
4546
}
47+
if (typeof val == "string") {
48+
return val;
49+
}
50+
if (typeof val == "number") {
51+
return val.toString();
52+
}
4653
if (ArrayBuffer.isView(val)) {
4754
var buf = Buffer.from(val.buffer, val.byteOffset, val.byteLength)
4855
if (buf.length === val.byteLength) {
@@ -170,8 +177,8 @@ const escapeIdentifier = function (str) {
170177
const escapeLiteral = function (str) {
171178
var hasBackslash = false
172179
var escaped = "'"
173-
174-
for (var i = 0; i < str.length; i++) {
180+
var len = str.length;
181+
for (var i = 0; i < len; i++) {
175182
var c = str[i]
176183
if (c === "'") {
177184
escaped += c + c

0 commit comments

Comments
 (0)