1 # -*- coding: iso-8859-1 -*-
2 # Copyright (C) 2001,2002 Python Software Foundation
3 # csv package unit tests
8 from StringIO
import StringIO
12 from test
import test_support
14 class Test_Csv(unittest
.TestCase
):
16 Test the underlying C csv parser in ways that are not appropriate
17 from the high level interface. Further tests of this nature are done
18 in TestDialectRegistry.
20 def _test_arg_valid(self
, ctor
, arg
):
21 self
.assertRaises(TypeError, ctor
)
22 self
.assertRaises(TypeError, ctor
, None)
23 self
.assertRaises(TypeError, ctor
, arg
, bad_attr
= 0)
24 self
.assertRaises(TypeError, ctor
, arg
, delimiter
= 0)
25 self
.assertRaises(TypeError, ctor
, arg
, delimiter
= 'XX')
26 self
.assertRaises(csv
.Error
, ctor
, arg
, 'foo')
27 self
.assertRaises(TypeError, ctor
, arg
, delimiter
=None)
28 self
.assertRaises(TypeError, ctor
, arg
, delimiter
=1)
29 self
.assertRaises(TypeError, ctor
, arg
, quotechar
=1)
30 self
.assertRaises(TypeError, ctor
, arg
, lineterminator
=None)
31 self
.assertRaises(TypeError, ctor
, arg
, lineterminator
=1)
32 self
.assertRaises(TypeError, ctor
, arg
, quoting
=None)
33 self
.assertRaises(TypeError, ctor
, arg
,
34 quoting
=csv
.QUOTE_ALL
, quotechar
='')
35 self
.assertRaises(TypeError, ctor
, arg
,
36 quoting
=csv
.QUOTE_ALL
, quotechar
=None)
38 def test_reader_arg_valid(self
):
39 self
._test
_arg
_valid
(csv
.reader
, [])
41 def test_writer_arg_valid(self
):
42 self
._test
_arg
_valid
(csv
.writer
, StringIO())
44 def _test_default_attrs(self
, ctor
, *args
):
47 self
.assertEqual(obj
.dialect
.delimiter
, ',')
48 self
.assertEqual(obj
.dialect
.doublequote
, True)
49 self
.assertEqual(obj
.dialect
.escapechar
, None)
50 self
.assertEqual(obj
.dialect
.lineterminator
, "\r\n")
51 self
.assertEqual(obj
.dialect
.quotechar
, '"')
52 self
.assertEqual(obj
.dialect
.quoting
, csv
.QUOTE_MINIMAL
)
53 self
.assertEqual(obj
.dialect
.skipinitialspace
, False)
54 self
.assertEqual(obj
.dialect
.strict
, False)
55 # Try deleting or changing attributes (they are read-only)
56 self
.assertRaises(TypeError, delattr, obj
.dialect
, 'delimiter')
57 self
.assertRaises(TypeError, setattr, obj
.dialect
, 'delimiter', ':')
58 self
.assertRaises(AttributeError, delattr, obj
.dialect
, 'quoting')
59 self
.assertRaises(AttributeError, setattr, obj
.dialect
,
62 def test_reader_attrs(self
):
63 self
._test
_default
_attrs
(csv
.reader
, [])
65 def test_writer_attrs(self
):
66 self
._test
_default
_attrs
(csv
.writer
, StringIO())
68 def _test_kw_attrs(self
, ctor
, *args
):
69 # Now try with alternate options
70 kwargs
= dict(delimiter
=':', doublequote
=False, escapechar
='\\',
71 lineterminator
='\r', quotechar
='*',
72 quoting
=csv
.QUOTE_NONE
, skipinitialspace
=True,
74 obj
= ctor(*args
, **kwargs
)
75 self
.assertEqual(obj
.dialect
.delimiter
, ':')
76 self
.assertEqual(obj
.dialect
.doublequote
, False)
77 self
.assertEqual(obj
.dialect
.escapechar
, '\\')
78 self
.assertEqual(obj
.dialect
.lineterminator
, "\r")
79 self
.assertEqual(obj
.dialect
.quotechar
, '*')
80 self
.assertEqual(obj
.dialect
.quoting
, csv
.QUOTE_NONE
)
81 self
.assertEqual(obj
.dialect
.skipinitialspace
, True)
82 self
.assertEqual(obj
.dialect
.strict
, True)
84 def test_reader_kw_attrs(self
):
85 self
._test
_kw
_attrs
(csv
.reader
, [])
87 def test_writer_kw_attrs(self
):
88 self
._test
_kw
_attrs
(csv
.writer
, StringIO())
90 def _test_dialect_attrs(self
, ctor
, *args
):
91 # Now try with dialect-derived options
101 args
= args
+ (dialect
,)
103 self
.assertEqual(obj
.dialect
.delimiter
, '-')
104 self
.assertEqual(obj
.dialect
.doublequote
, False)
105 self
.assertEqual(obj
.dialect
.escapechar
, '^')
106 self
.assertEqual(obj
.dialect
.lineterminator
, "$")
107 self
.assertEqual(obj
.dialect
.quotechar
, '#')
108 self
.assertEqual(obj
.dialect
.quoting
, csv
.QUOTE_ALL
)
109 self
.assertEqual(obj
.dialect
.skipinitialspace
, True)
110 self
.assertEqual(obj
.dialect
.strict
, False)
112 def test_reader_dialect_attrs(self
):
113 self
._test
_dialect
_attrs
(csv
.reader
, [])
115 def test_writer_dialect_attrs(self
):
116 self
._test
_dialect
_attrs
(csv
.writer
, StringIO())
119 def _write_test(self
, fields
, expect
, **kwargs
):
120 fd
, name
= tempfile
.mkstemp()
121 fileobj
= os
.fdopen(fd
, "w+b")
123 writer
= csv
.writer(fileobj
, **kwargs
)
124 writer
.writerow(fields
)
126 self
.assertEqual(fileobj
.read(),
127 expect
+ writer
.dialect
.lineterminator
)
132 def test_write_arg_valid(self
):
133 self
.assertRaises(csv
.Error
, self
._write
_test
, None, '')
134 self
._write
_test
((), '')
135 self
._write
_test
([None], '""')
136 self
.assertRaises(csv
.Error
, self
._write
_test
,
137 [None], None, quoting
= csv
.QUOTE_NONE
)
138 # Check that exceptions are passed up the chain
142 def __getitem__(self
, i
):
145 self
.assertRaises(IOError, self
._write
_test
, BadList(), '')
149 self
.assertRaises(IOError, self
._write
_test
, [BadItem()], '')
151 def test_write_bigfield(self
):
152 # This exercises the buffer realloc functionality
153 bigstring
= 'X' * 50000
154 self
._write
_test
([bigstring
,bigstring
], '%s,%s' % \
155 (bigstring
, bigstring
))
157 def test_write_quoting(self
):
158 self
._write
_test
(['a',1,'p,q'], 'a,1,"p,q"')
159 self
.assertRaises(csv
.Error
,
161 ['a',1,'p,q'], 'a,1,p,q',
162 quoting
= csv
.QUOTE_NONE
)
163 self
._write
_test
(['a',1,'p,q'], 'a,1,"p,q"',
164 quoting
= csv
.QUOTE_MINIMAL
)
165 self
._write
_test
(['a',1,'p,q'], '"a",1,"p,q"',
166 quoting
= csv
.QUOTE_NONNUMERIC
)
167 self
._write
_test
(['a',1,'p,q'], '"a","1","p,q"',
168 quoting
= csv
.QUOTE_ALL
)
169 self
._write
_test
(['a\nb',1], '"a\nb","1"',
170 quoting
= csv
.QUOTE_ALL
)
172 def test_write_escape(self
):
173 self
._write
_test
(['a',1,'p,q'], 'a,1,"p,q"',
175 self
.assertRaises(csv
.Error
,
177 ['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
178 escapechar
=None, doublequote
=False)
179 self
._write
_test
(['a',1,'p,"q"'], 'a,1,"p,\\"q\\""',
180 escapechar
='\\', doublequote
= False)
181 self
._write
_test
(['"'], '""""',
182 escapechar
='\\', quoting
= csv
.QUOTE_MINIMAL
)
183 self
._write
_test
(['"'], '\\"',
184 escapechar
='\\', quoting
= csv
.QUOTE_MINIMAL
,
186 self
._write
_test
(['"'], '\\"',
187 escapechar
='\\', quoting
= csv
.QUOTE_NONE
)
188 self
._write
_test
(['a',1,'p,q'], 'a,1,p\\,q',
189 escapechar
='\\', quoting
= csv
.QUOTE_NONE
)
191 def test_writerows(self
):
193 def write(self
, buf
):
195 writer
= csv
.writer(BrokenFile())
196 self
.assertRaises(IOError, writer
.writerows
, [['a']])
197 fd
, name
= tempfile
.mkstemp()
198 fileobj
= os
.fdopen(fd
, "w+b")
200 writer
= csv
.writer(fileobj
)
201 self
.assertRaises(TypeError, writer
.writerows
, None)
202 writer
.writerows([['a','b'],['c','d']])
204 self
.assertEqual(fileobj
.read(), "a,b\r\nc,d\r\n")
209 def _read_test(self
, input, expect
, **kwargs
):
210 reader
= csv
.reader(input, **kwargs
)
211 result
= list(reader
)
212 self
.assertEqual(result
, expect
)
214 def test_read_oddinputs(self
):
215 self
._read
_test
([], [])
216 self
._read
_test
([''], [[]])
217 self
.assertRaises(csv
.Error
, self
._read
_test
,
218 ['"ab"c'], None, strict
= 1)
219 # cannot handle null bytes for the moment
220 self
.assertRaises(csv
.Error
, self
._read
_test
,
221 ['ab\0c'], None, strict
= 1)
222 self
._read
_test
(['"ab"c'], [['abc']], doublequote
= 0)
224 def test_read_eol(self
):
225 self
._read
_test
(['a,b'], [['a','b']])
226 self
._read
_test
(['a,b\n'], [['a','b']])
227 self
._read
_test
(['a,b\r\n'], [['a','b']])
228 self
._read
_test
(['a,b\r'], [['a','b']])
229 self
.assertRaises(csv
.Error
, self
._read
_test
, ['a,b\rc,d'], [])
230 self
.assertRaises(csv
.Error
, self
._read
_test
, ['a,b\nc,d'], [])
231 self
.assertRaises(csv
.Error
, self
._read
_test
, ['a,b\r\nc,d'], [])
233 def test_read_escape(self
):
234 self
._read
_test
(['a,\\b,c'], [['a', 'b', 'c']], escapechar
='\\')
235 self
._read
_test
(['a,b\\,c'], [['a', 'b,c']], escapechar
='\\')
236 self
._read
_test
(['a,"b\\,c"'], [['a', 'b,c']], escapechar
='\\')
237 self
._read
_test
(['a,"b,\\c"'], [['a', 'b,c']], escapechar
='\\')
238 self
._read
_test
(['a,"b,c\\""'], [['a', 'b,c"']], escapechar
='\\')
239 self
._read
_test
(['a,"b,c"\\'], [['a', 'b,c\\']], escapechar
='\\')
241 def test_read_quoting(self
):
242 self
._read
_test
(['1,",3,",5'], [['1', ',3,', '5']])
243 self
._read
_test
(['1,",3,",5'], [['1', '"', '3', '"', '5']],
244 quotechar
=None, escapechar
='\\')
245 self
._read
_test
(['1,",3,",5'], [['1', '"', '3', '"', '5']],
246 quoting
=csv
.QUOTE_NONE
, escapechar
='\\')
247 # will this fail where locale uses comma for decimals?
248 self
._read
_test
([',3,"5",7.3, 9'], [['', 3, '5', 7.3, 9]],
249 quoting
=csv
.QUOTE_NONNUMERIC
)
250 self
._read
_test
(['"a\nb", 7'], [['a\nb', ' 7']])
251 self
.assertRaises(ValueError, self
._read
_test
,
253 quoting
=csv
.QUOTE_NONNUMERIC
)
255 def test_read_bigfield(self
):
256 # This exercises the buffer realloc functionality and field size
258 limit
= csv
.field_size_limit()
261 bigstring
= 'X' * size
262 bigline
= '%s,%s' % (bigstring
, bigstring
)
263 self
._read
_test
([bigline
], [[bigstring
, bigstring
]])
264 csv
.field_size_limit(size
)
265 self
._read
_test
([bigline
], [[bigstring
, bigstring
]])
266 self
.assertEqual(csv
.field_size_limit(), size
)
267 csv
.field_size_limit(size
-1)
268 self
.assertRaises(csv
.Error
, self
._read
_test
, [bigline
], [])
269 self
.assertRaises(TypeError, csv
.field_size_limit
, None)
270 self
.assertRaises(TypeError, csv
.field_size_limit
, 1, None)
272 csv
.field_size_limit(limit
)
274 def test_read_linenum(self
):
275 for r
in (csv
.reader(['line,1', 'line,2', 'line,3']),
276 csv
.DictReader(['line,1', 'line,2', 'line,3'],
277 fieldnames
=['a', 'b', 'c'])):
278 self
.assertEqual(r
.line_num
, 0)
280 self
.assertEqual(r
.line_num
, 1)
282 self
.assertEqual(r
.line_num
, 2)
284 self
.assertEqual(r
.line_num
, 3)
285 self
.assertRaises(StopIteration, r
.next
)
286 self
.assertEqual(r
.line_num
, 3)
288 def test_roundtrip_quoteed_newlines(self
):
289 fd
, name
= tempfile
.mkstemp()
290 fileobj
= os
.fdopen(fd
, "w+b")
292 writer
= csv
.writer(fileobj
)
293 self
.assertRaises(TypeError, writer
.writerows
, None)
294 rows
= [['a\nb','b'],['c','x\r\nd']]
295 writer
.writerows(rows
)
297 for i
, row
in enumerate(csv
.reader(fileobj
)):
298 self
.assertEqual(row
, rows
[i
])
303 class TestDialectRegistry(unittest
.TestCase
):
304 def test_registry_badargs(self
):
305 self
.assertRaises(TypeError, csv
.list_dialects
, None)
306 self
.assertRaises(TypeError, csv
.get_dialect
)
307 self
.assertRaises(csv
.Error
, csv
.get_dialect
, None)
308 self
.assertRaises(csv
.Error
, csv
.get_dialect
, "nonesuch")
309 self
.assertRaises(TypeError, csv
.unregister_dialect
)
310 self
.assertRaises(csv
.Error
, csv
.unregister_dialect
, None)
311 self
.assertRaises(csv
.Error
, csv
.unregister_dialect
, "nonesuch")
312 self
.assertRaises(TypeError, csv
.register_dialect
, None)
313 self
.assertRaises(TypeError, csv
.register_dialect
, None, None)
314 self
.assertRaises(TypeError, csv
.register_dialect
, "nonesuch", 0, 0)
315 self
.assertRaises(TypeError, csv
.register_dialect
, "nonesuch",
317 self
.assertRaises(TypeError, csv
.register_dialect
, "nonesuch",
319 self
.assertRaises(TypeError, csv
.register_dialect
, [])
321 def test_registry(self
):
322 class myexceltsv(csv
.excel
):
325 expected_dialects
= csv
.list_dialects() + [name
]
326 expected_dialects
.sort()
327 csv
.register_dialect(name
, myexceltsv
)
329 self
.failUnless(csv
.get_dialect(name
).delimiter
, '\t')
330 got_dialects
= csv
.list_dialects()
332 self
.assertEqual(expected_dialects
, got_dialects
)
334 csv
.unregister_dialect(name
)
336 def test_register_kwargs(self
):
338 csv
.register_dialect(name
, delimiter
=';')
340 self
.failUnless(csv
.get_dialect(name
).delimiter
, '\t')
341 self
.failUnless(list(csv
.reader('X;Y;Z', name
)), ['X', 'Y', 'Z'])
343 csv
.unregister_dialect(name
)
345 def test_incomplete_dialect(self
):
346 class myexceltsv(csv
.Dialect
):
348 self
.assertRaises(csv
.Error
, myexceltsv
)
350 def test_space_dialect(self
):
351 class space(csv
.excel
):
353 quoting
= csv
.QUOTE_NONE
356 fd
, name
= tempfile
.mkstemp()
357 fileobj
= os
.fdopen(fd
, "w+b")
359 fileobj
.write("abc def\nc1ccccc1 benzene\n")
361 rdr
= csv
.reader(fileobj
, dialect
=space())
362 self
.assertEqual(rdr
.next(), ["abc", "def"])
363 self
.assertEqual(rdr
.next(), ["c1ccccc1", "benzene"])
368 def test_dialect_apply(self
):
369 class testA(csv
.excel
):
371 class testB(csv
.excel
):
373 class testC(csv
.excel
):
376 csv
.register_dialect('testC', testC
)
378 fd
, name
= tempfile
.mkstemp()
379 fileobj
= os
.fdopen(fd
, "w+b")
381 writer
= csv
.writer(fileobj
)
382 writer
.writerow([1,2,3])
384 self
.assertEqual(fileobj
.read(), "1,2,3\r\n")
389 fd
, name
= tempfile
.mkstemp()
390 fileobj
= os
.fdopen(fd
, "w+b")
392 writer
= csv
.writer(fileobj
, testA
)
393 writer
.writerow([1,2,3])
395 self
.assertEqual(fileobj
.read(), "1\t2\t3\r\n")
400 fd
, name
= tempfile
.mkstemp()
401 fileobj
= os
.fdopen(fd
, "w+b")
403 writer
= csv
.writer(fileobj
, dialect
=testB())
404 writer
.writerow([1,2,3])
406 self
.assertEqual(fileobj
.read(), "1:2:3\r\n")
411 fd
, name
= tempfile
.mkstemp()
412 fileobj
= os
.fdopen(fd
, "w+b")
414 writer
= csv
.writer(fileobj
, dialect
='testC')
415 writer
.writerow([1,2,3])
417 self
.assertEqual(fileobj
.read(), "1|2|3\r\n")
422 fd
, name
= tempfile
.mkstemp()
423 fileobj
= os
.fdopen(fd
, "w+b")
425 writer
= csv
.writer(fileobj
, dialect
=testA
, delimiter
=';')
426 writer
.writerow([1,2,3])
428 self
.assertEqual(fileobj
.read(), "1;2;3\r\n")
434 csv
.unregister_dialect('testC')
436 def test_bad_dialect(self
):
438 self
.assertRaises(TypeError, csv
.reader
, [], bad_attr
= 0)
440 self
.assertRaises(TypeError, csv
.reader
, [], delimiter
= None)
441 self
.assertRaises(TypeError, csv
.reader
, [], quoting
= -1)
442 self
.assertRaises(TypeError, csv
.reader
, [], quoting
= 100)
444 class TestCsvBase(unittest
.TestCase
):
445 def readerAssertEqual(self
, input, expected_result
):
446 fd
, name
= tempfile
.mkstemp()
447 fileobj
= os
.fdopen(fd
, "w+b")
451 reader
= csv
.reader(fileobj
, dialect
= self
.dialect
)
452 fields
= list(reader
)
453 self
.assertEqual(fields
, expected_result
)
458 def writerAssertEqual(self
, input, expected_result
):
459 fd
, name
= tempfile
.mkstemp()
460 fileobj
= os
.fdopen(fd
, "w+b")
462 writer
= csv
.writer(fileobj
, dialect
= self
.dialect
)
463 writer
.writerows(input)
465 self
.assertEqual(fileobj
.read(), expected_result
)
470 class TestDialectExcel(TestCsvBase
):
473 def test_single(self
):
474 self
.readerAssertEqual('abc', [['abc']])
476 def test_simple(self
):
477 self
.readerAssertEqual('1,2,3,4,5', [['1','2','3','4','5']])
479 def test_blankline(self
):
480 self
.readerAssertEqual('', [])
482 def test_empty_fields(self
):
483 self
.readerAssertEqual(',', [['', '']])
485 def test_singlequoted(self
):
486 self
.readerAssertEqual('""', [['']])
488 def test_singlequoted_left_empty(self
):
489 self
.readerAssertEqual('"",', [['','']])
491 def test_singlequoted_right_empty(self
):
492 self
.readerAssertEqual(',""', [['','']])
494 def test_single_quoted_quote(self
):
495 self
.readerAssertEqual('""""', [['"']])
497 def test_quoted_quotes(self
):
498 self
.readerAssertEqual('""""""', [['""']])
500 def test_inline_quote(self
):
501 self
.readerAssertEqual('a""b', [['a""b']])
503 def test_inline_quotes(self
):
504 self
.readerAssertEqual('a"b"c', [['a"b"c']])
506 def test_quotes_and_more(self
):
507 # Excel would never write a field containing '"a"b', but when
508 # reading one, it will return 'ab'.
509 self
.readerAssertEqual('"a"b', [['ab']])
511 def test_lone_quote(self
):
512 self
.readerAssertEqual('a"b', [['a"b']])
514 def test_quote_and_quote(self
):
515 # Excel would never write a field containing '"a" "b"', but when
516 # reading one, it will return 'a "b"'.
517 self
.readerAssertEqual('"a" "b"', [['a "b"']])
519 def test_space_and_quote(self
):
520 self
.readerAssertEqual(' "a"', [[' "a"']])
522 def test_quoted(self
):
523 self
.readerAssertEqual('1,2,3,"I think, therefore I am",5,6',
525 'I think, therefore I am',
528 def test_quoted_quote(self
):
529 self
.readerAssertEqual('1,2,3,"""I see,"" said the blind man","as he picked up his hammer and saw"',
531 '"I see," said the blind man',
532 'as he picked up his hammer and saw']])
534 def test_quoted_nl(self
):
537 said the blind man","as he picked up his
540 self
.readerAssertEqual(input,
542 '"I see,"\nsaid the blind man',
543 'as he picked up his\nhammer and saw'],
546 def test_dubious_quote(self
):
547 self
.readerAssertEqual('12,12,1",', [['12', '12', '1"', '']])
550 self
.writerAssertEqual([], '')
552 def test_single(self
):
553 self
.writerAssertEqual([['abc']], 'abc\r\n')
555 def test_simple(self
):
556 self
.writerAssertEqual([[1, 2, 'abc', 3, 4]], '1,2,abc,3,4\r\n')
558 def test_quotes(self
):
559 self
.writerAssertEqual([[1, 2, 'a"bc"', 3, 4]], '1,2,"a""bc""",3,4\r\n')
561 def test_quote_fieldsep(self
):
562 self
.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
564 def test_newlines(self
):
565 self
.writerAssertEqual([[1, 2, 'a\nbc', 3, 4]], '1,2,"a\nbc",3,4\r\n')
567 class EscapedExcel(csv
.excel
):
568 quoting
= csv
.QUOTE_NONE
571 class TestEscapedExcel(TestCsvBase
):
572 dialect
= EscapedExcel()
574 def test_escape_fieldsep(self
):
575 self
.writerAssertEqual([['abc,def']], 'abc\\,def\r\n')
577 def test_read_escape_fieldsep(self
):
578 self
.readerAssertEqual('abc\\,def\r\n', [['abc,def']])
580 class QuotedEscapedExcel(csv
.excel
):
581 quoting
= csv
.QUOTE_NONNUMERIC
584 class TestQuotedEscapedExcel(TestCsvBase
):
585 dialect
= QuotedEscapedExcel()
587 def test_write_escape_fieldsep(self
):
588 self
.writerAssertEqual([['abc,def']], '"abc,def"\r\n')
590 def test_read_escape_fieldsep(self
):
591 self
.readerAssertEqual('"abc\\,def"\r\n', [['abc,def']])
593 class TestDictFields(unittest
.TestCase
):
594 ### "long" means the row is longer than the number of fieldnames
595 ### "short" means there are fewer elements in the row than fieldnames
596 def test_write_simple_dict(self
):
597 fd
, name
= tempfile
.mkstemp()
598 fileobj
= os
.fdopen(fd
, "w+b")
600 writer
= csv
.DictWriter(fileobj
, fieldnames
= ["f1", "f2", "f3"])
601 writer
.writerow({"f1": 10, "f3": "abc"})
603 self
.assertEqual(fileobj
.read(), "10,,abc\r\n")
608 def test_write_no_fields(self
):
610 self
.assertRaises(TypeError, csv
.DictWriter
, fileobj
)
612 def test_read_dict_fields(self
):
613 fd
, name
= tempfile
.mkstemp()
614 fileobj
= os
.fdopen(fd
, "w+b")
616 fileobj
.write("1,2,abc\r\n")
618 reader
= csv
.DictReader(fileobj
,
619 fieldnames
=["f1", "f2", "f3"])
620 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
625 def test_read_dict_no_fieldnames(self
):
626 fd
, name
= tempfile
.mkstemp()
627 fileobj
= os
.fdopen(fd
, "w+b")
629 fileobj
.write("f1,f2,f3\r\n1,2,abc\r\n")
631 reader
= csv
.DictReader(fileobj
)
632 self
.assertEqual(reader
.fieldnames
, ["f1", "f2", "f3"])
633 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
638 # Two test cases to make sure existing ways of implicitly setting
639 # fieldnames continue to work. Both arise from discussion in issue3436.
640 def test_read_dict_fieldnames_from_file(self
):
641 fd
, name
= tempfile
.mkstemp()
642 f
= os
.fdopen(fd
, "w+b")
644 f
.write("f1,f2,f3\r\n1,2,abc\r\n")
646 reader
= csv
.DictReader(f
, fieldnames
=csv
.reader(f
).next())
647 self
.assertEqual(reader
.fieldnames
, ["f1", "f2", "f3"])
648 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2', "f3": 'abc'})
653 def test_read_dict_fieldnames_chain(self
):
655 fd
, name
= tempfile
.mkstemp()
656 f
= os
.fdopen(fd
, "w+b")
658 f
.write("f1,f2,f3\r\n1,2,abc\r\n")
660 reader
= csv
.DictReader(f
)
662 for row
in itertools
.chain([first
], reader
):
663 self
.assertEqual(reader
.fieldnames
, ["f1", "f2", "f3"])
664 self
.assertEqual(row
, {"f1": '1', "f2": '2', "f3": 'abc'})
669 def test_read_long(self
):
670 fd
, name
= tempfile
.mkstemp()
671 fileobj
= os
.fdopen(fd
, "w+b")
673 fileobj
.write("1,2,abc,4,5,6\r\n")
675 reader
= csv
.DictReader(fileobj
,
676 fieldnames
=["f1", "f2"])
677 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2',
678 None: ["abc", "4", "5", "6"]})
683 def test_read_long_with_rest(self
):
684 fd
, name
= tempfile
.mkstemp()
685 fileobj
= os
.fdopen(fd
, "w+b")
687 fileobj
.write("1,2,abc,4,5,6\r\n")
689 reader
= csv
.DictReader(fileobj
,
690 fieldnames
=["f1", "f2"], restkey
="_rest")
691 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2',
692 "_rest": ["abc", "4", "5", "6"]})
697 def test_read_long_with_rest_no_fieldnames(self
):
698 fd
, name
= tempfile
.mkstemp()
699 fileobj
= os
.fdopen(fd
, "w+b")
701 fileobj
.write("f1,f2\r\n1,2,abc,4,5,6\r\n")
703 reader
= csv
.DictReader(fileobj
, restkey
="_rest")
704 self
.assertEqual(reader
.fieldnames
, ["f1", "f2"])
705 self
.assertEqual(reader
.next(), {"f1": '1', "f2": '2',
706 "_rest": ["abc", "4", "5", "6"]})
711 def test_read_short(self
):
712 fd
, name
= tempfile
.mkstemp()
713 fileobj
= os
.fdopen(fd
, "w+b")
715 fileobj
.write("1,2,abc,4,5,6\r\n1,2,abc\r\n")
717 reader
= csv
.DictReader(fileobj
,
718 fieldnames
="1 2 3 4 5 6".split(),
720 self
.assertEqual(reader
.next(), {"1": '1', "2": '2', "3": 'abc',
721 "4": '4', "5": '5', "6": '6'})
722 self
.assertEqual(reader
.next(), {"1": '1', "2": '2', "3": 'abc',
723 "4": 'DEFAULT', "5": 'DEFAULT',
729 def test_read_multi(self
):
731 '2147483648,43.0e12,17,abc,def\r\n',
732 '147483648,43.0e2,17,abc,def\r\n',
733 '47483648,43.0,170,abc,def\r\n'
736 reader
= csv
.DictReader(sample
,
737 fieldnames
="i1 float i2 s1 s2".split())
738 self
.assertEqual(reader
.next(), {"i1": '2147483648',
744 def test_read_with_blanks(self
):
745 reader
= csv
.DictReader(["1,2,abc,4,5,6\r\n","\r\n",
746 "1,2,abc,4,5,6\r\n"],
747 fieldnames
="1 2 3 4 5 6".split())
748 self
.assertEqual(reader
.next(), {"1": '1', "2": '2', "3": 'abc',
749 "4": '4', "5": '5', "6": '6'})
750 self
.assertEqual(reader
.next(), {"1": '1', "2": '2', "3": 'abc',
751 "4": '4', "5": '5', "6": '6'})
753 def test_read_semi_sep(self
):
754 reader
= csv
.DictReader(["1;2;abc;4;5;6\r\n"],
755 fieldnames
="1 2 3 4 5 6".split(),
757 self
.assertEqual(reader
.next(), {"1": '1', "2": '2', "3": 'abc',
758 "4": '4', "5": '5', "6": '6'})
760 class TestArrayWrites(unittest
.TestCase
):
761 def test_int_write(self
):
763 contents
= [(20-i
) for i
in range(20)]
764 a
= array
.array('i', contents
)
766 fd
, name
= tempfile
.mkstemp()
767 fileobj
= os
.fdopen(fd
, "w+b")
769 writer
= csv
.writer(fileobj
, dialect
="excel")
771 expected
= ",".join([str(i
) for i
in a
])+"\r\n"
773 self
.assertEqual(fileobj
.read(), expected
)
778 def test_double_write(self
):
780 contents
= [(20-i
)*0.1 for i
in range(20)]
781 a
= array
.array('d', contents
)
782 fd
, name
= tempfile
.mkstemp()
783 fileobj
= os
.fdopen(fd
, "w+b")
785 writer
= csv
.writer(fileobj
, dialect
="excel")
787 expected
= ",".join([str(i
) for i
in a
])+"\r\n"
789 self
.assertEqual(fileobj
.read(), expected
)
794 def test_float_write(self
):
796 contents
= [(20-i
)*0.1 for i
in range(20)]
797 a
= array
.array('f', contents
)
798 fd
, name
= tempfile
.mkstemp()
799 fileobj
= os
.fdopen(fd
, "w+b")
801 writer
= csv
.writer(fileobj
, dialect
="excel")
803 expected
= ",".join([str(i
) for i
in a
])+"\r\n"
805 self
.assertEqual(fileobj
.read(), expected
)
810 def test_char_write(self
):
812 a
= array
.array('c', string
.letters
)
813 fd
, name
= tempfile
.mkstemp()
814 fileobj
= os
.fdopen(fd
, "w+b")
816 writer
= csv
.writer(fileobj
, dialect
="excel")
818 expected
= ",".join(a
)+"\r\n"
820 self
.assertEqual(fileobj
.read(), expected
)
825 class TestDialectValidity(unittest
.TestCase
):
826 def test_quoting(self
):
827 class mydialect(csv
.Dialect
):
831 skipinitialspace
= True
832 lineterminator
= '\r\n'
833 quoting
= csv
.QUOTE_NONE
836 mydialect
.quoting
= None
837 self
.assertRaises(csv
.Error
, mydialect
)
839 mydialect
.doublequote
= True
840 mydialect
.quoting
= csv
.QUOTE_ALL
841 mydialect
.quotechar
= '"'
844 mydialect
.quotechar
= "''"
845 self
.assertRaises(csv
.Error
, mydialect
)
847 mydialect
.quotechar
= 4
848 self
.assertRaises(csv
.Error
, mydialect
)
850 def test_delimiter(self
):
851 class mydialect(csv
.Dialect
):
855 skipinitialspace
= True
856 lineterminator
= '\r\n'
857 quoting
= csv
.QUOTE_NONE
860 mydialect
.delimiter
= ":::"
861 self
.assertRaises(csv
.Error
, mydialect
)
863 mydialect
.delimiter
= 4
864 self
.assertRaises(csv
.Error
, mydialect
)
866 def test_lineterminator(self
):
867 class mydialect(csv
.Dialect
):
871 skipinitialspace
= True
872 lineterminator
= '\r\n'
873 quoting
= csv
.QUOTE_NONE
876 mydialect
.lineterminator
= ":::"
879 mydialect
.lineterminator
= 4
880 self
.assertRaises(csv
.Error
, mydialect
)
883 class TestSniffer(unittest
.TestCase
):
885 Harry's, Arlington Heights, IL, 2/1/03, Kimi Hayes
886 Shark City, Glendale Heights, IL, 12/28/02, Prezence
887 Tommy's Place, Blue Island, IL, 12/28/02, Blue Sunday/White Crow
888 Stonecutters Seafood and Chop House, Lemont, IL, 12/19/02, Week Back
891 'Harry''s':'Arlington Heights':'IL':'2/1/03':'Kimi Hayes'
892 'Shark City':'Glendale Heights':'IL':'12/28/02':'Prezence'
893 'Tommy''s Place':'Blue Island':'IL':'12/28/02':'Blue Sunday/White Crow'
894 'Stonecutters Seafood and Chop House':'Lemont':'IL':'12/19/02':'Week Back'
897 "venue","city","state","date","performers"
900 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
901 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
902 05/05/03?05/05/03?05/05/03?05/05/03?05/05/03?05/05/03
906 2147483648;43.0e12;17;abc;def
907 147483648;43.0e2;17;abc;def
908 47483648;43.0;170;abc;def
911 sample5
= "aaa\tbbb\r\nAAA\t\r\nBBB\t\r\n"
912 sample6
= "a|b|c\r\nd|e|f\r\n"
913 sample7
= "'a'|'b'|'c'\r\n'd'|e|f\r\n"
915 def test_has_header(self
):
916 sniffer
= csv
.Sniffer()
917 self
.assertEqual(sniffer
.has_header(self
.sample1
), False)
918 self
.assertEqual(sniffer
.has_header(self
.header
+self
.sample1
), True)
920 def test_sniff(self
):
921 sniffer
= csv
.Sniffer()
922 dialect
= sniffer
.sniff(self
.sample1
)
923 self
.assertEqual(dialect
.delimiter
, ",")
924 self
.assertEqual(dialect
.quotechar
, '"')
925 self
.assertEqual(dialect
.skipinitialspace
, True)
927 dialect
= sniffer
.sniff(self
.sample2
)
928 self
.assertEqual(dialect
.delimiter
, ":")
929 self
.assertEqual(dialect
.quotechar
, "'")
930 self
.assertEqual(dialect
.skipinitialspace
, False)
932 def test_delimiters(self
):
933 sniffer
= csv
.Sniffer()
934 dialect
= sniffer
.sniff(self
.sample3
)
935 # given that all three lines in sample3 are equal,
936 # I think that any character could have been 'guessed' as the
937 # delimiter, depending on dictionary order
938 self
.assert_(dialect
.delimiter
in self
.sample3
)
939 dialect
= sniffer
.sniff(self
.sample3
, delimiters
="?,")
940 self
.assertEqual(dialect
.delimiter
, "?")
941 dialect
= sniffer
.sniff(self
.sample3
, delimiters
="/,")
942 self
.assertEqual(dialect
.delimiter
, "/")
943 dialect
= sniffer
.sniff(self
.sample4
)
944 self
.assertEqual(dialect
.delimiter
, ";")
945 dialect
= sniffer
.sniff(self
.sample5
)
946 self
.assertEqual(dialect
.delimiter
, "\t")
947 dialect
= sniffer
.sniff(self
.sample6
)
948 self
.assertEqual(dialect
.delimiter
, "|")
949 dialect
= sniffer
.sniff(self
.sample7
)
950 self
.assertEqual(dialect
.delimiter
, "|")
951 self
.assertEqual(dialect
.quotechar
, "'")
953 if not hasattr(sys
, "gettotalrefcount"):
954 if test_support
.verbose
: print "*** skipping leakage tests ***"
961 class TestLeaks(unittest
.TestCase
):
962 def test_create_read(self
):
964 lastrc
= sys
.gettotalrefcount()
967 self
.assertEqual(gc
.garbage
, [])
968 rc
= sys
.gettotalrefcount()
969 csv
.reader(["a,b,c\r\n"])
970 csv
.reader(["a,b,c\r\n"])
971 csv
.reader(["a,b,c\r\n"])
974 # if csv.reader() leaks, last delta should be 3 or more
975 self
.assertEqual(delta
< 3, True)
977 def test_create_write(self
):
979 lastrc
= sys
.gettotalrefcount()
983 self
.assertEqual(gc
.garbage
, [])
984 rc
= sys
.gettotalrefcount()
990 # if csv.writer() leaks, last delta should be 3 or more
991 self
.assertEqual(delta
< 3, True)
995 rows
= ["a,b,c\r\n"]*5
996 lastrc
= sys
.gettotalrefcount()
999 self
.assertEqual(gc
.garbage
, [])
1000 rc
= sys
.gettotalrefcount()
1001 rdr
= csv
.reader(rows
)
1006 # if reader leaks during read, delta should be 5 or more
1007 self
.assertEqual(delta
< 5, True)
1009 def test_write(self
):
1013 lastrc
= sys
.gettotalrefcount()
1014 for i
in xrange(20):
1016 self
.assertEqual(gc
.garbage
, [])
1017 rc
= sys
.gettotalrefcount()
1018 writer
= csv
.writer(s
)
1020 writer
.writerow(row
)
1023 # if writer leaks during write, last delta should be 5 or more
1024 self
.assertEqual(delta
< 5, True)
1026 # commented out for now - csv module doesn't yet support Unicode
1027 ## class TestUnicode(unittest.TestCase):
1028 ## def test_unicode_read(self):
1030 ## f = codecs.EncodedFile(StringIO("Martin von Löwis,"
1031 ## "Marc André Lemburg,"
1032 ## "Guido van Rossum,"
1033 ## "François Pinard\r\n"),
1034 ## data_encoding='iso-8859-1')
1035 ## reader = csv.reader(f)
1036 ## self.assertEqual(list(reader), [[u"Martin von Löwis",
1037 ## u"Marc André Lemburg",
1038 ## u"Guido van Rossum",
1039 ## u"François Pinardn"]])
1042 mod
= sys
.modules
[__name__
]
1043 test_support
.run_unittest(
1044 *[getattr(mod
, name
) for name
in dir(mod
) if name
.startswith('Test')]
1047 if __name__
== '__main__':