Use a custom timeout in test_support.open_urlresource.
[python.git] / Lib / sqlite3 / test / types.py
bloba275a26e4d9d661daffcdb0f5b03a6712a3ca602
1 #-*- coding: ISO-8859-1 -*-
2 # pysqlite2/test/types.py: tests for type conversion and detection
4 # Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
6 # This file is part of pysqlite.
8 # This software is provided 'as-is', without any express or implied
9 # warranty. In no event will the authors be held liable for any damages
10 # arising from the use of this software.
12 # Permission is granted to anyone to use this software for any purpose,
13 # including commercial applications, and to alter it and redistribute it
14 # freely, subject to the following restrictions:
16 # 1. The origin of this software must not be misrepresented; you must not
17 # claim that you wrote the original software. If you use this software
18 # in a product, an acknowledgment in the product documentation would be
19 # appreciated but is not required.
20 # 2. Altered source versions must be plainly marked as such, and must not be
21 # misrepresented as being the original software.
22 # 3. This notice may not be removed or altered from any source distribution.
24 import datetime
25 import unittest
26 import sqlite3 as sqlite
27 try:
28 import zlib
29 except ImportError:
30 zlib = None
33 class SqliteTypeTests(unittest.TestCase):
34 def setUp(self):
35 self.con = sqlite.connect(":memory:")
36 self.cur = self.con.cursor()
37 self.cur.execute("create table test(i integer, s varchar, f number, b blob)")
39 def tearDown(self):
40 self.cur.close()
41 self.con.close()
43 def CheckString(self):
44 self.cur.execute("insert into test(s) values (?)", (u"Österreich",))
45 self.cur.execute("select s from test")
46 row = self.cur.fetchone()
47 self.assertEqual(row[0], u"Österreich")
49 def CheckSmallInt(self):
50 self.cur.execute("insert into test(i) values (?)", (42,))
51 self.cur.execute("select i from test")
52 row = self.cur.fetchone()
53 self.assertEqual(row[0], 42)
55 def CheckLargeInt(self):
56 num = 2**40
57 self.cur.execute("insert into test(i) values (?)", (num,))
58 self.cur.execute("select i from test")
59 row = self.cur.fetchone()
60 self.assertEqual(row[0], num)
62 def CheckFloat(self):
63 val = 3.14
64 self.cur.execute("insert into test(f) values (?)", (val,))
65 self.cur.execute("select f from test")
66 row = self.cur.fetchone()
67 self.assertEqual(row[0], val)
69 def CheckBlob(self):
70 val = buffer("Guglhupf")
71 self.cur.execute("insert into test(b) values (?)", (val,))
72 self.cur.execute("select b from test")
73 row = self.cur.fetchone()
74 self.assertEqual(row[0], val)
76 def CheckUnicodeExecute(self):
77 self.cur.execute(u"select 'Österreich'")
78 row = self.cur.fetchone()
79 self.assertEqual(row[0], u"Österreich")
81 class DeclTypesTests(unittest.TestCase):
82 class Foo:
83 def __init__(self, _val):
84 self.val = _val
86 def __cmp__(self, other):
87 if not isinstance(other, DeclTypesTests.Foo):
88 raise ValueError
89 if self.val == other.val:
90 return 0
91 else:
92 return 1
94 def __conform__(self, protocol):
95 if protocol is sqlite.PrepareProtocol:
96 return self.val
97 else:
98 return None
100 def __str__(self):
101 return "<%s>" % self.val
103 def setUp(self):
104 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
105 self.cur = self.con.cursor()
106 self.cur.execute("create table test(i int, s str, f float, b bool, u unicode, foo foo, bin blob, n1 number, n2 number(5))")
108 # override float, make them always return the same number
109 sqlite.converters["FLOAT"] = lambda x: 47.2
111 # and implement two custom ones
112 sqlite.converters["BOOL"] = lambda x: bool(int(x))
113 sqlite.converters["FOO"] = DeclTypesTests.Foo
114 sqlite.converters["WRONG"] = lambda x: "WRONG"
115 sqlite.converters["NUMBER"] = float
117 def tearDown(self):
118 del sqlite.converters["FLOAT"]
119 del sqlite.converters["BOOL"]
120 del sqlite.converters["FOO"]
121 del sqlite.converters["NUMBER"]
122 self.cur.close()
123 self.con.close()
125 def CheckString(self):
126 # default
127 self.cur.execute("insert into test(s) values (?)", ("foo",))
128 self.cur.execute('select s as "s [WRONG]" from test')
129 row = self.cur.fetchone()
130 self.assertEqual(row[0], "foo")
132 def CheckSmallInt(self):
133 # default
134 self.cur.execute("insert into test(i) values (?)", (42,))
135 self.cur.execute("select i from test")
136 row = self.cur.fetchone()
137 self.assertEqual(row[0], 42)
139 def CheckLargeInt(self):
140 # default
141 num = 2**40
142 self.cur.execute("insert into test(i) values (?)", (num,))
143 self.cur.execute("select i from test")
144 row = self.cur.fetchone()
145 self.assertEqual(row[0], num)
147 def CheckFloat(self):
148 # custom
149 val = 3.14
150 self.cur.execute("insert into test(f) values (?)", (val,))
151 self.cur.execute("select f from test")
152 row = self.cur.fetchone()
153 self.assertEqual(row[0], 47.2)
155 def CheckBool(self):
156 # custom
157 self.cur.execute("insert into test(b) values (?)", (False,))
158 self.cur.execute("select b from test")
159 row = self.cur.fetchone()
160 self.assertEqual(row[0], False)
162 self.cur.execute("delete from test")
163 self.cur.execute("insert into test(b) values (?)", (True,))
164 self.cur.execute("select b from test")
165 row = self.cur.fetchone()
166 self.assertEqual(row[0], True)
168 def CheckUnicode(self):
169 # default
170 val = u"\xd6sterreich"
171 self.cur.execute("insert into test(u) values (?)", (val,))
172 self.cur.execute("select u from test")
173 row = self.cur.fetchone()
174 self.assertEqual(row[0], val)
176 def CheckFoo(self):
177 val = DeclTypesTests.Foo("bla")
178 self.cur.execute("insert into test(foo) values (?)", (val,))
179 self.cur.execute("select foo from test")
180 row = self.cur.fetchone()
181 self.assertEqual(row[0], val)
183 def CheckUnsupportedSeq(self):
184 class Bar: pass
185 val = Bar()
186 try:
187 self.cur.execute("insert into test(f) values (?)", (val,))
188 self.fail("should have raised an InterfaceError")
189 except sqlite.InterfaceError:
190 pass
191 except:
192 self.fail("should have raised an InterfaceError")
194 def CheckUnsupportedDict(self):
195 class Bar: pass
196 val = Bar()
197 try:
198 self.cur.execute("insert into test(f) values (:val)", {"val": val})
199 self.fail("should have raised an InterfaceError")
200 except sqlite.InterfaceError:
201 pass
202 except:
203 self.fail("should have raised an InterfaceError")
205 def CheckBlob(self):
206 # default
207 val = buffer("Guglhupf")
208 self.cur.execute("insert into test(bin) values (?)", (val,))
209 self.cur.execute("select bin from test")
210 row = self.cur.fetchone()
211 self.assertEqual(row[0], val)
213 def CheckNumber1(self):
214 self.cur.execute("insert into test(n1) values (5)")
215 value = self.cur.execute("select n1 from test").fetchone()[0]
216 # if the converter is not used, it's an int instead of a float
217 self.assertEqual(type(value), float)
219 def CheckNumber2(self):
220 """Checks wether converter names are cut off at '(' characters"""
221 self.cur.execute("insert into test(n2) values (5)")
222 value = self.cur.execute("select n2 from test").fetchone()[0]
223 # if the converter is not used, it's an int instead of a float
224 self.assertEqual(type(value), float)
226 class ColNamesTests(unittest.TestCase):
227 def setUp(self):
228 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
229 self.cur = self.con.cursor()
230 self.cur.execute("create table test(x foo)")
232 sqlite.converters["FOO"] = lambda x: "[%s]" % x
233 sqlite.converters["BAR"] = lambda x: "<%s>" % x
234 sqlite.converters["EXC"] = lambda x: 5/0
235 sqlite.converters["B1B1"] = lambda x: "MARKER"
237 def tearDown(self):
238 del sqlite.converters["FOO"]
239 del sqlite.converters["BAR"]
240 del sqlite.converters["EXC"]
241 del sqlite.converters["B1B1"]
242 self.cur.close()
243 self.con.close()
245 def CheckDeclTypeNotUsed(self):
247 Assures that the declared type is not used when PARSE_DECLTYPES
248 is not set.
250 self.cur.execute("insert into test(x) values (?)", ("xxx",))
251 self.cur.execute("select x from test")
252 val = self.cur.fetchone()[0]
253 self.assertEqual(val, "xxx")
255 def CheckNone(self):
256 self.cur.execute("insert into test(x) values (?)", (None,))
257 self.cur.execute("select x from test")
258 val = self.cur.fetchone()[0]
259 self.assertEqual(val, None)
261 def CheckColName(self):
262 self.cur.execute("insert into test(x) values (?)", ("xxx",))
263 self.cur.execute('select x as "x [bar]" from test')
264 val = self.cur.fetchone()[0]
265 self.assertEqual(val, "<xxx>")
267 # Check if the stripping of colnames works. Everything after the first
268 # whitespace should be stripped.
269 self.assertEqual(self.cur.description[0][0], "x")
271 def CheckCaseInConverterName(self):
272 self.cur.execute("""select 'other' as "x [b1b1]\"""")
273 val = self.cur.fetchone()[0]
274 self.assertEqual(val, "MARKER")
276 def CheckCursorDescriptionNoRow(self):
278 cursor.description should at least provide the column name(s), even if
279 no row returned.
281 self.cur.execute("select * from test where 0 = 1")
282 self.assert_(self.cur.description[0][0] == "x")
284 class ObjectAdaptationTests(unittest.TestCase):
285 def cast(obj):
286 return float(obj)
287 cast = staticmethod(cast)
289 def setUp(self):
290 self.con = sqlite.connect(":memory:")
291 try:
292 del sqlite.adapters[int]
293 except:
294 pass
295 sqlite.register_adapter(int, ObjectAdaptationTests.cast)
296 self.cur = self.con.cursor()
298 def tearDown(self):
299 del sqlite.adapters[(int, sqlite.PrepareProtocol)]
300 self.cur.close()
301 self.con.close()
303 def CheckCasterIsUsed(self):
304 self.cur.execute("select ?", (4,))
305 val = self.cur.fetchone()[0]
306 self.assertEqual(type(val), float)
308 @unittest.skipUnless(zlib, "requires zlib")
309 class BinaryConverterTests(unittest.TestCase):
310 def convert(s):
311 return zlib.decompress(s)
312 convert = staticmethod(convert)
314 def setUp(self):
315 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
316 sqlite.register_converter("bin", BinaryConverterTests.convert)
318 def tearDown(self):
319 self.con.close()
321 def CheckBinaryInputForConverter(self):
322 testdata = "abcdefg" * 10
323 result = self.con.execute('select ? as "x [bin]"', (buffer(zlib.compress(testdata)),)).fetchone()[0]
324 self.assertEqual(testdata, result)
326 class DateTimeTests(unittest.TestCase):
327 def setUp(self):
328 self.con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
329 self.cur = self.con.cursor()
330 self.cur.execute("create table test(d date, ts timestamp)")
332 def tearDown(self):
333 self.cur.close()
334 self.con.close()
336 def CheckSqliteDate(self):
337 d = sqlite.Date(2004, 2, 14)
338 self.cur.execute("insert into test(d) values (?)", (d,))
339 self.cur.execute("select d from test")
340 d2 = self.cur.fetchone()[0]
341 self.assertEqual(d, d2)
343 def CheckSqliteTimestamp(self):
344 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0)
345 self.cur.execute("insert into test(ts) values (?)", (ts,))
346 self.cur.execute("select ts from test")
347 ts2 = self.cur.fetchone()[0]
348 self.assertEqual(ts, ts2)
350 def CheckSqlTimestamp(self):
351 # The date functions are only available in SQLite version 3.1 or later
352 if sqlite.sqlite_version_info < (3, 1):
353 return
355 # SQLite's current_timestamp uses UTC time, while datetime.datetime.now() uses local time.
356 now = datetime.datetime.now()
357 self.cur.execute("insert into test(ts) values (current_timestamp)")
358 self.cur.execute("select ts from test")
359 ts = self.cur.fetchone()[0]
360 self.assertEqual(type(ts), datetime.datetime)
361 self.assertEqual(ts.year, now.year)
363 def CheckDateTimeSubSeconds(self):
364 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 500000)
365 self.cur.execute("insert into test(ts) values (?)", (ts,))
366 self.cur.execute("select ts from test")
367 ts2 = self.cur.fetchone()[0]
368 self.assertEqual(ts, ts2)
370 def CheckDateTimeSubSecondsFloatingPoint(self):
371 ts = sqlite.Timestamp(2004, 2, 14, 7, 15, 0, 510241)
372 self.cur.execute("insert into test(ts) values (?)", (ts,))
373 self.cur.execute("select ts from test")
374 ts2 = self.cur.fetchone()[0]
375 self.assertEqual(ts, ts2)
377 def suite():
378 sqlite_type_suite = unittest.makeSuite(SqliteTypeTests, "Check")
379 decltypes_type_suite = unittest.makeSuite(DeclTypesTests, "Check")
380 colnames_type_suite = unittest.makeSuite(ColNamesTests, "Check")
381 adaptation_suite = unittest.makeSuite(ObjectAdaptationTests, "Check")
382 bin_suite = unittest.makeSuite(BinaryConverterTests, "Check")
383 date_suite = unittest.makeSuite(DateTimeTests, "Check")
384 return unittest.TestSuite((sqlite_type_suite, decltypes_type_suite, colnames_type_suite, adaptation_suite, bin_suite, date_suite))
386 def test():
387 runner = unittest.TextTestRunner()
388 runner.run(suite())
390 if __name__ == "__main__":
391 test()