Skip to content

Commit c5c3114

Browse files
antonbrianc
anton
authored andcommitted
1. behave correctly if copy to/from is send to db by query method (report error in standart way); 2. bugfix and code review in native copy to implementation
1 parent 868a9d0 commit c5c3114

File tree

5 files changed

+95
-47
lines changed

5 files changed

+95
-47
lines changed

lib/client.js

+6
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,12 @@ p.connect = function(callback) {
108108
con.on('copyInResponse', function(msg) {
109109
self.activeQuery.streamData(self.connection);
110110
});
111+
con.on('copyOutResponse', function(msg) {
112+
if (self.activeQuery.stream === undefined) {
113+
self.activeQuery._canceledDueToError = new Error('No destination stream defined');
114+
(new self.constructor(self.config)).cancel(self, self.activeQuery);
115+
}
116+
});
111117
con.on('copyData', function (msg) {
112118
self.activeQuery.handleCopyFromChunk(msg.chunk);
113119
});

lib/native/index.js

+11-3
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ p.copyTo = function (text) {
7272
p.sendCopyFromChunk = function (chunk) {
7373
this._sendCopyFromChunk(chunk);
7474
};
75-
p.endCopyFrom = function () {
76-
this._endCopyFrom();
75+
p.endCopyFrom = function (msg) {
76+
this._endCopyFrom(msg);
7777
};
7878
p.query = function(config, values, callback) {
7979
var query = (config instanceof NativeQuery) ? config : new NativeQuery(config, values, callback);
@@ -134,7 +134,9 @@ p.resumeDrain = function() {
134134
};
135135
this._drainPaused = 0;
136136
};
137-
137+
p.sendCopyFail = function(msg) {
138+
this.endCopyFrom(msg);
139+
};
138140
var clientBuilder = function(config) {
139141
config = config || {};
140142
var connection = new Connection();
@@ -198,6 +200,12 @@ var clientBuilder = function(config) {
198200
//start to send data from stream
199201
connection._activeQuery.streamData(connection);
200202
});
203+
connection.on('copyOutResponse', function(msg) {
204+
if (connection._activeQuery.stream === undefined) {
205+
connection._activeQuery._canceledDueToError = new Error('No destination stream defined');
206+
(new clientBuilder(connection.config)).cancel(connection, connection._activeQuery);
207+
}
208+
});
201209
connection.on('copyData', function (chunk) {
202210
//recieve chunk from connection
203211
//move it to stream

lib/native/query.js

+13-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ var NativeQuery = function(config, values, callback) {
2626
this.values[i] = utils.prepareValue(this.values[i]);
2727
}
2828
}
29+
this._canceledDueToError = false;
2930
};
3031

3132
util.inherits(NativeQuery, EventEmitter);
@@ -50,6 +51,10 @@ p.handleRow = function(rowData) {
5051
};
5152

5253
p.handleError = function(error) {
54+
if (this._canceledDueToError) {
55+
error = this._canceledDueToError;
56+
this._canceledDueToError = false;
57+
}
5358
if(this.callback) {
5459
this.callback(error);
5560
this.callback = null;
@@ -68,9 +73,15 @@ p.handleReadyForQuery = function(meta) {
6873
this.emit('end', this._result);
6974
};
7075
p.streamData = function (connection) {
71-
this.stream.startStreamingToConnection(connection);
76+
if ( this.stream ) this.stream.startStreamingToConnection(connection);
77+
else connection.sendCopyFail('No source stream defined');
7278
};
7379
p.handleCopyFromChunk = function (chunk) {
74-
this.stream.handleChunk(chunk);
80+
if ( this.stream ) {
81+
this.stream.handleChunk(chunk);
82+
}
83+
//if there are no stream (for example when copy to query was sent by
84+
//query method instead of copyTo) error will be handled
85+
//on copyOutResponse event, so silently ignore this error here
7586
}
7687
module.exports = NativeQuery;

lib/query.js

+11-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ var Query = function(config, values, callback) {
2525
this._fieldConverters = [];
2626
this._result = new Result();
2727
this.isPreparedStatement = false;
28+
this._canceledDueToError = false;
2829
EventEmitter.call(this);
2930
};
3031

@@ -99,6 +100,10 @@ p.handleReadyForQuery = function() {
99100
};
100101

101102
p.handleError = function(err) {
103+
if (this._canceledDueToError) {
104+
err = this._canceledDueToError;
105+
this._canceledDueToError = false;
106+
}
102107
//if callback supplied do not emit error event as uncaught error
103108
//events will bubble up to node process
104109
if(this.callback) {
@@ -174,10 +179,11 @@ p.streamData = function (connection) {
174179
else connection.sendCopyFail('No source stream defined');
175180
};
176181
p.handleCopyFromChunk = function (chunk) {
177-
if ( this.stream ) this.stream.handleChunk(chunk);
178-
else {
179-
// TODO: signal the problem somehow
180-
//this.handleError(new Error('error', 'No destination stream defined'));
181-
}
182+
if ( this.stream ) {
183+
this.stream.handleChunk(chunk);
184+
}
185+
//if there are no stream (for example when copy to query was sent by
186+
//query method instead of copyTo) error will be handled
187+
//on copyOutResponse event, so silently ignore this error here
182188
}
183189
module.exports = Query;

src/binding.cc

+54-37
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,14 @@ class Connection : public ObjectWrap {
248248
bool connecting_;
249249
bool ioInitialized_;
250250
bool copyOutMode_;
251+
bool copyInMode_;
251252
Connection () : ObjectWrap ()
252253
{
253254
connection_ = NULL;
254255
connecting_ = false;
255256
ioInitialized_ = false;
256257
copyOutMode_ = false;
258+
copyInMode_ = false;
257259
TRACE("Initializing ev watchers");
258260
read_watcher_.data = this;
259261
write_watcher_.data = this;
@@ -278,8 +280,13 @@ class Connection : public ObjectWrap {
278280
EndCopyFrom(const Arguments& args) {
279281
HandleScope scope;
280282
Connection *self = ObjectWrap::Unwrap<Connection>(args.This());
283+
char * error_msg = NULL;
284+
if (args[0]->IsString()) {
285+
error_msg = MallocCString(args[0]);
286+
}
281287
//TODO handle errors in some way
282-
self->EndCopyFrom();
288+
self->EndCopyFrom(error_msg);
289+
free(error_msg);
283290
return Undefined();
284291
}
285292

@@ -433,23 +440,19 @@ class Connection : public ObjectWrap {
433440
if (this->copyOutMode_) {
434441
this->HandleCopyOut();
435442
}
436-
if (PQisBusy(connection_) == 0) {
443+
if (!this->copyInMode_ && !this->copyOutMode_ && PQisBusy(connection_) == 0) {
437444
PGresult *result;
438445
bool didHandleResult = false;
439446
while ((result = PQgetResult(connection_))) {
440-
if (PGRES_COPY_IN == PQresultStatus(result)) {
441-
didHandleResult = false;
442-
Emit("copyInResponse");
443-
PQclear(result);
447+
didHandleResult = HandleResult(result);
448+
PQclear(result);
449+
if(!didHandleResult) {
450+
//this means that we are in copy in or copy out mode
451+
//in this situation PQgetResult will return same
452+
//result untill all data will be read (copy out) or
453+
//until data end notification (copy in)
454+
//and because of this, we need to break cycle
444455
break;
445-
} else if (PGRES_COPY_OUT == PQresultStatus(result)) {
446-
PQclear(result);
447-
this->copyOutMode_ = true;
448-
didHandleResult = this->HandleCopyOut();
449-
} else {
450-
HandleResult(result);
451-
didHandleResult = true;
452-
PQclear(result);
453456
}
454457
}
455458
//might have fired from notification
@@ -479,52 +482,65 @@ class Connection : public ObjectWrap {
479482
}
480483
bool HandleCopyOut () {
481484
char * buffer = NULL;
482-
int copied = PQgetCopyData(connection_, &buffer, 1);
483-
if (copied > 0) {
484-
Buffer * chunk = Buffer::New(buffer, copied);
485+
int copied;
486+
Buffer * chunk;
487+
copied = PQgetCopyData(connection_, &buffer, 1);
488+
while (copied > 0) {
489+
chunk = Buffer::New(buffer, copied);
485490
Handle<Value> node_chunk = chunk->handle_;
486491
Emit("copyData", &node_chunk);
487492
PQfreemem(buffer);
488-
//result was not handled copmpletely
489-
return false;
490-
} else if (copied == 0) {
493+
copied = PQgetCopyData(connection_, &buffer, 1);
494+
}
495+
if (copied == 0) {
491496
//wait for next read ready
492497
//result was not handled copmpletely
493498
return false;
494499
} else if (copied == -1) {
495-
PGresult *result;
496-
//result is handled completely
497500
this->copyOutMode_ = false;
498-
if (PQisBusy(connection_) == 0 && (result = PQgetResult(connection_))) {
499-
HandleResult(result);
500-
PQclear(result);
501-
return true;
502-
} else {
503-
return false;
504-
}
501+
return true;
505502
} else if (copied == -2) {
506-
//TODO error handling
507-
//result is handled with error
508-
HandleErrorResult(NULL);
503+
this->copyOutMode_ = false;
509504
return true;
510505
}
511506
}
512-
void HandleResult(PGresult* result)
507+
bool HandleResult(PGresult* result)
513508
{
514509
ExecStatusType status = PQresultStatus(result);
515510
switch(status) {
516511
case PGRES_TUPLES_OK:
517512
{
518513
HandleTuplesResult(result);
519514
EmitCommandMetaData(result);
515+
return true;
520516
}
521517
break;
522518
case PGRES_FATAL_ERROR:
523-
HandleErrorResult(result);
519+
{
520+
HandleErrorResult(result);
521+
return true;
522+
}
524523
break;
525524
case PGRES_COMMAND_OK:
526525
case PGRES_EMPTY_QUERY:
527-
EmitCommandMetaData(result);
526+
{
527+
EmitCommandMetaData(result);
528+
return true;
529+
}
530+
break;
531+
case PGRES_COPY_IN:
532+
{
533+
this->copyInMode_ = true;
534+
Emit("copyInResponse");
535+
return false;
536+
}
537+
break;
538+
case PGRES_COPY_OUT:
539+
{
540+
this->copyOutMode_ = true;
541+
Emit("copyOutResponse");
542+
return this->HandleCopyOut();
543+
}
528544
break;
529545
default:
530546
printf("YOU SHOULD NEVER SEE THIS! PLEASE OPEN AN ISSUE ON GITHUB! Unrecogized query status: %s\n", PQresStatus(status));
@@ -772,8 +788,9 @@ class Connection : public ObjectWrap {
772788
void SendCopyFromChunk(Handle<Object> chunk) {
773789
PQputCopyData(connection_, Buffer::Data(chunk), Buffer::Length(chunk));
774790
}
775-
void EndCopyFrom() {
776-
PQputCopyEnd(connection_, NULL);
791+
void EndCopyFrom(char * error_msg) {
792+
PQputCopyEnd(connection_, error_msg);
793+
this->copyInMode_ = false;
777794
}
778795

779796
};

0 commit comments

Comments
 (0)