4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
8 # * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
11 # * Redistributions in binary form must reproduce the above copyright
12 # notice, this list of conditions and the following disclaimer in the
13 # documentation and/or other materials provided with the distribution.
15 # * Neither the name of Red Hat nor the names of its contributors may be
16 # used to endorse or promote products derived from this software without
17 # specific prior written permission.
19 # THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 # THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 # PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 # OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 """See tests/test-python.sh."""
44 # Not nice, but there doesn't seem to be a better way of putting this
45 class TestAPI(unittest
.TestCase
):
47 def test_parse_size(self
):
48 self
.assertEqual(nbdkit
.parse_size('511'), 511)
49 self
.assertEqual(nbdkit
.parse_size('7k'), 7*1024)
50 self
.assertEqual(nbdkit
.parse_size('17M'), 17*1024*1024)
52 with self
.assertRaises(TypeError):
55 with self
.assertRaises(ValueError):
56 nbdkit
.parse_size('foo')
59 TestAPI().test_parse_size()
65 cfg
= pickle
.loads(base64
.b64decode(v
.encode()))
68 def config_complete():
69 print("set_error = %r" % nbdkit
.set_error
)
73 if cfg
.get('create_disk', True):
74 disk
= bytearray(cfg
.get('size', 0))
83 return cfg
.get('size', 0)
87 return cfg
.get('is_rotational', False)
90 def can_multi_conn(h
):
91 return cfg
.get('can_multi_conn', False)
95 return cfg
.get('can_write', True)
99 return cfg
.get('can_flush', False)
103 return cfg
.get('can_trim', False)
107 return cfg
.get('can_zero', False)
110 def can_fast_zero(h
):
111 return cfg
.get('can_fast_zero', False)
115 fua
= cfg
.get('can_fua', "none")
117 return nbdkit
.FUA_NONE
118 elif fua
== "emulate":
119 return nbdkit
.FUA_EMULATE
120 elif fua
== "native":
121 return nbdkit
.FUA_NATIVE
125 cache
= cfg
.get('can_cache', "none")
127 return nbdkit
.CACHE_NONE
128 elif cache
== "emulate":
129 return nbdkit
.CACHE_EMULATE
130 elif cache
== "native":
131 return nbdkit
.CACHE_NATIVE
135 return cfg
.get('can_extents', False)
138 def pread(h
, buf
, offset
, flags
):
140 end
= offset
+ len(buf
)
141 buf
[:] = h
['disk'][offset
:end
]
144 def pwrite(h
, buf
, offset
, flags
):
145 expect_fua
= cfg
.get('pwrite_expect_fua', False)
146 actual_fua
= bool(flags
& nbdkit
.FLAG_FUA
)
147 assert expect_fua
== actual_fua
148 end
= offset
+ len(buf
)
149 assert h
['disk'] is not None
150 h
['disk'][offset
:end
] = buf
157 def trim(h
, count
, offset
, flags
):
158 expect_fua
= cfg
.get('trim_expect_fua', False)
159 actual_fua
= bool(flags
& nbdkit
.FLAG_FUA
)
160 assert expect_fua
== actual_fua
161 if h
['disk'] is not None:
162 h
['disk'][offset
:offset
+count
] = bytearray(count
)
165 def zero(h
, count
, offset
, flags
):
166 expect_fua
= cfg
.get('zero_expect_fua', False)
167 actual_fua
= bool(flags
& nbdkit
.FLAG_FUA
)
168 assert expect_fua
== actual_fua
169 expect_may_trim
= cfg
.get('zero_expect_may_trim', False)
170 actual_may_trim
= bool(flags
& nbdkit
.FLAG_MAY_TRIM
)
171 assert expect_may_trim
== actual_may_trim
172 expect_fast_zero
= cfg
.get('zero_expect_fast_zero', False)
173 actual_fast_zero
= bool(flags
& nbdkit
.FLAG_FAST_ZERO
)
174 assert expect_fast_zero
== actual_fast_zero
175 if h
['disk'] is not None:
176 h
['disk'][offset
:offset
+count
] = bytearray(count
)
179 def cache(h
, count
, offset
, flags
):
184 def extents(h
, count
, offset
, flags
):
185 return cfg
.get('extents', [])