1 #! -*- coding: utf-8 -*-
6 from StringIO
import StringIO
8 from django
.core
.files
import temp
as tempfile
9 from django
.core
.files
.uploadedfile
import SimpleUploadedFile
10 from django
.test
import TestCase
, client
11 from django
.utils
import simplejson
12 from django
.utils
.hashcompat
import sha_constructor
13 from django
.http
.multipartparser
import MultiPartParser
15 from models
import FileModel
, temp_storage
, UPLOAD_TO
18 UNICODE_FILENAME
= u
'test-0123456789_中文_Orléans.jpg'
20 class FileUploadTests(TestCase
):
21 def test_simple_upload(self
):
24 'file_field': open(__file__
),
26 response
= self
.client
.post('/file_uploads/upload/', post_data
)
27 self
.assertEqual(response
.status_code
, 200)
29 def test_large_upload(self
):
30 tdir
= tempfile
.gettempdir()
32 file1
= tempfile
.NamedTemporaryFile(suffix
=".file1", dir=tdir
)
33 file1
.write('a' * (2 ** 21))
36 file2
= tempfile
.NamedTemporaryFile(suffix
=".file2", dir=tdir
)
37 file2
.write('a' * (10 * 2 ** 20))
46 for key
in post_data
.keys():
48 post_data
[key
+ '_hash'] = sha_constructor(post_data
[key
].read()).hexdigest()
49 post_data
[key
].seek(0)
50 except AttributeError:
51 post_data
[key
+ '_hash'] = sha_constructor(post_data
[key
]).hexdigest()
53 response
= self
.client
.post('/file_uploads/verify/', post_data
)
55 self
.assertEqual(response
.status_code
, 200)
57 def test_unicode_file_name(self
):
58 tdir
= tempfile
.gettempdir()
60 # This file contains chinese symbols and an accented char in the name.
61 file1
= open(os
.path
.join(tdir
, UNICODE_FILENAME
.encode('utf-8')), 'w+b')
62 file1
.write('b' * (2 ** 10))
66 'file_unicode': file1
,
69 response
= self
.client
.post('/file_uploads/unicode_name/', post_data
)
77 self
.assertEqual(response
.status_code
, 200)
79 def test_dangerous_file_names(self
):
80 """Uploaded file names should be sanitized before ever reaching the view."""
81 # This test simulates possible directory traversal attacks by a
82 # malicious uploader We have to do some monkeybusiness here to construct
83 # a malicious payload with an invalid file name (containing os.sep or
84 # os.pardir). This similar to what an attacker would need to do when
85 # trying such an attack.
87 "/tmp/hax0rd.txt", # Absolute path, *nix-style.
88 "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
89 "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
90 "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
91 "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
92 "subdir/hax0rd.txt", # Descendant path, *nix-style.
93 "subdir\\hax0rd.txt", # Descendant path, win-style.
94 "sub/dir\\hax0rd.txt", # Descendant path, mixed.
95 "../../hax0rd.txt", # Relative path, *nix-style.
96 "..\\..\\hax0rd.txt", # Relative path, win-style.
97 "../..\\hax0rd.txt" # Relative path, mixed.
101 for i
, name
in enumerate(scary_file_names
):
103 '--' + client
.BOUNDARY
,
104 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i
, name
),
105 'Content-Type: application/octet-stream',
110 '--' + client
.BOUNDARY
+ '--',
114 payload
= "\r\n".join(payload
)
116 'CONTENT_LENGTH': len(payload
),
117 'CONTENT_TYPE': client
.MULTIPART_CONTENT
,
118 'PATH_INFO': "/file_uploads/echo/",
119 'REQUEST_METHOD': 'POST',
120 'wsgi.input': client
.FakePayload(payload
),
122 response
= self
.client
.request(**r
)
124 # The filenames should have been sanitized by the time it got to the view.
125 recieved
= simplejson
.loads(response
.content
)
126 for i
, name
in enumerate(scary_file_names
):
127 got
= recieved
["file%s" % i
]
128 self
.assertEqual(got
, "hax0rd.txt")
130 def test_filename_overflow(self
):
131 """File names over 256 characters (dangerous on some platforms) get fixed up."""
132 name
= "%s.txt" % ("f"*500)
133 payload
= "\r\n".join([
134 '--' + client
.BOUNDARY
,
135 'Content-Disposition: form-data; name="file"; filename="%s"' % name
,
136 'Content-Type: application/octet-stream',
139 '--' + client
.BOUNDARY
+ '--',
143 'CONTENT_LENGTH': len(payload
),
144 'CONTENT_TYPE': client
.MULTIPART_CONTENT
,
145 'PATH_INFO': "/file_uploads/echo/",
146 'REQUEST_METHOD': 'POST',
147 'wsgi.input': client
.FakePayload(payload
),
149 got
= simplejson
.loads(self
.client
.request(**r
).content
)
150 self
.assert_(len(got
['file']) < 256, "Got a long file name (%s characters)." % len(got
['file']))
152 def test_custom_upload_handler(self
):
153 # A small file (under the 5M quota)
154 smallfile
= tempfile
.NamedTemporaryFile()
155 smallfile
.write('a' * (2 ** 21))
158 # A big file (over the quota)
159 bigfile
= tempfile
.NamedTemporaryFile()
160 bigfile
.write('a' * (10 * 2 ** 20))
163 # Small file posting should work.
164 response
= self
.client
.post('/file_uploads/quota/', {'f': smallfile
})
165 got
= simplejson
.loads(response
.content
)
166 self
.assert_('f' in got
)
168 # Large files don't go through.
169 response
= self
.client
.post("/file_uploads/quota/", {'f': bigfile
})
170 got
= simplejson
.loads(response
.content
)
171 self
.assert_('f' not in got
)
173 def test_broken_custom_upload_handler(self
):
174 f
= tempfile
.NamedTemporaryFile()
175 f
.write('a' * (2 ** 21))
178 # AttributeError: You cannot alter upload handlers after the upload has been processed.
182 '/file_uploads/quota/broken/',
186 def test_fileupload_getlist(self
):
187 file1
= tempfile
.NamedTemporaryFile()
188 file1
.write('a' * (2 ** 23))
191 file2
= tempfile
.NamedTemporaryFile()
192 file2
.write('a' * (2 * 2 ** 18))
195 file2a
= tempfile
.NamedTemporaryFile()
196 file2a
.write('a' * (5 * 2 ** 20))
199 response
= self
.client
.post('/file_uploads/getlist_count/', {
206 'file2': (file2
, file2a
)
208 got
= simplejson
.loads(response
.content
)
210 self
.assertEqual(got
.get('file1'), 1)
211 self
.assertEqual(got
.get('file2'), 2)
213 def test_file_error_blocking(self
):
215 The server should not block when there are upload errors (bug #8622).
216 This can happen if something -- i.e. an exception handler -- tries to
217 access POST while handling an error in parsing POST. This shouldn't
218 cause an infinite loop!
220 class POSTAccessingHandler(client
.ClientHandler
):
221 """A handler that'll access POST during an exception."""
222 def handle_uncaught_exception(self
, request
, resolver
, exc_info
):
223 ret
= super(POSTAccessingHandler
, self
).handle_uncaught_exception(request
, resolver
, exc_info
)
229 'file_field': open(__file__
),
231 # Maybe this is a little more complicated that it needs to be; but if
232 # the django.test.client.FakePayload.read() implementation changes then
233 # this test would fail. So we need to know exactly what kind of error
234 # it raises when there is an attempt to read more than the available bytes:
236 client
.FakePayload('a').read(2)
237 except Exception, reference_error
:
240 # install the custom handler that tries to access request.POST
241 self
.client
.handler
= POSTAccessingHandler()
244 response
= self
.client
.post('/file_uploads/upload_errors/', post_data
)
245 except reference_error
.__class
__, err
:
247 str(err
) == str(reference_error
),
248 "Caught a repeated exception that'll cause an infinite loop in file uploads."
250 except Exception, err
:
251 # CustomUploadError is the error that should have been raised
252 self
.assertEqual(err
.__class
__, uploadhandler
.CustomUploadError
)
254 class DirectoryCreationTests(unittest
.TestCase
):
256 Tests for error handling during directory creation
257 via _save_FIELD_file (ticket #6450)
260 self
.obj
= FileModel()
261 if not os
.path
.isdir(temp_storage
.location
):
262 os
.makedirs(temp_storage
.location
)
263 if os
.path
.isdir(UPLOAD_TO
):
264 os
.chmod(UPLOAD_TO
, 0700)
265 shutil
.rmtree(UPLOAD_TO
)
268 os
.chmod(temp_storage
.location
, 0700)
269 shutil
.rmtree(temp_storage
.location
)
271 def test_readonly_root(self
):
272 """Permission errors are not swallowed"""
273 os
.chmod(temp_storage
.location
, 0500)
275 self
.obj
.testfile
.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
277 self
.assertEquals(err
.errno
, errno
.EACCES
)
278 except Exception, err
:
279 self
.fail("OSError [Errno %s] not raised." % errno
.EACCES
)
281 def test_not_a_directory(self
):
282 """The correct IOError is raised when the upload directory name exists but isn't a directory"""
283 # Create a file with the upload directory name
284 fd
= open(UPLOAD_TO
, 'w')
287 self
.obj
.testfile
.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
289 # The test needs to be done on a specific string as IOError
290 # is raised even without the patch (just not early enough)
291 self
.assertEquals(err
.args
[0],
292 "%s exists and is not a directory." % UPLOAD_TO
)
294 self
.fail("IOError not raised")
296 class MultiParserTests(unittest
.TestCase
):
298 def test_empty_upload_handlers(self
):
299 # We're not actually parsing here; just checking if the parser properly
300 # instantiates with empty upload handlers.
301 parser
= MultiPartParser({
302 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
303 'CONTENT_LENGTH': '1'
304 }, StringIO('x'), [], 'utf-8')