2 from __future__
import with_statement
3 from basetest
import BaseTest
, StringIO
4 import sys
, tempfile
, os
6 import logging
, warnings
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
import helpers
15 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
16 from zeroinstall
.injector
.requirements
import Requirements
17 from zeroinstall
.injector
.driver
import Driver
18 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
19 from zeroinstall
.support
import basedir
, tasks
, ro_rmtree
20 from zeroinstall
.injector
import fetch
27 def raise_gui(*args
, **kwargs
):
29 use_gui
= kwargs
.get('use_gui', True)
30 assert use_gui
!= False
31 if 'DISPLAY' in os
.environ
:
34 assert use_gui
is None
35 return helpers
.DontUseGUI
37 background
._detach
= lambda: False
39 local_hello
= """<?xml version="1.0" ?>
40 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
41 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
45 def output_suppressed():
46 old_stdout
= sys
.stdout
47 old_stderr
= sys
.stderr
49 sys
.stdout
= StringIO()
50 sys
.stderr
= StringIO()
55 except BaseException
as ex
:
56 # Don't abort unit-tests if someone raises SystemExit
57 raise Exception(str(type(ex
)) + " " + str(ex
))
59 sys
.stdout
= old_stdout
60 sys
.stderr
= old_stderr
63 def trapped_exit(expected_exit_status
):
67 # The background handler runs in the same process
68 # as the tests, so don't let it abort.
69 if os
.getpid() == pid
:
70 raise SystemExit(code
)
71 # But, child download processes are OK
78 except SystemExit as ex
:
79 assert ex
.code
== expected_exit_status
84 def resourcewarnings_suppressed():
86 if sys
.version_info
[0] < 3:
89 with warnings
.catch_warnings():
90 warnings
.filterwarnings("ignore", category
= ResourceWarning
)
95 def __init__(self
, reply
):
101 def download_and_execute(driver
, prog_args
, main
= None):
102 driver_download(driver
)
103 run
.execute_selections(driver
.solver
.selections
, prog_args
, stores
= driver
.config
.stores
, main
= main
)
105 def driver_download(driver
):
106 downloaded
= driver
.solve_and_download_impls()
108 tasks
.wait_for_blocker(downloaded
)
110 class NetworkManager
:
112 return 3 # NM_STATUS_CONNECTED
114 server_process
= None
115 def kill_server_process():
116 global server_process
117 if server_process
is not None:
118 # The process may still be running. See
119 # http://bugs.python.org/issue14252 for why this is so
121 server_process
.stdout
.close()
123 server_process
.kill()
126 server_process
.kill()
127 except WindowsError as e
:
128 # This is what happens when terminate
129 # is called after the process has died.
130 if e
.winerror
== 5 and e
.strerror
== 'Access is denied':
131 assert not server_process
.poll()
134 server_process
.wait()
135 server_process
= None
137 def run_server(*args
):
138 global server_process
139 assert server_process
is None
140 server_process
= server
.handle_requests(*args
)
142 real_get_selections_gui
= helpers
.get_selections_gui
144 class TestDownload(BaseTest
):
148 self
.config
.handler
.allow_downloads
= True
149 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
151 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
153 stream
= tempfile
.TemporaryFile()
154 stream
.write(data
.thomas_key
)
156 gpg
.import_key(stream
)
159 trust
.trust_db
.watchers
= []
161 helpers
.get_selections_gui
= raise_gui
167 helpers
.get_selections_gui
= real_get_selections_gui
168 BaseTest
.tearDown(self
)
169 kill_server_process()
171 # Flush out ResourceWarnings
172 import gc
; gc
.collect()
174 def testRejectKey(self
):
175 with
output_suppressed():
176 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
177 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
178 assert driver
.need_download()
179 sys
.stdin
= Reply("N\n")
181 download_and_execute(driver
, ['Hello'])
183 except model
.SafeException
as ex
:
184 if "has no usable implementations" not in str(ex
):
186 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
187 raise self
.config
.handler
.ex
188 self
.config
.handler
.ex
= None
190 def testRejectKeyXML(self
):
191 with
output_suppressed():
192 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
193 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
194 assert driver
.need_download()
195 sys
.stdin
= Reply("N\n")
197 download_and_execute(driver
, ['Hello'])
199 except model
.SafeException
as ex
:
200 if "has no usable implementations" not in str(ex
):
202 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
204 self
.config
.handler
.ex
= None
206 def testImport(self
):
207 from zeroinstall
.injector
import cli
209 rootLogger
= getLogger()
210 rootLogger
.disabled
= True
213 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
215 except model
.SafeException
as ex
:
216 assert 'NO-SUCH-FILE' in str(ex
)
218 rootLogger
.disabled
= False
219 rootLogger
.setLevel(WARN
)
221 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
222 self
.assertEqual(None, hello
)
224 with
output_suppressed():
225 run_server('6FCF121BE2390E0B.gpg')
226 sys
.stdin
= Reply("Y\n")
228 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
229 cli
.main(['--import', 'Hello'], config
= self
.config
)
230 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
232 # Check we imported the interface after trusting the key
233 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
234 self
.assertEqual(1, len(hello
.implementations
))
236 self
.assertEqual(None, hello
.local_path
)
238 # Shouldn't need to prompt the second time
240 cli
.main(['--import', 'Hello'], config
= self
.config
)
242 def testSelections(self
):
243 from zeroinstall
.injector
import cli
244 with
open("selections.xml", 'rb') as stream
:
245 root
= qdom
.parse(stream
)
246 sels
= selections
.Selections(root
)
247 class Options
: dry_run
= False
249 with
output_suppressed():
250 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
251 sys
.stdin
= Reply("Y\n")
253 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
257 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
258 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
259 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
261 assert sels
.download_missing(self
.config
) is None
263 def testHelpers(self
):
264 from zeroinstall
import helpers
266 with
output_suppressed():
267 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
268 sys
.stdin
= Reply("Y\n")
269 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
270 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
271 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
272 assert sels
.download_missing(self
.config
) is None
274 def testSelectionsWithFeed(self
):
275 from zeroinstall
.injector
import cli
276 with
open("selections.xml", 'rb') as stream
:
277 root
= qdom
.parse(stream
)
278 sels
= selections
.Selections(root
)
280 with
output_suppressed():
281 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
282 sys
.stdin
= Reply("Y\n")
284 tasks
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
286 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
287 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
288 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
290 assert sels
.download_missing(self
.config
) is None
292 def testAcceptKey(self
):
293 with
output_suppressed():
294 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
295 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
296 assert driver
.need_download()
297 sys
.stdin
= Reply("Y\n")
299 download_and_execute(driver
, ['Hello'], main
= 'Missing')
301 except model
.SafeException
as ex
:
302 if "HelloWorld/Missing" not in str(ex
):
305 def testAutoAcceptKey(self
):
306 self
.config
.auto_approve_keys
= True
307 with
output_suppressed():
308 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
309 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
310 assert driver
.need_download()
311 sys
.stdin
= Reply("")
313 download_and_execute(driver
, ['Hello'], main
= 'Missing')
315 except model
.SafeException
as ex
:
316 if "HelloWorld/Missing" not in str(ex
):
319 def testDistro(self
):
320 with
output_suppressed():
321 native_url
= 'http://example.com:8000/Native.xml'
323 # Initially, we don't have the feed at all...
324 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
325 assert master_feed
is None, master_feed
327 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
328 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
329 driver
= Driver(requirements
= Requirements(native_url
), config
= self
.config
)
330 assert driver
.need_download()
332 solve
= driver
.solve_with_downloads()
333 tasks
.wait_for_blocker(solve
)
336 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
337 assert master_feed
is not None
338 assert master_feed
.implementations
== {}
340 distro_feed_url
= master_feed
.get_distro_feed()
341 assert distro_feed_url
is not None
342 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
343 assert distro_feed
is not None
344 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
346 def testWrongSize(self
):
347 with
output_suppressed():
348 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
349 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
350 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello-wrong-size'), config
= self
.config
)
351 assert driver
.need_download()
352 sys
.stdin
= Reply("Y\n")
354 download_and_execute(driver
, ['Hello'], main
= 'Missing')
356 except model
.SafeException
as ex
:
357 if "Downloaded archive has incorrect size" not in str(ex
):
360 def testRecipe(self
):
363 sys
.stdout
= StringIO()
364 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
365 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
367 download_and_execute(driver
, [])
369 except model
.SafeException
as ex
:
370 if "HelloWorld/Missing" not in str(ex
):
375 def testRename(self
):
376 with
output_suppressed():
377 run_server(('HelloWorld.tar.bz2',))
378 requirements
= Requirements(os
.path
.abspath('RecipeRename.xml'))
379 requirements
.command
= None
380 driver
= Driver(requirements
= requirements
, config
= self
.config
)
381 driver_download(driver
)
382 digests
= driver
.solver
.selections
[requirements
.interface_uri
].digests
383 path
= self
.config
.stores
.lookup_any(digests
)
384 assert os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'minor'))
385 assert not os
.path
.exists(os
.path
.join(path
, 'HelloWorld'))
386 assert not os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'main'))
388 def testSymlink(self
):
391 sys
.stdout
= StringIO()
392 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
393 driver
= Driver(requirements
= Requirements(os
.path
.abspath('RecipeSymlink.xml')), config
= self
.config
)
395 download_and_execute(driver
, [])
397 except model
.SafeException
as ex
:
398 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
400 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
404 def testAutopackage(self
):
407 sys
.stdout
= StringIO()
408 run_server('HelloWorld.autopackage')
409 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Autopackage.xml')), config
= self
.config
)
411 download_and_execute(driver
, [])
413 except model
.SafeException
as ex
:
414 if "HelloWorld/Missing" not in str(ex
):
419 def testRecipeFailure(self
):
420 with
resourcewarnings_suppressed():
424 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
426 download_and_execute(driver
, [])
428 except download
.DownloadError
as ex
:
429 if "Connection" not in str(ex
):
434 def testMirrors(self
):
435 with
resourcewarnings_suppressed():
436 getLogger().setLevel(logging
.ERROR
)
437 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
438 run_server(server
.Give404('/Hello.xml'),
439 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
440 '/0mirror/keys/6FCF121BE2390E0B.gpg',
441 server
.Give404('/HelloWorld.tgz'),
442 '/0mirror/archive/http%3A%23%23example.com%3A8000%23HelloWorld.tgz')
443 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
444 self
.config
.mirror
= 'http://example.com:8000/0mirror'
446 refreshed
= driver
.solve_with_downloads()
447 tasks
.wait_for_blocker(refreshed
)
448 assert driver
.solver
.ready
450 #getLogger().setLevel(logging.WARN)
451 downloaded
= driver
.download_uncached_implementations()
452 tasks
.wait_for_blocker(downloaded
)
453 path
= self
.config
.stores
.lookup_any(driver
.solver
.selections
.selections
['http://example.com:8000/Hello.xml'].digests
)
454 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
456 def testImplMirror(self
):
457 with
resourcewarnings_suppressed():
458 # This is like testMirror, except we have a different archive (that generates the same content),
459 # rather than an exact copy of the unavailable archive.
460 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
461 run_server('/Hello.xml',
462 '/6FCF121BE2390E0B.gpg',
463 server
.Give404('/HelloWorld.tgz'),
464 server
.Give404('/0mirror/archive/http%3A%2F%2Flocalhost%3A8000%2FHelloWorld.tgz'),
465 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
466 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
467 self
.config
.mirror
= 'http://example.com:8000/0mirror'
469 refreshed
= driver
.solve_with_downloads()
470 tasks
.wait_for_blocker(refreshed
)
471 assert driver
.solver
.ready
473 getLogger().setLevel(logging
.ERROR
)
474 downloaded
= driver
.download_uncached_implementations()
475 tasks
.wait_for_blocker(downloaded
)
476 path
= self
.config
.stores
.lookup_any(driver
.solver
.selections
.selections
['http://example.com:8000/Hello.xml'].digests
)
477 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
479 def testReplay(self
):
480 with
resourcewarnings_suppressed():
483 sys
.stdout
= StringIO()
484 getLogger().setLevel(ERROR
)
485 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
486 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
487 with
open('Hello-new.xml', 'rb') as stream
:
488 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, stream
.read(), mtime
+ 10000)
490 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
491 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
492 self
.config
.mirror
= 'http://example.com:8000/0mirror'
494 # Update from mirror (should ignore out-of-date timestamp)
495 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
496 tasks
.wait_for_blocker(refreshed
)
498 # Update from upstream (should report an error)
499 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
501 tasks
.wait_for_blocker(refreshed
)
502 raise Exception("Should have been rejected!")
503 except model
.SafeException
as ex
:
504 assert "New feed's modification time is before old version" in str(ex
)
506 # Must finish with the newest version
507 self
.assertEqual(1342285569, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
511 def testBackground(self
, verbose
= False):
512 r
= Requirements('http://example.com:8000/Hello.xml')
513 d
= Driver(requirements
= r
, config
= self
.config
)
514 self
.import_feed(r
.interface_uri
, 'Hello.xml')
515 self
.config
.freshness
= 0
516 self
.config
.network_use
= model
.network_minimal
517 d
.solver
.solve(r
.interface_uri
, arch
.get_host_architecture())
518 assert d
.solver
.ready
, d
.solver
.get_failure_reason()
521 def choose_download(registed_cb
, nid
, actions
):
523 assert actions
== ['download', 'Download'], actions
524 registed_cb(nid
, 'download')
527 traceback
.print_exc()
532 os
.environ
['DISPLAY'] = 'dummy'
535 sys
.stdout
= StringIO()
536 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
537 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
538 my_dbus
.user_callback
= choose_download
540 with
trapped_exit(1):
541 from zeroinstall
.injector
import config
542 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
543 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
545 background
.spawn_background_update(d
, verbose
)
547 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
552 def testBackgroundVerbose(self
):
553 self
.testBackground(verbose
= True)
555 def testBackgroundApp(self
):
556 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
558 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
562 with
output_suppressed():
563 # Select a version of Hello
564 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
565 r
= Requirements('http://example.com:8000/Hello.xml')
566 driver
= Driver(requirements
= r
, config
= self
.config
)
567 tasks
.wait_for_blocker(driver
.solve_with_downloads())
568 assert driver
.solver
.ready
569 kill_server_process()
572 app
= self
.config
.app_mgr
.create_app('test-app', r
)
573 app
.set_selections(driver
.solver
.selections
)
574 timestamp
= os
.path
.join(app
.path
, 'last-checked')
575 last_check_attempt
= os
.path
.join(app
.path
, 'last-check-attempt')
576 selections_path
= os
.path
.join(app
.path
, 'selections.xml')
578 def reset_timestamps():
581 os
.utime(timestamp
, (1, 1)) # 1970
582 os
.utime(selections_path
, (1, 1))
583 if os
.path
.exists(last_check_attempt
):
584 os
.unlink(last_check_attempt
)
586 # Download the implementation
587 sels
= app
.get_selections()
588 run_server('HelloWorld.tgz')
589 tasks
.wait_for_blocker(app
.download_selections(sels
))
590 kill_server_process()
592 # Not time for a background update yet
593 self
.config
.freshness
= 100
594 dl
= app
.download_selections(sels
)
598 # Trigger a background update - no updates found
600 run_server('Hello.xml')
601 with
trapped_exit(1):
602 dl
= app
.download_selections(sels
)
605 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
606 self
.assertEqual(1, os
.stat(selections_path
).st_mtime
)
607 kill_server_process()
609 # Change the selections
610 sels_path
= os
.path
.join(app
.path
, 'selections.xml')
611 with
open(sels_path
) as stream
:
613 with
open(sels_path
, 'w') as stream
:
614 stream
.write(old
.replace('Hello', 'Goodbye'))
616 # Trigger another background update - metadata changes found
618 run_server('Hello.xml')
619 with
trapped_exit(1):
620 dl
= app
.download_selections(sels
)
623 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
624 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
625 kill_server_process()
627 # Trigger another background update - GUI needed now
629 # Delete cached implementation so we need to download it again
630 stored
= sels
.selections
['http://example.com:8000/Hello.xml'].get_path(self
.config
.stores
)
631 assert os
.path
.basename(stored
).startswith('sha1')
634 # Replace with a valid local feed so we don't have to download immediately
635 with
open(sels_path
, 'w') as stream
:
636 stream
.write(local_hello
)
637 sels
= app
.get_selections()
639 os
.environ
['DISPLAY'] = 'dummy'
641 run_server('Hello.xml')
642 with
trapped_exit(1):
643 dl
= app
.download_selections(sels
)
645 assert ran_gui
# (so doesn't actually update)
646 kill_server_process()
648 # Now again with no DISPLAY
650 del os
.environ
['DISPLAY']
651 run_server('Hello.xml', 'HelloWorld.tgz')
652 with
trapped_exit(1):
653 dl
= app
.download_selections(sels
)
655 assert not ran_gui
# (so doesn't actually update)
657 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
658 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
659 kill_server_process()
661 sels
= app
.get_selections()
662 sel
, = sels
.selections
.values()
663 self
.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel
.id)
666 trust
.trust_db
.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
668 os
.environ
['DISPLAY'] = 'dummy'
670 run_server('Hello.xml')
671 with
trapped_exit(1):
672 #import logging; logging.getLogger().setLevel(logging.INFO)
673 dl
= app
.download_selections(sels
)
676 kill_server_process()
678 # Update not triggered because of last-check-attempt
680 os
.utime(timestamp
, (1, 1)) # 1970
681 os
.utime(selections_path
, (1, 1))
682 dl
= app
.download_selections(sels
)
687 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= True)
689 assert dl
._aborted
.happened
690 assert dl
.tempfile
is None
692 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= False)
693 path
= dl
.tempfile
.name
695 assert not os
.path
.exists(path
)
696 assert dl
._aborted
.happened
697 assert dl
.tempfile
is None
699 def testDownloadIconFails(self
):
700 mydir
= os
.path
.dirname(os
.path
.abspath(__file__
))
701 path
= model
.canonical_iface_uri(os
.path
.join(mydir
, 'Binary.xml'))
702 iface
= self
.config
.iface_cache
.get_interface(path
)
703 blocker
= self
.config
.fetcher
.download_icon(iface
)
705 tasks
.wait_for_blocker(blocker
)
707 except download
.DownloadError
as ex
:
708 assert "Error downloading http://localhost/missing.png" in str(ex
), ex
710 if __name__
== '__main__':
714 kill_server_process()