9 from blockfinder
import ipaddr
, normalize_country_code
12 class BaseBlockfinderTest(unittest
.TestCase
):
15 self
.base_test_dir
= tempfile
.mkdtemp()
16 self
.test_dir
= self
.base_test_dir
+ "/test/"
17 self
.database_cache
= blockfinder
.DatabaseCache(self
.test_dir
)
18 self
.downloader_parser
= blockfinder
.DownloaderParser(
19 self
.test_dir
, self
.database_cache
, "Mozilla")
20 self
.lookup
= blockfinder
.Lookup(self
.test_dir
, self
.database_cache
)
21 self
.database_cache
.connect_to_database()
22 self
.database_cache
.set_db_version()
23 shutil
.copy('test_rir_data', self
.test_dir
+ 'test_rir_data')
24 shutil
.copy('test_lir_data.gz', self
.test_dir
+ 'test_lir_data.gz')
25 self
.downloader_parser
.parse_rir_files(['test_rir_data'])
26 self
.downloader_parser
.parse_lir_files(['test_lir_data.gz'])
29 shutil
.rmtree(self
.base_test_dir
, True)
32 class CheckReverseLookup(BaseBlockfinderTest
):
34 def test_rir_ipv4_lookup(self
):
35 method
= self
.database_cache
.fetch_country_code
37 (int(ipaddr
.IPv4Address('175.45.176.100')), 'KP'),
38 (int(ipaddr
.IPv4Address('193.9.26.0')), 'HU'),
39 (int(ipaddr
.IPv4Address('193.9.25.1')), 'PL'),
40 (int(ipaddr
.IPv4Address('193.9.25.255')), 'PL'),
42 for ip
, expected_country
in ip_expected_co
:
43 self
.assertEqual(method('ipv4', 'rir', ip
), expected_country
)
45 def test_rir_asn_lookup(self
):
47 self
.database_cache
.fetch_country_code('asn',
50 self
.database_cache
.fetch_country_code('asn',
53 def test_lir_ipv4_lookup(self
):
54 method
= self
.database_cache
.fetch_country_code
56 (int(ipaddr
.IPv4Address('80.16.151.184')), 'IT'),
57 (int(ipaddr
.IPv4Address('80.16.151.180')), 'IT'),
58 (int(ipaddr
.IPv4Address('213.95.6.32')), 'DE'),
60 # Check capitalization.
61 (int(ipaddr
.IPv4Address('128.0.0.0')), 'RO'),
63 # Check comment-stripping.
64 # EU # Country is really world wide
65 (int(ipaddr
.IPv4Address('159.245.0.0')), 'EU'),
68 (int(ipaddr
.IPv4Address('85.195.129.0')), 'SE'),
71 for ip
, expected_country
in ip_expected_co
:
72 self
.assertEqual(method('ipv4', 'lir', ip
), expected_country
)
74 def test_lir_ipv6_lookup(self
):
75 method
= self
.database_cache
.fetch_country_code
80 int(ipaddr
.IPv6Address('2001:0658:021A::'))),
86 int(ipaddr
.IPv6Address('2001:67c:320::'))),
92 int(ipaddr
.IPv6Address('2001:670:0085::'))),
96 class CheckBlockFinder(BaseBlockfinderTest
):
98 def test_ipv4_bf(self
):
99 known_ipv4_assignments
= (
100 ('MM', ['203.81.64.0/19', '203.81.160.0/20']),
101 ('KP', ['175.45.176.0/22']))
102 for cc
, values
in known_ipv4_assignments
:
104 (int(ipaddr
.IPv4Network(network_str
).network_address
),
105 int(ipaddr
.IPv4Network(network_str
).broadcast_address
))
106 for network_str
in values
]
107 result
= self
.database_cache
.fetch_assignments('ipv4', cc
)
108 self
.assertEqual(result
, expected
)
110 def test_ipv6_bf(self
):
111 known_ipv6_assignments
= ['2001:200::/35', '2001:200:2000::/35',
112 '2001:200:4000::/34', '2001:200:8000::/33']
113 expected
= [(int(ipaddr
.IPv6Network(network_str
).network_address
),
114 int(ipaddr
.IPv6Network(network_str
).broadcast_address
))
115 for network_str
in known_ipv6_assignments
]
116 result
= self
.database_cache
.fetch_assignments('ipv6', 'JP')
117 self
.assertEqual(result
, expected
)
120 class NormalizationTest(unittest
.TestCase
):
122 def test_comment_stripping(self
):
123 # https://github.com/ioerror/blockfinder/issues/51
124 self
.assertEqual(normalize_country_code('EU'), 'EU')
125 self
.assertEqual(normalize_country_code(
126 'EU # Country is really world wide'), 'EU')
127 self
.assertEqual(normalize_country_code(
128 'DE #AT # IT'), 'DE')
129 self
.assertEqual(normalize_country_code(
130 'FR # GF # GP # MQ # RE'), 'FR')
132 def test_capitalization(self
):
133 # https://github.com/ioerror/blockfinder/issues/53
134 self
.assertEqual(normalize_country_code('ro'), 'RO')
135 self
.assertEqual(normalize_country_code('RO'), 'RO')
136 self
.assertEqual(normalize_country_code(''), '')
139 if __name__
== '__main__':
141 for test_class
in [CheckReverseLookup
,
142 CheckBlockFinder
, NormalizationTest
]:
143 test_suite
= unittest
.makeSuite(test_class
)
144 test_runner
= unittest
.TextTestRunner(verbosity
=2)
145 results
= test_runner
.run(test_suite
)
146 failures
+= len(results
.errors
)
147 failures
+= len(results
.failures
)