Skip to content

Commit 722296f

Browse files
committed
Initial commit
0 parents  commit 722296f

9 files changed

+318
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules

Makefile

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
SHELL := /bin/sh
2+
.PHONY: test
3+
4+
test:
5+
find test/ -name "*.js" | xargs -n 1 node

README.md

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# pg-query-stream
2+
3+
Receive result rows from [pg](https://github.com/brianc/node-postgres) as a readable (object) stream.
4+
5+
This module __only works with the pure JavaScript client__.
6+
7+
## installation
8+
9+
```bash
10+
$ npm install pg
11+
$ npm install pg-query-stream
12+
```
13+
14+
_requires pg>=2.8.1_
15+
16+
##### - or -
17+
18+
```bash
19+
$ npm install pg.js
20+
$ npm install pg-query-stream
21+
```
22+
23+
_requires pg.js>=2.8.1_
24+
25+
## use
26+
27+
```js
28+
var pg = require('pg')
29+
var QueryStream = require('pg-query-stream')
30+
var JSONStream = require('JSONStream')
31+
32+
//pipe 1,000,000 rows to stdout without blowing up your memory usage
33+
pg.connect(function(err, client, done) {
34+
if(err) throw err;
35+
var query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000])
36+
var stream = client.query(query)
37+
stream.pipe(JSONStream.stringify()).pipe(process.stdout)
38+
})
39+
```
40+
41+
The stream uses a cursor on the server so it efficiently keeps only a low number of rows in memory.
42+
43+
This is especially useful when doing [ETL](http://en.wikipedia.org/wiki/Extract,_transform,_load) on a huge table. Using manual `limit` and `offset` queries to fake out async itteration through your data is cumbersom, and _way way way_ slower than using a cursor.
44+
45+
## contribution
46+
47+
I'm very open to contribution! Open a pull request with your code or idea and we'll talk about it. If it's not way insane we'll merge it in too: isn't open source awesome?
48+
49+
## license
50+
51+
The MIT License (MIT)
52+
53+
Copyright (c) 2013 Brian M. Carlson
54+
55+
Permission is hereby granted, free of charge, to any person obtaining a copy
56+
of this software and associated documentation files (the "Software"), to deal
57+
in the Software without restriction, including without limitation the rights
58+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
59+
copies of the Software, and to permit persons to whom the Software is
60+
furnished to do so, subject to the following conditions:
61+
62+
The above copyright notice and this permission notice shall be included in
63+
all copies or substantial portions of the Software.
64+
65+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
66+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
67+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
68+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
69+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
70+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
71+
THE SOFTWARE.

index.js

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
var assert = require('assert')
2+
var Readable = require('stream').Readable
3+
var Result = require('pg')
4+
5+
var path = require('path')
6+
7+
var pgdir = false
8+
try {
9+
pgdir = path.dirname(require.resolve('pg'))
10+
} catch (e) {
11+
pgdir = path.dirname(require.resolve('pg.js'))
12+
}
13+
if(!pgdir) {
14+
throw new Error("Please install either `pg` or `pg.js` to use this module")
15+
}
16+
var Result = require(path.join(pgdir, 'result'))
17+
var utils = require(path.join(pgdir, 'utils'))
18+
19+
var QueryStream = module.exports = function(text, values, options) {
20+
options = options || {
21+
highWaterMark: 100,
22+
batchSize: 100
23+
}
24+
Readable.call(this, {
25+
objectMode: true,
26+
highWaterMark: 100
27+
})
28+
this.text = text
29+
assert(this.text, 'text cannot be falsy')
30+
this.values = (values || []).map(utils.prepareValue)
31+
this.name = ''
32+
this._result = new Result()
33+
this.batchSize = 100
34+
this._idle = true
35+
}
36+
37+
require('util').inherits(QueryStream, Readable)
38+
39+
QueryStream.prototype._read = function(n) {
40+
this._getRows(n)
41+
}
42+
43+
QueryStream.prototype._getRows = function(count) {
44+
var con = this.connection
45+
if(!this._idle || !this.connection) return;
46+
this._idle = false
47+
con.execute({
48+
portal: '',
49+
rows: count
50+
}, true)
51+
52+
con.flush()
53+
}
54+
55+
QueryStream.prototype.submit = function(con) {
56+
//save reference to connection
57+
this.connection = con
58+
59+
var name = this.name
60+
61+
con.parse({
62+
text: this.text,
63+
name: name,
64+
types: []
65+
}, true)
66+
67+
con.bind({
68+
portal: '',
69+
statement: name,
70+
values: this.values,
71+
binary: false
72+
}, true)
73+
74+
con.describe({
75+
type: 'P',
76+
name: name
77+
}, true)
78+
79+
this._getRows(this.batchSize)
80+
81+
}
82+
83+
QueryStream.prototype.handleRowDescription = function(msg) {
84+
this._result.addFields(msg.fields)
85+
}
86+
87+
QueryStream.prototype.handleDataRow = function(msg) {
88+
var row = this._result.parseRow(msg.fields)
89+
this._more = this.push(row)
90+
}
91+
92+
QueryStream.prototype.handlePortalSuspended = function(msg) {
93+
this._idle = true
94+
if(this._more) {
95+
this._getRows(this.batchSize)
96+
}
97+
}
98+
99+
QueryStream.prototype.handleCommandComplete = function(msg) {
100+
this.connection.sync()
101+
}
102+
103+
QueryStream.prototype.handleReadyForQuery = function() {
104+
this.push(null)
105+
}

package.json

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"name": "pg-query-stream",
3+
"version": "0.0.0",
4+
"description": "Postgres query result returned as readable stream",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "node test"
8+
},
9+
"repository": {
10+
"type": "git",
11+
"url": "git://github.com/brianc/node-pg-query-stream.git"
12+
},
13+
"keywords": [
14+
"postgres",
15+
"pg",
16+
"query",
17+
"stream"
18+
],
19+
"author": "Brian M. Carlson",
20+
"license": "BSD-2-Clause",
21+
"bugs": {
22+
"url": "https://github.com/brianc/node-pg-query-stream/issues"
23+
},
24+
"devDependencies": {
25+
"pg.js": "~2.8.0",
26+
"gonna": "0.0.0",
27+
"lodash": "~2.2.1",
28+
"concat-stream": "~1.0.1",
29+
"through": "~2.3.4",
30+
"stream-tester": "0.0.5",
31+
"stream-spec": "~0.3.5",
32+
"jsonstream": "0.0.1",
33+
"JSONStream": "~0.7.1"
34+
}
35+
}

