CLI: Handle ValueError when parsing input line
[gpodder.git] / bin / gpo
blob7381dfc5ce00d2e25723d4b08d78540158a48bab
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
5 # gPodder - A media aggregator and podcast client
6 # Copyright (c) 2005-2012 Thomas Perl and the gPodder Team
8 # gPodder is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # gPodder is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 # gpo - A better command-line interface to gPodder using the gPodder API
24 # by Thomas Perl <thp@gpodder.org>; 2009-05-07
27 """
28 Usage: gpo [--verbose|-v] [COMMAND] [params...]
30 - Subscription management -
32 subscribe URL [TITLE] Subscribe to a new feed at URL (as TITLE)
33 search QUERY Search the gpodder.net directory for QUERY
34 toplist Show the gpodder.net top-subscribe podcasts
36 import FILENAME|URL Subscribe to all podcasts in an OPML file
37 export FILENAME Export all subscriptions to an OPML file
39 rename URL TITLE Rename feed at URL to TITLE
40 unsubscribe URL Unsubscribe from feed at URL
41 enable URL Enable feed updates for the feed at URL
42 disable URL Disable feed updates for the feed at URL
44 info URL Show information about feed at URL
45 list List all subscribed podcasts
46 update [URL] Check for new episodes (all or only at URL)
48 - Episode management -
50 download [URL] Download new episodes (all or only from URL)
51 pending [URL] List new episodes (all or only from URL)
52 episodes [URL] List episodes (all or only from URL)
54 - Configuration -
56 set [key] [value] List one (all) keys or set to a new value
58 - Other commands -
60 youtube URL Resolve the YouTube URL to a download URL
61 rewrite OLDURL NEWURL Change the feed URL of [OLDURL] to [NEWURL]
62 webui [public] Start gPodder's Web UI server
63 (public = listen on all network interfaces)
65 """
67 import sys
68 import codecs
69 import collections
70 import os
71 import re
72 import inspect
73 import functools
74 try:
75 import readline
76 except ImportError:
77 readline = None
78 import shlex
79 import pydoc
80 import logging
82 try:
83 import termios
84 import fcntl
85 import struct
86 except ImportError:
87 termios = None
88 fcntl = None
89 struct = None
91 # A poor man's argparse/getopt - but it works for our use case :)
92 for verbose_flag in ('-v', '--verbose'):
93 if verbose_flag in sys.argv:
94 FMT = '%(created)f [%(name)s] %(levelname)s: %(message)s'
95 logging.basicConfig(format=FMT, level=logging.DEBUG)
96 sys.argv.remove(verbose_flag)
97 break
98 else:
99 logging.basicConfig()
101 # Avoid UnicodeDecodeError when output is not a terminal (less, cron, etc..)
102 if sys.stdout.encoding is None:
103 sys.stdout = codecs.getwriter('utf8')(sys.stdout)
105 gpodder_script = sys.argv[0]
106 if os.path.islink(gpodder_script):
107 gpodder_script = os.readlink(gpodder_script)
108 gpodder_dir = os.path.join(os.path.dirname(gpodder_script), '..')
109 prefix = os.path.abspath(os.path.normpath(gpodder_dir))
111 src_dir = os.path.join(prefix, 'src')
112 data_dir = os.path.join(prefix, 'data')
114 if os.path.exists(src_dir) and os.path.exists(data_dir) and \
115 not prefix.startswith('/usr'):
116 # Run gPodder from local source folder (not installed)
117 sys.path.insert(0, src_dir)
120 import gpodder
121 _ = gpodder.gettext
122 N_ = gpodder.ngettext
124 # This is the command-line UI variant
125 gpodder.ui.cli = True
127 # Platform detection (i.e. Maemo 5, etc..)
128 gpodder.detect_platform()
130 from gpodder import api
131 from gpodder import my
132 from gpodder import opml
133 from gpodder import util
134 from gpodder.config import config_value_to_string
136 have_ansi = sys.stdout.isatty() and not gpodder.win32
137 interactive_console = sys.stdin.isatty() and sys.stdout.isatty()
138 is_single_command = False
140 def incolor(color_id, s):
141 if have_ansi and cli.config.ui.cli.colors:
142 return '\033[9%dm%s\033[0m' % (color_id, s)
143 return s
145 # ANSI Colors: red = 1, green = 2, yellow = 3, blue = 4
146 inred, ingreen, inyellow, inblue = (functools.partial(incolor, x)
147 for x in range(1, 5))
149 def FirstArgumentIsPodcastURL(function):
150 """Decorator for functions that take a podcast URL as first arg"""
151 setattr(function, '_first_arg_is_podcast', True)
152 return function
154 def get_terminal_size():
155 if None in (termios, fcntl, struct):
156 return (80, 24)
158 s = struct.pack('HHHH', 0, 0, 0, 0)
159 stdout = sys.stdout.fileno()
160 x = fcntl.ioctl(stdout, termios.TIOCGWINSZ, s)
161 rows, cols, xp, yp = struct.unpack('HHHH', x)
162 return rows, cols
164 class gPodderCli(object):
165 COLUMNS = 80
166 EXIT_COMMANDS = ('quit', 'exit', 'bye')
168 def __init__(self):
169 self.client = api.PodcastClient()
170 self.config = self.client._config
172 self._current_action = ''
173 self._commands = dict((name.rstrip('_'), func)
174 for name, func in inspect.getmembers(self)
175 if inspect.ismethod(func) and not name.startswith('_'))
176 self._prefixes, self._expansions = self._build_prefixes_expansions()
177 self._prefixes.update({'?': 'help'})
178 self._valid_commands = sorted(self._prefixes.values())
179 gpodder.user_extensions.on_ui_initialized(self.client.core.model,
180 self._extensions_podcast_update_cb,
181 self._extensions_episode_download_cb)
183 def _build_prefixes_expansions(self):
184 prefixes = {}
185 expansions = collections.defaultdict(list)
186 names = sorted(self._commands.keys())
187 names.extend(self.EXIT_COMMANDS)
189 # Generator for all prefixes of a given string (longest first)
190 # e.g. ['gpodder', 'gpodde', 'gpodd', 'gpod', 'gpo', 'gp', 'g']
191 mkprefixes = lambda n: (n[:x] for x in xrange(len(n), 0, -1))
193 # Return True if the given prefix is unique in "names"
194 is_unique = lambda p: len([n for n in names if n.startswith(p)]) == 1
196 for name in names:
197 is_still_unique = True
198 unique_expansion = None
199 for prefix in mkprefixes(name):
200 if is_unique(prefix):
201 unique_expansion = '[%s]%s' % (prefix, name[len(prefix):])
202 prefixes[prefix] = name
203 continue
205 if unique_expansion is not None:
206 expansions[prefix].append(unique_expansion)
207 continue
209 return prefixes, expansions
211 def _extensions_podcast_update_cb(self, podcast):
212 self._info(_('Podcast update requested by extensions.'))
213 self._update_podcast(podcast)
215 def _extensions_episode_download_cb(self, episode):
216 self._info(_('Episode download requested by extensions.'))
217 self._download_episode(episode)
219 def _start_action(self, msg, *args):
220 line = msg % args
221 if len(line) > self.COLUMNS-7:
222 line = line[:self.COLUMNS-7-3] + '...'
223 else:
224 line = line + (' '*(self.COLUMNS-7-len(line)))
225 self._current_action = line
226 sys.stdout.write(line)
227 sys.stdout.flush()
229 def _update_action(self, progress):
230 if have_ansi:
231 progress = '%3.0f%%' % (progress*100.,)
232 result = '['+inblue(progress)+']'
233 sys.stdout.write('\r' + self._current_action + result)
234 sys.stdout.flush()
236 def _finish_action(self, success=True, skip=False):
237 if skip:
238 result = '['+inyellow('SKIP')+']'
239 elif success:
240 result = '['+ingreen('DONE')+']'
241 else:
242 result = '['+inred('FAIL')+']'
244 if have_ansi:
245 print '\r' + self._current_action + result
246 else:
247 print result
248 self._current_action = ''
250 def _atexit(self):
251 self.client.finish()
253 # -------------------------------------------------------------------
255 def import_(self, url):
256 for channel in opml.Importer(url).items:
257 self.subscribe(channel['url'], channel.get('title'))
259 def export(self, filename):
260 podcasts = self.client._model.get_podcasts()
261 opml.Exporter(filename).write(podcasts)
263 def subscribe(self, url, title=None):
264 url = util.normalize_feed_url(url)
265 if url is None:
266 self._error(_('Invalid URL.'))
267 return True
269 if self.client.get_podcast(url) is not None:
270 self._info(_('You are already subscribed to %s.') % url)
271 return True
273 try:
274 if self.client.create_podcast(url, title) is None:
275 self._error(_('Cannot subscribe to %s.') % url)
276 return True
277 except Exception, e:
278 self._error(e.strerror)
279 return True
281 self.client.commit()
283 self._info(_('Successfully added %s.' % url))
284 return True
286 def _print_config(self, search_for):
287 for key in self.config.all_keys():
288 if search_for is None or search_for.lower() in key.lower():
289 value = config_value_to_string(self.config._lookup(key))
290 print key, '=', value
292 def set(self, key=None, value=None):
293 if value is None:
294 self._print_config(key)
295 return
297 try:
298 current_value = self.config._lookup(key)
299 current_type = type(current_value)
300 except KeyError:
301 self._error(_('This configuration option does not exist.'))
302 return
304 if current_type == dict:
305 self._error(_('Can only set leaf configuration nodes.'))
306 return
308 self.config.update_field(key, value)
309 self.set(key)
311 @FirstArgumentIsPodcastURL
312 def rename(self, url, title):
313 podcast = self.client.get_podcast(url)
315 if podcast is None:
316 self._error(_('You are not subscribed to %s.') % url)
317 else:
318 old_title = podcast.title
319 podcast.rename(title)
320 self.client.commit()
321 self._info(_('Renamed %s to %s.') % (old_title, title))
323 return True
325 @FirstArgumentIsPodcastURL
326 def unsubscribe(self, url):
327 podcast = self.client.get_podcast(url)
329 if podcast is None:
330 self._error(_('You are not subscribed to %s.') % url)
331 else:
332 podcast.delete()
333 self.client.commit()
334 self._error(_('Unsubscribed from %s.') % url)
336 return True
338 def _episodesList(self, podcast):
339 def status_str(episode):
340 if episode.is_new:
341 return u' * '
342 if episode.is_downloaded:
343 return u' ▉ '
344 if episode.is_deleted:
345 return u' ░ '
347 return u' '
349 episodes = (u'%3d. %s %s' % (i+1, status_str(e), e.title)
350 for i, e in enumerate(podcast.get_episodes()))
351 return episodes
353 @FirstArgumentIsPodcastURL
354 def info(self, url):
355 podcast = self.client.get_podcast(url)
357 if podcast is None:
358 self._error(_('You are not subscribed to %s.') % url)
359 else:
360 title, url, status = podcast.title, podcast.url, podcast.feed_update_status_msg()
361 episodes = self._episodesList(podcast)
362 episodes = u'\n '.join(episodes)
363 self._pager(u"""
364 Title: %(title)s
365 URL: %(url)s
366 Feed update is %(status)s
368 Episodes:
369 %(episodes)s
370 """ % locals())
372 return True
374 @FirstArgumentIsPodcastURL
375 def episodes(self, url=None):
376 output = []
377 for podcast in self.client.get_podcasts():
378 podcast_printed = False
379 if url is None or podcast.url == url:
380 episodes = self._episodesList(podcast)
381 episodes = u'\n '.join(episodes)
382 output.append(u"""
383 Episodes from %s:
385 """ % (podcast.url, episodes))
387 self._pager(u'\n'.join(output))
388 return True
390 def list(self):
391 for podcast in self.client.get_podcasts():
392 if podcast.update_enabled():
393 print '#', ingreen(podcast.title)
394 else:
395 print '#', inred(podcast.title), '-', _('Updates disabled')
397 print podcast.url
399 return True
401 def _update_podcast(self, podcast):
402 self._start_action('Updating %s', podcast.title)
403 try:
404 podcast.update()
405 self._finish_action()
406 except Exception, e:
407 self._finish_action(False)
409 def _pending_message(self, count):
410 return N_('%(count)d new episode', '%(count)d new episodes',
411 count) % {'count': count}
413 @FirstArgumentIsPodcastURL
414 def update(self, url=None):
415 count = 0
416 for podcast in self.client.get_podcasts():
417 if url is not None and podcast.url != url:
418 continue
420 if podcast.update_enabled():
421 self._update_podcast(podcast)
422 count += sum(1 for e in podcast.get_episodes() if e.is_new)
423 else:
424 self._start_action(_('Skipping %(podcast)s') % {
425 'podcast': podcast.title})
426 self._finish_action(skip=True)
428 print inblue(self._pending_message(count))
429 return True
431 @FirstArgumentIsPodcastURL
432 def pending(self, url=None):
433 count = 0
434 for podcast in self.client.get_podcasts():
435 podcast_printed = False
436 if url is None or podcast.url == url:
437 for episode in podcast.get_episodes():
438 if episode.is_new:
439 if not podcast_printed:
440 print '#', ingreen(podcast.title)
441 podcast_printed = True
442 print ' ', episode.title
443 count += 1
445 print inblue(self._pending_message(count))
446 return True
448 def _download_episode(self, episode):
449 self._start_action('Downloading %s', episode.title)
450 episode.download(self._update_action)
451 self._finish_action()
453 @FirstArgumentIsPodcastURL
454 def download(self, url=None):
455 count = 0
456 for podcast in self.client.get_podcasts():
457 podcast_printed = False
458 if url is None or podcast.url == url:
459 for episode in podcast.get_episodes():
460 if episode.is_new:
461 if not podcast_printed:
462 print inblue(podcast.title)
463 podcast_printed = True
464 self._download_episode(episode)
465 count += 1
467 print count, 'episodes downloaded.'
468 return True
470 @FirstArgumentIsPodcastURL
471 def disable(self, url):
472 podcast = self.client.get_podcast(url)
474 if podcast is None:
475 self._error(_('You are not subscribed to %s.') % url)
476 else:
477 podcast.disable()
478 self.client.commit()
479 self._error(_('Disabling feed update from %s.') % url)
481 return True
483 @FirstArgumentIsPodcastURL
484 def enable(self, url):
485 podcast = self.client.get_podcast(url)
487 if podcast is None:
488 self._error(_('You are not subscribed to %s.') % url)
489 else:
490 podcast.enable()
491 self.client.commit()
492 self._error(_('Enabling feed update from %s.') % url)
494 return True
496 def youtube(self, url):
497 yurl = self.client.youtube_url_resolver(url)
498 print yurl
499 return True
501 def webui(self, public=None):
502 from gpodder import webui
503 if public == 'public':
504 # Warn the user that the web UI is listening on all network
505 # interfaces, which could lead to problems.
506 # Only use this on a trusted, private network!
507 self._warn(_('Listening on ALL network interfaces.'))
508 webui.main(only_localhost=False)
509 else:
510 webui.main()
512 def search(self, *terms):
513 query = ' '.join(terms)
514 if not query:
515 return
517 directory = my.Directory()
518 results = directory.search(query)
519 self._show_directory_results(results)
521 def toplist(self):
522 directory = my.Directory()
523 results = directory.toplist()
524 self._show_directory_results(results, True)
526 def _show_directory_results(self, results, multiple=False):
527 if not results:
528 self._error(_('No podcasts found.'))
529 return
531 if not interactive_console or is_single_command:
532 print '\n'.join(url for title, url in results)
533 return
535 def show_list():
536 self._pager('\n'.join(u'%3d: %s\n %s' %
537 (index+1, title, url if title != url else '')
538 for index, (title, url) in enumerate(results)))
540 show_list()
542 msg = _('Enter index to subscribe, ? for list')
543 while True:
544 index = raw_input(msg + ': ')
546 if not index:
547 return
549 if index == '?':
550 show_list()
551 continue
553 try:
554 index = int(index)
555 except ValueError:
556 self._error(_('Invalid value.'))
557 continue
559 if not (1 <= index <= len(results)):
560 self._error(_('Invalid value.'))
561 continue
563 title, url = results[index-1]
564 self._info(_('Adding %s...') % title)
565 self.subscribe(url)
566 if not multiple:
567 break
569 @FirstArgumentIsPodcastURL
570 def rewrite(self, old_url, new_url):
571 podcast = self.client.get_podcast(old_url)
572 if podcast is None:
573 self._error(_('You are not subscribed to %s.') % old_url)
574 else:
575 result = podcast.rewrite_url(new_url)
576 if result is None:
577 self._error(_('Invalid URL: %s') % new_url)
578 else:
579 new_url = result
580 self._error(_('Changed URL from %s to %s.') % (old_url, new_url))
581 return True
583 def help(self):
584 sys.stderr.write(stylize(__doc__))
585 return True
587 # -------------------------------------------------------------------
589 def _pager(self, output):
590 if have_ansi:
591 # Need two additional rows for command prompt
592 rows_needed = len(output.splitlines()) + 2
593 rows, cols = get_terminal_size()
594 if rows_needed < rows:
595 print output
596 else:
597 pydoc.pager(output.encode(sys.stdout.encoding))
598 else:
599 print output
601 def _shell(self):
602 print '\n'.join(x.strip() for x in ("""
603 gPodder %(__version__)s "%(__relname__)s" (%(__date__)s) - %(__url__)s
604 %(__copyright__)s
605 License: %(__license__)s
607 Entering interactive shell. Type 'help' for help.
608 Press Ctrl+D (EOF) or type 'quit' to quit.
609 """ % gpodder.__dict__).splitlines())
611 if readline is not None:
612 readline.parse_and_bind('tab: complete')
613 readline.set_completer(self._tab_completion)
614 readline.set_completer_delims(' ')
616 while True:
617 try:
618 line = raw_input('gpo> ')
619 except EOFError:
620 print ''
621 break
622 except KeyboardInterrupt:
623 print ''
624 continue
626 if self._prefixes.get(line, line) in self.EXIT_COMMANDS:
627 break
629 try:
630 args = shlex.split(line)
631 except ValueError, value_error:
632 self._error(_('Syntax error: %(error)s') %
633 {'error': value_error})
634 continue
636 try:
637 self._parse(args)
638 except KeyboardInterrupt:
639 self._error('Keyboard interrupt.')
640 except EOFError:
641 self._error('EOF.')
643 self._atexit()
645 def _error(self, *args):
646 print >>sys.stderr, inred(' '.join(args))
648 # Warnings look like error messages for now
649 _warn = _error
651 def _info(self, *args):
652 print >>sys.stdout, ' '.join(args)
654 def _checkargs(self, func, command_line):
655 args, varargs, keywords, defaults = inspect.getargspec(func)
656 args.pop(0) # Remove "self" from args
657 defaults = defaults or ()
658 minarg, maxarg = len(args)-len(defaults), len(args)
660 if len(command_line) < minarg or (len(command_line) > maxarg and \
661 varargs is None):
662 self._error('Wrong argument count for %s.' % func.__name__)
663 return False
665 return func(*command_line)
667 def _tab_completion_podcast(self, text, count):
668 """Tab completion for podcast URLs"""
669 urls = [p.url for p in self.client.get_podcasts() if text in p.url]
670 if count < len(urls):
671 return urls[count]
673 return None
676 def _tab_completion(self, text, count):
677 """Tab completion function for readline"""
678 if readline is None:
679 return None
681 current_line = readline.get_line_buffer()
682 if text == current_line:
683 for name in self._valid_commands:
684 if name.startswith(text):
685 if count == 0:
686 return name
687 else:
688 count -= 1
689 else:
690 args = current_line.split()
691 command = args.pop(0)
692 command_function = getattr(self, command, None)
693 if not command_function:
694 return None
695 if getattr(command_function, '_first_arg_is_podcast', False):
696 if not args or (len(args) == 1 and not current_line.endswith(' ')):
697 return self._tab_completion_podcast(text, count)
699 return None
702 def _parse_single(self, command_line):
703 try:
704 result = self._parse(command_line)
705 except KeyboardInterrupt:
706 self._error('Keyboard interrupt.')
707 result = -1
708 self._atexit()
709 return result
711 def _parse(self, command_line):
712 if not command_line:
713 return False
715 command = command_line.pop(0)
717 # Resolve command aliases
718 command = self._prefixes.get(command, command)
720 if command in self._commands:
721 func = self._commands[command]
722 if inspect.ismethod(func):
723 return self._checkargs(func, command_line)
725 if command in self._expansions:
726 print _('Ambigous command. Did you mean..')
727 for cmd in self._expansions[command]:
728 print ' ', inblue(cmd)
729 else:
730 self._error(_('The requested function is not available.'))
732 return False
735 def stylize(s):
736 s = re.sub(r' .{27}', lambda m: inblue(m.group(0)), s)
737 s = re.sub(r' - .*', lambda m: ingreen(m.group(0)), s)
738 return s
740 if __name__ == '__main__':
741 cli = gPodderCli()
742 args = sys.argv[1:]
743 if args:
744 is_single_command = True
745 cli._parse_single(args)
746 elif interactive_console:
747 cli._shell()
748 else:
749 sys.stdout.write(__doc__)