2 from contextlib
import closing
3 from pathlib2
import Path
4 import pytest
# type: ignore
10 def sock_path(monkeypatch
, tmpdir
):
11 omd_root
= Path("%s" % tmpdir
)
12 sock_path
= omd_root
/ "tmp" / "run" / "live"
13 monkeypatch
.setenv("OMD_ROOT", "%s" % omd_root
)
14 # Will be fixed with pylint >2.0
15 sock_path
.parent
.mkdir(parents
=True) # pylint: disable=no-member
19 @pytest.mark
.parametrize("query_part", [
23 def test_lqencode(query_part
):
24 result
= livestatus
.lqencode(query_part
)
25 assert isinstance(result
, type(query_part
))
26 assert result
== "xyzabc"
29 def test_livestatus_local_connection_omd_root_not_set(monkeypatch
, tmpdir
):
30 with pytest
.raises(livestatus
.MKLivestatusConfigError
, match
="OMD_ROOT is not set"):
31 livestatus
.LocalConnection()
34 def test_livestatus_local_connection_no_socket(sock_path
):
36 livestatus
.MKLivestatusSocketError
, match
="Cannot connect to 'unix:%s'" % sock_path
):
37 livestatus
.LocalConnection()
40 def test_livestatus_local_connection_not_listening(sock_path
):
41 sock
= socket
.socket(socket
.AF_UNIX
)
42 sock
.bind("%s" % sock_path
)
45 livestatus
.MKLivestatusSocketError
, match
="Cannot connect to 'unix:%s'" % sock_path
):
46 livestatus
.LocalConnection()
49 def test_livestatus_local_connection(sock_path
):
50 sock
= socket
.socket(socket
.AF_UNIX
)
51 sock
.bind("%s" % sock_path
)
54 live
= livestatus
.LocalConnection()
55 assert isinstance(live
, livestatus
.SingleSiteConnection
)
58 def test_livestatus_ipv4_connection():
59 with
closing(socket
.socket(socket
.AF_INET
)) as sock
:
60 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1) # pylint: disable=no-member
63 sock
.bind(("127.0.0.1", 0)) # pylint: disable=no-member
64 port
= sock
.getsockname()[1] # pylint: disable=no-member
66 sock
.listen(1) # pylint: disable=no-member
68 live
= livestatus
.SingleSiteConnection("tcp:127.0.0.1:%d" % port
)
72 def test_livestatus_ipv6_connection():
73 with
closing(socket
.socket(socket
.AF_INET6
)) as sock
:
74 sock
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1) # pylint: disable=no-member
78 sock
.bind(("::1", 0)) # pylint: disable=no-member
79 except socket
.error
as e
:
80 # Skip this test in case ::1 can not be bound to
81 # (happened in docker container with IPv6 disabled)
82 if e
.errno
== 99: # Cannot assign requested address
83 pytest
.skip("Unable to bind to ::1 (%s)" % e
)
85 port
= sock
.getsockname()[1] # pylint: disable=no-member
87 sock
.listen(1) # pylint: disable=no-member
89 live
= livestatus
.SingleSiteConnection("tcp6:::1:%d" % port
)
93 @pytest.mark
.parametrize("socket_url,result", [
94 ("unix:/omd/sites/heute/tmp/run/live", (socket
.AF_UNIX
, "/omd/sites/heute/tmp/run/live")),
95 ("unix:/omd/sites/heute/tmp/run/li:ve", (socket
.AF_UNIX
, "/omd/sites/heute/tmp/run/li:ve")),
96 ("tcp:127.0.0.1:1234", (socket
.AF_INET
, ("127.0.0.1", 1234))),
97 ("tcp:126.0.0.1:abc", None),
98 ("tcp6:::1:1234", (socket
.AF_INET6
, ("::1", 1234))),
99 ("tcp6:::1:abc", None),
102 def test_single_site_connection_socketurl(socket_url
, result
, monkeypatch
):
103 live
= livestatus
.SingleSiteConnection(socket_url
)
106 with pytest
.raises(livestatus
.MKLivestatusConfigError
, match
="Invalid livestatus"):
107 live
._parse
_socket
_url
(socket_url
)
110 assert live
._parse
_socket
_url
(socket_url
) == result