8 from blockfinder
import ipaddr
11 class BaseBlockfinderTest(unittest
.TestCase
):
13 self
.base_test_dir
= tempfile
.mkdtemp()
14 self
.test_dir
= self
.base_test_dir
+ "/test/"
15 self
.database_cache
= blockfinder
.DatabaseCache(self
.test_dir
)
16 self
.downloader_parser
= blockfinder
.DownloaderParser(
17 self
.test_dir
, self
.database_cache
, "Mozilla")
18 self
.lookup
= blockfinder
.Lookup(self
.test_dir
, self
.database_cache
)
19 self
.database_cache
.connect_to_database()
20 self
.database_cache
.set_db_version()
21 shutil
.copy('test_rir_data', self
.test_dir
+ 'test_rir_data')
22 shutil
.copy('test_lir_data.gz', self
.test_dir
+ 'test_lir_data.gz')
23 self
.downloader_parser
.parse_rir_files(['test_rir_data'])
24 self
.downloader_parser
.parse_lir_files(['test_lir_data.gz'])
27 shutil
.rmtree(self
.base_test_dir
, True)
30 class CheckReverseLookup(BaseBlockfinderTest
):
31 def test_rir_ipv4_lookup(self
):
32 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
33 'rir', int(ipaddr
.IPv4Address('175.45.176.100'))), 'KP')
34 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
35 'rir', int(ipaddr
.IPv4Address('193.9.26.0'))), 'HU')
36 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
37 'rir', int(ipaddr
.IPv4Address('193.9.25.1'))), 'PL')
38 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
39 'rir', int(ipaddr
.IPv4Address('193.9.25.255'))), 'PL')
41 def test_rir_asn_lookup(self
):
42 self
.assertEqual(self
.database_cache
.fetch_country_code('asn',
44 self
.assertEqual(self
.database_cache
.fetch_country_code('asn',
47 def test_lir_ipv4_lookup(self
):
48 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
49 'lir', int(ipaddr
.IPv4Address('80.16.151.184'))), 'IT')
50 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
51 'lir', int(ipaddr
.IPv4Address('80.16.151.180'))), 'IT')
52 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
53 'lir', int(ipaddr
.IPv4Address('213.95.6.32'))), 'DE')
55 def test_lir_ipv6_lookup(self
):
56 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
57 'lir', int(ipaddr
.IPv6Address('2001:0658:021A::'))), 'DE')
58 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
59 'lir', int(ipaddr
.IPv6Address('2001:67c:320::'))), 'DE')
60 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
61 'lir', int(ipaddr
.IPv6Address('2001:670:0085::'))), 'FI')
64 class CheckBlockFinder(BaseBlockfinderTest
):
65 def test_ipv4_bf(self
):
66 known_ipv4_assignments
= (
67 ('MM', ['203.81.64.0/19', '203.81.160.0/20']),
68 ('KP', ['175.45.176.0/22']))
69 for cc
, values
in known_ipv4_assignments
:
70 expected
= [(int(ipaddr
.IPv4Network(network_str
).network_address
),
71 int(ipaddr
.IPv4Network(network_str
).broadcast_address
))
72 for network_str
in values
]
73 result
= self
.database_cache
.fetch_assignments('ipv4', cc
)
74 self
.assertEqual(result
, expected
)
76 def test_ipv6_bf(self
):
77 known_ipv6_assignments
= ['2001:200::/35', '2001:200:2000::/35',
78 '2001:200:4000::/34', '2001:200:8000::/33']
79 expected
= [(int(ipaddr
.IPv6Network(network_str
).network_address
),
80 int(ipaddr
.IPv6Network(network_str
).broadcast_address
))
81 for network_str
in known_ipv6_assignments
]
82 result
= self
.database_cache
.fetch_assignments('ipv6', 'JP')
83 self
.assertEqual(result
, expected
)
86 if __name__
== '__main__':
87 for test_class
in [CheckReverseLookup
, CheckBlockFinder
]:
88 test_suite
= unittest
.makeSuite(test_class
)
89 test_runner
= unittest
.TextTestRunner(verbosity
=2)
90 test_runner
.run(test_suite
)