9 from blockfinder
import ipaddr
12 class BaseBlockfinderTest(unittest
.TestCase
):
14 self
.base_test_dir
= tempfile
.mkdtemp()
15 self
.test_dir
= self
.base_test_dir
+ "/test/"
16 self
.database_cache
= blockfinder
.DatabaseCache(self
.test_dir
)
17 self
.downloader_parser
= blockfinder
.DownloaderParser(
18 self
.test_dir
, self
.database_cache
, "Mozilla")
19 self
.lookup
= blockfinder
.Lookup(self
.test_dir
, self
.database_cache
)
20 self
.database_cache
.connect_to_database()
21 self
.database_cache
.set_db_version()
22 shutil
.copy('test_rir_data', self
.test_dir
+ 'test_rir_data')
23 shutil
.copy('test_lir_data.gz', self
.test_dir
+ 'test_lir_data.gz')
24 self
.downloader_parser
.parse_rir_files(['test_rir_data'])
25 self
.downloader_parser
.parse_lir_files(['test_lir_data.gz'])
28 shutil
.rmtree(self
.base_test_dir
, True)
31 class CheckReverseLookup(BaseBlockfinderTest
):
32 def test_rir_ipv4_lookup(self
):
33 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
34 'rir', int(ipaddr
.IPv4Address('175.45.176.100'))), 'KP')
35 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
36 'rir', int(ipaddr
.IPv4Address('193.9.26.0'))), 'HU')
37 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
38 'rir', int(ipaddr
.IPv4Address('193.9.25.1'))), 'PL')
39 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
40 'rir', int(ipaddr
.IPv4Address('193.9.25.255'))), 'PL')
42 def test_rir_asn_lookup(self
):
43 self
.assertEqual(self
.database_cache
.fetch_country_code('asn',
45 self
.assertEqual(self
.database_cache
.fetch_country_code('asn',
48 def test_lir_ipv4_lookup(self
):
49 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
50 'lir', int(ipaddr
.IPv4Address('80.16.151.184'))), 'IT')
51 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
52 'lir', int(ipaddr
.IPv4Address('80.16.151.180'))), 'IT')
53 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv4',
54 'lir', int(ipaddr
.IPv4Address('213.95.6.32'))), 'DE')
56 def test_lir_ipv6_lookup(self
):
57 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
58 'lir', int(ipaddr
.IPv6Address('2001:0658:021A::'))), 'DE')
59 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
60 'lir', int(ipaddr
.IPv6Address('2001:67c:320::'))), 'DE')
61 self
.assertEqual(self
.database_cache
.fetch_country_code('ipv6',
62 'lir', int(ipaddr
.IPv6Address('2001:670:0085::'))), 'FI')
65 class CheckBlockFinder(BaseBlockfinderTest
):
66 def test_ipv4_bf(self
):
67 known_ipv4_assignments
= (
68 ('MM', ['203.81.64.0/19', '203.81.160.0/20']),
69 ('KP', ['175.45.176.0/22']))
70 for cc
, values
in known_ipv4_assignments
:
71 expected
= [(int(ipaddr
.IPv4Network(network_str
).network_address
),
72 int(ipaddr
.IPv4Network(network_str
).broadcast_address
))
73 for network_str
in values
]
74 result
= self
.database_cache
.fetch_assignments('ipv4', cc
)
75 self
.assertEqual(result
, expected
)
77 def test_ipv6_bf(self
):
78 known_ipv6_assignments
= ['2001:200::/35', '2001:200:2000::/35',
79 '2001:200:4000::/34', '2001:200:8000::/33']
80 expected
= [(int(ipaddr
.IPv6Network(network_str
).network_address
),
81 int(ipaddr
.IPv6Network(network_str
).broadcast_address
))
82 for network_str
in known_ipv6_assignments
]
83 result
= self
.database_cache
.fetch_assignments('ipv6', 'JP')
84 self
.assertEqual(result
, expected
)
87 if __name__
== '__main__':
89 for test_class
in [CheckReverseLookup
, CheckBlockFinder
]:
90 test_suite
= unittest
.makeSuite(test_class
)
91 test_runner
= unittest
.TextTestRunner(verbosity
=2)
92 results
= test_runner
.run(test_suite
)
93 failures
+= len(results
.errors
)
94 failures
+= len(results
.failures
)