gpodder.util: Fix timestamp issues (bug 1983)
[gpodder.git] / src / gpodder / util.py
blob1af2fe20a5a53de8826b46166c4a16b400ca2e86
1 # -*- coding: utf-8 -*-
3 # gPodder - A media aggregator and podcast client
4 # Copyright (c) 2005-2014 Thomas Perl and the gPodder Team
5 # Copyright (c) 2011 Neal H. Walfield
7 # gPodder is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # gPodder is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 # util.py -- Misc utility functions
23 # Thomas Perl <thp@perli.net> 2007-08-04
26 """Miscellaneous helper functions for gPodder
28 This module provides helper and utility functions for gPodder that
29 are not tied to any specific part of gPodder.
31 """
33 import gpodder
35 import logging
36 logger = logging.getLogger(__name__)
38 import os
39 import os.path
40 import platform
41 import glob
42 import stat
43 import shlex
44 import shutil
45 import socket
46 import sys
47 import string
49 import re
50 import subprocess
51 from htmlentitydefs import entitydefs
52 import time
53 import gzip
54 import datetime
55 import threading
57 import urlparse
58 import urllib
59 import urllib2
60 import httplib
61 import webbrowser
62 import mimetypes
63 import itertools
65 import feedparser
67 import StringIO
68 import xml.dom.minidom
70 if gpodder.ui.win32:
71 try:
72 import win32file
73 except ImportError:
74 logger.warn('Running on Win32 but win32api/win32file not installed.')
75 win32file = None
77 _ = gpodder.gettext
78 N_ = gpodder.ngettext
81 import locale
82 try:
83 locale.setlocale(locale.LC_ALL, '')
84 except Exception, e:
85 logger.warn('Cannot set locale (%s)', e, exc_info=True)
87 # Native filesystem encoding detection
88 encoding = sys.getfilesystemencoding()
90 if encoding is None:
91 if 'LANG' in os.environ and '.' in os.environ['LANG']:
92 lang = os.environ['LANG']
93 (language, encoding) = lang.rsplit('.', 1)
94 logger.info('Detected encoding: %s', encoding)
95 elif gpodder.ui.harmattan:
96 encoding = 'utf-8'
97 elif gpodder.ui.win32:
98 # To quote http://docs.python.org/howto/unicode.html:
99 # ,,on Windows, Python uses the name "mbcs" to refer
100 # to whatever the currently configured encoding is``
101 encoding = 'mbcs'
102 else:
103 encoding = 'iso-8859-15'
104 logger.info('Assuming encoding: ISO-8859-15 ($LANG not set).')
107 # Filename / folder name sanitization
108 def _sanitize_char(c):
109 if c in string.whitespace:
110 return ' '
111 elif c in ',-.()':
112 return c
113 elif c in string.punctuation or ord(c) <= 31:
114 return '_'
116 return c
118 SANITIZATION_TABLE = ''.join(map(_sanitize_char, map(chr, range(256))))
119 del _sanitize_char
121 _MIME_TYPE_LIST = [
122 ('.aac', 'audio/aac'),
123 ('.axa', 'audio/annodex'),
124 ('.flac', 'audio/flac'),
125 ('.m4b', 'audio/m4b'),
126 ('.m4a', 'audio/mp4'),
127 ('.mp3', 'audio/mpeg'),
128 ('.spx', 'audio/ogg'),
129 ('.oga', 'audio/ogg'),
130 ('.ogg', 'audio/ogg'),
131 ('.wma', 'audio/x-ms-wma'),
132 ('.3gp', 'video/3gpp'),
133 ('.axv', 'video/annodex'),
134 ('.divx', 'video/divx'),
135 ('.m4v', 'video/m4v'),
136 ('.mp4', 'video/mp4'),
137 ('.ogv', 'video/ogg'),
138 ('.mov', 'video/quicktime'),
139 ('.flv', 'video/x-flv'),
140 ('.mkv', 'video/x-matroska'),
141 ('.wmv', 'video/x-ms-wmv'),
142 ('.opus', 'audio/opus'),
145 _MIME_TYPES = dict((k, v) for v, k in _MIME_TYPE_LIST)
146 _MIME_TYPES_EXT = dict(_MIME_TYPE_LIST)
149 def make_directory( path):
151 Tries to create a directory if it does not exist already.
152 Returns True if the directory exists after the function
153 call, False otherwise.
155 if os.path.isdir( path):
156 return True
158 try:
159 os.makedirs( path)
160 except:
161 logger.warn('Could not create directory: %s', path)
162 return False
164 return True
167 def normalize_feed_url(url):
169 Converts any URL to http:// or ftp:// so that it can be
170 used with "wget". If the URL cannot be converted (invalid
171 or unknown scheme), "None" is returned.
173 This will also normalize feed:// and itpc:// to http://.
175 >>> normalize_feed_url('itpc://example.org/podcast.rss')
176 'http://example.org/podcast.rss'
178 If no URL scheme is defined (e.g. "curry.com"), we will
179 simply assume the user intends to add a http:// feed.
181 >>> normalize_feed_url('curry.com')
182 'http://curry.com/'
184 There are even some more shortcuts for advanced users
185 and lazy typists (see the source for details).
187 >>> normalize_feed_url('fb:43FPodcast')
188 'http://feeds.feedburner.com/43FPodcast'
190 It will also take care of converting the domain name to
191 all-lowercase (because domains are not case sensitive):
193 >>> normalize_feed_url('http://Example.COM/')
194 'http://example.com/'
196 Some other minimalistic changes are also taken care of,
197 e.g. a ? with an empty query is removed:
199 >>> normalize_feed_url('http://example.org/test?')
200 'http://example.org/test'
202 Username and password in the URL must not be affected
203 by URL normalization (see gPodder bug 1942):
205 >>> normalize_feed_url('http://UserName:PassWord@Example.com/')
206 'http://UserName:PassWord@example.com/'
208 if not url or len(url) < 8:
209 return None
211 # This is a list of prefixes that you can use to minimize the amount of
212 # keystrokes that you have to use.
213 # Feel free to suggest other useful prefixes, and I'll add them here.
214 PREFIXES = {
215 'fb:': 'http://feeds.feedburner.com/%s',
216 'yt:': 'http://www.youtube.com/rss/user/%s/videos.rss',
217 'sc:': 'http://soundcloud.com/%s',
218 # YouTube playlists. To get a list of playlists per-user, use:
219 # https://gdata.youtube.com/feeds/api/users/<username>/playlists
220 'ytpl:': 'http://gdata.youtube.com/feeds/api/playlists/%s',
223 for prefix, expansion in PREFIXES.iteritems():
224 if url.startswith(prefix):
225 url = expansion % (url[len(prefix):],)
226 break
228 # Assume HTTP for URLs without scheme
229 if not '://' in url:
230 url = 'http://' + url
232 scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
234 # Domain name is case insensitive, but username/password is not (bug 1942)
235 if '@' in netloc:
236 authentication, netloc = netloc.rsplit('@', 1)
237 netloc = '@'.join((authentication, netloc.lower()))
238 else:
239 netloc = netloc.lower()
241 # Schemes and domain names are case insensitive
242 scheme = scheme.lower()
244 # Normalize empty paths to "/"
245 if path == '':
246 path = '/'
248 # feed://, itpc:// and itms:// are really http://
249 if scheme in ('feed', 'itpc', 'itms'):
250 scheme = 'http'
252 if scheme not in ('http', 'https', 'ftp', 'file'):
253 return None
255 # urlunsplit might return "a slighty different, but equivalent URL"
256 return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
259 def username_password_from_url(url):
260 r"""
261 Returns a tuple (username,password) containing authentication
262 data from the specified URL or (None,None) if no authentication
263 data can be found in the URL.
265 See Section 3.1 of RFC 1738 (http://www.ietf.org/rfc/rfc1738.txt)
267 >>> username_password_from_url('https://@host.com/')
268 ('', None)
269 >>> username_password_from_url('telnet://host.com/')
270 (None, None)
271 >>> username_password_from_url('ftp://foo:@host.com/')
272 ('foo', '')
273 >>> username_password_from_url('http://a:b@host.com/')
274 ('a', 'b')
275 >>> username_password_from_url(1)
276 Traceback (most recent call last):
278 ValueError: URL has to be a string or unicode object.
279 >>> username_password_from_url(None)
280 Traceback (most recent call last):
282 ValueError: URL has to be a string or unicode object.
283 >>> username_password_from_url('http://a@b:c@host.com/')
284 ('a@b', 'c')
285 >>> username_password_from_url('ftp://a:b:c@host.com/')
286 ('a', 'b:c')
287 >>> username_password_from_url('http://i%2Fo:P%40ss%3A@host.com/')
288 ('i/o', 'P@ss:')
289 >>> username_password_from_url('ftp://%C3%B6sterreich@host.com/')
290 ('\xc3\xb6sterreich', None)
291 >>> username_password_from_url('http://w%20x:y%20z@example.org/')
292 ('w x', 'y z')
293 >>> username_password_from_url('http://example.com/x@y:z@test.com/')
294 (None, None)
296 if type(url) not in (str, unicode):
297 raise ValueError('URL has to be a string or unicode object.')
299 (username, password) = (None, None)
301 (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
303 if '@' in netloc:
304 (authentication, netloc) = netloc.rsplit('@', 1)
305 if ':' in authentication:
306 (username, password) = authentication.split(':', 1)
308 # RFC1738 dictates that we should not allow ['/', '@', ':']
309 # characters in the username and password field (Section 3.1):
311 # 1. The "/" can't be in there at this point because of the way
312 # urlparse (which we use above) works.
313 # 2. Due to gPodder bug 1521, we allow "@" in the username and
314 # password field. We use netloc.rsplit('@', 1), which will
315 # make sure that we split it at the last '@' in netloc.
316 # 3. The colon must be excluded (RFC2617, Section 2) in the
317 # username, but is apparently allowed in the password. This
318 # is handled by the authentication.split(':', 1) above, and
319 # will cause any extraneous ':'s to be part of the password.
321 username = urllib.unquote(username)
322 password = urllib.unquote(password)
323 else:
324 username = urllib.unquote(authentication)
326 return (username, password)
328 def directory_is_writable(path):
330 Returns True if the specified directory exists and is writable
331 by the current user.
333 return os.path.isdir(path) and os.access(path, os.W_OK)
336 def calculate_size( path):
338 Tries to calculate the size of a directory, including any
339 subdirectories found. The returned value might not be
340 correct if the user doesn't have appropriate permissions
341 to list all subdirectories of the given path.
343 if path is None:
344 return 0L
346 if os.path.dirname( path) == '/':
347 return 0L
349 if os.path.isfile( path):
350 return os.path.getsize( path)
352 if os.path.isdir( path) and not os.path.islink( path):
353 sum = os.path.getsize( path)
355 try:
356 for item in os.listdir(path):
357 try:
358 sum += calculate_size(os.path.join(path, item))
359 except:
360 logger.warn('Cannot get size for %s', path, exc_info=True)
361 except:
362 logger.warn('Cannot access %s', path, exc_info=True)
364 return sum
366 return 0L
369 def file_modification_datetime(filename):
371 Returns the modification date of the specified file
372 as a datetime.datetime object or None if the modification
373 date cannot be determined.
375 if filename is None:
376 return None
378 if not os.access(filename, os.R_OK):
379 return None
381 try:
382 s = os.stat(filename)
383 timestamp = s[stat.ST_MTIME]
384 return datetime.datetime.fromtimestamp(timestamp)
385 except:
386 logger.warn('Cannot get mtime for %s', filename, exc_info=True)
387 return None
390 def file_age_in_days(filename):
392 Returns the age of the specified filename in days or
393 zero if the modification date cannot be determined.
395 dt = file_modification_datetime(filename)
396 if dt is None:
397 return 0
398 else:
399 return (datetime.datetime.now()-dt).days
401 def file_modification_timestamp(filename):
403 Returns the modification date of the specified file as a number
404 or -1 if the modification date cannot be determined.
406 if filename is None:
407 return -1
408 try:
409 s = os.stat(filename)
410 return s[stat.ST_MTIME]
411 except:
412 logger.warn('Cannot get modification timestamp for %s', filename)
413 return -1
416 def file_age_to_string(days):
418 Converts a "number of days" value to a string that
419 can be used in the UI to display the file age.
421 >>> file_age_to_string(0)
423 >>> file_age_to_string(1)
424 u'1 day ago'
425 >>> file_age_to_string(2)
426 u'2 days ago'
428 if days < 1:
429 return ''
430 else:
431 return N_('%(count)d day ago', '%(count)d days ago', days) % {'count':days}
434 def is_system_file(filename):
436 Checks to see if the given file is a system file.
438 if gpodder.ui.win32 and win32file is not None:
439 result = win32file.GetFileAttributes(filename)
440 #-1 is returned by GetFileAttributes when an error occurs
441 #0x4 is the FILE_ATTRIBUTE_SYSTEM constant
442 return result != -1 and result & 0x4 != 0
443 else:
444 return False
447 def get_free_disk_space_win32(path):
449 Win32-specific code to determine the free disk space remaining
450 for a given path. Uses code from:
452 http://mail.python.org/pipermail/python-list/2003-May/203223.html
454 if win32file is None:
455 # Cannot determine free disk space
456 return 0
458 drive, tail = os.path.splitdrive(path)
459 userFree, userTotal, freeOnDisk = win32file.GetDiskFreeSpaceEx(drive)
460 return userFree
463 def get_free_disk_space(path):
465 Calculates the free disk space available to the current user
466 on the file system that contains the given path.
468 If the path (or its parent folder) does not yet exist, this
469 function returns zero.
472 if not os.path.exists(path):
473 return 0
475 if gpodder.ui.win32:
476 return get_free_disk_space_win32(path)
478 s = os.statvfs(path)
480 return s.f_bavail * s.f_bsize
483 def format_date(timestamp):
485 Converts a UNIX timestamp to a date representation. This
486 function returns "Today", "Yesterday", a weekday name or
487 the date in %x format, which (according to the Python docs)
488 is the "Locale's appropriate date representation".
490 Returns None if there has been an error converting the
491 timestamp to a string representation.
493 if timestamp is None:
494 return None
496 seconds_in_a_day = 60*60*24
498 today = time.localtime()[:3]
499 yesterday = time.localtime(time.time() - seconds_in_a_day)[:3]
500 try:
501 timestamp_date = time.localtime(timestamp)[:3]
502 except ValueError, ve:
503 logger.warn('Cannot convert timestamp', exc_info=True)
504 return None
505 except TypeError, te:
506 logger.warn('Cannot convert timestamp', exc_info=True)
507 return None
509 if timestamp_date == today:
510 return _('Today')
511 elif timestamp_date == yesterday:
512 return _('Yesterday')
514 try:
515 diff = int( (time.time() - timestamp)/seconds_in_a_day )
516 except:
517 logger.warn('Cannot convert "%s" to date.', timestamp, exc_info=True)
518 return None
520 try:
521 timestamp = datetime.datetime.fromtimestamp(timestamp)
522 except:
523 return None
525 if diff < 7:
526 # Weekday name
527 return str(timestamp.strftime('%A').decode(encoding))
528 else:
529 # Locale's appropriate date representation
530 return str(timestamp.strftime('%x'))
533 def format_filesize(bytesize, use_si_units=False, digits=2):
535 Formats the given size in bytes to be human-readable,
537 Returns a localized "(unknown)" string when the bytesize
538 has a negative value.
540 si_units = (
541 ( 'kB', 10**3 ),
542 ( 'MB', 10**6 ),
543 ( 'GB', 10**9 ),
546 binary_units = (
547 ( 'KiB', 2**10 ),
548 ( 'MiB', 2**20 ),
549 ( 'GiB', 2**30 ),
552 try:
553 bytesize = float( bytesize)
554 except:
555 return _('(unknown)')
557 if bytesize < 0:
558 return _('(unknown)')
560 if use_si_units:
561 units = si_units
562 else:
563 units = binary_units
565 ( used_unit, used_value ) = ( 'B', bytesize )
567 for ( unit, value ) in units:
568 if bytesize >= value:
569 used_value = bytesize / float(value)
570 used_unit = unit
572 return ('%.'+str(digits)+'f %s') % (used_value, used_unit)
575 def delete_file(filename):
576 """Delete a file from the filesystem
578 Errors (permissions errors or file not found)
579 are silently ignored.
581 try:
582 os.remove(filename)
583 except:
584 pass
587 def remove_html_tags(html):
589 Remove HTML tags from a string and replace numeric and
590 named entities with the corresponding character, so the
591 HTML text can be displayed in a simple text view.
593 if html is None:
594 return None
596 # If we would want more speed, we could make these global
597 re_strip_tags = re.compile('<[^>]*>')
598 re_unicode_entities = re.compile('&#(\d{2,4});')
599 re_html_entities = re.compile('&(.{2,8});')
600 re_newline_tags = re.compile('(<br[^>]*>|<[/]?ul[^>]*>|</li>)', re.I)
601 re_listing_tags = re.compile('<li[^>]*>', re.I)
603 result = html
605 # Convert common HTML elements to their text equivalent
606 result = re_newline_tags.sub('\n', result)
607 result = re_listing_tags.sub('\n * ', result)
608 result = re.sub('<[Pp]>', '\n\n', result)
610 # Remove all HTML/XML tags from the string
611 result = re_strip_tags.sub('', result)
613 # Convert numeric XML entities to their unicode character
614 result = re_unicode_entities.sub(lambda x: unichr(int(x.group(1))), result)
616 # Convert named HTML entities to their unicode character
617 result = re_html_entities.sub(lambda x: unicode(entitydefs.get(x.group(1),''), 'iso-8859-1'), result)
619 # Convert more than two newlines to two newlines
620 result = re.sub('([\r\n]{2})([\r\n])+', '\\1', result)
622 return result.strip()
625 def wrong_extension(extension):
627 Determine if a given extension looks like it's
628 wrong (e.g. empty, extremely long or spaces)
630 Returns True if the extension most likely is a
631 wrong one and should be replaced.
633 >>> wrong_extension('.mp3')
634 False
635 >>> wrong_extension('.divx')
636 False
637 >>> wrong_extension('mp3')
638 True
639 >>> wrong_extension('')
640 True
641 >>> wrong_extension('.12 - Everybody')
642 True
643 >>> wrong_extension('.mp3 ')
644 True
645 >>> wrong_extension('.')
646 True
647 >>> wrong_extension('.42')
648 True
650 if not extension:
651 return True
652 elif len(extension) > 5:
653 return True
654 elif ' ' in extension:
655 return True
656 elif extension == '.':
657 return True
658 elif not extension.startswith('.'):
659 return True
660 else:
661 try:
662 # ".<number>" is an invalid extension
663 float(extension)
664 return True
665 except:
666 pass
668 return False
671 def extension_from_mimetype(mimetype):
673 Simply guesses what the file extension should be from the mimetype
675 >>> extension_from_mimetype('audio/mp4')
676 '.m4a'
677 >>> extension_from_mimetype('audio/ogg')
678 '.ogg'
679 >>> extension_from_mimetype('audio/mpeg')
680 '.mp3'
681 >>> extension_from_mimetype('video/x-matroska')
682 '.mkv'
683 >>> extension_from_mimetype('wrong-mimetype')
686 if mimetype in _MIME_TYPES:
687 return _MIME_TYPES[mimetype]
688 return mimetypes.guess_extension(mimetype) or ''
691 def mimetype_from_extension(extension):
693 Simply guesses what the mimetype should be from the file extension
695 >>> mimetype_from_extension('.m4a')
696 'audio/mp4'
697 >>> mimetype_from_extension('.ogg')
698 'audio/ogg'
699 >>> mimetype_from_extension('.mp3')
700 'audio/mpeg'
701 >>> mimetype_from_extension('.mkv')
702 'video/x-matroska'
703 >>> mimetype_from_extension('._invalid_file_extension_')
706 if extension in _MIME_TYPES_EXT:
707 return _MIME_TYPES_EXT[extension]
709 # Need to prepend something to the extension, so guess_type works
710 type, encoding = mimetypes.guess_type('file'+extension)
712 return type or ''
715 def extension_correct_for_mimetype(extension, mimetype):
717 Check if the given filename extension (e.g. ".ogg") is a possible
718 extension for a given mimetype (e.g. "application/ogg") and return
719 a boolean value (True if it's possible, False if not). Also do
721 >>> extension_correct_for_mimetype('.ogg', 'application/ogg')
722 True
723 >>> extension_correct_for_mimetype('.ogv', 'video/ogg')
724 True
725 >>> extension_correct_for_mimetype('.ogg', 'audio/mpeg')
726 False
727 >>> extension_correct_for_mimetype('.m4a', 'audio/mp4')
728 True
729 >>> extension_correct_for_mimetype('mp3', 'audio/mpeg')
730 Traceback (most recent call last):
732 ValueError: "mp3" is not an extension (missing .)
733 >>> extension_correct_for_mimetype('.mp3', 'audio mpeg')
734 Traceback (most recent call last):
736 ValueError: "audio mpeg" is not a mimetype (missing /)
738 if not '/' in mimetype:
739 raise ValueError('"%s" is not a mimetype (missing /)' % mimetype)
740 if not extension.startswith('.'):
741 raise ValueError('"%s" is not an extension (missing .)' % extension)
743 if (extension, mimetype) in _MIME_TYPE_LIST:
744 return True
746 # Create a "default" extension from the mimetype, e.g. "application/ogg"
747 # becomes ".ogg", "audio/mpeg" becomes ".mpeg", etc...
748 default = ['.'+mimetype.split('/')[-1]]
750 return extension in default+mimetypes.guess_all_extensions(mimetype)
753 def filename_from_url(url):
755 Extracts the filename and (lowercase) extension (with dot)
756 from a URL, e.g. http://server.com/file.MP3?download=yes
757 will result in the string ("file", ".mp3") being returned.
759 This function will also try to best-guess the "real"
760 extension for a media file (audio, video) by
761 trying to match an extension to these types and recurse
762 into the query string to find better matches, if the
763 original extension does not resolve to a known type.
765 http://my.net/redirect.php?my.net/file.ogg => ("file", ".ogg")
766 http://server/get.jsp?file=/episode0815.MOV => ("episode0815", ".mov")
767 http://s/redirect.mp4?http://serv2/test.mp4 => ("test", ".mp4")
769 (scheme, netloc, path, para, query, fragid) = urlparse.urlparse(url)
770 (filename, extension) = os.path.splitext(os.path.basename( urllib.unquote(path)))
772 if file_type_by_extension(extension) is not None and not \
773 query.startswith(scheme+'://'):
774 # We have found a valid extension (audio, video)
775 # and the query string doesn't look like a URL
776 return ( filename, extension.lower() )
778 # If the query string looks like a possible URL, try that first
779 if len(query.strip()) > 0 and query.find('/') != -1:
780 query_url = '://'.join((scheme, urllib.unquote(query)))
781 (query_filename, query_extension) = filename_from_url(query_url)
783 if file_type_by_extension(query_extension) is not None:
784 return os.path.splitext(os.path.basename(query_url))
786 # No exact match found, simply return the original filename & extension
787 return ( filename, extension.lower() )
790 def file_type_by_extension(extension):
792 Tries to guess the file type by looking up the filename
793 extension from a table of known file types. Will return
794 "audio", "video" or None.
796 >>> file_type_by_extension('.aif')
797 'audio'
798 >>> file_type_by_extension('.3GP')
799 'video'
800 >>> file_type_by_extension('.m4a')
801 'audio'
802 >>> file_type_by_extension('.txt') is None
803 True
804 >>> file_type_by_extension(None) is None
805 True
806 >>> file_type_by_extension('ogg')
807 Traceback (most recent call last):
809 ValueError: Extension does not start with a dot: ogg
811 if not extension:
812 return None
814 if not extension.startswith('.'):
815 raise ValueError('Extension does not start with a dot: %s' % extension)
817 extension = extension.lower()
819 if extension in _MIME_TYPES_EXT:
820 return _MIME_TYPES_EXT[extension].split('/')[0]
822 # Need to prepend something to the extension, so guess_type works
823 type, encoding = mimetypes.guess_type('file'+extension)
825 if type is not None and '/' in type:
826 filetype, rest = type.split('/', 1)
827 if filetype in ('audio', 'video', 'image'):
828 return filetype
830 return None
833 def get_first_line( s):
835 Returns only the first line of a string, stripped so
836 that it doesn't have whitespace before or after.
838 return s.strip().split('\n')[0].strip()
841 def object_string_formatter(s, **kwargs):
843 Makes attributes of object passed in as keyword
844 arguments available as {OBJECTNAME.ATTRNAME} in
845 the passed-in string and returns a string with
846 the above arguments replaced with the attribute
847 values of the corresponding object.
849 >>> class x: pass
850 >>> a = x()
851 >>> a.title = 'Hello world'
852 >>> object_string_formatter('{episode.title}', episode=a)
853 'Hello world'
855 >>> class x: pass
856 >>> a = x()
857 >>> a.published = 123
858 >>> object_string_formatter('Hi {episode.published} 456', episode=a)
859 'Hi 123 456'
861 result = s
862 for key, o in kwargs.iteritems():
863 matches = re.findall(r'\{%s\.([^\}]+)\}' % key, s)
864 for attr in matches:
865 if hasattr(o, attr):
866 try:
867 from_s = '{%s.%s}' % (key, attr)
868 to_s = str(getattr(o, attr))
869 result = result.replace(from_s, to_s)
870 except:
871 logger.warn('Replace of "%s" failed for "%s".', attr, s)
873 return result
876 def format_desktop_command(command, filenames, start_position=None):
878 Formats a command template from the "Exec=" line of a .desktop
879 file to a string that can be invoked in a shell.
881 Handled format strings: %U, %u, %F, %f and a fallback that
882 appends the filename as first parameter of the command.
884 Also handles non-standard %p which is replaced with the start_position
885 (probably only makes sense if starting a single file). (see bug 1140)
887 See http://standards.freedesktop.org/desktop-entry-spec/1.0/ar01s06.html
889 Returns a list of commands to execute, either one for
890 each filename if the application does not support multiple
891 file names or one for all filenames (%U, %F or unknown).
893 # Replace backslashes with slashes to fix win32 issues
894 # (even on win32, "/" works, but "\" does not)
895 command = command.replace('\\', '/')
897 if start_position is not None:
898 command = command.replace('%p', str(start_position))
900 command = shlex.split(command)
902 command_before = command
903 command_after = []
904 multiple_arguments = True
905 for fieldcode in ('%U', '%F', '%u', '%f'):
906 if fieldcode in command:
907 command_before = command[:command.index(fieldcode)]
908 command_after = command[command.index(fieldcode)+1:]
909 multiple_arguments = fieldcode in ('%U', '%F')
910 break
912 if multiple_arguments:
913 return [command_before + filenames + command_after]
915 commands = []
916 for filename in filenames:
917 commands.append(command_before+[filename]+command_after)
919 return commands
921 def url_strip_authentication(url):
923 Strips authentication data from an URL. Returns the URL with
924 the authentication data removed from it.
926 >>> url_strip_authentication('https://host.com/')
927 'https://host.com/'
928 >>> url_strip_authentication('telnet://foo:bar@host.com/')
929 'telnet://host.com/'
930 >>> url_strip_authentication('ftp://billy@example.org')
931 'ftp://example.org'
932 >>> url_strip_authentication('ftp://billy:@example.org')
933 'ftp://example.org'
934 >>> url_strip_authentication('http://aa:bc@localhost/x')
935 'http://localhost/x'
936 >>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
937 'http://blubb.lan/u.html'
938 >>> url_strip_authentication('http://c:d@x.org/')
939 'http://x.org/'
940 >>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
941 'http://cx.lan'
942 >>> url_strip_authentication('http://x@x.com:s3cret@example.com/')
943 'http://example.com/'
945 url_parts = list(urlparse.urlsplit(url))
946 # url_parts[1] is the HOST part of the URL
948 # Remove existing authentication data
949 if '@' in url_parts[1]:
950 url_parts[1] = url_parts[1].rsplit('@', 1)[1]
952 return urlparse.urlunsplit(url_parts)
955 def url_add_authentication(url, username, password):
957 Adds authentication data (username, password) to a given
958 URL in order to construct an authenticated URL.
960 >>> url_add_authentication('https://host.com/', '', None)
961 'https://host.com/'
962 >>> url_add_authentication('http://example.org/', None, None)
963 'http://example.org/'
964 >>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
965 'telnet://foo:bar@host.com/'
966 >>> url_add_authentication('ftp://example.org', 'billy', None)
967 'ftp://billy@example.org'
968 >>> url_add_authentication('ftp://example.org', 'billy', '')
969 'ftp://billy:@example.org'
970 >>> url_add_authentication('http://localhost/x', 'aa', 'bc')
971 'http://aa:bc@localhost/x'
972 >>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
973 'http://i%2Fo:P@ss:@blubb.lan/u.html'
974 >>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
975 'http://c:d@x.org/'
976 >>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@x', 'i/')
977 'http://P@x:i%2F@cx.lan'
978 >>> url_add_authentication('http://x.org/', 'a b', 'c d')
979 'http://a%20b:c%20d@x.org/'
981 if username is None or username == '':
982 return url
984 # Relaxations of the strict quoting rules (bug 1521):
985 # 1. Accept '@' in username and password
986 # 2. Acecpt ':' in password only
987 username = urllib.quote(username, safe='@')
989 if password is not None:
990 password = urllib.quote(password, safe='@:')
991 auth_string = ':'.join((username, password))
992 else:
993 auth_string = username
995 url = url_strip_authentication(url)
997 url_parts = list(urlparse.urlsplit(url))
998 # url_parts[1] is the HOST part of the URL
999 url_parts[1] = '@'.join((auth_string, url_parts[1]))
1001 return urlparse.urlunsplit(url_parts)
1004 def urlopen(url, headers=None, data=None, timeout=None):
1006 An URL opener with the User-agent set to gPodder (with version)
1008 username, password = username_password_from_url(url)
1009 if username is not None or password is not None:
1010 url = url_strip_authentication(url)
1011 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
1012 password_mgr.add_password(None, url, username, password)
1013 handler = urllib2.HTTPBasicAuthHandler(password_mgr)
1014 opener = urllib2.build_opener(handler)
1015 else:
1016 opener = urllib2.build_opener()
1018 if headers is None:
1019 headers = {}
1020 else:
1021 headers = dict(headers)
1023 headers.update({'User-agent': gpodder.user_agent})
1024 request = urllib2.Request(url, data=data, headers=headers)
1025 if timeout is None:
1026 return opener.open(request)
1027 else:
1028 return opener.open(request, timeout=timeout)
1030 def get_real_url(url):
1032 Gets the real URL of a file and resolves all redirects.
1034 try:
1035 return urlopen(url).geturl()
1036 except:
1037 logger.error('Getting real url for %s', url, exc_info=True)
1038 return url
1041 def find_command(command):
1043 Searches the system's PATH for a specific command that is
1044 executable by the user. Returns the first occurence of an
1045 executable binary in the PATH, or None if the command is
1046 not available.
1048 On Windows, this also looks for "<command>.bat" and
1049 "<command>.exe" files if "<command>" itself doesn't exist.
1052 if 'PATH' not in os.environ:
1053 return None
1055 for path in os.environ['PATH'].split(os.pathsep):
1056 command_file = os.path.join(path, command)
1057 if gpodder.ui.win32 and not os.path.exists(command_file):
1058 for extension in ('.bat', '.exe'):
1059 cmd = command_file + extension
1060 if os.path.isfile(cmd):
1061 command_file = cmd
1062 break
1063 if os.path.isfile(command_file) and os.access(command_file, os.X_OK):
1064 return command_file
1066 return None
1068 idle_add_handler = None
1070 def idle_add(func, *args):
1071 """Run a function in the main GUI thread
1073 This is a wrapper function that does the Right Thing depending on if we are
1074 running on Gtk+, Qt or CLI.
1076 You should use this function if you are calling from a Python thread and
1077 modify UI data, so that you make sure that the function is called as soon
1078 as possible from the main UI thread.
1080 if gpodder.ui.gtk:
1081 import gobject
1082 gobject.idle_add(func, *args)
1083 elif gpodder.ui.qml:
1084 from PySide.QtCore import Signal, QTimer, QThread, Qt, QObject
1086 class IdleAddHandler(QObject):
1087 signal = Signal(object)
1088 def __init__(self):
1089 QObject.__init__(self)
1091 self.main_thread_id = QThread.currentThreadId()
1093 self.signal.connect(self.run_func)
1095 def run_func(self, func):
1096 assert QThread.currentThreadId() == self.main_thread_id, \
1097 ("Running in %s, not %s"
1098 % (str(QThread.currentThreadId()),
1099 str(self.main_thread_id)))
1100 func()
1102 def idle_add(self, func, *args):
1103 def doit():
1104 try:
1105 func(*args)
1106 except Exception, e:
1107 logger.exception("Running %s%s: %s",
1108 func, str(tuple(args)), str(e))
1110 if QThread.currentThreadId() == self.main_thread_id:
1111 # If we emit the signal in the main thread,
1112 # then the function will be run immediately.
1113 # Instead, use a single shot timer with a 0
1114 # timeout: this will run the function when the
1115 # event loop next iterates.
1116 QTimer.singleShot(0, doit)
1117 else:
1118 self.signal.emit(doit)
1120 global idle_add_handler
1121 if idle_add_handler is None:
1122 idle_add_handler = IdleAddHandler()
1124 idle_add_handler.idle_add(func, *args)
1125 else:
1126 func(*args)
1129 def bluetooth_available():
1131 Returns True or False depending on the availability
1132 of bluetooth functionality on the system.
1134 if find_command('bluetooth-sendto') or \
1135 find_command('gnome-obex-send'):
1136 return True
1137 else:
1138 return False
1141 def bluetooth_send_file(filename):
1143 Sends a file via bluetooth.
1145 This function tries to use "bluetooth-sendto", and if
1146 it is not available, it also tries "gnome-obex-send".
1148 command_line = None
1150 if find_command('bluetooth-sendto'):
1151 command_line = ['bluetooth-sendto']
1152 elif find_command('gnome-obex-send'):
1153 command_line = ['gnome-obex-send']
1155 if command_line is not None:
1156 command_line.append(filename)
1157 return (subprocess.Popen(command_line).wait() == 0)
1158 else:
1159 logger.error('Cannot send file. Please install "bluetooth-sendto" or "gnome-obex-send".')
1160 return False
1163 def format_time(value):
1164 """Format a seconds value to a string
1166 >>> format_time(0)
1167 '00:00'
1168 >>> format_time(20)
1169 '00:20'
1170 >>> format_time(3600)
1171 '01:00:00'
1172 >>> format_time(10921)
1173 '03:02:01'
1175 dt = datetime.datetime.utcfromtimestamp(value)
1176 if dt.hour == 0:
1177 return dt.strftime('%M:%S')
1178 else:
1179 return dt.strftime('%H:%M:%S')
1181 def parse_time(value):
1182 """Parse a time string into seconds
1184 >>> parse_time('00:00')
1186 >>> parse_time('00:00:00')
1188 >>> parse_time('00:20')
1190 >>> parse_time('00:00:20')
1192 >>> parse_time('01:00:00')
1193 3600
1194 >>> parse_time('03:02:01')
1195 10921
1196 >>> parse_time('61:08')
1197 3668
1198 >>> parse_time('25:03:30')
1199 90210
1200 >>> parse_time('25:3:30')
1201 90210
1202 >>> parse_time('61.08')
1203 3668
1205 if value == '':
1206 return 0
1208 if not value:
1209 raise ValueError('Invalid value: %s' % (str(value),))
1211 m = re.match(r'(\d+)[:.](\d\d?)[:.](\d\d?)', value)
1212 if m:
1213 hours, minutes, seconds = m.groups()
1214 return (int(hours) * 60 + int(minutes)) * 60 + int(seconds)
1216 m = re.match(r'(\d+)[:.](\d\d?)', value)
1217 if m:
1218 minutes, seconds = m.groups()
1219 return int(minutes) * 60 + int(seconds)
1221 return int(value)
1224 def format_seconds_to_hour_min_sec(seconds):
1226 Take the number of seconds and format it into a
1227 human-readable string (duration).
1229 >>> format_seconds_to_hour_min_sec(3834)
1230 u'1 hour, 3 minutes and 54 seconds'
1231 >>> format_seconds_to_hour_min_sec(3600)
1232 u'1 hour'
1233 >>> format_seconds_to_hour_min_sec(62)
1234 u'1 minute and 2 seconds'
1237 if seconds < 1:
1238 return N_('%(count)d second', '%(count)d seconds', seconds) % {'count':seconds}
1240 result = []
1242 seconds = int(seconds)
1244 hours = seconds/3600
1245 seconds = seconds%3600
1247 minutes = seconds/60
1248 seconds = seconds%60
1250 if hours:
1251 result.append(N_('%(count)d hour', '%(count)d hours', hours) % {'count':hours})
1253 if minutes:
1254 result.append(N_('%(count)d minute', '%(count)d minutes', minutes) % {'count':minutes})
1256 if seconds:
1257 result.append(N_('%(count)d second', '%(count)d seconds', seconds) % {'count':seconds})
1259 if len(result) > 1:
1260 return (' '+_('and')+' ').join((', '.join(result[:-1]), result[-1]))
1261 else:
1262 return result[0]
1264 def http_request(url, method='HEAD'):
1265 (scheme, netloc, path, parms, qry, fragid) = urlparse.urlparse(url)
1266 conn = httplib.HTTPConnection(netloc)
1267 start = len(scheme) + len('://') + len(netloc)
1268 conn.request(method, url[start:])
1269 return conn.getresponse()
1272 def gui_open(filename):
1274 Open a file or folder with the default application set
1275 by the Desktop environment. This uses "xdg-open" on all
1276 systems with a few exceptions:
1278 on Win32, os.startfile() is used
1280 try:
1281 if gpodder.ui.win32:
1282 os.startfile(filename)
1283 elif gpodder.ui.osx:
1284 subprocess.Popen(['open', filename])
1285 else:
1286 subprocess.Popen(['xdg-open', filename])
1287 return True
1288 except:
1289 logger.error('Cannot open file/folder: "%s"', filename, exc_info=True)
1290 return False
1293 def open_website(url):
1295 Opens the specified URL using the default system web
1296 browser. This uses Python's "webbrowser" module, so
1297 make sure your system is set up correctly.
1299 run_in_background(lambda: webbrowser.open(url))
1301 def convert_bytes(d):
1303 Convert byte strings to unicode strings
1305 This function will decode byte strings into unicode
1306 strings. Any other data types will be left alone.
1308 >>> convert_bytes(None)
1309 >>> convert_bytes(1)
1311 >>> convert_bytes(4711L)
1312 4711L
1313 >>> convert_bytes(True)
1314 True
1315 >>> convert_bytes(3.1415)
1316 3.1415
1317 >>> convert_bytes('Hello')
1318 u'Hello'
1319 >>> convert_bytes(u'Hey')
1320 u'Hey'
1322 if d is None:
1323 return d
1324 if any(isinstance(d, t) for t in (int, long, bool, float)):
1325 return d
1326 elif not isinstance(d, unicode):
1327 return d.decode('utf-8', 'ignore')
1328 return d
1330 def sanitize_encoding(filename):
1331 r"""
1332 Generate a sanitized version of a string (i.e.
1333 remove invalid characters and encode in the
1334 detected native language encoding).
1336 >>> sanitize_encoding('\x80')
1338 >>> sanitize_encoding(u'unicode')
1339 'unicode'
1341 # The encoding problem goes away in Python 3.. hopefully!
1342 if sys.version_info >= (3, 0):
1343 return filename
1345 global encoding
1346 if not isinstance(filename, unicode):
1347 filename = filename.decode(encoding, 'ignore')
1348 return filename.encode(encoding, 'ignore')
1351 def sanitize_filename(filename, max_length=0, use_ascii=False):
1353 Generate a sanitized version of a filename that can
1354 be written on disk (i.e. remove/replace invalid
1355 characters and encode in the native language) and
1356 trim filename if greater than max_length (0 = no limit).
1358 If use_ascii is True, don't encode in the native language,
1359 but use only characters from the ASCII character set.
1361 if not isinstance(filename, unicode):
1362 filename = filename.decode(encoding, 'ignore')
1364 if max_length > 0 and len(filename) > max_length:
1365 logger.info('Limiting file/folder name "%s" to %d characters.',
1366 filename, max_length)
1367 filename = filename[:max_length]
1369 filename = filename.encode('ascii' if use_ascii else encoding, 'ignore')
1370 filename = filename.translate(SANITIZATION_TABLE)
1371 filename = filename.strip('.' + string.whitespace)
1373 return filename
1376 def find_mount_point(directory):
1378 Try to find the mount point for a given directory.
1379 If the directory is itself a mount point, return
1380 it. If not, remove the last part of the path and
1381 re-check if it's a mount point. If the directory
1382 resides on your root filesystem, "/" is returned.
1384 >>> find_mount_point('/')
1387 >>> find_mount_point(u'/something')
1388 Traceback (most recent call last):
1390 ValueError: Convert unicode objects to str first.
1392 >>> find_mount_point(None)
1393 Traceback (most recent call last):
1395 ValueError: Directory names should be of type str.
1397 >>> find_mount_point(42)
1398 Traceback (most recent call last):
1400 ValueError: Directory names should be of type str.
1402 >>> from minimock import mock, restore
1403 >>> mocked_mntpoints = ('/', '/home', '/media/usbdisk', '/media/cdrom')
1404 >>> mock('os.path.ismount', returns_func=lambda x: x in mocked_mntpoints)
1406 >>> # For mocking os.getcwd(), we simply use a lambda to avoid the
1407 >>> # massive output of "Called os.getcwd()" lines in this doctest
1408 >>> os.getcwd = lambda: '/home/thp'
1410 >>> find_mount_point('.')
1411 Called os.path.ismount('/home/thp')
1412 Called os.path.ismount('/home')
1413 '/home'
1414 >>> find_mount_point('relativity')
1415 Called os.path.ismount('/home/thp/relativity')
1416 Called os.path.ismount('/home/thp')
1417 Called os.path.ismount('/home')
1418 '/home'
1419 >>> find_mount_point('/media/usbdisk/')
1420 Called os.path.ismount('/media/usbdisk')
1421 '/media/usbdisk'
1422 >>> find_mount_point('/home/thp/Desktop')
1423 Called os.path.ismount('/home/thp/Desktop')
1424 Called os.path.ismount('/home/thp')
1425 Called os.path.ismount('/home')
1426 '/home'
1427 >>> find_mount_point('/media/usbdisk/Podcasts/With Spaces')
1428 Called os.path.ismount('/media/usbdisk/Podcasts/With Spaces')
1429 Called os.path.ismount('/media/usbdisk/Podcasts')
1430 Called os.path.ismount('/media/usbdisk')
1431 '/media/usbdisk'
1432 >>> find_mount_point('/home/')
1433 Called os.path.ismount('/home')
1434 '/home'
1435 >>> find_mount_point('/media/cdrom/../usbdisk/blubb//')
1436 Called os.path.ismount('/media/usbdisk/blubb')
1437 Called os.path.ismount('/media/usbdisk')
1438 '/media/usbdisk'
1439 >>> restore()
1441 if isinstance(directory, unicode):
1442 # XXX: This is only valid for Python 2 - misleading error in Python 3?
1443 # We do not accept unicode strings, because they could fail when
1444 # trying to be converted to some native encoding, so fail loudly
1445 # and leave it up to the callee to encode into the proper encoding.
1446 raise ValueError('Convert unicode objects to str first.')
1448 if not isinstance(directory, str):
1449 # In Python 2, we assume it's a byte str; in Python 3, we assume
1450 # that it's a unicode str. The abspath/ismount/split functions of
1451 # os.path work with unicode str in Python 3, but not in Python 2.
1452 raise ValueError('Directory names should be of type str.')
1454 directory = os.path.abspath(directory)
1456 while directory != '/':
1457 if os.path.ismount(directory):
1458 return directory
1459 else:
1460 (directory, tail_data) = os.path.split(directory)
1462 return '/'
1465 # matches http:// and ftp:// and mailto://
1466 protocolPattern = re.compile(r'^\w+://')
1468 def isabs(string):
1470 @return true if string is an absolute path or protocoladdress
1471 for addresses beginning in http:// or ftp:// or ldap:// -
1472 they are considered "absolute" paths.
1473 Source: http://code.activestate.com/recipes/208993/
1475 if protocolPattern.match(string): return 1
1476 return os.path.isabs(string)
1479 def commonpath(l1, l2, common=[]):
1481 helper functions for relpath
1482 Source: http://code.activestate.com/recipes/208993/
1484 if len(l1) < 1: return (common, l1, l2)
1485 if len(l2) < 1: return (common, l1, l2)
1486 if l1[0] != l2[0]: return (common, l1, l2)
1487 return commonpath(l1[1:], l2[1:], common+[l1[0]])
1489 def relpath(p1, p2):
1491 Finds relative path from p1 to p2
1492 Source: http://code.activestate.com/recipes/208993/
1494 pathsplit = lambda s: s.split(os.path.sep)
1496 (common,l1,l2) = commonpath(pathsplit(p1), pathsplit(p2))
1497 p = []
1498 if len(l1) > 0:
1499 p = [ ('..'+os.sep) * len(l1) ]
1500 p = p + l2
1501 if len(p) is 0:
1502 return "."
1504 return os.path.join(*p)
1507 def get_hostname():
1508 """Return the hostname of this computer
1510 This can be implemented in a different way on each
1511 platform and should yield a unique-per-user device ID.
1513 nodename = platform.node()
1515 if nodename:
1516 return nodename
1518 # Fallback - but can this give us "localhost"?
1519 return socket.gethostname()
1521 def detect_device_type():
1522 """Device type detection for gpodder.net
1524 This function tries to detect on which
1525 kind of device gPodder is running on.
1527 Possible return values:
1528 desktop, laptop, mobile, server, other
1530 if gpodder.ui.harmattan:
1531 return 'mobile'
1532 elif glob.glob('/proc/acpi/battery/*'):
1533 # Linux: If we have a battery, assume Laptop
1534 return 'laptop'
1536 return 'desktop'
1539 def write_m3u_playlist(m3u_filename, episodes, extm3u=True):
1540 """Create an M3U playlist from a episode list
1542 If the parameter "extm3u" is False, the list of
1543 episodes should be a list of filenames, and no
1544 extended information will be written into the
1545 M3U files (#EXTM3U / #EXTINF).
1547 If the parameter "extm3u" is True (default), then the
1548 list of episodes should be PodcastEpisode objects,
1549 as the extended metadata will be taken from them.
1551 f = open(m3u_filename, 'w')
1553 if extm3u:
1554 # Mandatory header for extended playlists
1555 f.write('#EXTM3U\n')
1557 for episode in episodes:
1558 if not extm3u:
1559 # Episode objects are strings that contain file names
1560 f.write(episode+'\n')
1561 continue
1563 if episode.was_downloaded(and_exists=True):
1564 filename = episode.local_filename(create=False)
1565 assert filename is not None
1567 if os.path.dirname(filename).startswith(os.path.dirname(m3u_filename)):
1568 filename = filename[len(os.path.dirname(m3u_filename)+os.sep):]
1569 f.write('#EXTINF:0,'+episode.playlist_title()+'\n')
1570 f.write(filename+'\n')
1572 f.close()
1575 def generate_names(filename):
1576 basename, ext = os.path.splitext(filename)
1577 for i in itertools.count():
1578 if i:
1579 yield '%s (%d)%s' % (basename, i+1, ext)
1580 else:
1581 yield filename
1584 def is_known_redirecter(url):
1585 """Check if a URL redirect is expected, and no filenames should be updated
1587 We usually honor URL redirects, and update filenames accordingly.
1588 In some cases (e.g. Soundcloud) this results in a worse filename,
1589 so we hardcode and detect these cases here to avoid renaming files
1590 for which we know that a "known good default" exists.
1592 The problem here is that by comparing the currently-assigned filename
1593 with the new filename determined by the URL, we cannot really determine
1594 which one is the "better" URL (e.g. "n5rMSpXrqmR9.128.mp3" for Soundcloud).
1597 # Soundcloud-hosted media downloads (we take the track name as filename)
1598 if url.startswith('http://ak-media.soundcloud.com/'):
1599 return True
1601 return False
1604 def atomic_rename(old_name, new_name):
1605 """Atomically rename/move a (temporary) file
1607 This is usually used when updating a file safely by writing
1608 the new contents into a temporary file and then moving the
1609 temporary file over the original file to replace it.
1611 if gpodder.ui.win32:
1612 # Win32 does not support atomic rename with os.rename
1613 shutil.move(old_name, new_name)
1614 else:
1615 os.rename(old_name, new_name)
1618 def check_command(self, cmd):
1619 """Check if a command line command/program exists"""
1620 # Prior to Python 2.7.3, this module (shlex) did not support Unicode input.
1621 cmd = sanitize_encoding(cmd)
1622 program = shlex.split(cmd)[0]
1623 return (find_command(program) is not None)
1626 def rename_episode_file(episode, filename):
1627 """Helper method to update a PodcastEpisode object
1629 Useful after renaming/converting its download file.
1631 if not os.path.exists(filename):
1632 raise ValueError('Target filename does not exist.')
1634 basename, extension = os.path.splitext(filename)
1636 episode.download_filename = os.path.basename(filename)
1637 episode.file_size = os.path.getsize(filename)
1638 episode.mime_type = mimetype_from_extension(extension)
1639 episode.save()
1640 episode.db.commit()
1643 def get_update_info(url='http://gpodder.org/downloads'):
1645 Get up to date release information from gpodder.org.
1647 Returns a tuple: (up_to_date, latest_version, release_date, days_since)
1649 Example result (up to date version, 20 days after release):
1650 (True, '3.0.4', '2012-01-24', 20)
1652 Example result (outdated version, 10 days after release):
1653 (False, '3.0.5', '2012-02-29', 10)
1655 data = urlopen(url).read()
1656 id_field_re = re.compile(r'<([a-z]*)[^>]*id="([^"]*)"[^>]*>([^<]*)</\1>')
1657 info = dict((m.group(2), m.group(3)) for m in id_field_re.finditer(data))
1659 latest_version = info['latest-version']
1660 release_date = info['release-date']
1662 release_parsed = datetime.datetime.strptime(release_date, '%Y-%m-%d')
1663 days_since_release = (datetime.datetime.today() - release_parsed).days
1665 convert = lambda s: tuple(int(x) for x in s.split('.'))
1666 up_to_date = (convert(gpodder.__version__) >= convert(latest_version))
1668 return up_to_date, latest_version, release_date, days_since_release
1671 def run_in_background(function, daemon=False):
1672 logger.debug('run_in_background: %s (%s)', function, str(daemon))
1673 thread = threading.Thread(target=function)
1674 thread.setDaemon(daemon)
1675 thread.start()
1676 return thread
1679 def linux_get_active_interfaces():
1680 """Get active network interfaces using 'ip link'
1682 Returns a list of active network interfaces or an
1683 empty list if the device is offline. The loopback
1684 interface is not included.
1686 process = subprocess.Popen(['ip', 'link'], stdout=subprocess.PIPE)
1687 data, _ = process.communicate()
1688 for interface, _ in re.findall(r'\d+: ([^:]+):.*state (UP|UNKNOWN)', data):
1689 if interface != 'lo':
1690 yield interface
1693 def osx_get_active_interfaces():
1694 """Get active network interfaces using 'ifconfig'
1696 Returns a list of active network interfaces or an
1697 empty list if the device is offline. The loopback
1698 interface is not included.
1700 process = subprocess.Popen(['ifconfig'], stdout=subprocess.PIPE)
1701 stdout, _ = process.communicate()
1702 for i in re.split('\n(?!\t)', stdout, re.MULTILINE):
1703 b = re.match('(\\w+):.*status: (active|associated)$', i, re.MULTILINE | re.DOTALL)
1704 if b:
1705 yield b.group(1)
1707 def unix_get_active_interfaces():
1708 """Get active network interfaces using 'ifconfig'
1710 Returns a list of active network interfaces or an
1711 empty list if the device is offline. The loopback
1712 interface is not included.
1714 process = subprocess.Popen(['ifconfig'], stdout=subprocess.PIPE)
1715 stdout, _ = process.communicate()
1716 for i in re.split('\n(?!\t)', stdout, re.MULTILINE):
1717 b = re.match('(\\w+):.*status: active$', i, re.MULTILINE | re.DOTALL)
1718 if b:
1719 yield b.group(1)
1722 def connection_available():
1723 """Check if an Internet connection is available
1725 Returns True if a connection is available (or if there
1726 is no way to determine the connection). Returns False
1727 if no network interfaces are up (i.e. no connectivity).
1729 try:
1730 if gpodder.ui.win32:
1731 # FIXME: Implement for Windows
1732 return True
1733 elif gpodder.ui.osx:
1734 return len(list(osx_get_active_interfaces())) > 0
1735 else:
1736 # By default, we assume we're not offline (bug 1730)
1737 offline = False
1739 if find_command('ifconfig') is not None:
1740 # If ifconfig is available, and it says we don't have
1741 # any active interfaces, assume we're offline
1742 if len(list(unix_get_active_interfaces())) == 0:
1743 offline = True
1745 # If we assume we're offline, try the "ip" command as fallback
1746 if offline and find_command('ip') is not None:
1747 if len(list(linux_get_active_interfaces())) == 0:
1748 offline = True
1749 else:
1750 offline = False
1752 return not offline
1754 return False
1755 except Exception, e:
1756 logger.warn('Cannot get connection status: %s', e, exc_info=True)
1757 # When we can't determine the connection status, act as if we're online (bug 1730)
1758 return True
1761 def website_reachable(url):
1763 Check if a specific website is available.
1765 if not connection_available():
1766 # No network interfaces up - assume website not reachable
1767 return (False, None)
1769 try:
1770 response = urllib2.urlopen(url, timeout=1)
1771 return (True, response)
1772 except urllib2.URLError as err:
1773 pass
1775 return (False, None)
1777 def delete_empty_folders(top):
1778 for root, dirs, files in os.walk(top, topdown=False):
1779 for name in dirs:
1780 dirname = os.path.join(root, name)
1781 if not os.listdir(dirname):
1782 os.rmdir(dirname)