2 from __future__
import with_statement
3 from basetest
import BaseTest
, StringIO
4 import sys
, tempfile
, os
7 from logging
import getLogger
, WARN
, ERROR
8 from contextlib
import contextmanager
10 sys
.path
.insert(0, '..')
12 os
.environ
["http_proxy"] = "localhost:8000"
14 from zeroinstall
import helpers
15 from zeroinstall
.injector
import model
, gpg
, download
, trust
, background
, arch
, selections
, qdom
, run
16 from zeroinstall
.injector
.requirements
import Requirements
17 from zeroinstall
.injector
.driver
import Driver
18 from zeroinstall
.zerostore
import Store
, NotStored
; Store
._add
_with
_helper
= lambda *unused
: False
19 from zeroinstall
.support
import basedir
, tasks
, ro_rmtree
20 from zeroinstall
.injector
import fetch
30 background
._detach
= lambda: False
32 local_hello
= """<?xml version="1.0" ?>
33 <selections command="run" interface="http://example.com:8000/Hello.xml" xmlns="http://zero-install.sourceforge.net/2004/injector/interface">
34 <selection id="." local-path='.' interface="http://example.com:8000/Hello.xml" version="0.1"><command name="run" path="foo"/></selection>
38 def output_suppressed():
39 old_stdout
= sys
.stdout
40 old_stderr
= sys
.stderr
42 sys
.stdout
= StringIO()
43 sys
.stderr
= StringIO()
48 except BaseException
as ex
:
49 # Don't abort unit-tests if someone raises SystemExit
50 raise Exception(str(type(ex
)) + " " + str(ex
))
52 sys
.stdout
= old_stdout
53 sys
.stderr
= old_stderr
56 def trapped_exit(expected_exit_status
):
60 # The background handler runs in the same process
61 # as the tests, so don't let it abort.
62 if os
.getpid() == pid
:
63 raise SystemExit(code
)
64 # But, child download processes are OK
71 except SystemExit as ex
:
72 assert ex
.code
== expected_exit_status
77 def __init__(self
, reply
):
83 def download_and_execute(driver
, prog_args
, main
= None):
84 driver_download(driver
)
85 run
.execute_selections(driver
.solver
.selections
, prog_args
, stores
= driver
.config
.stores
, main
= main
)
87 def driver_download(driver
):
88 downloaded
= driver
.solve_and_download_impls()
90 tasks
.wait_for_blocker(downloaded
)
94 return 3 # NM_STATUS_CONNECTED
97 def kill_server_process():
99 if server_process
is not None:
100 # The process may still be running. See
101 # http://bugs.python.org/issue14252 for why this is so
103 server_process
.stdout
.close()
105 server_process
.kill()
108 server_process
.kill()
109 except WindowsError as e
:
110 # This is what happens when terminate
111 # is called after the process has died.
112 if e
.winerror
== 5 and e
.strerror
== 'Access is denied':
113 assert not server_process
.poll()
116 server_process
.wait()
117 server_process
= None
119 def run_server(*args
):
120 global server_process
121 assert server_process
is None
122 server_process
= server
.handle_requests(*args
)
124 real_get_selections_gui
= helpers
.get_selections_gui
126 class TestDownload(BaseTest
):
130 self
.config
.handler
.allow_downloads
= True
131 self
.config
.key_info_server
= 'http://localhost:3333/key-info'
133 self
.config
.fetcher
= fetch
.Fetcher(self
.config
)
135 stream
= tempfile
.TemporaryFile()
136 stream
.write(data
.thomas_key
)
138 gpg
.import_key(stream
)
141 trust
.trust_db
.watchers
= []
143 helpers
.get_selections_gui
= raise_gui
149 helpers
.get_selections_gui
= real_get_selections_gui
150 BaseTest
.tearDown(self
)
151 kill_server_process()
153 def testRejectKey(self
):
154 with
output_suppressed():
155 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
156 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
157 assert driver
.need_download()
158 sys
.stdin
= Reply("N\n")
160 download_and_execute(driver
, ['Hello'])
162 except model
.SafeException
as ex
:
163 if "has no usable implementations" not in str(ex
):
165 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
166 raise self
.config
.handler
.ex
167 self
.config
.handler
.ex
= None
169 def testRejectKeyXML(self
):
170 with
output_suppressed():
171 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
172 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
173 assert driver
.need_download()
174 sys
.stdin
= Reply("N\n")
176 download_and_execute(driver
, ['Hello'])
178 except model
.SafeException
as ex
:
179 if "has no usable implementations" not in str(ex
):
181 if "Not signed with a trusted key" not in str(self
.config
.handler
.ex
):
183 self
.config
.handler
.ex
= None
185 def testImport(self
):
186 from zeroinstall
.injector
import cli
188 rootLogger
= getLogger()
189 rootLogger
.disabled
= True
192 cli
.main(['--import', '-v', 'NO-SUCH-FILE'], config
= self
.config
)
194 except model
.SafeException
as ex
:
195 assert 'NO-SUCH-FILE' in str(ex
)
197 rootLogger
.disabled
= False
198 rootLogger
.setLevel(WARN
)
200 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello')
201 self
.assertEqual(None, hello
)
203 with
output_suppressed():
204 run_server('6FCF121BE2390E0B.gpg')
205 sys
.stdin
= Reply("Y\n")
207 assert not trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
208 cli
.main(['--import', 'Hello'], config
= self
.config
)
209 assert trust
.trust_db
.is_trusted('DE937DD411906ACF7C263B396FCF121BE2390E0B')
211 # Check we imported the interface after trusting the key
212 hello
= self
.config
.iface_cache
.get_feed('http://localhost:8000/Hello', force
= True)
213 self
.assertEqual(1, len(hello
.implementations
))
215 self
.assertEqual(None, hello
.local_path
)
217 # Shouldn't need to prompt the second time
219 cli
.main(['--import', 'Hello'], config
= self
.config
)
221 def testSelections(self
):
222 from zeroinstall
.injector
import cli
223 with
open("selections.xml", 'rb') as stream
:
224 root
= qdom
.parse(stream
)
225 sels
= selections
.Selections(root
)
226 class Options
: dry_run
= False
228 with
output_suppressed():
229 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
230 sys
.stdin
= Reply("Y\n")
232 self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
236 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
237 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
238 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
240 assert sels
.download_missing(self
.config
) is None
242 def testHelpers(self
):
243 from zeroinstall
import helpers
245 with
output_suppressed():
246 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
247 sys
.stdin
= Reply("Y\n")
248 sels
= helpers
.ensure_cached('http://example.com:8000/Hello.xml', config
= self
.config
)
249 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
250 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
251 assert sels
.download_missing(self
.config
) is None
253 def testSelectionsWithFeed(self
):
254 from zeroinstall
.injector
import cli
255 with
open("selections.xml", 'rb') as stream
:
256 root
= qdom
.parse(stream
)
257 sels
= selections
.Selections(root
)
259 with
output_suppressed():
260 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
261 sys
.stdin
= Reply("Y\n")
263 tasks
.wait_for_blocker(self
.config
.fetcher
.download_and_import_feed('http://example.com:8000/Hello.xml', self
.config
.iface_cache
))
265 cli
.main(['--download-only', 'selections.xml'], config
= self
.config
)
266 path
= self
.config
.stores
.lookup_any(sels
.selections
['http://example.com:8000/Hello.xml'].digests
)
267 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
269 assert sels
.download_missing(self
.config
) is None
271 def testAcceptKey(self
):
272 with
output_suppressed():
273 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
274 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
275 assert driver
.need_download()
276 sys
.stdin
= Reply("Y\n")
278 download_and_execute(driver
, ['Hello'], main
= 'Missing')
280 except model
.SafeException
as ex
:
281 if "HelloWorld/Missing" not in str(ex
):
284 def testAutoAcceptKey(self
):
285 self
.config
.auto_approve_keys
= True
286 with
output_suppressed():
287 run_server('Hello', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
288 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello'), config
= self
.config
)
289 assert driver
.need_download()
290 sys
.stdin
= Reply("")
292 download_and_execute(driver
, ['Hello'], main
= 'Missing')
294 except model
.SafeException
as ex
:
295 if "HelloWorld/Missing" not in str(ex
):
298 def testDistro(self
):
299 with
output_suppressed():
300 native_url
= 'http://example.com:8000/Native.xml'
302 # Initially, we don't have the feed at all...
303 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
304 assert master_feed
is None, master_feed
306 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
307 run_server('Native.xml', '6FCF121BE2390E0B.gpg', '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B')
308 driver
= Driver(requirements
= Requirements(native_url
), config
= self
.config
)
309 assert driver
.need_download()
311 solve
= driver
.solve_with_downloads()
312 tasks
.wait_for_blocker(solve
)
315 master_feed
= self
.config
.iface_cache
.get_feed(native_url
)
316 assert master_feed
is not None
317 assert master_feed
.implementations
== {}
319 distro_feed_url
= master_feed
.get_distro_feed()
320 assert distro_feed_url
is not None
321 distro_feed
= self
.config
.iface_cache
.get_feed(distro_feed_url
)
322 assert distro_feed
is not None
323 assert len(distro_feed
.implementations
) == 2, distro_feed
.implementations
325 def testWrongSize(self
):
326 with
output_suppressed():
327 run_server('Hello-wrong-size', '6FCF121BE2390E0B.gpg',
328 '/key-info/key/DE937DD411906ACF7C263B396FCF121BE2390E0B', 'HelloWorld.tgz')
329 driver
= Driver(requirements
= Requirements('http://localhost:8000/Hello-wrong-size'), config
= self
.config
)
330 assert driver
.need_download()
331 sys
.stdin
= Reply("Y\n")
333 download_and_execute(driver
, ['Hello'], main
= 'Missing')
335 except model
.SafeException
as ex
:
336 if "Downloaded archive has incorrect size" not in str(ex
):
339 def testRecipe(self
):
342 sys
.stdout
= StringIO()
343 run_server(('HelloWorld.tar.bz2', 'redirect/dummy_1-1_all.deb', 'dummy_1-1_all.deb'))
344 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
346 download_and_execute(driver
, [])
348 except model
.SafeException
as ex
:
349 if "HelloWorld/Missing" not in str(ex
):
354 def testRename(self
):
355 with
output_suppressed():
356 run_server(('HelloWorld.tar.bz2',))
357 requirements
= Requirements(os
.path
.abspath('RecipeRename.xml'))
358 requirements
.command
= None
359 driver
= Driver(requirements
= requirements
, config
= self
.config
)
360 driver_download(driver
)
361 digests
= driver
.solver
.selections
[requirements
.interface_uri
].digests
362 path
= self
.config
.stores
.lookup_any(digests
)
363 assert os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'minor'))
364 assert not os
.path
.exists(os
.path
.join(path
, 'HelloWorld'))
365 assert not os
.path
.exists(os
.path
.join(path
, 'HelloUniverse', 'main'))
367 def testSymlink(self
):
370 sys
.stdout
= StringIO()
371 run_server(('HelloWorld.tar.bz2', 'HelloSym.tgz'))
372 driver
= Driver(requirements
= Requirements(os
.path
.abspath('RecipeSymlink.xml')), config
= self
.config
)
374 download_and_execute(driver
, [])
376 except model
.SafeException
as ex
:
377 if 'Attempt to unpack dir over symlink "HelloWorld"' not in str(ex
):
379 self
.assertEqual(None, basedir
.load_first_cache('0install.net', 'implementations', 'main'))
383 def testAutopackage(self
):
386 sys
.stdout
= StringIO()
387 run_server('HelloWorld.autopackage')
388 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Autopackage.xml')), config
= self
.config
)
390 download_and_execute(driver
, [])
392 except model
.SafeException
as ex
:
393 if "HelloWorld/Missing" not in str(ex
):
398 def testRecipeFailure(self
):
401 sys
.stdout
= StringIO()
403 driver
= Driver(requirements
= Requirements(os
.path
.abspath('Recipe.xml')), config
= self
.config
)
405 download_and_execute(driver
, [])
407 except download
.DownloadError
as ex
:
408 if "Connection" not in str(ex
):
413 def testMirrors(self
):
414 getLogger().setLevel(logging
.ERROR
)
415 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
416 run_server(server
.Give404('/Hello.xml'),
417 '/0mirror/feeds/http/example.com:8000/Hello.xml/latest.xml',
418 '/0mirror/keys/6FCF121BE2390E0B.gpg',
419 server
.Give404('/HelloWorld.tgz'),
420 '/0mirror/feeds/http/example.com:8000/Hello.xml/impl/sha1=3ce644dc725f1d21cfcf02562c76f375944b266a')
421 driver
= Driver(requirements
= Requirements('http://example.com:8000/Hello.xml'), config
= self
.config
)
422 self
.config
.mirror
= 'http://example.com:8000/0mirror'
424 refreshed
= driver
.solve_with_downloads()
425 tasks
.wait_for_blocker(refreshed
)
426 assert driver
.solver
.ready
428 #getLogger().setLevel(logging.WARN)
429 downloaded
= driver
.download_uncached_implementations()
430 tasks
.wait_for_blocker(downloaded
)
431 path
= self
.config
.stores
.lookup_any(driver
.solver
.selections
.selections
['http://example.com:8000/Hello.xml'].digests
)
432 assert os
.path
.exists(os
.path
.join(path
, 'HelloWorld', 'main'))
434 def testReplay(self
):
437 sys
.stdout
= StringIO()
438 getLogger().setLevel(ERROR
)
439 iface
= self
.config
.iface_cache
.get_interface('http://example.com:8000/Hello.xml')
440 mtime
= int(os
.stat('Hello-new.xml').st_mtime
)
441 with
open('Hello-new.xml', 'rb') as stream
:
442 self
.config
.iface_cache
.update_feed_from_network(iface
.uri
, stream
.read(), mtime
+ 10000)
444 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
445 run_server(server
.Give404('/Hello.xml'), 'latest.xml', '/0mirror/keys/6FCF121BE2390E0B.gpg', 'Hello.xml')
446 self
.config
.mirror
= 'http://example.com:8000/0mirror'
448 # Update from mirror (should ignore out-of-date timestamp)
449 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
450 tasks
.wait_for_blocker(refreshed
)
452 # Update from upstream (should report an error)
453 refreshed
= self
.config
.fetcher
.download_and_import_feed(iface
.uri
, self
.config
.iface_cache
)
455 tasks
.wait_for_blocker(refreshed
)
456 raise Exception("Should have been rejected!")
457 except model
.SafeException
as ex
:
458 assert "New feed's modification time is before old version" in str(ex
)
460 # Must finish with the newest version
461 self
.assertEqual(1235911552, self
.config
.iface_cache
._get
_signature
_date
(iface
.uri
))
465 def testBackground(self
, verbose
= False):
466 r
= Requirements('http://example.com:8000/Hello.xml')
467 d
= Driver(requirements
= r
, config
= self
.config
)
468 self
.import_feed(r
.interface_uri
, 'Hello.xml')
469 self
.config
.freshness
= 0
470 self
.config
.network_use
= model
.network_minimal
471 d
.solver
.solve(r
.interface_uri
, arch
.get_host_architecture())
472 assert d
.solver
.ready
, d
.solver
.get_failure_reason()
475 def choose_download(registed_cb
, nid
, actions
):
477 assert actions
== ['download', 'Download'], actions
478 registed_cb(nid
, 'download')
481 traceback
.print_exc()
486 os
.environ
['DISPLAY'] = 'dummy'
489 sys
.stdout
= StringIO()
490 run_server('Hello.xml', '6FCF121BE2390E0B.gpg')
491 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
492 my_dbus
.user_callback
= choose_download
494 with
trapped_exit(1):
495 from zeroinstall
.injector
import config
496 key_info
= config
.DEFAULT_KEY_LOOKUP_SERVER
497 config
.DEFAULT_KEY_LOOKUP_SERVER
= None
499 background
.spawn_background_update(d
, verbose
)
501 config
.DEFAULT_KEY_LOOKUP_SERVER
= key_info
506 def testBackgroundVerbose(self
):
507 self
.testBackground(verbose
= True)
509 def testBackgroundApp(self
):
510 my_dbus
.system_services
= {"org.freedesktop.NetworkManager": {"/org/freedesktop/NetworkManager": NetworkManager()}}
512 trust
.trust_db
.trust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
516 with
output_suppressed():
517 # Select a version of Hello
518 run_server('Hello.xml', '6FCF121BE2390E0B.gpg', 'HelloWorld.tgz')
519 r
= Requirements('http://example.com:8000/Hello.xml')
520 driver
= Driver(requirements
= r
, config
= self
.config
)
521 tasks
.wait_for_blocker(driver
.solve_with_downloads())
522 assert driver
.solver
.ready
523 kill_server_process()
526 app
= self
.config
.app_mgr
.create_app('test-app', r
)
527 app
.set_selections(driver
.solver
.selections
)
528 timestamp
= os
.path
.join(app
.path
, 'last-checked')
529 last_check_attempt
= os
.path
.join(app
.path
, 'last-check-attempt')
530 selections_path
= os
.path
.join(app
.path
, 'selections.xml')
532 def reset_timestamps():
535 os
.utime(timestamp
, (1, 1)) # 1970
536 os
.utime(selections_path
, (1, 1))
537 if os
.path
.exists(last_check_attempt
):
538 os
.unlink(last_check_attempt
)
540 # Download the implementation
541 sels
= app
.get_selections()
542 run_server('HelloWorld.tgz')
543 tasks
.wait_for_blocker(app
.download_selections(sels
))
544 kill_server_process()
546 # Not time for a background update yet
547 self
.config
.freshness
= 100
548 dl
= app
.download_selections(sels
)
552 # Trigger a background update - no updates found
554 run_server('Hello.xml')
555 with
trapped_exit(1):
556 dl
= app
.download_selections(sels
)
559 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
560 self
.assertEqual(1, os
.stat(selections_path
).st_mtime
)
561 kill_server_process()
563 # Change the selections
564 sels_path
= os
.path
.join(app
.path
, 'selections.xml')
565 with
open(sels_path
) as stream
:
567 with
open(sels_path
, 'w') as stream
:
568 stream
.write(old
.replace('Hello', 'Goodbye'))
570 # Trigger another background update - metadata changes found
572 run_server('Hello.xml')
573 with
trapped_exit(1):
574 dl
= app
.download_selections(sels
)
577 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
578 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
579 kill_server_process()
581 # Trigger another background update - GUI needed now
583 # Delete cached implementation so we need to download it again
584 stored
= sels
.selections
['http://example.com:8000/Hello.xml'].get_path(self
.config
.stores
)
585 assert os
.path
.basename(stored
).startswith('sha1')
588 # Replace with a valid local feed so we don't have to download immediately
589 with
open(sels_path
, 'w') as stream
:
590 stream
.write(local_hello
)
591 sels
= app
.get_selections()
593 os
.environ
['DISPLAY'] = 'dummy'
595 run_server('Hello.xml')
596 with
trapped_exit(1):
597 dl
= app
.download_selections(sels
)
599 assert ran_gui
# (so doesn't actually update)
600 kill_server_process()
602 # Now again with no DISPLAY
604 del os
.environ
['DISPLAY']
605 run_server('Hello.xml', 'HelloWorld.tgz')
606 with
trapped_exit(1):
607 dl
= app
.download_selections(sels
)
609 assert not ran_gui
# (so doesn't actually update)
611 self
.assertNotEqual(1, os
.stat(timestamp
).st_mtime
)
612 self
.assertNotEqual(1, os
.stat(selections_path
).st_mtime
)
613 kill_server_process()
615 sels
= app
.get_selections()
616 sel
, = sels
.selections
.values()
617 self
.assertEqual("sha1=3ce644dc725f1d21cfcf02562c76f375944b266a", sel
.id)
620 trust
.trust_db
.untrust_key('DE937DD411906ACF7C263B396FCF121BE2390E0B', 'example.com:8000')
622 os
.environ
['DISPLAY'] = 'dummy'
624 run_server('Hello.xml')
625 with
trapped_exit(1):
626 #import logging; logging.getLogger().setLevel(logging.INFO)
627 dl
= app
.download_selections(sels
)
630 kill_server_process()
632 # Update not triggered because of last-check-attempt
634 os
.utime(timestamp
, (1, 1)) # 1970
635 os
.utime(selections_path
, (1, 1))
636 dl
= app
.download_selections(sels
)
641 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= True)
643 assert dl
._aborted
.happened
644 assert dl
.tempfile
is None
646 dl
= download
.Download("http://localhost/test.tgz", auto_delete
= False)
647 path
= dl
.tempfile
.name
649 assert not os
.path
.exists(path
)
650 assert dl
._aborted
.happened
651 assert dl
.tempfile
is None
653 if __name__
== '__main__':
657 kill_server_process()