Planner: remove backend application
[autotest-zwu.git] / mirror / database_unittest.py
bloba3b6815485dfc666858911080ae804394c0d4627
1 #!/usr/bin/python
2 # Copyright 2009 Google Inc. Released under the GPL v2
4 import unittest
6 import common
7 from autotest_lib.mirror import database
8 from autotest_lib.client.common_lib.test_utils import mock
10 class dict_database_unittest(unittest.TestCase):
11 _path = 'somepath.db'
13 _db_contents = {
14 'file1': database.item('file1', 10, 10000),
15 'file2': database.item('file2', 20, 20000),
18 def setUp(self):
19 self.god = mock.mock_god()
21 self.god.stub_function(database.cPickle, 'load')
22 self.god.stub_function(database.cPickle, 'dump')
23 self.god.stub_function(database.tempfile, 'mkstemp')
24 self.god.stub_function(database.os, 'fdopen')
25 self.god.stub_function(database.os, 'close')
26 self.god.stub_function(database.os, 'rename')
27 self.god.stub_function(database.os, 'unlink')
28 self._open_mock = self.god.create_mock_function('open')
29 self._file_instance = self.god.create_mock_class(file, 'file')
32 def tearDown(self):
33 self.god.unstub_all()
36 def test_get_dictionary_no_file(self):
37 # record
38 (self._open_mock.expect_call(self._path, 'rb')
39 .and_raises(IOError('blah')))
41 # playback
42 db = database.dict_database(self._path)
43 self.assertEqual(db.get_dictionary(_open_func=self._open_mock), {})
45 self.god.check_playback()
48 def test_get_dictionary(self):
49 # record
50 (self._open_mock.expect_call(self._path, 'rb')
51 .and_return(self._file_instance))
52 (database.cPickle.load.expect_call(self._file_instance)
53 .and_return(self._db_contents))
54 self._file_instance.close.expect_call()
56 # playback
57 db = database.dict_database(self._path)
58 self.assertEqual(db.get_dictionary(_open_func=self._open_mock),
59 self._db_contents)
61 self.god.check_playback()
64 def _setup_merge_dictionary(self):
65 # setup
66 db = database.dict_database(self._path)
67 self.god.stub_function(db, 'get_dictionary')
68 self.god.stub_function(db, '_aquire_lock')
70 new_files = {
71 'file3': database.item('file3', 30, 30000),
72 'file4': database.item('file4', 40, 40000),
74 all_files = dict(self._db_contents)
75 all_files.update(new_files)
77 # record
78 db._aquire_lock.expect_call().and_return(3)
79 db.get_dictionary.expect_call().and_return(self._db_contents)
80 (database.tempfile.mkstemp.expect_call(prefix=self._path, dir='')
81 .and_return((4, 'tmpfile')))
82 database.os.fdopen.expect_call(4, 'wb').and_return(self._file_instance)
84 return db, new_files, all_files
87 def test_merge_dictionary(self):
88 db, new_files, all_files = self._setup_merge_dictionary()
90 database.cPickle.dump.expect_call(all_files, self._file_instance,
91 protocol=database.cPickle.HIGHEST_PROTOCOL)
92 self._file_instance.close.expect_call()
93 database.os.rename.expect_call('tmpfile', self._path)
94 database.os.close.expect_call(3)
96 # playback
97 db.merge_dictionary(new_files)
98 self.god.check_playback()
101 def test_merge_dictionary_disk_full(self):
102 err = Exception('fail')
103 db, new_files, all_files = self._setup_merge_dictionary()
105 database.cPickle.dump.expect_call(all_files, self._file_instance,
106 protocol=database.cPickle.HIGHEST_PROTOCOL).and_raises(err)
107 self._file_instance.close.expect_call().and_raises(err)
108 database.os.unlink.expect_call('tmpfile')
109 database.os.close.expect_call(3)
111 # playback
112 self.assertRaises(Exception, db.merge_dictionary, new_files)
113 self.god.check_playback()
116 if __name__ == '__main__':
117 unittest.main()