Merge pull request #48 from kloesing/maxmind
[blockfinder.git] / blockfindertest.py
blobf52b09a5c3ea109ab33c950dc1ce41dceddd9e74
1 #!/usr/bin/python
2 import blockfinder
3 import unittest
4 import os
5 import shutil
6 import tempfile
7 import ipaddr
9 class BaseBlockfinderTest(unittest.TestCase):
10 def setUp(self):
11 self.base_test_dir = tempfile.mkdtemp()
12 self.test_dir = self.base_test_dir + "/test/"
13 self.database_cache = blockfinder.DatabaseCache(self.test_dir)
14 self.downloader_parser = blockfinder.DownloaderParser(
15 self.test_dir, self.database_cache, "Mozilla")
16 self.lookup = blockfinder.Lookup(self.test_dir, self.database_cache)
17 self.database_cache.connect_to_database()
18 self.database_cache.set_db_version()
19 shutil.copy('test_rir_data', self.test_dir + 'test_rir_data')
20 shutil.copy('test_lir_data.gz', self.test_dir + 'test_lir_data.gz')
21 self.downloader_parser.parse_rir_files(['test_rir_data'])
22 self.downloader_parser.parse_lir_files(['test_lir_data.gz'])
24 def tearDown(self):
25 shutil.rmtree(self.base_test_dir, True)
27 class CheckReverseLookup(BaseBlockfinderTest):
28 def test_rir_ipv4_lookup(self):
29 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
30 'rir', int(ipaddr.IPv4Address('175.45.176.100'))), 'KP')
31 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
32 'rir', int(ipaddr.IPv4Address('193.9.26.0'))), 'HU')
33 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
34 'rir', int(ipaddr.IPv4Address('193.9.25.1'))), 'PL')
35 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
36 'rir', int(ipaddr.IPv4Address('193.9.25.255'))), 'PL')
38 def test_rir_asn_lookup(self):
39 self.assertEqual(self.database_cache.fetch_country_code('asn',
40 'rir', 681), 'NZ')
41 self.assertEqual(self.database_cache.fetch_country_code('asn',
42 'rir', 173), 'JP')
44 def test_lir_ipv4_lookup(self):
45 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
46 'lir', int(ipaddr.IPv4Address('80.16.151.184'))), 'IT')
47 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
48 'lir', int(ipaddr.IPv4Address('80.16.151.180'))), 'IT')
49 self.assertEqual(self.database_cache.fetch_country_code('ipv4',
50 'lir', int(ipaddr.IPv4Address('213.95.6.32'))), 'DE')
52 def test_lir_ipv6_lookup(self):
53 self.assertEqual(self.database_cache.fetch_country_code('ipv6',
54 'lir', int(ipaddr.IPv6Address('2001:0658:021A::'))), 'DE')
55 self.assertEqual(self.database_cache.fetch_country_code('ipv6',
56 'lir', int(ipaddr.IPv6Address('2001:67c:320::'))), 'DE')
57 self.assertEqual(self.database_cache.fetch_country_code('ipv6',
58 'lir', int(ipaddr.IPv6Address('2001:670:0085::'))), 'FI')
60 class CheckBlockFinder(BaseBlockfinderTest):
61 def test_ipv4_bf(self):
62 known_ipv4_assignments = (
63 ('MM', ['203.81.64.0/19', '203.81.160.0/20']),
64 ('KP', ['175.45.176.0/22']))
65 for cc, values in known_ipv4_assignments:
66 expected = [(int(ipaddr.IPv4Network(network_str).network), \
67 int(ipaddr.IPv4Network(network_str).broadcast)) \
68 for network_str in values]
69 result = self.database_cache.fetch_assignments('ipv4', cc)
70 self.assertEqual(result, expected)
72 def test_ipv6_bf(self):
73 known_ipv6_assignments = ['2001:200::/35', '2001:200:2000::/35',
74 '2001:200:4000::/34', '2001:200:8000::/33']
75 expected = [(int(ipaddr.IPv6Network(network_str).network), \
76 int(ipaddr.IPv6Network(network_str).broadcast)) \
77 for network_str in known_ipv6_assignments]
78 result = self.database_cache.fetch_assignments('ipv6', 'JP')
79 self.assertEqual(result, expected)
81 if __name__ == '__main__':
82 for test_class in [CheckReverseLookup, CheckBlockFinder]:
83 test_suite = unittest.makeSuite(test_class)
84 test_runner = unittest.TextTestRunner(verbosity=2)
85 test_runner.run(test_suite)