Bundled cherrypy.
[smonitor.git] / monitor / cherrypy / test / test_caching.py
blob720a933a43c381073677de340f40e6b4d32cd7c3
1 import datetime
2 import gzip
3 from itertools import count
4 import os
5 curdir = os.path.join(os.getcwd(), os.path.dirname(__file__))
6 import sys
7 import threading
8 import time
9 import urllib
11 import cherrypy
12 from cherrypy._cpcompat import next, ntob, quote, xrange
13 from cherrypy.lib import httputil
15 gif_bytes = ntob('GIF89a\x01\x00\x01\x00\x82\x00\x01\x99"\x1e\x00\x00\x00\x00\x00'
16 '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
17 '\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x02\x03\x02\x08\t\x00;')
21 from cherrypy.test import helper
23 class CacheTest(helper.CPWebCase):
25 def setup_server():
27 class Root:
29 _cp_config = {'tools.caching.on': True}
31 def __init__(self):
32 self.counter = 0
33 self.control_counter = 0
34 self.longlock = threading.Lock()
36 def index(self):
37 self.counter += 1
38 msg = "visit #%s" % self.counter
39 return msg
40 index.exposed = True
42 def control(self):
43 self.control_counter += 1
44 return "visit #%s" % self.control_counter
45 control.exposed = True
47 def a_gif(self):
48 cherrypy.response.headers['Last-Modified'] = httputil.HTTPDate()
49 return gif_bytes
50 a_gif.exposed = True
52 def long_process(self, seconds='1'):
53 try:
54 self.longlock.acquire()
55 time.sleep(float(seconds))
56 finally:
57 self.longlock.release()
58 return 'success!'
59 long_process.exposed = True
61 def clear_cache(self, path):
62 cherrypy._cache.store[cherrypy.request.base + path].clear()
63 clear_cache.exposed = True
65 class VaryHeaderCachingServer(object):
67 _cp_config = {'tools.caching.on': True,
68 'tools.response_headers.on': True,
69 'tools.response_headers.headers': [('Vary', 'Our-Varying-Header')],
72 def __init__(self):
73 self.counter = count(1)
75 def index(self):
76 return "visit #%s" % next(self.counter)
77 index.exposed = True
79 class UnCached(object):
80 _cp_config = {'tools.expires.on': True,
81 'tools.expires.secs': 60,
82 'tools.staticdir.on': True,
83 'tools.staticdir.dir': 'static',
84 'tools.staticdir.root': curdir,
87 def force(self):
88 cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
89 self._cp_config['tools.expires.force'] = True
90 self._cp_config['tools.expires.secs'] = 0
91 return "being forceful"
92 force.exposed = True
93 force._cp_config = {'tools.expires.secs': 0}
95 def dynamic(self):
96 cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
97 cherrypy.response.headers['Cache-Control'] = 'private'
98 return "D-d-d-dynamic!"
99 dynamic.exposed = True
101 def cacheable(self):
102 cherrypy.response.headers['Etag'] = 'bibbitybobbityboo'
103 return "Hi, I'm cacheable."
104 cacheable.exposed = True
106 def specific(self):
107 cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
108 return "I am being specific"
109 specific.exposed = True
110 specific._cp_config = {'tools.expires.secs': 86400}
112 class Foo(object):pass
114 def wrongtype(self):
115 cherrypy.response.headers['Etag'] = 'need_this_to_make_me_cacheable'
116 return "Woops"
117 wrongtype.exposed = True
118 wrongtype._cp_config = {'tools.expires.secs': Foo()}
120 cherrypy.tree.mount(Root())
121 cherrypy.tree.mount(UnCached(), "/expires")
122 cherrypy.tree.mount(VaryHeaderCachingServer(), "/varying_headers")
123 cherrypy.config.update({'tools.gzip.on': True})
124 setup_server = staticmethod(setup_server)
126 def testCaching(self):
127 elapsed = 0.0
128 for trial in range(10):
129 self.getPage("/")
130 # The response should be the same every time,
131 # except for the Age response header.
132 self.assertBody('visit #1')
133 if trial != 0:
134 age = int(self.assertHeader("Age"))
135 self.assert_(age >= elapsed)
136 elapsed = age
138 # POST, PUT, DELETE should not be cached.
139 self.getPage("/", method="POST")
140 self.assertBody('visit #2')
141 # Because gzip is turned on, the Vary header should always Vary for content-encoding
142 self.assertHeader('Vary', 'Accept-Encoding')
143 # The previous request should have invalidated the cache,
144 # so this request will recalc the response.
145 self.getPage("/", method="GET")
146 self.assertBody('visit #3')
147 # ...but this request should get the cached copy.
148 self.getPage("/", method="GET")
149 self.assertBody('visit #3')
150 self.getPage("/", method="DELETE")
151 self.assertBody('visit #4')
153 # The previous request should have invalidated the cache,
154 # so this request will recalc the response.
155 self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
156 self.assertHeader('Content-Encoding', 'gzip')
157 self.assertHeader('Vary')
158 self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
160 # Now check that a second request gets the gzip header and gzipped body
161 # This also tests a bug in 3.0 to 3.0.2 whereby the cached, gzipped
162 # response body was being gzipped a second time.
163 self.getPage("/", method="GET", headers=[('Accept-Encoding', 'gzip')])
164 self.assertHeader('Content-Encoding', 'gzip')
165 self.assertEqual(cherrypy.lib.encoding.decompress(self.body), ntob("visit #5"))
167 # Now check that a third request that doesn't accept gzip
168 # skips the cache (because the 'Vary' header denies it).
169 self.getPage("/", method="GET")
170 self.assertNoHeader('Content-Encoding')
171 self.assertBody('visit #6')
173 def testVaryHeader(self):
174 self.getPage("/varying_headers/")
175 self.assertStatus("200 OK")
176 self.assertHeaderItemValue('Vary', 'Our-Varying-Header')
177 self.assertBody('visit #1')
179 # Now check that different 'Vary'-fields don't evict each other.
180 # This test creates 2 requests with different 'Our-Varying-Header'
181 # and then tests if the first one still exists.
182 self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
183 self.assertStatus("200 OK")
184 self.assertBody('visit #2')
186 self.getPage("/varying_headers/", headers=[('Our-Varying-Header', 'request 2')])
187 self.assertStatus("200 OK")
188 self.assertBody('visit #2')
190 self.getPage("/varying_headers/")
191 self.assertStatus("200 OK")
192 self.assertBody('visit #1')
194 def testExpiresTool(self):
195 # test setting an expires header
196 self.getPage("/expires/specific")
197 self.assertStatus("200 OK")
198 self.assertHeader("Expires")
200 # test exceptions for bad time values
201 self.getPage("/expires/wrongtype")
202 self.assertStatus(500)
203 self.assertInBody("TypeError")
205 # static content should not have "cache prevention" headers
206 self.getPage("/expires/index.html")
207 self.assertStatus("200 OK")
208 self.assertNoHeader("Pragma")
209 self.assertNoHeader("Cache-Control")
210 self.assertHeader("Expires")
212 # dynamic content that sets indicators should not have
213 # "cache prevention" headers
214 self.getPage("/expires/cacheable")
215 self.assertStatus("200 OK")
216 self.assertNoHeader("Pragma")
217 self.assertNoHeader("Cache-Control")
218 self.assertHeader("Expires")
220 self.getPage('/expires/dynamic')
221 self.assertBody("D-d-d-dynamic!")
222 # the Cache-Control header should be untouched
223 self.assertHeader("Cache-Control", "private")
224 self.assertHeader("Expires")
226 # configure the tool to ignore indicators and replace existing headers
227 self.getPage("/expires/force")
228 self.assertStatus("200 OK")
229 # This also gives us a chance to test 0 expiry with no other headers
230 self.assertHeader("Pragma", "no-cache")
231 if cherrypy.server.protocol_version == "HTTP/1.1":
232 self.assertHeader("Cache-Control", "no-cache, must-revalidate")
233 self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
235 # static content should now have "cache prevention" headers
236 self.getPage("/expires/index.html")
237 self.assertStatus("200 OK")
238 self.assertHeader("Pragma", "no-cache")
239 if cherrypy.server.protocol_version == "HTTP/1.1":
240 self.assertHeader("Cache-Control", "no-cache, must-revalidate")
241 self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
243 # the cacheable handler should now have "cache prevention" headers
244 self.getPage("/expires/cacheable")
245 self.assertStatus("200 OK")
246 self.assertHeader("Pragma", "no-cache")
247 if cherrypy.server.protocol_version == "HTTP/1.1":
248 self.assertHeader("Cache-Control", "no-cache, must-revalidate")
249 self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
251 self.getPage('/expires/dynamic')
252 self.assertBody("D-d-d-dynamic!")
253 # dynamic sets Cache-Control to private but it should be
254 # overwritten here ...
255 self.assertHeader("Pragma", "no-cache")
256 if cherrypy.server.protocol_version == "HTTP/1.1":
257 self.assertHeader("Cache-Control", "no-cache, must-revalidate")
258 self.assertHeader("Expires", "Sun, 28 Jan 2007 00:00:00 GMT")
260 def testLastModified(self):
261 self.getPage("/a.gif")
262 self.assertStatus(200)
263 self.assertBody(gif_bytes)
264 lm1 = self.assertHeader("Last-Modified")
266 # this request should get the cached copy.
267 self.getPage("/a.gif")
268 self.assertStatus(200)
269 self.assertBody(gif_bytes)
270 self.assertHeader("Age")
271 lm2 = self.assertHeader("Last-Modified")
272 self.assertEqual(lm1, lm2)
274 # this request should match the cached copy, but raise 304.
275 self.getPage("/a.gif", [('If-Modified-Since', lm1)])
276 self.assertStatus(304)
277 self.assertNoHeader("Last-Modified")
278 if not getattr(cherrypy.server, "using_apache", False):
279 self.assertHeader("Age")
281 def test_antistampede(self):
282 SECONDS = 4
283 # We MUST make an initial synchronous request in order to create the
284 # AntiStampedeCache object, and populate its selecting_headers,
285 # before the actual stampede.
286 self.getPage("/long_process?seconds=%d" % SECONDS)
287 self.assertBody('success!')
288 self.getPage("/clear_cache?path=" +
289 quote('/long_process?seconds=%d' % SECONDS, safe=''))
290 self.assertStatus(200)
291 sys.stdout.write("prepped... ")
292 sys.stdout.flush()
294 start = datetime.datetime.now()
295 def run():
296 self.getPage("/long_process?seconds=%d" % SECONDS)
297 # The response should be the same every time
298 self.assertBody('success!')
299 ts = [threading.Thread(target=run) for i in xrange(100)]
300 for t in ts:
301 t.start()
302 for t in ts:
303 t.join()
304 self.assertEqualDates(start, datetime.datetime.now(),
305 # Allow a second for our thread/TCP overhead etc.
306 seconds=SECONDS + 1.1)
308 def test_cache_control(self):
309 self.getPage("/control")
310 self.assertBody('visit #1')
311 self.getPage("/control")
312 self.assertBody('visit #1')
314 self.getPage("/control", headers=[('Cache-Control', 'no-cache')])
315 self.assertBody('visit #2')
316 self.getPage("/control")
317 self.assertBody('visit #2')
319 self.getPage("/control", headers=[('Pragma', 'no-cache')])
320 self.assertBody('visit #3')
321 self.getPage("/control")
322 self.assertBody('visit #3')
324 time.sleep(1)
325 self.getPage("/control", headers=[('Cache-Control', 'max-age=0')])
326 self.assertBody('visit #4')
327 self.getPage("/control")
328 self.assertBody('visit #4')