Skip to content

Commit a045b8c

Browse files
authored
Merge pull request #155 from mitsos1os/fix-maultiple-tags-queue-flush
Fix packets on multiple tags get corrput and multiple calls of callbacks on error
2 parents 77a356b + 4ba8610 commit a045b8c

File tree

3 files changed

+66
-25
lines changed

3 files changed

+66
-25
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
node_modules
2+
/package-lock.json

lib/sender.js

+30-25
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,9 @@ class FluentSender {
159159
}
160160

161161
const packet = [tag, time, data];
162-
let options = {};
162+
const options = {};
163163
if (this.requireAckResponse) {
164-
options = {
165-
chunk: crypto.randomBytes(16).toString('base64')
166-
};
164+
options.chunk = crypto.randomBytes(16).toString('base64');
167165
packet.push(options);
168166
}
169167
return {
@@ -186,20 +184,25 @@ class FluentSender {
186184
_push(tag, time, data, callback) {
187185
if (this._eventMode === 'Message') {
188186
// Message mode
189-
let item = this._makePacketItem(tag, time, data);
187+
const item = this._makePacketItem(tag, time, data);
190188
item.callback = callback;
191189
this._sendQueue.push(item);
192190
} else {
193191
// PackedForward mode
194192
const eventEntry = this._makeEventEntry(time, data);
195193
this._sendQueueSize += eventEntry.length;
196194
if (this._sendQueue.has(tag)) {
197-
let eventEntryData = this._sendQueue.get(tag);
195+
const eventEntryData = this._sendQueue.get(tag);
198196
eventEntryData.eventEntries.push(eventEntry);
197+
eventEntryData.size += eventEntry.length;
199198
if (callback) eventEntryData.callbacks.push(callback);
200199
} else {
201200
const callbacks = callback ? [callback] : [];
202-
this._sendQueue.set(tag, { eventEntries: [eventEntry], callbacks: callbacks });
201+
this._sendQueue.set(tag, {
202+
eventEntries: [eventEntry],
203+
size: eventEntry.length,
204+
callbacks: callbacks
205+
});
203206
}
204207
}
205208
}
@@ -225,8 +228,8 @@ class FluentSender {
225228
}
226229

227230
_doConnect(callback) {
228-
let addHandlers = () => {
229-
let errorHandler = (err) => {
231+
const addHandlers = () => {
232+
const errorHandler = (err) => {
230233
if (this._socket) {
231234
this._disconnect();
232235
this._handleEvent('error', err);
@@ -260,7 +263,7 @@ class FluentSender {
260263
});
261264
}
262265
} else {
263-
let postConnect = () => {
266+
const postConnect = () => {
264267
if (this.security.clientHostname && this.security.sharedKey !== null) {
265268
this._handshake(callback);
266269
} else {
@@ -362,7 +365,7 @@ class FluentSender {
362365

363366
_doFlushSendQueue(timeoutId) {
364367
if (this._eventMode === 'Message') {
365-
let item = this._sendQueue.shift();
368+
const item = this._sendQueue.shift();
366369
if (item === undefined) {
367370
this._flushingSendQueue = false;
368371
// nothing written;
@@ -374,10 +377,10 @@ class FluentSender {
374377
this._flushingSendQueue = false;
375378
return;
376379
}
377-
let first = this._sendQueue.entries().next().value;
378-
let tag = first[0];
379-
let eventEntryData = first[1];
380-
let entries = Buffer.concat(eventEntryData.eventEntries, this._sendQueueSize);
380+
const first = this._sendQueue.entries().next().value;
381+
const tag = first[0];
382+
const eventEntryData = first[1];
383+
let entries = Buffer.concat(eventEntryData.eventEntries, eventEntryData.size);
381384
let size = entries.length;
382385
this._sendQueue.delete(tag);
383386
if (this._compressed) {
@@ -387,15 +390,16 @@ class FluentSender {
387390
const options = {
388391
chunk: crypto.randomBytes(16).toString('base64'),
389392
size: size,
390-
compressed: this._compressed ? 'gzip' : 'text'
393+
compressed: this._compressed ? 'gzip' : 'text',
394+
eventEntryDataSize: eventEntryData.size
391395
};
392396
const packet = msgpack.encode([tag, entries, options], { codec: codec });
393397
this._doWrite(packet, options, timeoutId, eventEntryData.callbacks);
394398
}
395399
}
396400

397401
_doWrite(packet, options, timeoutId, callbacks) {
398-
const sendQueueSize = this._sendQueueSize;
402+
const sendPacketSize = (options && options.eventEntryDataSize) || this._sendQueueSize;
399403
this._socket.write(packet, () => {
400404
if (this.requireAckResponse) {
401405
this._socket.once('data', (data) => {
@@ -409,11 +413,12 @@ class FluentSender {
409413
callbacks.forEach((callback) => {
410414
this._handleEvent('error', error, callback);
411415
});
416+
} else { // no error on ack
417+
callbacks.forEach((callback) => {
418+
callback && callback();
419+
});
412420
}
413-
this._sendQueueSize -= sendQueueSize;
414-
callbacks.forEach((callback) => {
415-
callback && callback();
416-
});
421+
this._sendQueueSize -= sendPacketSize;
417422
process.nextTick(() => {
418423
this._waitToWrite();
419424
});
@@ -425,7 +430,7 @@ class FluentSender {
425430
});
426431
}, this.ackResponseTimeout);
427432
} else {
428-
this._sendQueueSize -= sendQueueSize;
433+
this._sendQueueSize -= sendPacketSize;
429434
callbacks.forEach((callback) => {
430435
callback && callback();
431436
});
@@ -452,10 +457,10 @@ class FluentSender {
452457
this._status = null;
453458
this.internalLogger.error('Fluentd error', error);
454459
this.internalLogger.info('Fluentd will reconnect after ' + this.reconnectInterval / 1000 + ' seconds');
455-
let timeoutId = setTimeout(() => {
460+
const timeoutId = setTimeout(() => {
456461
this.internalLogger.info('Fluentd is reconnecting...');
457462
this._connect(() => {
458-
this._flushSendQueue()
463+
this._flushSendQueue();
459464
this.internalLogger.info('Fluentd reconnection finished!!');
460465
});
461466
}, this.reconnectInterval);
@@ -562,7 +567,7 @@ class FluentSender {
562567
let dataString = '';
563568
writable._write = (chunk, encoding, callback) => {
564569
const dataArray = chunk.toString(defaultEncoding).split(/\n/);
565-
let next = () => {
570+
const next = () => {
566571
if (dataArray.length) {
567572
dataString += dataArray.shift();
568573
}

test/test.sender.js

+35
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,41 @@ let doTest = (tls) => {
316316
}, 100);
317317
});
318318

319+
it('should send messages with different tags correctly in PackedForward', (done) => {
320+
runServer({}, serverOptions, (server, finish) => {
321+
const s1 = new FluentSender('debug', Object.assign({}, clientOptions, {
322+
port: server.port,
323+
eventMode: 'PackedForward'
324+
}));
325+
const emits = [];
326+
const total = 4;
327+
function emit(messageData) {
328+
emits.push((asyncDone) => {
329+
if (messageData.number === total) { // end
330+
s1.emit(`multi-${messageData.number}`, { text: messageData.text}, asyncDone); // wait for send
331+
} else {
332+
s1.emit(`multi-${messageData.number}`, { text: messageData.text});
333+
asyncDone(); // run immediately do not wait for ack
334+
}
335+
});
336+
}
337+
for (let i = 0; i <= total; i++) {
338+
emit({ number: i, text: `This is text No ${i}` });
339+
}
340+
emits.push(() => {
341+
finish((data) => {
342+
expect(data.length).to.be.equal(5);
343+
data.forEach((element, index) => {
344+
expect(element.tag).to.be.equal(`debug.multi-${index}`);
345+
expect(element.data.text).to.be.equal(`This is text No ${index}`);
346+
});
347+
done();
348+
});
349+
});
350+
async.series(emits);
351+
});
352+
});
353+
319354
[
320355
{
321356
name: 'tag and record',

0 commit comments

Comments
 (0)