Add Django-1.2.1
[frozenviper.git] / Django-1.2.1 / tests / regressiontests / file_uploads / tests.py
blob1395cb23ea3ecbd0e4abd26dfc74e43ff3285f53
1 #! -*- coding: utf-8 -*-
2 import os
3 import errno
4 import shutil
5 import unittest
6 from StringIO import StringIO
8 from django.core.files import temp as tempfile
9 from django.core.files.uploadedfile import SimpleUploadedFile
10 from django.test import TestCase, client
11 from django.utils import simplejson
12 from django.utils.hashcompat import sha_constructor
13 from django.http.multipartparser import MultiPartParser
15 from models import FileModel, temp_storage, UPLOAD_TO
16 import uploadhandler
18 UNICODE_FILENAME = u'test-0123456789_中文_Orléans.jpg'
20 class FileUploadTests(TestCase):
21 def test_simple_upload(self):
22 post_data = {
23 'name': 'Ringo',
24 'file_field': open(__file__),
26 response = self.client.post('/file_uploads/upload/', post_data)
27 self.assertEqual(response.status_code, 200)
29 def test_large_upload(self):
30 tdir = tempfile.gettempdir()
32 file1 = tempfile.NamedTemporaryFile(suffix=".file1", dir=tdir)
33 file1.write('a' * (2 ** 21))
34 file1.seek(0)
36 file2 = tempfile.NamedTemporaryFile(suffix=".file2", dir=tdir)
37 file2.write('a' * (10 * 2 ** 20))
38 file2.seek(0)
40 post_data = {
41 'name': 'Ringo',
42 'file_field1': file1,
43 'file_field2': file2,
46 for key in post_data.keys():
47 try:
48 post_data[key + '_hash'] = sha_constructor(post_data[key].read()).hexdigest()
49 post_data[key].seek(0)
50 except AttributeError:
51 post_data[key + '_hash'] = sha_constructor(post_data[key]).hexdigest()
53 response = self.client.post('/file_uploads/verify/', post_data)
55 self.assertEqual(response.status_code, 200)
57 def test_unicode_file_name(self):
58 tdir = tempfile.gettempdir()
60 # This file contains chinese symbols and an accented char in the name.
61 file1 = open(os.path.join(tdir, UNICODE_FILENAME.encode('utf-8')), 'w+b')
62 file1.write('b' * (2 ** 10))
63 file1.seek(0)
65 post_data = {
66 'file_unicode': file1,
69 response = self.client.post('/file_uploads/unicode_name/', post_data)
71 file1.close()
72 try:
73 os.unlink(file1.name)
74 except:
75 pass
77 self.assertEqual(response.status_code, 200)
79 def test_dangerous_file_names(self):
80 """Uploaded file names should be sanitized before ever reaching the view."""
81 # This test simulates possible directory traversal attacks by a
82 # malicious uploader We have to do some monkeybusiness here to construct
83 # a malicious payload with an invalid file name (containing os.sep or
84 # os.pardir). This similar to what an attacker would need to do when
85 # trying such an attack.
86 scary_file_names = [
87 "/tmp/hax0rd.txt", # Absolute path, *nix-style.
88 "C:\\Windows\\hax0rd.txt", # Absolute path, win-syle.
89 "C:/Windows/hax0rd.txt", # Absolute path, broken-style.
90 "\\tmp\\hax0rd.txt", # Absolute path, broken in a different way.
91 "/tmp\\hax0rd.txt", # Absolute path, broken by mixing.
92 "subdir/hax0rd.txt", # Descendant path, *nix-style.
93 "subdir\\hax0rd.txt", # Descendant path, win-style.
94 "sub/dir\\hax0rd.txt", # Descendant path, mixed.
95 "../../hax0rd.txt", # Relative path, *nix-style.
96 "..\\..\\hax0rd.txt", # Relative path, win-style.
97 "../..\\hax0rd.txt" # Relative path, mixed.
100 payload = []
101 for i, name in enumerate(scary_file_names):
102 payload.extend([
103 '--' + client.BOUNDARY,
104 'Content-Disposition: form-data; name="file%s"; filename="%s"' % (i, name),
105 'Content-Type: application/octet-stream',
107 'You got pwnd.'
109 payload.extend([
110 '--' + client.BOUNDARY + '--',
114 payload = "\r\n".join(payload)
115 r = {
116 'CONTENT_LENGTH': len(payload),
117 'CONTENT_TYPE': client.MULTIPART_CONTENT,
118 'PATH_INFO': "/file_uploads/echo/",
119 'REQUEST_METHOD': 'POST',
120 'wsgi.input': client.FakePayload(payload),
122 response = self.client.request(**r)
124 # The filenames should have been sanitized by the time it got to the view.
125 recieved = simplejson.loads(response.content)
126 for i, name in enumerate(scary_file_names):
127 got = recieved["file%s" % i]
128 self.assertEqual(got, "hax0rd.txt")
130 def test_filename_overflow(self):
131 """File names over 256 characters (dangerous on some platforms) get fixed up."""
132 name = "%s.txt" % ("f"*500)
133 payload = "\r\n".join([
134 '--' + client.BOUNDARY,
135 'Content-Disposition: form-data; name="file"; filename="%s"' % name,
136 'Content-Type: application/octet-stream',
138 'Oops.'
139 '--' + client.BOUNDARY + '--',
142 r = {
143 'CONTENT_LENGTH': len(payload),
144 'CONTENT_TYPE': client.MULTIPART_CONTENT,
145 'PATH_INFO': "/file_uploads/echo/",
146 'REQUEST_METHOD': 'POST',
147 'wsgi.input': client.FakePayload(payload),
149 got = simplejson.loads(self.client.request(**r).content)
150 self.assert_(len(got['file']) < 256, "Got a long file name (%s characters)." % len(got['file']))
152 def test_custom_upload_handler(self):
153 # A small file (under the 5M quota)
154 smallfile = tempfile.NamedTemporaryFile()
155 smallfile.write('a' * (2 ** 21))
156 smallfile.seek(0)
158 # A big file (over the quota)
159 bigfile = tempfile.NamedTemporaryFile()
160 bigfile.write('a' * (10 * 2 ** 20))
161 bigfile.seek(0)
163 # Small file posting should work.
164 response = self.client.post('/file_uploads/quota/', {'f': smallfile})
165 got = simplejson.loads(response.content)
166 self.assert_('f' in got)
168 # Large files don't go through.
169 response = self.client.post("/file_uploads/quota/", {'f': bigfile})
170 got = simplejson.loads(response.content)
171 self.assert_('f' not in got)
173 def test_broken_custom_upload_handler(self):
174 f = tempfile.NamedTemporaryFile()
175 f.write('a' * (2 ** 21))
176 f.seek(0)
178 # AttributeError: You cannot alter upload handlers after the upload has been processed.
179 self.assertRaises(
180 AttributeError,
181 self.client.post,
182 '/file_uploads/quota/broken/',
183 {'f': f}
186 def test_fileupload_getlist(self):
187 file1 = tempfile.NamedTemporaryFile()
188 file1.write('a' * (2 ** 23))
189 file1.seek(0)
191 file2 = tempfile.NamedTemporaryFile()
192 file2.write('a' * (2 * 2 ** 18))
193 file2.seek(0)
195 file2a = tempfile.NamedTemporaryFile()
196 file2a.write('a' * (5 * 2 ** 20))
197 file2a.seek(0)
199 response = self.client.post('/file_uploads/getlist_count/', {
200 'file1': file1,
201 'field1': u'test',
202 'field2': u'test3',
203 'field3': u'test5',
204 'field4': u'test6',
205 'field5': u'test7',
206 'file2': (file2, file2a)
208 got = simplejson.loads(response.content)
210 self.assertEqual(got.get('file1'), 1)
211 self.assertEqual(got.get('file2'), 2)
213 def test_file_error_blocking(self):
215 The server should not block when there are upload errors (bug #8622).
216 This can happen if something -- i.e. an exception handler -- tries to
217 access POST while handling an error in parsing POST. This shouldn't
218 cause an infinite loop!
220 class POSTAccessingHandler(client.ClientHandler):
221 """A handler that'll access POST during an exception."""
222 def handle_uncaught_exception(self, request, resolver, exc_info):
223 ret = super(POSTAccessingHandler, self).handle_uncaught_exception(request, resolver, exc_info)
224 p = request.POST
225 return ret
227 post_data = {
228 'name': 'Ringo',
229 'file_field': open(__file__),
231 # Maybe this is a little more complicated that it needs to be; but if
232 # the django.test.client.FakePayload.read() implementation changes then
233 # this test would fail. So we need to know exactly what kind of error
234 # it raises when there is an attempt to read more than the available bytes:
235 try:
236 client.FakePayload('a').read(2)
237 except Exception, reference_error:
238 pass
240 # install the custom handler that tries to access request.POST
241 self.client.handler = POSTAccessingHandler()
243 try:
244 response = self.client.post('/file_uploads/upload_errors/', post_data)
245 except reference_error.__class__, err:
246 self.failIf(
247 str(err) == str(reference_error),
248 "Caught a repeated exception that'll cause an infinite loop in file uploads."
250 except Exception, err:
251 # CustomUploadError is the error that should have been raised
252 self.assertEqual(err.__class__, uploadhandler.CustomUploadError)
254 class DirectoryCreationTests(unittest.TestCase):
256 Tests for error handling during directory creation
257 via _save_FIELD_file (ticket #6450)
259 def setUp(self):
260 self.obj = FileModel()
261 if not os.path.isdir(temp_storage.location):
262 os.makedirs(temp_storage.location)
263 if os.path.isdir(UPLOAD_TO):
264 os.chmod(UPLOAD_TO, 0700)
265 shutil.rmtree(UPLOAD_TO)
267 def tearDown(self):
268 os.chmod(temp_storage.location, 0700)
269 shutil.rmtree(temp_storage.location)
271 def test_readonly_root(self):
272 """Permission errors are not swallowed"""
273 os.chmod(temp_storage.location, 0500)
274 try:
275 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
276 except OSError, err:
277 self.assertEquals(err.errno, errno.EACCES)
278 except Exception, err:
279 self.fail("OSError [Errno %s] not raised." % errno.EACCES)
281 def test_not_a_directory(self):
282 """The correct IOError is raised when the upload directory name exists but isn't a directory"""
283 # Create a file with the upload directory name
284 fd = open(UPLOAD_TO, 'w')
285 fd.close()
286 try:
287 self.obj.testfile.save('foo.txt', SimpleUploadedFile('foo.txt', 'x'))
288 except IOError, err:
289 # The test needs to be done on a specific string as IOError
290 # is raised even without the patch (just not early enough)
291 self.assertEquals(err.args[0],
292 "%s exists and is not a directory." % UPLOAD_TO)
293 except:
294 self.fail("IOError not raised")
296 class MultiParserTests(unittest.TestCase):
298 def test_empty_upload_handlers(self):
299 # We're not actually parsing here; just checking if the parser properly
300 # instantiates with empty upload handlers.
301 parser = MultiPartParser({
302 'CONTENT_TYPE': 'multipart/form-data; boundary=_foo',
303 'CONTENT_LENGTH': '1'
304 }, StringIO('x'), [], 'utf-8')