Allow mirroring both archives and implementations
[zeroinstall.git] / tests / testdownload.py
blob3ae52592d77177f7a4c7f93169305ee63510e6a3
1 #!/usr/bin/env python
2 from __future__ import with_statement
3 from basetest import BaseTest, StringIO
4 import sys, tempfile, os
5 import unittest
6 import logging
7 from logging import getLogger, WARN, ERROR
8 from contextlib import contextmanager
10 sys.path.insert(0, '..')
12 os.environ["http_proxy"] = "localhost:8000"
14 from zeroinstall import helpers
15 from zeroinstall.injector import model, gpg, download, trust, background, arch, selections, qdom, run
16 from zeroinstall.injector.requirements import Requirements
17 from zeroinstall.injector.driver import Driver
18 from zeroinstall.zerostore import Store, NotStored; Store._add_with_helper = lambda *unused: False
19 from zeroinstall.support import basedir, tasks, ro_rmtree
20 from zeroinstall.injector import fetch
21 import data
22 import my_dbus
24 import server
26 ran_gui = False
27 def raise_gui(*args):
28 global ran_gui
29 ran_gui = True
30 background._detach = lambda: False
32 local_hello = """<?xml version="1.0" ?>
33 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
34 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
35 </selections>"""
37 @contextmanager
38 def output_suppressed():
39 old_stdout = sys.stdout
40 old_stderr = sys.stderr
41 try:
42 sys.stdout = StringIO()
43 sys.stderr = StringIO()
44 try:
45 yield
46 except Exception:
47 raise
48 except BaseException as ex:
49 # Don't abort unit-tests if someone raises SystemExit
50 raise Exception(str(type(ex)) + " " + str(ex))
51 finally:
52 sys.stdout = old_stdout
53 sys.stderr = old_stderr
55 @contextmanager
56 def trapped_exit(expected_exit_status):
57 pid = os.getpid()
58 old_exit = os._exit
59 def my_exit(code):
60 # The background handler runs in the same process
61 # as the tests, so don't let it abort.
62 if os.getpid() == pid:
63 raise SystemExit(code)
64 # But, child download processes are OK
65 old_exit(code)
66 os._exit = my_exit
67 try:
68 try:
69 yield
70 assert False
71 except SystemExit as ex:
72 assert ex.code == expected_exit_status
73 finally:
74 os._exit = old_exit
76 class Reply:
77 def __init__(self, reply):
78 self.reply = reply
80 def readline(self):
81 return self.reply
83 def download_and_execute(driver, prog_args, main = None):
84 driver_download(driver)
85 run.execute_selections(driver.solver.selections, prog_args, stores = driver.config.stores, main = main)
87 def driver_download(driver):
88 downloaded = driver.solve_and_download_impls()
89 if downloaded:
90 tasks.wait_for_blocker(downloaded)
92 class NetworkManager:
93 def state(self):
94 return 3 # NM_STATUS_CONNECTED
96 server_process = None
97 def kill_server_process():
98 global server_process
99 if server_process is not None:
100 # The process may still be running. See
101 # http://bugs.python.org/issue14252 for why this is so
102 # complicated.
103 server_process.stdout.close()
104 if os.name != 'nt':
105 server_process.kill()
106 else:
107 try:
108 server_process.kill()
109 except WindowsError as e:
110 # This is what happens when terminate
111 # is called after the process has died.
112 if e.winerror == 5 and e.strerror == 'Access is denied':
113 assert not server_process.poll()
114 else:
115 raise
116 server_process.wait()
117 server_process = None
119 def run_server(*args):
120 global server_process
121 assert server_process is None
122 server_process = server.handle_requests(*args)
124 real_get_selections_gui = helpers.get_selections_gui
126 class TestDownload(BaseTest):
127 def setUp(self):
128 BaseTest.setUp(self)
130 self.config.handler.allow_downloads = True
131 self.config.key_info_server = 'http://localhost:3333/key-info'
133 self.config.fetcher = fetch.Fetcher(self.config)
135 stream = tempfile.TemporaryFile()
136 stream.write(data.thomas_key)
137 stream.seek(0)
138 gpg.import_key(stream)
139 stream.close()
141 trust.trust_db.watchers = []
143 helpers.get_selections_gui = raise_gui
145 global ran_gui
146 ran_gui = False
148 def tearDown(self):
149 helpers.get_selections_gui = real_get_selections_gui
150 BaseTest.tearDown(self)
151 kill_server_process()
153 def testRejectKey(self):
154 with output_suppressed():
155 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
157 assert driver.need_download()
158 sys.stdin = Reply("N\n")
159 try:
160 download_and_execute(driver, ['Hello'])
161 assert 0
162 except model.SafeException as ex:
163 if "has no usable implementations" not in str(ex):
164 raise ex
165 if "Not signed with a trusted key" not in str(self.config.handler.ex):
166 raise self.config.handler.ex
167 self.config.handler.ex = None
169 def testRejectKeyXML(self):
170 with output_suppressed():
171 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
172 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
173 assert driver.need_download()
174 sys.stdin = Reply("N\n")
175 try:
176 download_and_execute(driver, ['Hello'])
177 assert 0
178 except model.SafeException as ex:
179 if "has no usable implementations" not in str(ex):
180 raise ex
181 if "Not signed with a trusted key" not in str(self.config.handler.ex):
182 raise
183 self.config.handler.ex = None
185 def testImport(self):
186 from zeroinstall.injector import cli
188 rootLogger = getLogger()
189 rootLogger.disabled = True
190 try:
191 try:
192 cli.main(['--import', '-v', 'NO-SUCH-FILE'], config = self.config)
193 assert 0
194 except model.SafeException as ex:
195 assert 'NO-SUCH-FILE' in str(ex)
196 finally:
197 rootLogger.disabled = False
198 rootLogger.setLevel(WARN)
200 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello')
201 self.assertEqual(None, hello)
203 with output_suppressed():
204 run_server('6FCF121BE2390E0B.gpg')
205 sys.stdin = Reply("Y\n")
207 assert not trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
208 cli.main(['--import', 'Hello'], config = self.config)
209 assert trust.trust_db.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
211 # Check we imported the interface after trusting the key
212 hello = self.config.iface_cache.get_feed('http://localhost:8000/Hello', force = True)
213 self.assertEqual(1, len(hello.implementations))
215 self.assertEqual(None, hello.local_path)
217 # Shouldn't need to prompt the second time
218 sys.stdin = None
219 cli.main(['--import', 'Hello'], config = self.config)
221 def testSelections(self):
222 from zeroinstall.injector import cli
223 with open("selections.xml", 'rb') as stream:
224 root = qdom.parse(stream)
225 sels = selections.Selections(root)
226 class Options: dry_run = False
228 with output_suppressed():
229 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
230 sys.stdin = Reply("Y\n")
231 try:
232 self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
233 assert False
234 except NotStored:
235 pass
236 cli.main(['--download-only', 'selections.xml'], config = self.config)
237 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
238 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
240 assert sels.download_missing(self.config) is None
242 def testHelpers(self):
243 from zeroinstall import helpers
245 with output_suppressed():
246 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
247 sys.stdin = Reply("Y\n")
248 sels = helpers.ensure_cached('http://example.com:8000/Hello.xml', config = self.config)
249 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
250 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
251 assert sels.download_missing(self.config) is None
253 def testSelectionsWithFeed(self):
254 from zeroinstall.injector import cli
255 with open("selections.xml", 'rb') as stream:
256 root = qdom.parse(stream)
257 sels = selections.Selections(root)
259 with output_suppressed():
260 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
261 sys.stdin = Reply("Y\n")
263 tasks.wait_for_blocker(self.config.fetcher.download_and_import_feed('http://example.com:8000/Hello.xml', self.config.iface_cache))
265 cli.main(['--download-only', 'selections.xml'], config = self.config)
266 path = self.config.stores.lookup_any(sels.selections['http://example.com:8000/Hello.xml'].digests)
267 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
269 assert sels.download_missing(self.config) is None
271 def testAcceptKey(self):
272 with output_suppressed():
273 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
274 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
275 assert driver.need_download()
276 sys.stdin = Reply("Y\n")
277 try:
278 download_and_execute(driver, ['Hello'], main = 'Missing')
279 assert 0
280 except model.SafeException as ex:
281 if "HelloWorld/Missing" not in str(ex):
282 raise
284 def testAutoAcceptKey(self):
285 self.config.auto_approve_keys = True
286 with output_suppressed():
287 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
288 driver = Driver(requirements = Requirements('http://localhost:8000/Hello'), config = self.config)
289 assert driver.need_download()
290 sys.stdin = Reply("")
291 try:
292 download_and_execute(driver, ['Hello'], main = 'Missing')
293 assert 0
294 except model.SafeException as ex:
295 if "HelloWorld/Missing" not in str(ex):
296 raise
298 def testDistro(self):
299 with output_suppressed():
300 native_url = 'http://example.com:8000/Native.xml'
302 # Initially, we don't have the feed at all...
303 master_feed = self.config.iface_cache.get_feed(native_url)
304 assert master_feed is None, master_feed
306 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
307 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
308 driver = Driver(requirements = Requirements(native_url), config = self.config)
309 assert driver.need_download()
311 solve = driver.solve_with_downloads()
312 tasks.wait_for_blocker(solve)
313 tasks.check(solve)
315 master_feed = self.config.iface_cache.get_feed(native_url)
316 assert master_feed is not None
317 assert master_feed.implementations == {}
319 distro_feed_url = master_feed.get_distro_feed()
320 assert distro_feed_url is not None
321 distro_feed = self.config.iface_cache.get_feed(distro_feed_url)
322 assert distro_feed is not None
323 assert len(distro_feed.implementations) == 2, distro_feed.implementations
325 def testWrongSize(self):
326 with output_suppressed():
327 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
328 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
329 driver = Driver(requirements = Requirements('http://localhost:8000/Hello-wrong-size'), config = self.config)
330 assert driver.need_download()
331 sys.stdin = Reply("Y\n")
332 try:
333 download_and_execute(driver, ['Hello'], main = 'Missing')
334 assert 0
335 except model.SafeException as ex:
336 if "Downloaded archive has incorrect size" not in str(ex):
337 raise ex
339 def testRecipe(self):
340 old_out = sys.stdout
341 try:
342 sys.stdout = StringIO()
343 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
344 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
345 try:
346 download_and_execute(driver, [])
347 assert False
348 except model.SafeException as ex:
349 if "HelloWorld/Missing" not in str(ex):
350 raise ex
351 finally:
352 sys.stdout = old_out
354 def testRename(self):
355 with output_suppressed():
356 run_server(('HelloWorld.tar.bz2',))
357 requirements = Requirements(os.path.abspath('RecipeRename.xml'))
358 requirements.command = None
359 driver = Driver(requirements = requirements, config = self.config)
360 driver_download(driver)
361 digests = driver.solver.selections[requirements.interface_uri].digests
362 path = self.config.stores.lookup_any(digests)
363 assert os.path.exists(os.path.join(path, 'HelloUniverse', 'minor'))
364 assert not os.path.exists(os.path.join(path, 'HelloWorld'))
365 assert not os.path.exists(os.path.join(path, 'HelloUniverse', 'main'))
367 def testSymlink(self):
368 old_out = sys.stdout
369 try:
370 sys.stdout = StringIO()
371 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
372 driver = Driver(requirements = Requirements(os.path.abspath('RecipeSymlink.xml')), config = self.config)
373 try:
374 download_and_execute(driver, [])
375 assert False
376 except model.SafeException as ex:
377 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex):
378 raise
379 self.assertEqual(None, basedir.load_first_cache('0install.net', 'implementations', 'main'))
380 finally:
381 sys.stdout = old_out
383 def testAutopackage(self):
384 old_out = sys.stdout
385 try:
386 sys.stdout = StringIO()
387 run_server('HelloWorld.autopackage')
388 driver = Driver(requirements = Requirements(os.path.abspath('Autopackage.xml')), config = self.config)
389 try:
390 download_and_execute(driver, [])
391 assert False
392 except model.SafeException as ex:
393 if "HelloWorld/Missing" not in str(ex):
394 raise
395 finally:
396 sys.stdout = old_out
398 def testRecipeFailure(self):
399 old_out = sys.stdout
400 try:
401 run_server('*')
402 driver = Driver(requirements = Requirements(os.path.abspath('Recipe.xml')), config = self.config)
403 try:
404 download_and_execute(driver, [])
405 assert False
406 except download.DownloadError as ex:
407 if "Connection" not in str(ex):
408 raise
409 finally:
410 sys.stdout = old_out
412 def testMirrors(self):
413 getLogger().setLevel(logging.ERROR)
414 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
415 run_server(server.Give404('/Hello.xml'),
416 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
417 '/0mirror/keys/6FCF121BE2390E0B.gpg',
418 server.Give404('/HelloWorld.tgz'),
419 '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz')
420 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
421 self.config.mirror = 'http://example.com:8000/0mirror'
423 refreshed = driver.solve_with_downloads()
424 tasks.wait_for_blocker(refreshed)
425 assert driver.solver.ready
427 #getLogger().setLevel(logging.WARN)
428 downloaded = driver.download_uncached_implementations()
429 tasks.wait_for_blocker(downloaded)
430 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
431 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
433 def testImplMirror(self):
434 # This is like testMirror, except we have a different archive (that generates the same content),
435 # rather than an exact copy of the unavailable archive.
436 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
437 run_server('/Hello.xml',
438 '/6FCF121BE2390E0B.gpg',
439 server.Give404('/HelloWorld.tgz'),
440 server.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'),
441 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
442 driver = Driver(requirements = Requirements('http://example.com:8000/Hello.xml'), config = self.config)
443 self.config.mirror = 'http://example.com:8000/0mirror'
445 refreshed = driver.solve_with_downloads()
446 tasks.wait_for_blocker(refreshed)
447 assert driver.solver.ready
449 getLogger().setLevel(logging.ERROR)
450 downloaded = driver.download_uncached_implementations()
451 tasks.wait_for_blocker(downloaded)
452 path = self.config.stores.lookup_any(driver.solver.selections.selections['http://example.com:8000/Hello.xml'].digests)
453 assert os.path.exists(os.path.join(path, 'HelloWorld', 'main'))
455 def testReplay(self):
456 old_out = sys.stdout
457 try:
458 sys.stdout = StringIO()
459 getLogger().setLevel(ERROR)
460 iface = self.config.iface_cache.get_interface('http://example.com:8000/Hello.xml')
461 mtime = int(os.stat('Hello-new.xml').st_mtime)
462 with open('Hello-new.xml', 'rb') as stream:
463 self.config.iface_cache.update_feed_from_network(iface.uri, stream.read(), mtime + 10000)
465 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
466 run_server(server.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
467 self.config.mirror = 'http://example.com:8000/0mirror'
469 # Update from mirror (should ignore out-of-date timestamp)
470 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
471 tasks.wait_for_blocker(refreshed)
473 # Update from upstream (should report an error)
474 refreshed = self.config.fetcher.download_and_import_feed(iface.uri, self.config.iface_cache)
475 try:
476 tasks.wait_for_blocker(refreshed)
477 raise Exception("Should have been rejected!")
478 except model.SafeException as ex:
479 assert "New feed's modification time is before old version" in str(ex)
481 # Must finish with the newest version
482 self.assertEqual(1342285569, self.config.iface_cache._get_signature_date(iface.uri))
483 finally:
484 sys.stdout = old_out
486 def testBackground(self, verbose = False):
487 r = Requirements('http://example.com:8000/Hello.xml')
488 d = Driver(requirements = r, config = self.config)
489 self.import_feed(r.interface_uri, 'Hello.xml')
490 self.config.freshness = 0
491 self.config.network_use = model.network_minimal
492 d.solver.solve(r.interface_uri, arch.get_host_architecture())
493 assert d.solver.ready, d.solver.get_failure_reason()
495 @tasks.async
496 def choose_download(registed_cb, nid, actions):
497 try:
498 assert actions == ['download', 'Download'], actions
499 registed_cb(nid, 'download')
500 except:
501 import traceback
502 traceback.print_exc()
503 yield None
505 global ran_gui
506 ran_gui = False
507 os.environ['DISPLAY'] = 'dummy'
508 old_out = sys.stdout
509 try:
510 sys.stdout = StringIO()
511 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
512 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
513 my_dbus.user_callback = choose_download
515 with trapped_exit(1):
516 from zeroinstall.injector import config
517 key_info = config.DEFAULT_KEY_LOOKUP_SERVER
518 config.DEFAULT_KEY_LOOKUP_SERVER = None
519 try:
520 background.spawn_background_update(d, verbose)
521 finally:
522 config.DEFAULT_KEY_LOOKUP_SERVER = key_info
523 finally:
524 sys.stdout = old_out
525 assert ran_gui
527 def testBackgroundVerbose(self):
528 self.testBackground(verbose = True)
530 def testBackgroundApp(self):
531 my_dbus.system_services = {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
533 trust.trust_db.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
535 global ran_gui
537 with output_suppressed():
538 # Select a version of Hello
539 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
540 r = Requirements('http://example.com:8000/Hello.xml')
541 driver = Driver(requirements = r, config = self.config)
542 tasks.wait_for_blocker(driver.solve_with_downloads())
543 assert driver.solver.ready
544 kill_server_process()
546 # Save it as an app
547 app = self.config.app_mgr.create_app('test-app', r)
548 app.set_selections(driver.solver.selections)
549 timestamp = os.path.join(app.path, 'last-checked')
550 last_check_attempt = os.path.join(app.path, 'last-check-attempt')
551 selections_path = os.path.join(app.path, 'selections.xml')
553 def reset_timestamps():
554 global ran_gui
555 ran_gui = False
556 os.utime(timestamp, (1, 1)) # 1970
557 os.utime(selections_path, (1, 1))
558 if os.path.exists(last_check_attempt):
559 os.unlink(last_check_attempt)
561 # Download the implementation
562 sels = app.get_selections()
563 run_server('HelloWorld.tgz')
564 tasks.wait_for_blocker(app.download_selections(sels))
565 kill_server_process()
567 # Not time for a background update yet
568 self.config.freshness = 100
569 dl = app.download_selections(sels)
570 assert dl == None
571 assert not ran_gui
573 # Trigger a background update - no updates found
574 reset_timestamps()
575 run_server('Hello.xml')
576 with trapped_exit(1):
577 dl = app.download_selections(sels)
578 assert dl == None
579 assert not ran_gui
580 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
581 self.assertEqual(1, os.stat(selections_path).st_mtime)
582 kill_server_process()
584 # Change the selections
585 sels_path = os.path.join(app.path, 'selections.xml')
586 with open(sels_path) as stream:
587 old = stream.read()
588 with open(sels_path, 'w') as stream:
589 stream.write(old.replace('Hello', 'Goodbye'))
591 # Trigger another background update - metadata changes found
592 reset_timestamps()
593 run_server('Hello.xml')
594 with trapped_exit(1):
595 dl = app.download_selections(sels)
596 assert dl == None
597 assert not ran_gui
598 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
599 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
600 kill_server_process()
602 # Trigger another background update - GUI needed now
604 # Delete cached implementation so we need to download it again
605 stored = sels.selections['http://example.com:8000/Hello.xml'].get_path(self.config.stores)
606 assert os.path.basename(stored).startswith('sha1')
607 ro_rmtree(stored)
609 # Replace with a valid local feed so we don't have to download immediately
610 with open(sels_path, 'w') as stream:
611 stream.write(local_hello)
612 sels = app.get_selections()
614 os.environ['DISPLAY'] = 'dummy'
615 reset_timestamps()
616 run_server('Hello.xml')
617 with trapped_exit(1):
618 dl = app.download_selections(sels)
619 assert dl == None
620 assert ran_gui # (so doesn't actually update)
621 kill_server_process()
623 # Now again with no DISPLAY
624 reset_timestamps()
625 del os.environ['DISPLAY']
626 run_server('Hello.xml', 'HelloWorld.tgz')
627 with trapped_exit(1):
628 dl = app.download_selections(sels)
629 assert dl == None
630 assert not ran_gui # (so doesn't actually update)
632 self.assertNotEqual(1, os.stat(timestamp).st_mtime)
633 self.assertNotEqual(1, os.stat(selections_path).st_mtime)
634 kill_server_process()
636 sels = app.get_selections()
637 sel, = sels.selections.values()
638 self.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel.id)
640 # Untrust the key
641 trust.trust_db.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
643 os.environ['DISPLAY'] = 'dummy'
644 reset_timestamps()
645 run_server('Hello.xml')
646 with trapped_exit(1):
647 #import logging; logging.getLogger().setLevel(logging.INFO)
648 dl = app.download_selections(sels)
649 assert dl == None
650 assert ran_gui
651 kill_server_process()
653 # Update not triggered because of last-check-attempt
654 ran_gui = False
655 os.utime(timestamp, (1, 1)) # 1970
656 os.utime(selections_path, (1, 1))
657 dl = app.download_selections(sels)
658 assert dl == None
659 assert not ran_gui
661 def testAbort(self):
662 dl = download.Download("http://localhost/test.tgz", auto_delete = True)
663 dl.abort()
664 assert dl._aborted.happened
665 assert dl.tempfile is None
667 dl = download.Download("http://localhost/test.tgz", auto_delete = False)
668 path = dl.tempfile.name
669 dl.abort()
670 assert not os.path.exists(path)
671 assert dl._aborted.happened
672 assert dl.tempfile is None
674 if __name__ == '__main__':
675 try:
676 unittest.main()
677 finally:
678 kill_server_process()