Skip to content

Commit 258b445

Browse files
author
Paweł Guz
committed
Add queue overflow handler in asyncsender.
Signed-off-by: Paweł Guz <pawel.guz@socialwifi.com>
1 parent e00bcdd commit 258b445

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

fluent/asyncsender.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self,
5555
msgpack_kwargs=None,
5656
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
5757
queue_circular=DEFAULT_QUEUE_CIRCULAR,
58+
queue_overflow_handler=None,
5859
**kwargs):
5960
"""
6061
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
@@ -66,6 +67,10 @@ def __init__(self,
6667
**kwargs)
6768
self._queue_maxsize = queue_maxsize
6869
self._queue_circular = queue_circular
70+
if queue_circular and queue_overflow_handler:
71+
self._queue_overflow_handler = queue_overflow_handler
72+
else:
73+
self._queue_overflow_handler = self._queue_overflow_handler_default
6974

7075
self._thread_guard = threading.Event() # This ensures visibility across all variables
7176
self._closed = False
@@ -109,7 +114,8 @@ def _send(self, bytes_):
109114
if self._queue_circular and self._queue.full():
110115
# discard oldest
111116
try:
112-
self._queue.get(block=False)
117+
discarded_bytes = self._queue.get(block=False)
118+
self._queue_overflow_handler(discarded_bytes)
113119
except Empty: # pragma: no cover
114120
pass
115121
try:
@@ -132,5 +138,8 @@ def _send_loop(self):
132138
finally:
133139
self._close()
134140

141+
def _queue_overflow_handler_default(self, discarded_bytes):
142+
pass
143+
135144
def __exit__(self, exc_type, exc_val, exc_tb):
136145
self.close()

tests/test_asynchandler.py

+52
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import sys
55
import unittest
66

7+
from mock import patch
8+
from unittest import mock
9+
710
import fluent.asynchandler
811
import fluent.handler
912
from tests import mockserver
@@ -309,3 +312,52 @@ def test_simple(self):
309312
eq('userB', el[2]['to'])
310313
self.assertTrue(el[1])
311314
self.assertTrue(isinstance(el[1], int))
315+
316+
317+
class QueueOverflowException(Exception):
318+
pass
319+
320+
321+
def queue_overflow_handler(discarded_bytes):
322+
raise QueueOverflowException(discarded_bytes)
323+
324+
325+
326+
327+
class TestHandlerWithCircularQueueHandler(unittest.TestCase):
328+
Q_SIZE = 1
329+
330+
def setUp(self):
331+
super(TestHandlerWithCircularQueueHandler, self).setUp()
332+
self._server = mockserver.MockRecvServer('localhost')
333+
self._port = self._server.port
334+
335+
def tearDown(self):
336+
self._server.close()
337+
338+
def get_handler_class(self):
339+
# return fluent.handler.FluentHandler
340+
return fluent.asynchandler.FluentHandler
341+
342+
@patch.object(fluent.asynchandler.asyncsender.Queue, 'full', mock.Mock(return_value=True))
343+
def test_simple(self):
344+
handler = self.get_handler_class()('app.follow', port=self._port,
345+
queue_maxsize=self.Q_SIZE,
346+
queue_circular=True,
347+
queue_overflow_handler=queue_overflow_handler)
348+
with handler:
349+
self.assertEqual(handler.sender.queue_circular, True)
350+
self.assertEqual(handler.sender.queue_maxsize, self.Q_SIZE)
351+
352+
logging.basicConfig(level=logging.INFO)
353+
log = logging.getLogger('fluent.test')
354+
handler.setFormatter(fluent.handler.FluentRecordFormatter())
355+
log.addHandler(handler)
356+
357+
log.info({'cnt': 1, 'from': 'userA', 'to': 'userB'})
358+
with self.assertRaises(QueueOverflowException):
359+
log.info({'cnt': 2, 'from': 'userA', 'to': 'userB'})
360+
log.info({'cnt': 3, 'from': 'userA', 'to': 'userB'})
361+
with self.assertRaises(QueueOverflowException):
362+
log.info({'cnt': 4, 'from': 'userA', 'to': 'userB'})
363+

0 commit comments

Comments
 (0)