Delay live search to be more responsive
[gpodder.git] / src / gpodder / my.py
blob53ae3a90456a99539717add4d8c314ecb683f9bc
1 #!/usr/bin/python
2 # -*- coding: utf-8 -*-
4 # gPodder - A media aggregator and podcast client
5 # Copyright (c) 2005-2010 Thomas Perl and the gPodder Team
7 # gPodder is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # gPodder is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # my.py -- mygpo Client Abstraction for gPodder
24 # Thomas Perl <thp@gpodder.org>; 2010-01-19
27 import gpodder
28 _ = gpodder.gettext
30 import atexit
31 import datetime
32 import os
33 import sys
34 import threading
35 import time
37 from gpodder.liblogger import log
39 from gpodder import util
40 from gpodder import minidb
42 # Append gPodder's user agent to mygpoclient's user agent
43 import mygpoclient
44 mygpoclient.user_agent += ' ' + gpodder.user_agent
46 MYGPOCLIENT_REQUIRED = '1.4'
48 if not hasattr(mygpoclient, 'require_version') or \
49 not mygpoclient.require_version(MYGPOCLIENT_REQUIRED):
50 print >>sys.stderr, """
51 Please upgrade your mygpoclient library.
52 See http://thpinfo.com/2010/mygpoclient/
54 Required version: %s
55 Installed version: %s
56 """ % (MYGPOCLIENT_REQUIRED, mygpoclient.__version__)
57 sys.exit(1)
59 from mygpoclient import api
61 from mygpoclient import util as mygpoutil
64 # Database model classes
65 class SinceValue(object):
66 __slots__ = {'host': str, 'device_id': str, 'category': int, 'since': int}
68 # Possible values for the "category" field
69 PODCASTS, EPISODES = range(2)
71 def __init__(self, host, device_id, category, since=0):
72 self.host = host
73 self.device_id = device_id
74 self.category = category
75 self.since = since
77 class SubscribeAction(object):
78 __slots__ = {'action_type': int, 'url': str}
80 # Possible values for the "action_type" field
81 ADD, REMOVE = range(2)
83 def __init__(self, action_type, url):
84 self.action_type = action_type
85 self.url = url
87 @property
88 def is_add(self):
89 return self.action_type == self.ADD
91 @property
92 def is_remove(self):
93 return self.action_type == self.REMOVE
95 @classmethod
96 def add(cls, url):
97 return cls(cls.ADD, url)
99 @classmethod
100 def remove(cls, url):
101 return cls(cls.REMOVE, url)
103 @classmethod
104 def undo(cls, action):
105 if action.is_add:
106 return cls(cls.REMOVE, action.url)
107 elif action.is_remove:
108 return cls(cls.ADD, action.url)
110 raise ValueError('Cannot undo action: %r' % action)
112 # New entity name for "received" actions
113 class ReceivedSubscribeAction(SubscribeAction): pass
115 class UpdateDeviceAction(object):
116 __slots__ = {'device_id': str, 'caption': str, 'device_type': str}
118 def __init__(self, device_id, caption, device_type):
119 self.device_id = device_id
120 self.caption = caption
121 self.device_type = device_type
123 class EpisodeAction(object):
124 __slots__ = {'podcast_url': str, 'episode_url': str, 'device_id': str,
125 'action': str, 'timestamp': int,
126 'started': int, 'position': int, 'total': int}
128 def __init__(self, podcast_url, episode_url, device_id, \
129 action, timestamp, started, position, total):
130 self.podcast_url = podcast_url
131 self.episode_url = episode_url
132 self.device_id = device_id
133 self.action = action
134 self.timestamp = timestamp
135 self.started = started
136 self.position = position
137 self.total = total
139 # New entity name for "received" actions
140 class ReceivedEpisodeAction(EpisodeAction): pass
142 class RewrittenUrl(object):
143 __slots__ = {'old_url': str, 'new_url': str}
145 def __init__(self, old_url, new_url):
146 self.old_url = old_url
147 self.new_url = new_url
148 # End Database model classes
152 # Helper class for displaying changes in the UI
153 class Change(object):
154 def __init__(self, action, podcast=None):
155 self.action = action
156 self.podcast = podcast
158 @property
159 def description(self):
160 if self.action.is_add:
161 return _('Add %s') % self.action.url
162 else:
163 return _('Remove %s') % self.podcast.title
166 class MygPoClient(object):
167 STORE_FILE = 'mygpo.queue.sqlite'
168 FLUSH_TIMEOUT = 60
169 FLUSH_RETRIES = 3
171 def __init__(self, config):
172 self._store = minidb.Store(os.path.join(gpodder.home, self.STORE_FILE))
174 self._config = config
175 self._client = None
177 # Initialize the _client attribute and register with config
178 self.on_config_changed()
179 assert self._client is not None
181 self._config.add_observer(self.on_config_changed)
183 self._worker_thread = None
184 atexit.register(self._at_exit)
186 def create_device(self):
187 """Uploads the device changes to the server
189 This should be called when device settings change
190 or when the mygpo client functionality is enabled.
192 # Remove all previous device update actions
193 self._store.remove(self._store.load(UpdateDeviceAction))
195 # Insert our new update action
196 action = UpdateDeviceAction(self.device_id, \
197 self._config.mygpo_device_caption, \
198 self._config.mygpo_device_type)
199 self._store.save(action)
201 def get_rewritten_urls(self):
202 """Returns a list of rewritten URLs for uploads
204 This should be called regularly. Every object returned
205 should be merged into the database, and the old_url
206 should be updated to new_url in every podcdast.
208 rewritten_urls = self._store.load(RewrittenUrl)
209 self._store.remove(rewritten_urls)
210 return rewritten_urls
212 def get_episode_actions(self, updated_urls):
213 for podcast_url in updated_urls:
214 for action in self._store.load(ReceivedEpisodeAction, \
215 podcast_url=podcast_url):
216 yield action
218 # Remove all episode actions belonging to this URL
219 self._store.delete(ReceivedEpisodeAction, \
220 podcast_url=podcast_url)
221 self._store.commit()
223 def get_received_actions(self):
224 """Returns a list of ReceivedSubscribeAction objects
226 The list might be empty. All these actions have to
227 be processed. The user should confirm which of these
228 actions should be taken, the reest should be rejected.
230 Use confirm_received_actions and reject_received_actions
231 to return and finalize the actions received by this
232 method in order to not receive duplicate actions.
234 return self._store.load(ReceivedSubscribeAction)
236 def confirm_received_actions(self, actions):
237 """Confirm that a list of actions has been processed
239 The UI should call this with a list of actions that
240 have been accepted by the user and processed by the
241 podcast backend.
243 # Simply remove the received actions from the queue
244 self._store.remove(actions)
246 def reject_received_actions(self, actions):
247 """Reject (undo) a list of ReceivedSubscribeAction objects
249 The UI should call this with a list of actions that
250 have been rejected by the user. A reversed set of
251 actions will be uploaded to the server so that the
252 state on the server matches the state on the client.
254 # Create "undo" actions for received subscriptions
255 self._store.save(SubscribeAction.undo(a) for a in actions)
256 self.flush()
258 # After we've handled the reverse-actions, clean up
259 self._store.remove(actions)
261 @property
262 def host(self):
263 return self._config.mygpo_server
265 @property
266 def device_id(self):
267 return self._config.mygpo_device_uid
269 def can_access_webservice(self):
270 return self._config.mygpo_enabled and self._config.mygpo_device_uid
272 def set_subscriptions(self, urls):
273 if self.can_access_webservice():
274 log('Uploading (overwriting) subscriptions...')
275 self._client.put_subscriptions(self.device_id, urls)
276 log('Subscription upload done.')
277 else:
278 raise Exception('Webservice access not enabled')
280 def _convert_played_episode(self, episode, start, end, total):
281 return EpisodeAction(episode.channel.url, \
282 episode.url, self.device_id, 'play', \
283 int(time.time()), start, end, total)
285 def _convert_episode(self, episode, action):
286 return EpisodeAction(episode.channel.url, \
287 episode.url, self.device_id, action, \
288 int(time.time()), None, None, None)
290 def on_delete(self, episodes):
291 log('Storing %d episode delete actions', len(episodes), sender=self)
292 self._store.save(self._convert_episode(e, 'delete') for e in episodes)
294 def on_download(self, episodes):
295 log('Storing %d episode download actions', len(episodes), sender=self)
296 self._store.save(self._convert_episode(e, 'download') for e in episodes)
298 def on_playback_full(self, episode, start, end, total):
299 log('Storing full episode playback action', sender=self)
300 self._store.save(self._convert_played_episode(episode, start, end, total))
302 def on_playback(self, episodes):
303 log('Storing %d episode playback actions', len(episodes), sender=self)
304 self._store.save(self._convert_episode(e, 'play') for e in episodes)
306 def on_subscribe(self, urls):
307 # Cancel previously-inserted "remove" actions
308 self._store.remove(SubscribeAction.remove(url) for url in urls)
310 # Insert new "add" actions
311 self._store.save(SubscribeAction.add(url) for url in urls)
313 self.flush()
315 def on_unsubscribe(self, urls):
316 # Cancel previously-inserted "add" actions
317 self._store.remove(SubscribeAction.add(url) for url in urls)
319 # Insert new "remove" actions
320 self._store.save(SubscribeAction.remove(url) for url in urls)
322 self.flush()
324 def _at_exit(self):
325 self._worker_proc(forced=True)
326 self._store.commit()
327 self._store.close()
329 def _worker_proc(self, forced=False):
330 if not forced:
331 # Store the current contents of the queue database
332 self._store.commit()
334 log('Worker thread waiting for timeout', sender=self)
335 time.sleep(self.FLUSH_TIMEOUT)
337 # Only work when enabled, UID set and allowed to work
338 if self.can_access_webservice() and \
339 (self._worker_thread is not None or forced):
340 self._worker_thread = None
342 log('Worker thread starting to work...', sender=self)
343 for retry in range(self.FLUSH_RETRIES):
344 must_retry = False
346 if retry:
347 log('Retrying flush queue...', sender=self)
349 # Update the device first, so it can be created if new
350 for action in self._store.load(UpdateDeviceAction):
351 if self.update_device(action):
352 self._store.remove(action)
353 else:
354 must_retry = True
356 # Upload podcast subscription actions
357 actions = self._store.load(SubscribeAction)
358 if self.synchronize_subscriptions(actions):
359 self._store.remove(actions)
360 else:
361 must_retry = True
363 # Upload episode actions
364 actions = self._store.load(EpisodeAction)
365 if self.synchronize_episodes(actions):
366 self._store.remove(actions)
367 else:
368 must_retry = True
370 if not must_retry:
371 # No more pending actions. Ready to quit.
372 break
374 log('Worker thread finished.', sender=self)
375 else:
376 log('Worker thread may not execute (disabled).', sender=self)
378 # Store the current contents of the queue database
379 self._store.commit()
381 def flush(self, now=False):
382 if not self.can_access_webservice():
383 log('Flush requested, but sync disabled.', sender=self)
384 return
386 if self._worker_thread is None or now:
387 if now:
388 log('Flushing NOW.', sender=self)
389 else:
390 log('Flush requested.', sender=self)
391 self._worker_thread = threading.Thread(target=self._worker_proc, args=[now])
392 self._worker_thread.setDaemon(True)
393 self._worker_thread.start()
394 else:
395 log('Flush requested, already waiting.', sender=self)
397 def on_config_changed(self, name=None, old_value=None, new_value=None):
398 if name in ('mygpo_username', 'mygpo_password', 'mygpo_server') \
399 or self._client is None:
400 self._client = api.MygPodderClient(self._config.mygpo_username,
401 self._config.mygpo_password, self._config.mygpo_server)
402 log('Reloading settings.', sender=self)
403 elif name.startswith('mygpo_device_'):
404 # Update or create the device
405 self.create_device()
407 def synchronize_episodes(self, actions):
408 log('Starting episode status sync.', sender=self)
410 def convert_to_api(action):
411 dt = datetime.datetime.fromtimestamp(action.timestamp)
412 since = mygpoutil.datetime_to_iso8601(dt)
413 return api.EpisodeAction(action.podcast_url, \
414 action.episode_url, action.action, \
415 action.device_id, since, \
416 action.started, action.position, action.total)
418 def convert_from_api(action):
419 dt = mygpoutil.iso8601_to_datetime(action.timestamp)
420 since = int(dt.strftime('%s'))
421 return ReceivedEpisodeAction(action.podcast, \
422 action.episode, action.device, \
423 action.action, since, \
424 action.started, action.position, action.total)
426 try:
427 # Load the "since" value from the database
428 since_o = self._store.get(SinceValue, host=self.host, \
429 device_id=self.device_id, \
430 category=SinceValue.EPISODES)
432 # Use a default since object for the first-time case
433 if since_o is None:
434 since_o = SinceValue(self.host, self.device_id, SinceValue.EPISODES)
436 # Step 1: Download Episode actions
437 try:
438 changes = self._client.download_episode_actions(since_o.since)
440 received_actions = [convert_from_api(a) for a in changes.actions]
441 log('Received %d episode actions', len(received_actions), \
442 sender=self)
443 self._store.save(received_actions)
445 # Save the "since" value for later use
446 self._store.update(since_o, since=changes.since)
447 except Exception, e:
448 log('Exception while polling for episodes.', sender=self, traceback=True)
450 # Step 2: Upload Episode actions
452 # Convert actions to the mygpoclient format for uploading
453 episode_actions = [convert_to_api(a) for a in actions]
455 # Upload the episode actions
456 self._client.upload_episode_actions(episode_actions)
458 # Actions have been uploaded to the server - remove them
459 self._store.remove(actions)
460 log('Episode actions have been uploaded to the server.', sender=self)
461 return True
462 except Exception, e:
463 log('Cannot upload episode actions: %s', str(e), sender=self, traceback=True)
464 return False
466 def synchronize_subscriptions(self, actions):
467 log('Starting subscription sync.', sender=self)
468 try:
469 # Load the "since" value from the database
470 since_o = self._store.get(SinceValue, host=self.host, \
471 device_id=self.device_id, \
472 category=SinceValue.PODCASTS)
474 # Use a default since object for the first-time case
475 if since_o is None:
476 since_o = SinceValue(self.host, self.device_id, SinceValue.PODCASTS)
478 # Step 1: Pull updates from the server and notify the frontend
479 result = self._client.pull_subscriptions(self.device_id, since_o.since)
481 # Update the "since" value in the database
482 self._store.update(since_o, since=result.since)
484 # Store received actions for later retrieval (and in case we
485 # have outdated actions in the database, simply remove them)
486 for url in result.add:
487 log('Received add action: %s', url, sender=self)
488 self._store.remove(ReceivedSubscribeAction.remove(url))
489 self._store.remove(ReceivedSubscribeAction.add(url))
490 self._store.save(ReceivedSubscribeAction.add(url))
491 for url in result.remove:
492 log('Received remove action: %s', url, sender=self)
493 self._store.remove(ReceivedSubscribeAction.add(url))
494 self._store.remove(ReceivedSubscribeAction.remove(url))
495 self._store.save(ReceivedSubscribeAction.remove(url))
497 # Step 2: Push updates to the server and rewrite URLs (if any)
498 actions = self._store.load(SubscribeAction)
500 add = [a.url for a in actions if a.is_add]
501 remove = [a.url for a in actions if a.is_remove]
503 if add or remove:
504 log('Uploading: +%d / -%d', len(add), len(remove), sender=self)
505 # Only do a push request if something has changed
506 result = self._client.update_subscriptions(self.device_id, add, remove)
508 # Update the "since" value in the database
509 self._store.update(since_o, since=result.since)
511 # Store URL rewrites for later retrieval by GUI
512 for old_url, new_url in result.update_urls:
513 if new_url:
514 log('Rewritten URL: %s', new_url, sender=self)
515 self._store.save(RewrittenUrl(old_url, new_url))
517 # Actions have been uploaded to the server - remove them
518 self._store.remove(actions)
519 log('All actions have been uploaded to the server.', sender=self)
520 return True
521 except Exception, e:
522 log('Cannot upload subscriptions: %s', str(e), sender=self, traceback=True)
523 return False
525 def update_device(self, action):
526 try:
527 log('Uploading device settings...', sender=self)
528 self._client.update_device_settings(action.device_id, \
529 action.caption, action.device_type)
530 log('Device settings uploaded.', sender=self)
531 return True
532 except Exception, e:
533 log('Cannot update device %s: %s', self.device_id, str(e), sender=self, traceback=True)
534 return False
536 def get_devices(self):
537 result = []
538 for d in self._client.get_devices():
539 result.append((d.device_id, d.caption, d.type))
540 return result
542 def open_website(self):
543 util.open_website('http://' + self._config.mygpo_server)