1
1
var Native = require ( 'pg-native' ) ;
2
2
var EventEmitter = require ( 'events' ) . EventEmitter ;
3
3
var util = require ( 'util' ) ;
4
+ var ConnectionParameters = require ( __dirname + '/../connection-parameters' ) ;
4
5
5
6
var NativeQuery = require ( './query' ) ;
6
7
@@ -11,6 +12,16 @@ var Client = module.exports = function(config) {
11
12
}
12
13
this . native = new Native ( ) ;
13
14
this . _queryQueue = [ ] ;
15
+ this . _connected = false ;
16
+
17
+ //keep these on the object for legacy reasons
18
+ //for the time being. TODO: deprecate all this jazz
19
+ var cp = new ConnectionParameters ( config ) ;
20
+ this . user = cp . user ;
21
+ this . password = cp . password ;
22
+ this . database = cp . database ;
23
+ this . host = cp . host ;
24
+ this . port = cp . port ;
14
25
} ;
15
26
16
27
util . inherits ( Client , EventEmitter ) ;
@@ -73,7 +84,11 @@ Client.prototype.query = function(config, values, callback) {
73
84
} ;
74
85
75
86
Client . prototype . end = function ( cb ) {
76
- this . native . end ( cb ) ;
87
+ var self = this ;
88
+ this . native . end ( function ( ) {
89
+ self . emit ( 'end' ) ;
90
+ if ( cb ) cb ( ) ;
91
+ } ) ;
77
92
} ;
78
93
79
94
Client . prototype . _pulseQueryQueue = function ( initialConnection ) {
@@ -99,223 +114,3 @@ Client.prototype._pulseQueryQueue = function(initialConnection) {
99
114
self . _pulseQueryQueue ( ) ;
100
115
} ) ;
101
116
} ;
102
-
103
-
104
- return ;
105
- //require the c++ bindings & export to javascript
106
- var EventEmitter = require ( 'events' ) . EventEmitter ;
107
-
108
- var ConnectionParameters = require ( __dirname + '/../connection-parameters' ) ;
109
- var CopyFromStream = require ( __dirname + '/../copystream' ) . CopyFromStream ;
110
- var CopyToStream = require ( __dirname + '/../copystream' ) . CopyToStream ;
111
- var JsClient = require ( __dirname + '/../client' ) ; // used to import JS escape functions
112
-
113
- var binding ;
114
-
115
- //TODO remove on v1.0.0
116
- try {
117
- //v0.5.x
118
- binding = require ( __dirname + '/../../build/Release/binding.node' ) ;
119
- } catch ( e ) {
120
- //v0.4.x
121
- binding = require ( __dirname + '/../../build/default/binding' ) ;
122
- }
123
-
124
- var Connection = binding . Connection ;
125
- var NativeQuery = require ( __dirname + '/query' ) ;
126
-
127
- for ( var k in EventEmitter . prototype ) {
128
- Connection . prototype [ k ] = EventEmitter . prototype [ k ] ;
129
- }
130
-
131
- var nativeConnect = Connection . prototype . connect ;
132
-
133
- Connection . prototype . connect = function ( cb ) {
134
- var self = this ;
135
- this . connectionParameters . getLibpqConnectionString ( function ( err , conString ) {
136
- if ( err ) {
137
- return cb ? cb ( err ) : self . emit ( 'error' , err ) ;
138
- }
139
- if ( cb ) {
140
- var errCallback ;
141
- var connectCallback = function ( ) {
142
- //remove single-fire connection error callback
143
- self . removeListener ( 'error' , errCallback ) ;
144
- cb ( null ) ;
145
- } ;
146
- errCallback = function ( err ) {
147
- //remove singel-fire connection success callback
148
- self . removeListener ( 'connect' , connectCallback ) ;
149
- cb ( err ) ;
150
- } ;
151
- self . once ( 'connect' , connectCallback ) ;
152
- self . once ( 'error' , errCallback ) ;
153
- }
154
- nativeConnect . call ( self , conString ) ;
155
- } ) ;
156
- } ;
157
-
158
- Connection . prototype . _copy = function ( text , stream ) {
159
- var q = new NativeQuery ( text , function ( error ) {
160
- if ( error ) {
161
- q . stream . error ( error ) ;
162
- } else {
163
- q . stream . close ( ) ;
164
- }
165
- } ) ;
166
- q . stream = stream ;
167
- this . _queryQueue . push ( q ) ;
168
- this . _pulseQueryQueue ( ) ;
169
- return q . stream ;
170
- } ;
171
-
172
- Connection . prototype . copyFrom = function ( text ) {
173
- return this . _copy ( text , new CopyFromStream ( ) ) ;
174
- } ;
175
-
176
- Connection . prototype . copyTo = function ( text ) {
177
- return this . _copy ( text , new CopyToStream ( ) ) ;
178
- } ;
179
-
180
- Connection . prototype . sendCopyFromChunk = function ( chunk ) {
181
- this . _sendCopyFromChunk ( chunk ) ;
182
- } ;
183
-
184
- Connection . prototype . endCopyFrom = function ( msg ) {
185
- this . _endCopyFrom ( msg ) ;
186
- } ;
187
-
188
- // use JS version if native version undefined
189
- // happens when PG version < 9.0.0
190
- if ( ! Connection . prototype . escapeIdentifier ) {
191
- Connection . prototype . escapeIdentifier = JsClient . prototype . escapeIdentifier ;
192
- }
193
- if ( ! Connection . prototype . escapeLiteral ) {
194
- Connection . prototype . escapeLiteral = JsClient . prototype . escapeLiteral ;
195
- }
196
-
197
- Connection . prototype . query = function ( config , values , callback ) {
198
- var query = ( config instanceof NativeQuery ) ? config :
199
- new NativeQuery ( config , values , callback ) ;
200
- this . _queryQueue . push ( query ) ;
201
- this . _pulseQueryQueue ( ) ;
202
- return query ;
203
- } ;
204
-
205
- var nativeCancel = Connection . prototype . cancel ;
206
-
207
- Connection . prototype . cancel = function ( client , query ) {
208
- if ( client . _activeQuery == query ) {
209
- this . connect ( nativeCancel . bind ( client ) ) ;
210
- } else if ( client . _queryQueue . indexOf ( query ) != - 1 ) {
211
- client . _queryQueue . splice ( client . _queryQueue . indexOf ( query ) , 1 ) ;
212
- }
213
- } ;
214
-
215
- Connection . prototype . sendCopyFail = function ( msg ) {
216
- this . endCopyFrom ( msg ) ;
217
- } ;
218
-
219
- var clientBuilder = function ( config ) {
220
- config = config || { } ;
221
- var connection = new Connection ( ) ;
222
- EventEmitter . call ( connection ) ;
223
- connection . _queryQueue = [ ] ;
224
- connection . _namedQueries = { } ;
225
- connection . _activeQuery = null ;
226
- connection . connectionParameters = new ConnectionParameters ( config ) ;
227
- //attach properties to normalize interface with pure js client
228
- connection . user = connection . connectionParameters . user ;
229
- connection . password = connection . connectionParameters . password ;
230
- connection . database = connection . connectionParameters . database ;
231
- connection . host = connection . connectionParameters . host ;
232
- connection . port = connection . connectionParameters . port ;
233
- connection . on ( 'connect' , function ( ) {
234
- connection . _connected = true ;
235
- connection . _pulseQueryQueue ( true ) ;
236
- } ) ;
237
-
238
- connection . on ( '_rowDescription' , function ( rowDescription ) {
239
- connection . _activeQuery . handleRowDescription ( rowDescription ) ;
240
- } ) ;
241
-
242
- //proxy some events to active query
243
- connection . on ( '_row' , function ( row ) {
244
- connection . _activeQuery . handleRow ( row ) ;
245
- } ) ;
246
-
247
- connection . on ( '_cmdStatus' , function ( status ) {
248
- //set this here so we can pass it to the query
249
- //when the query completes
250
- connection . _lastMeta = status ;
251
- } ) ;
252
-
253
- //TODO: emit more native error properties (make it match js error)
254
- connection . on ( '_error' , function ( err ) {
255
- //create Error object from object literal
256
- var error = new Error ( err . message || "Unknown native driver error" ) ;
257
- for ( var key in err ) {
258
- error [ key ] = err [ key ] ;
259
- }
260
-
261
- //give up on trying to wait for named query prepare
262
- this . _namedQuery = false ;
263
- if ( connection . _activeQuery ) {
264
- connection . _activeQuery . handleError ( error ) ;
265
- } else {
266
- connection . emit ( 'error' , error ) ;
267
- }
268
- } ) ;
269
-
270
- connection . on ( '_end' , function ( ) {
271
- process . nextTick ( function ( ) {
272
- if ( connection . _activeQuery ) {
273
- connection . _activeQuery . handleError ( new Error ( "Connection was ended during query" ) ) ;
274
- }
275
- connection . emit ( 'end' ) ;
276
- } ) ;
277
- } ) ;
278
-
279
- connection . on ( '_readyForQuery' , function ( ) {
280
- var error ;
281
- var q = this . _activeQuery ;
282
- //a named query finished being prepared
283
- if ( this . _namedQuery ) {
284
- this . _namedQuery = false ;
285
- this . _sendQueryPrepared ( q . name , q . values || [ ] ) ;
286
- } else {
287
- //try/catch/rethrow to ensure exceptions don't prevent the queryQueue from
288
- //being processed
289
- try {
290
- connection . _activeQuery . handleReadyForQuery ( connection . _lastMeta ) ;
291
- } catch ( e ) {
292
- error = e ;
293
- }
294
- connection . _activeQuery = null ;
295
- connection . _pulseQueryQueue ( ) ;
296
- if ( error ) throw error ;
297
- }
298
- } ) ;
299
- connection . on ( 'copyInResponse' , function ( ) {
300
- //connection is ready to accept chunks
301
- //start to send data from stream
302
- connection . _activeQuery . streamData ( connection ) ;
303
- } ) ;
304
- connection . on ( 'copyOutResponse' , function ( msg ) {
305
- if ( connection . _activeQuery . stream === undefined ) {
306
- connection . _activeQuery . _canceledDueToError = new Error ( 'No destination stream defined' ) ;
307
- ( new clientBuilder ( { port : connection . port , host : connection . host } ) ) . cancel ( connection , connection . _activeQuery ) ;
308
- }
309
- } ) ;
310
- connection . on ( 'copyData' , function ( chunk ) {
311
- //recieve chunk from connection
312
- //move it to stream
313
- connection . _activeQuery . handleCopyFromChunk ( chunk ) ;
314
- } ) ;
315
- return connection ;
316
- } ;
317
-
318
- // expose a Query constructor
319
- clientBuilder . Query = NativeQuery ;
320
-
321
- module . exports = clientBuilder ;
0 commit comments