6 from tempfile
import mkdtemp
10 class BlockFinderTestExtras
:
12 self
.base_test_dir
= mkdtemp()
13 self
.test_dir
= self
.base_test_dir
+ "/test/"
14 self
.database_cache
= blockfinder
.DatabaseCache(self
.test_dir
)
15 self
.downloader_parser
= blockfinder
.DownloaderParser(
16 self
.test_dir
, self
.database_cache
, "Mozilla")
17 self
.lookup
= blockfinder
.Lookup(self
.test_dir
, self
.database_cache
)
19 def create_new_test_cache_dir(self
):
20 self
.downloader_parser
.create_blockfinder_cache_dir()
21 self
.database_cache
.connect_to_database()
22 self
.database_cache
.create_sql_database()
24 def load_del_test_data(self
):
25 delegations
= [test_data
.return_sub_apnic_del()]
27 for delegation
in delegations
:
28 for entry
in delegation
:
29 registry
= str(entry
['registry'])
30 if not registry
.isdigit() and str (entry
['cc']) !="*":
31 temp_row
= [entry
['registry'], entry
['cc'], entry
['start'], \
32 entry
['value'], entry
['date'], entry
['status'], entry
['type']]
34 self
.database_cache
.insert_into_sql_database(rows
)
36 def load_lir_test_data(self
):
37 self
.downloader_parser
.update_lir_delegation_cache("https://github.com/downloads/d1b/blockfinder/tiny_lir_data_for_test.gz")
38 self
.database_cache
.create_or_replace_lir_table_in_db()
39 self
.downloader_parser
.extract_info_from_lir_file_and_insert_into_sqlite("tiny_lir_data_for_test")
41 def copy_country_code_txt(self
):
42 shutil
.copy(str(os
.path
.expanduser('~')) + "/.blockfinder/countrycodes.txt", self
.test_dir
+ "countrycodes.txt")
45 shutil
.rmtree(self
.base_test_dir
, True)
47 class BaseBlockfinderTest(unittest
.TestCase
):
48 """ This is the base blockfinder test class and provides
49 a setUp and a tearDown which create and destroy a temporary
50 cache directory and database respectively.
53 self
.extra_block_test_f
= BlockFinderTestExtras()
54 self
.cache_dir
= self
.extra_block_test_f
.test_dir
55 self
.database_cache
= blockfinder
.DatabaseCache(self
.cache_dir
)
56 self
.downloader_parser
= blockfinder
.DownloaderParser(
57 self
.cache_dir
, self
.database_cache
, "Mozilla")
58 self
.lookup
= blockfinder
.Lookup(self
.cache_dir
, self
.database_cache
)
59 self
.extra_block_test_f
.create_new_test_cache_dir()
60 self
.extra_block_test_f
.load_del_test_data()
63 self
.extra_block_test_f
.clean_up()
65 class CheckReverseLookup(BaseBlockfinderTest
):
66 rirValues
= ( ('175.45.176.100', 'KP'),
69 ('193.9.25.255', 'PL'),
71 asnValues
= ( ('681', 'NZ'),
76 self
.extra_block_test_f
.clean_up()
78 def reverse_lookup_cc_matcher(self
, method
, values
):
79 self
.database_cache
.connect_to_database()
80 self
.downloader_parser
.download_country_code_file()
81 for value
, cc
in values
:
82 result
= method(value
)
83 self
.assertEqual(result
, cc
)
85 def test_rir_lookup(self
):
86 method
= self
.database_cache
.rir_lookup
87 self
.reverse_lookup_cc_matcher(method
, self
.rirValues
)
89 def test_asn_lookup(self
):
90 method
= self
.database_cache
.asn_lookup
91 self
.reverse_lookup_cc_matcher(method
, self
.asnValues
)
93 class CheckBlockFinder(BaseBlockfinderTest
):
94 # You can add known blocks to the tuple as a list
95 # they will be looked up and checked
96 known_ipv4_Results
= ( ('mm', ['203.81.160.0/20', '203.81.64.0/19']),
97 ('kp', ['175.45.176.0/22']))
99 def test_ipv4_bf(self
):
100 self
.database_cache
.connect_to_database()
101 for cc
, values
in self
.known_ipv4_Results
:
102 result
= self
.database_cache
.use_sql_database("ipv4", cc
.upper())
103 self
.assertEqual(result
, values
)
104 self
.database_cache
.commit_and_close_database()
105 def test_ipv6_bf(self
):
106 self
.database_cache
.connect_to_database()
107 expected
= ['2001:200:2000::/35', '2001:200:4000::/34', '2001:200:8000::/33', '2001:200::/35']
108 result
= self
.database_cache
.use_sql_database("ipv6", "JP")
109 self
.assertEqual(result
, expected
)
110 self
.database_cache
.commit_and_close_database()
112 def test_lir_fetching_and_use(self
):
113 """ test LIR fetching and use. """
115 self
.database_cache
.connect_to_database()
116 self
.extra_block_test_f
.load_lir_test_data()
117 self
.downloader_parser
.download_country_code_file()
118 self
.assertEqual(self
.database_cache
._rir
_or
_lir
_lookup
_ipv
4("80.16.151.184", "LIR"), "IT")
119 self
.assertEqual(self
.database_cache
._rir
_or
_lir
_lookup
_ipv
4("80.16.151.180", "LIR"), "IT")
120 self
.assertEqual(self
.database_cache
._rir
_or
_lir
_lookup
_ipv
4("213.95.6.32", "LIR"), "DE")
123 self
.assertEqual(self
.database_cache
.rir_or_lir_lookup_ipv6("2001:0658:021A::", "2001%", "LIR"), u
"DE")
124 self
.assertEqual(self
.database_cache
.rir_or_lir_lookup_ipv6("2001:67c:320::", "2001%", "LIR"), u
"DE")
125 self
.assertEqual(self
.database_cache
.rir_or_lir_lookup_ipv6("2001:670:0085::", "2001%", "LIR"), u
"FI")
126 self
.database_cache
.commit_and_close_database()
129 def test_db_version(self
):
130 """ test the handling of the db version information of the database cache. """
131 self
.database_cache
.connect_to_database()
132 self
.assertEqual(self
.database_cache
.get_db_version(), None)
133 self
.database_cache
.set_db_version()
134 self
.assertEqual(self
.database_cache
.get_db_version(), self
.database_cache
.db_version
)
136 if __name__
== '__main__':
137 for test_class
in [CheckReverseLookup
, CheckBlockFinder
]:
138 unittest
.TextTestRunner(verbosity
=2).run(unittest
.makeSuite(test_class
))