Skip to content

Commit 31b2b1d

Browse files
committed
All tests passing in isolation
Still have weird race conditions & shutdown/error/resume conditions to tackle
1 parent 2e33d4a commit 31b2b1d

File tree

3 files changed

+185
-143
lines changed

3 files changed

+185
-143
lines changed

index.js

+90-81
Original file line numberDiff line numberDiff line change
@@ -1,116 +1,125 @@
1-
var path = require('path')
2-
3-
var resultPath = path.dirname(require.resolve('pg.js')) + '/lib/result'
4-
var Result = require(resultPath)
5-
var Client = require('pg.js').Client
1+
var Result = require('./result')
62

73
var Cursor = function(text, values) {
84
this.text = text
95
this.values = values
10-
this._connection = null
6+
this.connection = null
7+
this._queue = []
8+
this.state = 'initialized'
9+
this._result = new Result()
10+
this._cb = null
11+
this._rows = null
1112
}
1213

13-
Cursor.prototype._connect = function(cb) {
14-
if(this._connected) return setImmediate(cb);
15-
this._connected = true
16-
var self = this
17-
var client = new Client()
18-
client.connect(function(err) {
19-
if(err) return cb(err);
20-
21-
//remove all listeners from
22-
//client's connection and discard the client
23-
self.connection = client.connection
24-
self.connection.removeAllListeners()
14+
Cursor.prototype.submit = function(connection) {
15+
this.connection = connection
2516

26-
var con = self.connection
17+
var con = connection
18+
var self = this
2719

28-
con.parse({
29-
text: self.text
30-
}, true)
20+
con.parse({
21+
text: this.text
22+
}, true)
3123

32-
con.bind({
33-
values: self.values
34-
}, true)
24+
con.bind({
25+
values: this.values
26+
}, true)
3527

36-
con.describe({
37-
type: 'P',
38-
name: '' //use unamed portal
39-
}, true)
28+
con.describe({
29+
type: 'P',
30+
name: '' //use unamed portal
31+
}, true)
4032

41-
con.flush()
33+
con.flush()
34+
}
4235

43-
var onError = function(err) {
44-
cb(err)
45-
con.end()
46-
}
36+
Cursor.prototype.handleRowDescription = function(msg) {
37+
this._result.addFields(msg.fields)
38+
this.state = 'idle'
39+
if(this._queue.length) {
40+
this._getRows.apply(this, this._queue.shift())
41+
}
42+
}
4743

48-
con.once('error', onError)
44+
Cursor.prototype.handleDataRow = function(msg) {
45+
var row = this._result.parseRow(msg.fields)
46+
this._rows.push(row)
47+
}
4948

50-
con.on('rowDescription', function(msg) {
51-
self.rowDescription = msg
52-
con.removeListener('error', onError)
53-
cb(null, con)
54-
})
49+
Cursor.prototype._sendRows = function() {
50+
this.state = 'idle'
51+
setImmediate(function() {
52+
this._cb(null, this._rows)
53+
this._rows = []
54+
}.bind(this))
55+
}
5556

56-
var onRow = function(msg) {
57-
var row = self.result.parseRow(msg.fields)
58-
self.result.addRow(row)
59-
}
57+
Cursor.prototype.handleCommandComplete = function() {
58+
this._sendRows()
59+
this.state = 'done'
60+
this.connection.sync()
61+
}
6062

61-
con.on('dataRow', onRow)
63+
Cursor.prototype.handlePortalSuspended = function() {
64+
this._sendRows()
65+
}
6266

63-
con.once('readyForQuery', function() {
64-
con.end()
65-
})
67+
Cursor.prototype.handleReadyForQuery = function() {
6668

67-
con.once('commandComplete', function() {
68-
self._complete = true
69-
con.sync()
70-
})
71-
})
7269
}
7370

74-
Cursor.prototype._getRows = function(con, n, cb) {
75-
if(this._done) {
76-
return cb(null, [], false)
71+
Cursor.prototype.handleError = function(msg) {
72+
this.state = 'error'
73+
this._error = msg
74+
//satisfy any waiting callback
75+
if(this._cb) {
76+
this._cb(msg)
7777
}
78-
var msg = {
79-
portal: '',
80-
rows: n
78+
//dispatch error to all waiting callbacks
79+
for(var i = 0; i < this._queue.length; i++) {
80+
this._queue.pop()[1](msg)
8181
}
82-
con.execute(msg, true)
83-
con.flush()
84-
this.result = new Result()
85-
this.result.addFields(this.rowDescription.fields)
86-
87-
var self = this
82+
}
8883

89-
var onComplete = function() {
90-
self._done = true
91-
cb(null, self.result.rows, self.result)
84+
Cursor.prototype._getRows = function(rows, cb) {
85+
console.log('get', rows)
86+
this.state = 'busy'
87+
this._cb = cb
88+
this._rows = []
89+
var msg = {
90+
portal: '',
91+
rows: rows
9292
}
93-
con.once('commandComplete', onComplete)
94-
95-
con.once('portalSuspended', function() {
96-
cb(null, self.result.rows, self.result)
97-
con.removeListener('commandComplete', onComplete)
98-
})
93+
this.connection.execute(msg, true)
94+
this.connection.flush()
9995
}
10096

10197
Cursor.prototype.end = function(cb) {
98+
if(this.statue != 'initialized') {
99+
this.connection.sync()
100+
}
102101
this.connection.end()
103102
this.connection.stream.once('end', cb)
104103
}
105104

106105
Cursor.prototype.read = function(rows, cb) {
106+
console.log('read', rows, this.state)
107107
var self = this
108-
this._connect(function(err) {
109-
if(err) return cb(err);
110-
self._getRows(self.connection, rows, cb)
111-
})
108+
if(this.state == 'idle') {
109+
return this._getRows(rows, cb)
110+
}
111+
if(this.state == 'busy' || this.state == 'initialized') {
112+
return this._queue.push([rows, cb])
113+
}
114+
if(this.state == 'error') {
115+
return cb(this._error)
116+
}
117+
if(this.state == 'done') {
118+
return cb(null, [])
119+
}
120+
else {
121+
throw new Error("Unknown state: " + this.state)
122+
}
112123
}
113124

114-
module.exports = function(query, params) {
115-
return new Cursor(query, params)
116-
}
125+
module.exports = Cursor

result.js

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
var path = require('path')
2+
var pgPath;
3+
//support both pg & pg.js
4+
//this will eventually go away when i break native bindings
5+
//out into their own module
6+
try {
7+
pgPath = path.dirname(require.resolve('pg'))
8+
} catch(e) {
9+
pgPath = path.dirname(require.resolve('pg.js'))
10+
}
11+
12+
module.exports = require(path.join(pgPath, 'lib', 'result.js'))

test/index.js

+83-62
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,107 @@
11
var assert = require('assert')
2-
var pgCursor = require('../')
3-
var gonna = require('gonna')
2+
var Cursor = require('../')
3+
var pg = require('pg.js')
44

55
var text = 'SELECT generate_series as num FROM generate_series(0, 5)'
6-
var values = []
76

8-
it('fetch 6 when asking for 10', function(done) {
9-
var cursor = pgCursor(text)
10-
cursor.read(10, function(err, res) {
11-
assert.ifError(err)
12-
assert.equal(res.length, 6)
13-
done()
14-
})
15-
})
7+
describe('cursor', function() {
168

17-
it('end before reading to end', function(done) {
18-
var cursor = pgCursor(text)
19-
cursor.read(3, function(err, res) {
20-
assert.equal(res.length, 3)
21-
cursor.end(done)
9+
var client;
10+
11+
var pgCursor = function(text, values) {
12+
client.connect()
13+
client.on('drain', client.end.bind(client))
14+
return client.query(new Cursor(text, values || []))
15+
}
16+
17+
before(function() {
18+
client = new pg.Client()
2219
})
23-
})
2420

25-
it('callback with error', function(done) {
26-
var cursor = pgCursor('select asdfasdf')
27-
cursor.read(1, function(err) {
28-
assert(err)
29-
done()
21+
22+
after(function() {
23+
client.end()
3024
})
31-
})
3225

26+
it('fetch 6 when asking for 10', function(done) {
27+
var cursor = pgCursor(text)
28+
cursor.read(10, function(err, res) {
29+
assert.ifError(err)
30+
assert.equal(res.length, 6)
31+
done()
32+
})
33+
})
3334

34-
it('read a partial chunk of data', function(done) {
35-
var cursor = pgCursor(text)
36-
cursor.read(2, function(err, res) {
37-
assert.equal(res.length, 2)
35+
it('end before reading to end', function(done) {
36+
var cursor = pgCursor(text)
3837
cursor.read(3, function(err, res) {
38+
assert.ifError(err)
3939
assert.equal(res.length, 3)
40-
cursor.read(1, function(err, res) {
41-
assert.equal(res.length, 1)
40+
cursor.end(done)
41+
})
42+
})
43+
44+
it('callback with error', function(done) {
45+
var cursor = pgCursor('select asdfasdf')
46+
cursor.read(1, function(err) {
47+
assert(err)
48+
done()
49+
})
50+
})
51+
52+
53+
it('read a partial chunk of data', function(done) {
54+
var cursor = pgCursor(text)
55+
cursor.read(2, function(err, res) {
56+
assert.ifError(err)
57+
assert.equal(res.length, 2)
58+
cursor.read(3, function(err, res) {
59+
assert.equal(res.length, 3)
4260
cursor.read(1, function(err, res) {
43-
assert.ifError(err)
44-
assert.strictEqual(res.length, 0)
45-
done()
61+
assert.equal(res.length, 1)
62+
cursor.read(1, function(err, res) {
63+
assert.ifError(err)
64+
assert.strictEqual(res.length, 0)
65+
done()
66+
})
4667
})
4768
})
4869
})
4970
})
50-
})
5171

52-
it('read return length 0 past the end', function(done) {
53-
var cursor = pgCursor(text)
54-
cursor.read(2, function(err, res) {
55-
cursor.read(100, function(err, res) {
56-
assert.equal(res.length, 4)
72+
it('read return length 0 past the end', function(done) {
73+
var cursor = pgCursor(text)
74+
cursor.read(2, function(err, res) {
5775
cursor.read(100, function(err, res) {
58-
assert.equal(res.length, 0)
59-
done()
76+
assert.equal(res.length, 4)
77+
cursor.read(100, function(err, res) {
78+
assert.equal(res.length, 0)
79+
done()
80+
})
6081
})
6182
})
6283
})
63-
})
6484

65-
it('read huge result', function(done) {
66-
this.timeout(10000)
67-
var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)'
68-
var values = []
69-
cursor = pgCursor(text, values);
70-
var count = 0;
71-
var read = function() {
72-
cursor.read(1000, function(err, rows) {
73-
if(err) return done(err);
74-
if(!rows.length) {
75-
assert.equal(count, 1000001)
76-
return done()
77-
}
78-
count += rows.length;
79-
if(count%100000 == 0) {
80-
//console.log(count)
81-
}
82-
setImmediate(read)
83-
})
84-
}
85-
read()
85+
it('read huge result', function(done) {
86+
this.timeout(10000)
87+
var text = 'SELECT generate_series as num FROM generate_series(0, 1000000)'
88+
var values = []
89+
cursor = pgCursor(text, values);
90+
var count = 0;
91+
var read = function() {
92+
cursor.read(1000, function(err, rows) {
93+
if(err) return done(err);
94+
if(!rows.length) {
95+
assert.equal(count, 1000001)
96+
return done()
97+
}
98+
count += rows.length;
99+
if(count%100000 == 0) {
100+
//console.log(count)
101+
}
102+
setImmediate(read)
103+
})
104+
}
105+
read()
106+
})
86107
})

0 commit comments

Comments
 (0)