Added "0install apps"
[zeroinstall.git] / zeroinstall / apps.py
blobbb80b206209f6f10da0088f8dd76893f1e6bc984
1 """
2 Support for managing apps (as created with "0install add").
3 @since: 1.9
4 """
6 # Copyright (C) 2012, Thomas Leonard
7 # See the README file for details, or visit http://0install.net.
9 from zeroinstall import _, SafeException, logger
10 from zeroinstall.support import basedir, portable_rename
11 from zeroinstall.injector import namespaces, selections, qdom
12 import re, os, time, tempfile
14 # Avoid characters that are likely to cause problems (reject : and ; everywhere
15 # so that apps can be portable between POSIX and Windows).
16 valid_name = re.compile(r'''^[^./\\:=;'"][^/\\:=;'"]*$''')
18 def validate_name(name):
19 if valid_name.match(name): return
20 raise SafeException("Invalid application name '{name}'".format(name = name))
22 def _export(name, value):
23 """Try to guess the command to set an environment variable."""
24 shell = os.environ.get('SHELL', '?')
25 if 'csh' in shell:
26 return "setenv %s %s" % (name, value)
27 return "export %s=%s" % (name, value)
29 def find_bin_dir(paths = None):
30 """Find the first writable path in the list (default $PATH),
31 skipping /bin, /sbin and everything under /usr except /usr/local/bin"""
32 if paths is None:
33 paths = os.environ['PATH'].split(os.pathsep)
34 for path in paths:
35 if path.startswith('/usr/') and not path.startswith('/usr/local/bin'):
36 # (/usr/local/bin is OK if we're running as root)
37 pass
38 elif path.startswith('/bin') or path.startswith('/sbin'):
39 pass
40 elif os.path.realpath(path).startswith(basedir.xdg_cache_home):
41 pass # print "Skipping cache", first_path
42 elif not os.access(path, os.W_OK):
43 pass # print "No access", first_path
44 else:
45 break
46 else:
47 path = os.path.expanduser('~/bin/')
48 logger.warn('%s is not in $PATH. Add it with:\n%s' % (path, _export('PATH', path + ':$PATH')))
50 if not os.path.isdir(path):
51 os.makedirs(path)
52 return path
54 _command_template = """#!/bin/sh
55 exec 0install run {app} "$@"
56 """
58 class App:
59 def __init__(self, config, path):
60 self.config = config
61 self.path = path
63 def set_selections(self, sels):
64 """Store a new set of selections. We include today's date in the filename
65 so that we keep a history of previous selections (max one per day), in case
66 we want to to roll back later."""
67 date = time.strftime('%Y-%m-%d')
68 sels_file = os.path.join(self.path, 'selections-{date}.xml'.format(date = date))
69 dom = sels.toDOM()
71 tmp = tempfile.NamedTemporaryFile(prefix = 'selections.xml-', dir = self.path, delete = False, mode = 'wt')
72 try:
73 dom.writexml(tmp, addindent=" ", newl="\n", encoding = 'utf-8')
74 except:
75 tmp.close()
76 os.unlink(tmp.name)
77 raise
78 tmp.close()
79 portable_rename(tmp.name, sels_file)
81 sels_latest = os.path.join(self.path, 'selections.xml')
82 if os.path.exists(sels_latest):
83 os.unlink(sels_latest)
84 os.symlink(os.path.basename(sels_file), sels_latest)
86 self.set_last_checked()
88 def get_selections(self, snapshot_date = None):
89 """Load the selections. Does not check whether they are cached, nor trigger updates.
90 @param snapshot_date: get a historical snapshot
91 @type snapshot_date: (as returned by L{get_history}) | None
92 @return: the selections
93 @rtype: L{selections.Selections}"""
94 if snapshot_date:
95 sels_file = os.path.join(self.path, 'selections-' + snapshot_date + '.xml')
96 else:
97 sels_file = os.path.join(self.path, 'selections.xml')
98 with open(sels_file, 'rb') as stream:
99 return selections.Selections(qdom.parse(stream))
101 def get_history(self):
102 """Get the dates of the available snapshots, starting with the most recent.
103 @rtype: [str]"""
104 date_re = re.compile('selections-(\d\d\d\d-\d\d-\d\d).xml')
105 snapshots = []
106 for f in os.listdir(self.path):
107 match = date_re.match(f)
108 if match:
109 snapshots.append(match.group(1))
110 snapshots.sort(reverse = True)
111 return snapshots
113 def download_selections(self, sels):
114 """Download any missing implementations in the given selections.
115 If no downloads are needed, but we haven't checked for a while, start
116 a background process to check for updates (but return None immediately).
117 @return: a blocker which resolves when all needed implementations are available
118 @rtype: L{tasks.Blocker} | None
120 # Check the selections are still available
121 blocker = sels.download_missing(self.config) # TODO: package impls
123 if blocker:
124 return blocker
125 else:
126 # Nothing to download, but is it time for a background update?
127 timestamp_path = os.path.join(self.path, 'last-checked')
128 try:
129 utime = os.stat(timestamp_path).st_mtime
130 staleness = time.time() - utime
131 logger.info("Staleness of app %s is %d hours", self, staleness / (60 * 60))
132 freshness_threshold = self.config.freshness
133 need_update = freshness_threshold > 0 and staleness >= freshness_threshold
135 if need_update:
136 last_check_attempt_path = os.path.join(self.path, 'last-check-attempt')
137 if os.path.exists(last_check_attempt_path):
138 last_check_attempt = os.stat(last_check_attempt_path).st_mtime
139 if last_check_attempt + 60 * 60 > time.time():
140 logger.info("Tried to check within last hour; not trying again now")
141 need_update = False
142 except Exception as ex:
143 logger.warn("Failed to get time-stamp of %s: %s", timestamp_path, ex)
144 need_update = True
146 if need_update:
147 self.set_last_check_attempt()
148 from zeroinstall.injector import background
149 r = self.get_requirements()
150 background.spawn_background_update2(r, False, self)
152 def set_requirements(self, requirements):
153 import json
154 tmp = tempfile.NamedTemporaryFile(prefix = 'tmp-requirements-', dir = self.path, delete = False, mode = 'wt')
155 try:
156 json.dump(dict((key, getattr(requirements, key)) for key in requirements.__slots__), tmp)
157 except:
158 tmp.close()
159 os.unlink(tmp.name)
160 raise
161 tmp.close()
163 reqs_file = os.path.join(self.path, 'requirements.json')
164 portable_rename(tmp.name, reqs_file)
166 def get_requirements(self):
167 import json
168 from zeroinstall.injector import requirements
169 r = requirements.Requirements(None)
170 reqs_file = os.path.join(self.path, 'requirements.json')
171 with open(reqs_file, 'rt') as stream:
172 values = json.load(stream)
173 for k, v in values.items():
174 setattr(r, k, v)
175 return r
177 def set_last_check_attempt(self):
178 timestamp_path = os.path.join(self.path, 'last-check-attempt')
179 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
180 os.close(fd)
181 os.utime(timestamp_path, None) # In case file already exists
183 def get_last_checked(self):
184 """Get the time of the last successful check for updates.
185 @return: the timestamp (or None on error)
186 @rtype: float | None"""
187 last_updated_path = os.path.join(self.path, 'last-checked')
188 try:
189 return os.stat(last_updated_path).st_mtime
190 except Exception as ex:
191 logger.warn("Failed to get time-stamp of %s: %s", last_updated_path, ex)
192 return None
194 def get_last_check_attempt(self):
195 """Get the time of the last attempted check.
196 @return: the timestamp, or None if we updated successfully.
197 @rtype: float | None"""
198 last_check_attempt_path = os.path.join(self.path, 'last-check-attempt')
199 if os.path.exists(last_check_attempt_path):
200 last_check_attempt = os.stat(last_check_attempt_path).st_mtime
202 last_checked = self.get_last_checked()
204 if last_checked < last_check_attempt:
205 return last_check_attempt
206 return None
208 def set_last_checked(self):
209 timestamp_path = os.path.join(self.path, 'last-checked')
210 fd = os.open(timestamp_path, os.O_WRONLY | os.O_CREAT, 0o644)
211 os.close(fd)
212 os.utime(timestamp_path, None) # In case file already exists
214 def destroy(self):
215 # Check for shell command
216 # TODO: remember which commands we own instead of guessing
217 name = self.get_name()
218 bin_dir = find_bin_dir()
219 launcher = os.path.join(bin_dir, name)
220 expanded_template = _command_template.format(app = name)
221 if os.path.exists(launcher) and os.path.getsize(launcher) == len(expanded_template):
222 with open(launcher, 'r') as stream:
223 contents = stream.read()
224 if contents == expanded_template:
225 #print "rm", launcher
226 os.unlink(launcher)
228 # Remove the app itself
229 import shutil
230 shutil.rmtree(self.path)
232 def integrate_shell(self, name):
233 # TODO: remember which commands we create
234 if not valid_name.match(name):
235 raise SafeException("Invalid shell command name '{name}'".format(name = name))
236 bin_dir = find_bin_dir()
237 launcher = os.path.join(bin_dir, name)
238 if os.path.exists(launcher):
239 raise SafeException("Command already exists: {path}".format(path = launcher))
241 with open(launcher, 'w') as stream:
242 stream.write(_command_template.format(app = self.get_name()))
243 # Make new script executable
244 os.chmod(launcher, 0o111 | os.fstat(stream.fileno()).st_mode)
246 def get_name(self):
247 return os.path.basename(self.path)
249 def __str__(self):
250 return '<app ' + self.get_name() + '>'
252 class AppManager:
253 def __init__(self, config):
254 self.config = config
256 def create_app(self, name, requirements):
257 validate_name(name)
259 apps_dir = basedir.save_config_path(namespaces.config_site, "apps")
260 app_dir = os.path.join(apps_dir, name)
261 if os.path.isdir(app_dir):
262 raise SafeException(_("Application '{name}' already exists: {path}").format(name = name, path = app_dir))
263 os.mkdir(app_dir)
265 app = App(self.config, app_dir)
266 app.set_requirements(requirements)
267 app.set_last_checked()
269 return app
271 def lookup_app(self, name, missing_ok = False):
272 """Get the App for name.
273 Returns None if name is not an application (doesn't exist or is not a valid name).
274 Since / and : are not valid name characters, it is generally safe to try this
275 before calling L{injector.model.canonical_iface_uri}."""
276 if not valid_name.match(name):
277 if missing_ok:
278 return None
279 else:
280 raise SafeException("Invalid application name '{name}'".format(name = name))
281 app_dir = basedir.load_first_config(namespaces.config_site, "apps", name)
282 if app_dir:
283 return App(self.config, app_dir)
284 if missing_ok:
285 return None
286 else:
287 raise SafeException("No such application '{name}'".format(name = name))
289 def list_apps(self):
290 """Returns all the apps.
291 @rtype: [L{App}]"""
292 apps = []
294 for d in basedir.load_config_paths(namespaces.config_site, "apps"):
295 for app in os.listdir(d):
296 if not valid_name.match(app):
297 continue
298 app_dir = os.path.join(d, app)
299 if os.path.isdir(app_dir):
300 apps.append((app, App(self.config, app_dir)))
302 return [app for (name, app) in sorted(apps)]