2 # Copyright 2009 Google Inc. Released under the GPL v2
7 from autotest_lib
.mirror
import database
8 from autotest_lib
.client
.common_lib
.test_utils
import mock
10 class dict_database_unittest(unittest
.TestCase
):
14 'file1': database
.item('file1', 10, 10000),
15 'file2': database
.item('file2', 20, 20000),
19 self
.god
= mock
.mock_god()
21 self
.god
.stub_function(database
.cPickle
, 'load')
22 self
.god
.stub_function(database
.cPickle
, 'dump')
23 self
.god
.stub_function(database
.tempfile
, 'mkstemp')
24 self
.god
.stub_function(database
.os
, 'fdopen')
25 self
.god
.stub_function(database
.os
, 'close')
26 self
.god
.stub_function(database
.os
, 'rename')
27 self
.god
.stub_function(database
.os
, 'unlink')
28 self
._open
_mock
= self
.god
.create_mock_function('open')
29 self
._file
_instance
= self
.god
.create_mock_class(file, 'file')
36 def test_get_dictionary_no_file(self
):
38 (self
._open
_mock
.expect_call(self
._path
, 'rb')
39 .and_raises(IOError('blah')))
42 db
= database
.dict_database(self
._path
)
43 self
.assertEqual(db
.get_dictionary(_open_func
=self
._open
_mock
), {})
45 self
.god
.check_playback()
48 def test_get_dictionary(self
):
50 (self
._open
_mock
.expect_call(self
._path
, 'rb')
51 .and_return(self
._file
_instance
))
52 (database
.cPickle
.load
.expect_call(self
._file
_instance
)
53 .and_return(self
._db
_contents
))
54 self
._file
_instance
.close
.expect_call()
57 db
= database
.dict_database(self
._path
)
58 self
.assertEqual(db
.get_dictionary(_open_func
=self
._open
_mock
),
61 self
.god
.check_playback()
64 def _setup_merge_dictionary(self
):
66 db
= database
.dict_database(self
._path
)
67 self
.god
.stub_function(db
, 'get_dictionary')
68 self
.god
.stub_function(db
, '_aquire_lock')
71 'file3': database
.item('file3', 30, 30000),
72 'file4': database
.item('file4', 40, 40000),
74 all_files
= dict(self
._db
_contents
)
75 all_files
.update(new_files
)
78 db
._aquire
_lock
.expect_call().and_return(3)
79 db
.get_dictionary
.expect_call().and_return(self
._db
_contents
)
80 (database
.tempfile
.mkstemp
.expect_call(prefix
=self
._path
, dir='')
81 .and_return((4, 'tmpfile')))
82 database
.os
.fdopen
.expect_call(4, 'wb').and_return(self
._file
_instance
)
84 return db
, new_files
, all_files
87 def test_merge_dictionary(self
):
88 db
, new_files
, all_files
= self
._setup
_merge
_dictionary
()
90 database
.cPickle
.dump
.expect_call(all_files
, self
._file
_instance
,
91 protocol
=database
.cPickle
.HIGHEST_PROTOCOL
)
92 self
._file
_instance
.close
.expect_call()
93 database
.os
.rename
.expect_call('tmpfile', self
._path
)
94 database
.os
.close
.expect_call(3)
97 db
.merge_dictionary(new_files
)
98 self
.god
.check_playback()
101 def test_merge_dictionary_disk_full(self
):
102 err
= Exception('fail')
103 db
, new_files
, all_files
= self
._setup
_merge
_dictionary
()
105 database
.cPickle
.dump
.expect_call(all_files
, self
._file
_instance
,
106 protocol
=database
.cPickle
.HIGHEST_PROTOCOL
).and_raises(err
)
107 self
._file
_instance
.close
.expect_call().and_raises(err
)
108 database
.os
.unlink
.expect_call('tmpfile')
109 database
.os
.close
.expect_call(3)
112 self
.assertRaises(Exception, db
.merge_dictionary
, new_files
)
113 self
.god
.check_playback()
116 if __name__
== '__main__':