test/concat.js

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
var pg = require('pg')
2+
var assert = require('assert')
3+
var gonna = require('gonna')
4+
var _ = require('lodash')
5+
var concat = require('concat-stream')
6+
var through = require('through')
7+
8+
var QueryStream = require('../')
9+
10+
var client = new pg.Client()
11+
12+
var connected = gonna('connect', 100, function() {
13+
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
14+
var query = client.query(stream)
15+
query.pipe(through(function(row) {
16+
this.push(row.num)
17+
})).pipe(concat(function(result) {
18+
var total = result.reduce(function(prev, cur) {
19+
return prev + cur
20+
})
21+
assert.equal(total, 20100)
22+
}))
23+
stream.on('end', client.end.bind(client))
24+
})
25+
26+
client.connect(connected)

test/fast-reader.js

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
var pg = require('pg')
2+
var assert = require('assert')
3+
var gonna = require('gonna')
4+
var _ = require('lodash')
5+
6+
var QueryStream = require('../')
7+
8+
var client = new pg.Client()
9+
10+
var connected = gonna('connect', 100, function() {
11+
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
12+
var query = client.query(stream)
13+
var result = []
14+
stream.on('readable', function() {
15+
var res = stream.read()
16+
assert(res, 'should not return null on evented reader')
17+
result.push(res.num)
18+
})
19+
stream.on('end', client.end.bind(client))
20+
stream.on('end', function() {
21+
var total = result.reduce(function(prev, cur) {
22+
return prev + cur
23+
})
24+
assert.equal(total, 20100)
25+
})
26+
assert.strictEqual(query.read(2), null)
27+
})
28+
29+
client.connect(connected)

test/pauses.js

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
var pg = require('pg')
2+
var assert = require('assert')
3+
var gonna = require('gonna')
4+
var _ = require('lodash')
5+
var concat = require('concat-stream')
6+
var through = require('through')
7+
var tester = require('stream-tester')
8+
var JSONStream = require('JSONStream')
9+
var stream = require('stream')
10+
11+
var QueryStream = require('../')
12+
13+
var client = new pg.Client()
14+
15+
var connected = gonna('connect', 100, function() {
16+
var stream = new QueryStream('SELECT * FROM generate_series(0, $1) num', [200], {chunkSize: 2, highWaterMark: 2})
17+
var query = client.query(stream)
18+
var pauser = tester.createPauseStream(0.1, 100)
19+
query.pipe(JSONStream.stringify()).pipe(concat(function(json) {
20+
JSON.parse(json)
21+
client.end()
22+
}))
23+
})
24+
25+
client.connect(connected)

test/stream-tester.js

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
var pg = require('pg')
2+
var assert = require('assert')
3+
var gonna = require('gonna')
4+
var tester = require('stream-tester')
5+
6+
var QueryStream = require('../')
7+
8+
var client = new pg.Client()
9+
10+
var connected = gonna('connect', 100, function() {
11+
var stream = new QueryStream('SELECT * FROM generate_series(0, 200) num', [])
12+
var spec = require('stream-spec')
13+
var query = client.query(stream)
14+
spec(query)
15+
.readable()
16+
.pausable({strict: true})
17+
.validateOnExit()
18+
stream.on('end', client.end.bind(client))
19+
})
20+
21+
client.connect(connected)

0 commit comments

Comments
 (0)