Skip to content

Commit 5d27cf2

Browse files
committed
Initial commit
0 parents  commit 5d27cf2

File tree

4 files changed

+231
-0
lines changed

4 files changed

+231
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules

index.js

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
var path = require('path')
2+
3+
var resultPath = path.dirname(require.resolve('pg.js')) + '/lib/result'
4+
var Result = require(resultPath)
5+
var Client = require('pg.js').Client
6+
7+
var Cursor = function(text, values) {
8+
this.text = text
9+
this.values = values
10+
this._connection = null
11+
}
12+
13+
Cursor.prototype._connect = function(cb) {
14+
if(this._connected) return setImmediate(cb);
15+
this._connected = true
16+
var self = this
17+
var client = new Client()
18+
client.connect(function(err) {
19+
if(err) return cb(err);
20+
21+
//remove all listeners from
22+
//client's connection and discard the client
23+
self.connection = client.connection
24+
self.connection.removeAllListeners()
25+
26+
var con = self.connection
27+
28+
con.parse({
29+
text: self.text
30+
}, true)
31+
32+
con.bind({
33+
values: self.values
34+
}, true)
35+
36+
con.describe({
37+
type: 'P',
38+
name: '' //use unamed portal
39+
}, true)
40+
41+
con.flush()
42+
43+
var onError = function(err) {
44+
cb(err)
45+
con.end()
46+
}
47+
48+
con.once('error', onError)
49+
50+
con.on('rowDescription', function(msg) {
51+
self.rowDescription = msg
52+
con.removeListener('error', onError)
53+
cb(null, con)
54+
})
55+
56+
var onRow = function(msg) {
57+
var row = self.result.parseRow(msg.fields)
58+
self.result.addRow(row)
59+
}
60+
61+
con.on('dataRow', onRow)
62+
63+
con.once('readyForQuery', function() {
64+
con.end()
65+
})
66+
67+
con.once('commandComplete', function() {
68+
self._complete = true
69+
con.sync()
70+
})
71+
})
72+
}
73+
74+
Cursor.prototype._getRows = function(con, n, cb) {
75+
if(this._done) {
76+
return cb(null, [], false)
77+
}
78+
var msg = {
79+
portal: '',
80+
rows: n
81+
}
82+
con.execute(msg, true)
83+
con.flush()
84+
this.result = new Result()
85+
this.result.addFields(this.rowDescription.fields)
86+
87+
var self = this
88+
89+
var onComplete = function() {
90+
self._done = true
91+
cb(null, self.result.rows, self.result)
92+
}
93+
con.once('commandComplete', onComplete)
94+
95+
con.once('portalSuspended', function() {
96+
cb(null, self.result.rows, self.result)
97+
con.removeListener('commandComplete', onComplete)
98+
})
99+
}
100+
101+
Cursor.prototype.end = function(cb) {
102+
this.connection.end()
103+
this.connection.stream.once('end', cb)
104+
}
105+
106+
Cursor.prototype.read = function(rows, cb) {
107+
var self = this
108+
this._connect(function(err) {
109+
if(err) return cb(err);
110+
self._getRows(self.connection, rows, cb)
111+
})
112+
}
113+
114+
module.exports = function(query, params) {
115+
return new Cursor(query, params)
116+
}

package.json

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"name": "node-pg-cursor",
3+
"version": "0.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"directories": {
7+
"test": "test"
8+
},
9+
"scripts": {
10+
"test": "echo \"Error: no test specified\" && exit 1"
11+
},
12+
"author": "",
13+
"license": "BSD",
14+
"devDependencies": {
15+
"gonna": "0.0.0"
16+
},
17+
"dependencies": {
18+
"pg.js": "~2.7.0"
19+
}
20+
}

test/index.js

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
var assert = require('assert')
2+
var pgCursor = require('../')
3+
var gonna = require('gonna')
4+
5+
var text = 'SELECT generate_series as num FROM generate_series(0, 5)'
6+
var values = []
7+
8+
var test = function(name, fn, timeout) {
9+
timeout = timeout || 1000
10+
var done = gonna(name, timeout, function(err) {
11+
console.log(name)
12+
assert.ifError(err)
13+
})
14+
fn(done)
15+
}
16+
17+
test('fetch 6 when asking for 10', function(done) {
18+
var cursor = pgCursor(text)
19+
cursor.read(10, function(err, res) {
20+
assert.ifError(err)
21+
assert.equal(res.length, 6)
22+
done()
23+
})
24+
})
25+
26+
test('end before reading to end', function(done) {
27+
var cursor = pgCursor(text)
28+
cursor.read(3, function(err, res) {
29+
assert.equal(res.length, 3)
30+
cursor.end(done)
31+
})
32+
})
33+
34+
test('callback with error', function(done) {
35+
var cursor = pgCursor('select asdfasdf')
36+
cursor.read(1, function(err) {
37+
assert(err)
38+
done()
39+
})
40+
})
41+
42+
43+
test('read a partial chunk of data', function(done) {
44+
var cursor = pgCursor(text)
45+
cursor.read(2, function(err, res) {
46+
assert.equal(res.length, 2)
47+
cursor.read(3, function(err, res) {
48+
assert.equal(res.length, 3)
49+
cursor.read(1, function(err, res) {
50+
assert.equal(res.length, 1)
51+
cursor.read(1, function(err, res) {
52+
assert.ifError(err)
53+
assert.strictEqual(res.length, 0)
54+
done()
55+
})
56+
})
57+
})
58+
})
59+
})
60+
61+
test('read return length 0 past the end', function(done) {
62+
var cursor = pgCursor(text)
63+
cursor.read(2, function(err, res) {
64+
cursor.read(100, function(err, res) {
65+
assert.equal(res.length, 4)
66+
cursor.read(100, function(err, res) {
67+
assert.equal(res.length, 0)
68+
done()
69+
})
70+
})
71+
})
72+
})
73+
74+
test('read huge result', function(done) {
75+
var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)'
76+
var values = []
77+
cursor = pgCursor(text, values);
78+
var count = 0;
79+
var more = function() {
80+
cursor.read(1000, function(err, rows) {
81+
if(err) return done(err);
82+
if(!rows.length) {
83+
assert.equal(count, 1000001)
84+
return done()
85+
}
86+
count += rows.length;
87+
if(count%100000 == 0) {
88+
console.log(count)
89+
}
90+
setImmediate(more)
91+
})
92+
}
93+
more()
94+
}, 100000)

0 commit comments

Comments
 (0)