Skip to content

Add test for IPv6 functionality #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ can also specify remote logger by passing the options.
# for remote fluent
logger = sender.FluentSender('app', host='host', port=24224)

The logger will prefer using IPv4 and fall back to IPv6 by default. Should you wish to prefer
IPv6 and fall back to IPv4, specify `prefer_ipv6` option as `True`.

.. code:: python

# for remote fluent preferring IPv6, falling back to IPv4
logger = sender.FluentSender('app', host='host', port=24224, prefer_ipv6=True)

For sending event, call `emit` method with your event. Following example will send the event to
fluentd, with tag 'app.follow' and the attributes 'from' and 'to'.

Expand Down
31 changes: 28 additions & 3 deletions fluent/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
prefer_ipv6=False,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
Expand All @@ -69,6 +70,8 @@ def __init__(self,
self.msgpack_kwargs = {} if msgpack_kwargs is None else msgpack_kwargs

self.socket = None
self.prefer_ipv6 = prefer_ipv6
self.ip_addr_family = None
self.pendings = None
self.lock = threading.Lock()
self._closed = False
Expand Down Expand Up @@ -120,6 +123,20 @@ def close(self):
self._close()
self.pendings = None

def _find_ip_addr_family(self):
if not self.prefer_ipv6:
try:
socket.getaddrinfo(self.host, None, socket.AF_INET)
return socket.AF_INET
except socket.error:
return socket.AF_INET6
else:
try:
socket.getaddrinfo(self.host, None, socket.AF_INET6)
return socket.AF_INET6
except socket.error:
return socket.AF_INET

def _make_packet(self, label, timestamp, data):
if label:
tag = '.'.join((self.tag, label))
Expand Down Expand Up @@ -180,8 +197,13 @@ def _check_recv_side(self):
self.socket.settimeout(self.timeout)

def _send_data(self, bytes_):
# reconnect if possible
self._reconnect()
try:
# reconnect if possible
self._reconnect()
except Exception as e:
# try once more but redetermine v4/v6 capability
self.ip_addr_family = None
self._reconnect()
# send message
bytes_to_send = len(bytes_)
bytes_sent = 0
Expand All @@ -201,7 +223,10 @@ def _reconnect(self):
sock.settimeout(self.timeout)
sock.connect(self.host[len('unix://'):])
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if not self.ip_addr_family:
self.ip_addr_family = self._find_ip_addr_family()
sock = socket.socket(self.ip_addr_family,
socket.SOCK_STREAM)
sock.settimeout(self.timeout)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to call _is_ipv4_host() in every _make_packet?
It may be able to reuse checked status.

# This might be controversial and may need to be removed
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
Expand Down
8 changes: 4 additions & 4 deletions tests/mockserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@ class MockRecvServer(threading.Thread):
Single threaded server accepts one connection and recv until EOF.
"""

def __init__(self, host='localhost', port=0):
def __init__(self, host='localhost', port=0, inet_family=socket.AF_INET):
super(MockRecvServer, self).__init__()

if host.startswith('unix://'):
self.socket_proto = socket.AF_UNIX
self.socket_type = socket.SOCK_STREAM
self.socket_addr = host[len('unix://'):]
else:
self.socket_proto = socket.AF_INET
self.socket_proto = inet_family
self.socket_type = socket.SOCK_STREAM
self.socket_addr = (host, port)

self._sock = socket.socket(self.socket_proto, self.socket_type)
self._sock.bind(self.socket_addr)
if self.socket_proto == socket.AF_INET:
if self.socket_proto == inet_family:
self.port = self._sock.getsockname()[1]

self._sock.listen(1)
Expand Down Expand Up @@ -76,7 +76,7 @@ def close(self):
pass

try:
conn = socket.socket(socket.AF_INET,
conn = socket.socket(self.socket_proto,
socket.SOCK_STREAM)
try:
conn.connect((self.socket_addr[0], self.port))
Expand Down
98 changes: 98 additions & 0 deletions tests/test_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import unittest
from shutil import rmtree
from tempfile import mkdtemp
from unittest.mock import patch

import msgpack

Expand Down Expand Up @@ -291,6 +292,103 @@ def recv(self, bufsize, flags=0):
finally:
self._sender.socket = old_sock

def test_ipv6_preferred_but_not_avail(self):
real_getaddrinfo = socket.getaddrinfo

def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if family == socket.AF_INET6:
raise socket.gaierror("mock: IPv4 Only")
else:
return real_getaddrinfo(host, port, family, type, proto, flags)
with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo):
sender = fluent.sender.FluentSender(tag='test',
host='localhost',
port=self._server.port,
prefer_ipv6=True)
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})

def test_ipv6_disappeared(self):
sender = fluent.sender.FluentSender(tag='test',
host='127.0.0.1',
port=self._server.port,
prefer_ipv6=True)
# here we cause sender to believe it already determined IPv6, but want
# it to re-test when IPv6 stops working
sender.ip_addr_family = socket.AF_INET6
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})

def test_ipv6_only(self):
# Test if our host supports IPv6 before running this test
try:
socket.gethostbyaddr('::1')
except socket.herror:
self.skipTest("Host does not support IPv6, cannot run this test")

self.tearDown()

real_getaddrinfo = socket.getaddrinfo

def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if family == socket.AF_INET:
raise socket.gaierror("mock: IPv6 Only")
else:
return real_getaddrinfo(host, port, family, type, proto, flags)

self._server = mockserver.MockRecvServer(host='localhost',
inet_family=socket.AF_INET6)


with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo):
sender = fluent.sender.FluentSender(tag='test',
host='localhost',
port=self._server.port)
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})

def test_ipv6_preferred(self):
# Test if our host supports IPv6 before running this test
try:
socket.gethostbyaddr('::1')
except socket.herror:
self.skipTest("Host does not support IPv6, cannot run this test")

self.tearDown()

real_getaddrinfo = socket.getaddrinfo

def _fake_getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
if family == socket.AF_INET:
raise socket.gaierror("mock: IPv6 Only")
else:
return real_getaddrinfo(host, port, family, type, proto, flags)

self._server = mockserver.MockRecvServer(host='localhost',
inet_family=socket.AF_INET6)


with patch('socket.getaddrinfo', side_effect=_fake_getaddrinfo):
sender = fluent.sender.FluentSender(tag='test',
host='localhost',
port=self._server.port,
prefer_ipv6=True)
sender.emit('foo', {'bar': 'baz'})
sender._close()
data = self.get_data()
self.assertEqual(len(data), 1)
self.assertEqual(data[0][2], {'bar': 'baz'})


@unittest.skipIf(sys.platform == "win32", "Unix socket not supported")
def test_unix_socket(self):
self.tearDown()
Expand Down