Updated @since epydoc for new release
[zeroinstall.git] / zeroinstall / apps.py
blob7f9bcf3d5e57d2ebf20d15de4848d264d6326889
1 """
2 Support for managing apps (as created with "0install add").
3 @since: 1.9
4 """
6 # Copyright (C) 2012, Thomas Leonard
7 # See the README file for details, or visit http://0install.net.
9 from zeroinstall import _, SafeException
10 from zeroinstall.support import basedir, portable_rename
11 from zeroinstall.injector import namespaces, selections, qdom
12 from logging import warn, info
13 import re, os, time, tempfile
15 # Avoid characters that are likely to cause problems (reject : and ; everywhere
16 # so that apps can be portable between POSIX and Windows).
17 valid_name = re.compile(r'''^[^./\\:=;'"][^/\\:=;'"]*$''')
19 def validate_name(name):
20 if valid_name.match(name): return
21 raise SafeException("Invalid application name '{name}'".format(name = name))
23 def find_bin_dir(paths = None):
24 """Find the first writable path in the list (default $PATH),
25 skipping /bin, /sbin and everything under /usr except /usr/local/bin"""
26 if paths is None:
27 paths = os.environ['PATH'].split(os.pathsep)
28 for path in paths:
29 if path.startswith('/usr/') and not path.startswith('/usr/local/bin'):
30 # (/usr/local/bin is OK if we're running as root)
31 pass
32 elif path.startswith('/bin') or path.startswith('/sbin'):
33 pass
34 elif os.path.realpath(path).startswith(basedir.xdg_cache_home):
35 pass # print "Skipping cache", first_path
36 elif not os.access(path, os.W_OK):
37 pass # print "No access", first_path
38 else:
39 break
40 else:
41 return None
43 return path
45 _command_template = """#!/bin/sh
46 exec 0install run {app} "$@"
47 """
49 class App:
50 def __init__(self, config, path):
51 self.config = config
52 self.path = path
54 def set_selections(self, sels):
55 """Store a new set of selections. We include today's date in the filename
56 so that we keep a history of previous selections (max one per day), in case
57 we want to to roll back later."""
58 date = time.strftime('%Y-%m-%d')
59 sels_file = os.path.join(self.path, 'selections-{date}.xml'.format(date = date))
60 dom = sels.toDOM()
62 tmp = tempfile.NamedTemporaryFile(prefix = 'selections.xml-', dir = self.path, delete = False)
63 try:
64 dom.writexml(tmp, addindent=" ", newl="\n", encoding = 'utf-8')
65 except:
66 tmp.close()
67 os.unlink(tmp.name)
68 raise
69 tmp.close()
70 portable_rename(tmp.name, sels_file)
72 sels_latest = os.path.join(self.path, 'selections.xml')
73 if os.path.exists(sels_latest):
74 os.unlink(sels_latest)
75 os.symlink(os.path.basename(sels_file), sels_latest)
77 self.set_last_checked()
79 def get_selections(self):
80 """Load the selections. Does not check whether they are cached, nor trigger updates."""
81 sels_file = os.path.join(self.path, 'selections.xml')
82 with open(sels_file) as stream:
83 return selections.Selections(qdom.parse(stream))
85 def download_selections(self, sels):
86 stores = self.config.stores
88 # Check the selections are still available
89 blocker = sels.download_missing(self.config) # TODO: package impls
91 if blocker:
92 return blocker
93 else:
94 # Nothing to download, but is it time for a background update?
95 timestamp_path = os.path.join(self.path, 'last-checked')
96 try:
97 utime = os.stat(timestamp_path).st_mtime
98 staleness = time.time() - utime
99 info("Staleness of app %s is %d hours", self, staleness / (60 * 60))
100 freshness_threshold = self.config.freshness
101 need_update = freshness_threshold > 0 and staleness >= freshness_threshold
103 if need_update:
104 last_check_attempt_path = os.path.join(self.path, 'last-check-attempt')
105 if os.path.exists(last_check_attempt_path):
106 last_check_attempt = os.stat(last_check_attempt_path).st_mtime
107 if last_check_attempt + 60 * 60 > time.time():
108 info("Tried to check within last hour; not trying again now")
109 need_update = False
110 except Exception as ex:
111 warn("Failed to get time-stamp of %s: %s", timestamp_path, ex)
112 need_update = True
114 if need_update:
115 self.set_last_check_attempt()
116 from zeroinstall.injector import background
117 r = self.get_requirements()
118 background.spawn_background_update2(r, True, self)
120 def set_requirements(self, requirements):
121 import json
122 tmp = tempfile.NamedTemporaryFile(prefix = 'tmp-requirements-', dir = self.path, delete = False)
123 try:
124 json.dump(dict((key, getattr(requirements, key)) for key in requirements.__slots__), tmp)
125 except:
126 tmp.close()
127 os.unlink(tmp.name)
128 raise
129 tmp.close()
131 reqs_file = os.path.join(self.path, 'requirements.json')
132 portable_rename(tmp.name, reqs_file)
134 def get_requirements(self):
135 import json
136 from zeroinstall.injector import requirements
137 r = requirements.Requirements(None)
138 reqs_file = os.path.join(self.path, 'requirements.json')
139 with open(reqs_file) as stream:
140 values = json.load(stream)
141 for k, v in values.items():
142 setattr(r, k, v)
143 return r
145 def set_last_check_attempt(self):
146 timestamp_path = os.path.join(self.path, 'last-check-attempt')
147 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
148 os.close(fd)
149 os.utime(timestamp_path, None) # In case file already exists
151 def set_last_checked(self):
152 timestamp_path = os.path.join(self.path, 'last-checked')
153 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
154 os.close(fd)
155 os.utime(timestamp_path, None) # In case file already exists
157 def destroy(self):
158 # Check for shell command
159 # TODO: remember which commands we own instead of guessing
160 name = self.get_name()
161 bin_dir = find_bin_dir()
162 launcher = os.path.join(bin_dir, name)
163 expanded_template = _command_template.format(app = name)
164 if os.path.exists(launcher) and os.path.getsize(launcher) == len(expanded_template):
165 with open(launcher, 'r') as stream:
166 contents = stream.read()
167 if contents == expanded_template:
168 #print "rm", launcher
169 os.unlink(launcher)
171 # Remove the app itself
172 import shutil
173 shutil.rmtree(self.path)
175 def integrate_shell(self, name):
176 # TODO: remember which commands we create
177 if not valid_name.match(name):
178 raise SafeException("Invalid shell command name '{name}'".format(name = name))
179 bin_dir = find_bin_dir()
180 launcher = os.path.join(bin_dir, name)
181 if os.path.exists(launcher):
182 raise SafeException("Command already exists: {path}".format(path = launcher))
184 with open(launcher, 'w') as stream:
185 stream.write(_command_template.format(app = self.get_name()))
186 # Make new script executable
187 os.chmod(launcher, 0o111 | os.fstat(stream.fileno()).st_mode)
189 def get_name(self):
190 return os.path.basename(self.path)
192 def __str__(self):
193 return '<app ' + self.get_name() + '>'
195 class AppManager:
196 def __init__(self, config):
197 self.config = config
199 def create_app(self, name, requirements):
200 validate_name(name)
202 apps_dir = basedir.save_config_path(namespaces.config_site, "apps")
203 app_dir = os.path.join(apps_dir, name)
204 if os.path.isdir(app_dir):
205 raise SafeException(_("Application '{name}' already exists: {path}").format(name = name, path = app_dir))
206 os.mkdir(app_dir)
208 app = App(self.config, app_dir)
209 app.set_requirements(requirements)
210 app.set_last_checked()
212 return app
214 def lookup_app(self, name, missing_ok = False):
215 """Get the App for name.
216 Returns None if name is not an application (doesn't exist or is not a valid name).
217 Since / and : are not valid name characters, it is generally safe to try this
218 before calling L{model.canonical_iface_uri}."""
219 if not valid_name.match(name):
220 if missing_ok:
221 return None
222 else:
223 raise SafeException("Invalid application name '{name}'".format(name = name))
224 app_dir = basedir.load_first_config(namespaces.config_site, "apps", name)
225 if app_dir:
226 return App(self.config, app_dir)
227 if missing_ok:
228 return None
229 else:
230 raise SafeException("No such application '{name}'".format(name = name))