Skip to content

Commit d81e981

Browse files
author
calibr
committed
add messageQueueSizeLimit option
1 parent 77a356b commit d81e981

File tree

3 files changed

+28
-0
lines changed

3 files changed

+28
-0
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ Set flush interval in milliseconds. This option has no effect in `Message` mode.
367367
The logger stores emitted events in buffer and flush events for each interval.
368368
Default `100`.
369369
370+
**messageQueueSizeLimit**
371+
372+
Maximum number of messages that can be in queue at the same time. If a new message is received and it overflows the queue then the oldest message will be removed before adding the new item. This option has effect only in `Message` mode. No limit by default.
373+
370374
**security.clientHostname**
371375
372376
Set hostname of this logger. Use this value for hostname based authentication.

lib/sender.js

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class FluentSender {
3737
if (this._eventMode === 'Message') {
3838
this._sendQueue = []; // queue for items waiting for being sent.
3939
this._flushInterval = 0;
40+
this._messageQueueSizeLimit = options.messageQueueSizeLimit || 0;
4041
} else {
4142
this._sendQueue = new Map();
4243
this._flushInterval = options.flushInterval || 100;
@@ -188,6 +189,9 @@ class FluentSender {
188189
// Message mode
189190
let item = this._makePacketItem(tag, time, data);
190191
item.callback = callback;
192+
if (this._messageQueueSizeLimit && this._sendQueue.length === this._messageQueueSizeLimit) {
193+
this._sendQueue.shift();
194+
}
191195
this._sendQueue.push(item);
192196
} else {
193197
// PackedForward mode

test/test.sender.js

+20
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,26 @@ let doTest = (tls) => {
912912
});
913913
});
914914
});
915+
916+
it('should limit messages stored in queue if server is not available', (done) => {
917+
runServer({}, serverOptions, (server, finish) => {
918+
finish(() => {
919+
const s = new FluentSender('debug', Object.assign({}, clientOptions, {
920+
port: server.port,
921+
messageQueueSizeLimit: 3
922+
}));
923+
s.emit('message1', {});
924+
s.emit('message2', {});
925+
s.emit('message3', {});
926+
s.emit('message4', {});
927+
expect(s._sendQueue.length).to.be.equal(3);
928+
expect(s._sendQueue[0].tag).to.be.equal('debug.message2');
929+
expect(s._sendQueue[1].tag).to.be.equal('debug.message3');
930+
expect(s._sendQueue[2].tag).to.be.equal('debug.message4');
931+
done();
932+
});
933+
});
934+
});
915935
};
916936

917937
describe('FluentSender', () => {

0 commit comments

Comments
 (0)