return values instead of raising them
[mygpo.git] / mygpo / utils.py
blob6ccd37fb61f172fff87058bde46328640dcc1a64
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 import subprocess
19 import os
20 import operator
21 import sys
22 import re
23 import collections
24 from datetime import datetime, timedelta, date
25 import time
26 import hashlib
27 import urlparse
28 import urllib
29 import urllib2
31 from django.conf import settings
34 def daterange(from_date, to_date=None, leap=timedelta(days=1)):
35 """
36 >>> from_d = datetime(2010, 01, 01)
37 >>> to_d = datetime(2010, 01, 05)
38 >>> list(daterange(from_d, to_d))
39 [datetime.datetime(2010, 1, 1, 0, 0), datetime.datetime(2010, 1, 2, 0, 0), datetime.datetime(2010, 1, 3, 0, 0), datetime.datetime(2010, 1, 4, 0, 0), datetime.datetime(2010, 1, 5, 0, 0)]
40 """
42 if to_date is None:
43 if isinstance(from_date, datetime):
44 to_date = datetime.now()
45 else:
46 to_date = date.today()
48 while from_date <= to_date:
49 yield from_date
50 from_date = from_date + leap
51 return
53 def format_time(value):
54 """Format an offset (in seconds) to a string
56 The offset should be an integer or float value.
58 >>> format_time(0)
59 '00:00'
60 >>> format_time(20)
61 '00:20'
62 >>> format_time(3600)
63 '01:00:00'
64 >>> format_time(10921)
65 '03:02:01'
66 """
67 try:
68 dt = datetime.utcfromtimestamp(value)
69 except ValueError:
70 return ''
72 if dt.hour == 0:
73 return dt.strftime('%M:%S')
74 else:
75 return dt.strftime('%H:%M:%S')
77 def parse_time(value):
78 """
79 >>> parse_time(10)
82 >>> parse_time('05:10') #5*60+10
83 310
85 >>> parse_time('1:05:10') #60*60+5*60+10
86 3910
87 """
88 if value is None:
89 raise ValueError('None value in parse_time')
91 if isinstance(value, int):
92 # Don't need to parse already-converted time value
93 return value
95 if value == '':
96 raise ValueError('Empty valueing in parse_time')
98 for format in ('%H:%M:%S', '%M:%S'):
99 try:
100 t = time.strptime(value, format)
101 return t.tm_hour * 60*60 + t.tm_min * 60 + t.tm_sec
102 except ValueError, e:
103 continue
105 return int(value)
108 def parse_bool(val):
110 >>> parse_bool('True')
111 True
113 >>> parse_bool('true')
114 True
116 >>> parse_bool('')
117 False
119 if isinstance(val, bool):
120 return val
121 if val.lower() == 'true':
122 return True
123 return False
126 def iterate_together(lists, key=lambda x: x, reverse=False):
128 takes ordered, possibly sparse, lists with similar items
129 (some items have a corresponding item in the other lists, some don't).
131 It then yield tuples of corresponding items, where one element is None is
132 there is no corresponding entry in one of the lists.
134 Tuples where both elements are None are skipped.
136 The results of the key method are used for the comparisons.
138 If reverse is True, the lists are expected to be sorted in reverse order
139 and the results will also be sorted reverse
141 >>> list(iterate_together([range(1, 3), range(1, 4, 2)]))
142 [(1, 1), (2, None), (None, 3)]
144 >>> list(iterate_together([[], []]))
147 >>> list(iterate_together([range(1, 3), range(3, 5)]))
148 [(1, None), (2, None), (None, 3), (None, 4)]
150 >>> list(iterate_together([range(1, 3), []]))
151 [(1, None), (2, None)]
153 >>> list(iterate_together([[1, None, 3], [None, None, 3]]))
154 [(1, None), (3, 3)]
157 Next = collections.namedtuple('Next', 'item more')
158 min_ = min if not reverse else max
159 lt_ = operator.lt if not reverse else operator.gt
161 lists = [iter(l) for l in lists]
163 def _take(it):
164 try:
165 i = it.next()
166 while i is None:
167 i = it.next()
168 return Next(i, True)
169 except StopIteration:
170 return Next(None, False)
172 def new_res():
173 return [None]*len(lists)
175 # take first bunch of items
176 items = [_take(l) for l in lists]
178 while any(i.item is not None or i.more for i in items):
180 res = new_res()
182 for n, item in enumerate(items):
184 if item.item is None:
185 continue
187 if all(x is None for x in res):
188 res[n] = item.item
189 continue
191 min_v = min_(filter(lambda x: x is not None, res), key=key)
193 if key(item.item) == key(min_v):
194 res[n] = item.item
196 elif lt_(key(item.item), key(min_v)):
197 res = new_res()
198 res[n] = item.item
200 for n, x in enumerate(res):
201 if x is not None:
202 items[n] = _take(lists[n])
204 yield tuple(res)
207 def progress(val, max_val, status_str='', max_width=50, stream=sys.stdout):
209 # progress as percentage
210 percentage_str = '{val:.2%}'.format(val=float(val)/max_val)
212 # progress bar filled with #s
213 factor = min(int(float(val)/max_val*max_width), max_width)
214 progress_str = '#' * factor + ' ' * (max_width-factor)
216 #insert percentage into bar
217 percentage_start = int((max_width-len(percentage_str))/2)
218 progress_str = progress_str[:percentage_start] + \
219 percentage_str + \
220 progress_str[percentage_start+len(percentage_str):]
222 print >> stream, '\r',
223 print >> stream, '[ %s ] %s / %s | %s' % (
224 progress_str,
225 val,
226 max_val,
227 status_str),
228 stream.flush()
231 def set_cmp(list, simplify):
233 Builds a set out of a list but uses the results of simplify to determine equality between items
235 simpl = lambda x: (simplify(x), x)
236 lst = dict(map(simpl, list))
237 return lst.values()
240 def first(it):
242 returns the first not-None object or None if the iterator is exhausted
244 for x in it:
245 if x != None:
246 return x
247 return None
250 def intersect(a, b):
251 return list(set(a) & set(b))
255 def remove_control_chars(s):
256 import unicodedata, re
258 all_chars = (unichr(i) for i in xrange(0x110000))
259 control_chars = ''.join(map(unichr, range(0,32) + range(127,160)))
260 control_char_re = re.compile('[%s]' % re.escape(control_chars))
262 return control_char_re.sub('', s)
265 def unzip(a):
266 return tuple(map(list,zip(*a)))
269 def parse_range(s, min, max, default=None):
271 Parses the string and returns its value. If the value is outside the given
272 range, its closest number within the range is returned
274 >>> parse_range('5', 0, 10)
277 >>> parse_range('0', 5, 10)
280 >>> parse_range('15',0, 10)
283 >>> parse_range('x', 0, 20)
286 >>> parse_range('x', 0, 20, 20)
289 try:
290 val = int(s)
291 if val < min:
292 return min
293 if val > max:
294 return max
295 return val
297 except (ValueError, TypeError):
298 return default if default is not None else (max-min)/2
302 def flatten(l):
303 return [item for sublist in l for item in sublist]
306 def linearize(key, iterators, reverse=False):
308 Linearizes a number of iterators, sorted by some comparison function
311 iters = [iter(i) for i in iterators]
312 vals = []
313 for i in iters:
314 try:
315 v = i.next()
316 vals. append( (v, i) )
317 except StopIteration:
318 continue
320 while vals:
321 vals = sorted(vals, key=lambda x: key(x[0]), reverse=reverse)
322 val, it = vals.pop(0)
323 yield val
324 try:
325 next_val = it.next()
326 vals.append( (next_val, it) )
327 except StopIteration:
328 pass
331 def skip_pairs(iterator, cmp=cmp):
332 """ Skips pairs of equal items
334 >>> list(skip_pairs([]))
337 >>> list(skip_pairs([1]))
340 >>> list(skip_pairs([1, 2, 3]))
341 [1, 2, 3]
343 >>> list(skip_pairs([1, 1]))
346 >>> list(skip_pairs([1, 2, 2]))
349 >>> list(skip_pairs([1, 2, 2, 3]))
350 [1, 3]
352 >>> list(skip_pairs([1, 2, 2, 2]))
353 [1, 2]
355 >>> list(skip_pairs([1, 2, 2, 2, 2, 3]))
356 [1, 3]
359 iterator = iter(iterator)
360 next = iterator.next()
362 while True:
363 item = next
364 try:
365 next = iterator.next()
366 except StopIteration as e:
367 yield item
368 raise e
370 if cmp(item, next) == 0:
371 next = iterator.next()
372 else:
373 yield item
376 def get_timestamp(datetime_obj):
377 """ Returns the timestamp as an int for the given datetime object
379 >>> get_timestamp(datetime(2011, 4, 7, 9, 30, 6))
380 1302168606
382 >>> get_timestamp(datetime(1970, 1, 1, 0, 0, 0))
385 return int(time.mktime(datetime_obj.timetuple()))
389 re_url = re.compile('^https?://')
391 def is_url(string):
392 """ Returns true if a string looks like an URL
394 >>> is_url('http://example.com/some-path/file.xml')
395 True
397 >>> is_url('something else')
398 False
401 return bool(re_url.match(string))
405 # from http://stackoverflow.com/questions/2892931/longest-common-substring-from-more-than-two-strings-python
406 # this does not increase asymptotical complexity
407 # but can still waste more time than it saves.
408 def shortest_of(strings):
409 return min(strings, key=len)
411 def longest_substr(strings):
413 Returns the longest common substring of the given strings
416 substr = ""
417 if not strings:
418 return substr
419 reference = shortest_of(strings) #strings[0]
420 length = len(reference)
421 #find a suitable slice i:j
422 for i in xrange(length):
423 #only consider strings long at least len(substr) + 1
424 for j in xrange(i + len(substr) + 1, length):
425 candidate = reference[i:j]
426 if all(candidate in text for text in strings):
427 substr = candidate
428 return substr
432 def additional_value(it, gen_val, val_changed=lambda _: True):
433 """ Provides an additional value to the elements, calculated when needed
435 For the elements from the iterator, some additional value can be computed
436 by gen_val (which might be an expensive computation).
438 If the elements in the iterator are ordered so that some subsequent
439 elements would generate the same additional value, val_changed can be
440 provided, which receives the next element from the iterator and the
441 previous additional value. If the element would generate the same
442 additional value (val_changed returns False), its computation is skipped.
444 >>> # get the next full hundred higher than x
445 >>> # this will probably be an expensive calculation
446 >>> next_hundred = lambda x: x + 100-(x % 100)
448 >>> # returns True if h is not the value that next_hundred(x) would provide
449 >>> # this should be a relatively cheap calculation, compared to the above
450 >>> diff_hundred = lambda x, h: (h-x) < 0 or (h - x) > 100
452 >>> xs = [0, 50, 100, 101, 199, 200, 201]
453 >>> list(additional_value(xs, next_hundred, diff_hundred))
454 [(0, 100), (50, 100), (100, 100), (101, 200), (199, 200), (200, 200), (201, 300)]
457 _none = object()
458 current = _none
460 for x in it:
461 if current is _none or val_changed(x, current):
462 current = gen_val(x)
464 yield (x, current)
467 def file_hash(f, h=hashlib.md5, block_size=2**20):
468 """ returns the hash of the contents of a file """
469 f_hash = h()
470 for chunk in iter(lambda: f.read(block_size), ''):
471 f_hash.update(chunk)
472 return f_hash
476 def split_list(l, prop):
477 """ split elements that satisfy a property, and those that don't """
478 match = filter(prop, l)
479 nomatch = [x for x in l if x not in match]
480 return match, nomatch
483 def sorted_chain(links, key, reverse=False):
484 """ Takes a list of iters can iterates over sorted elements
486 Each elment of links should be a tuple of (sort_key, iterator). The
487 elements of each iterator should be sorted already. sort_key should
488 indicate the key of the first element and needs to be comparable to the
489 result of key(elem).
491 The function returns an iterator over the globally sorted element that
492 ensures that as little iterators as possible are evaluated. When
493 evaluating """
495 # mixed_list initially contains all placeholders; later evaluated
496 # elements (from the iterators) are mixed in
497 mixed_list = [(k, link, True) for k, link in links]
499 while mixed_list:
500 _, item, expand = mixed_list.pop(0)
502 # found an element (from an earlier expansion), yield it
503 if not expand:
504 yield item
505 continue
507 # found an iter that needs to be expanded.
508 # The iterator is fully consumed
509 new_items = [(key(i), i, False) for i in item]
511 # sort links (placeholders) and elements together
512 mixed_list = sorted(mixed_list + new_items, key=lambda (k, _v, _e): k,
513 reverse=reverse)
516 def url_add_authentication(url, username, password):
518 Adds authentication data (username, password) to a given
519 URL in order to construct an authenticated URL.
521 >>> url_add_authentication('https://host.com/', '', None)
522 'https://host.com/'
523 >>> url_add_authentication('http://example.org/', None, None)
524 'http://example.org/'
525 >>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
526 'telnet://foo:bar@host.com/'
527 >>> url_add_authentication('ftp://example.org', 'billy', None)
528 'ftp://billy@example.org'
529 >>> url_add_authentication('ftp://example.org', 'billy', '')
530 'ftp://billy:@example.org'
531 >>> url_add_authentication('http://localhost/x', 'aa', 'bc')
532 'http://aa:bc@localhost/x'
533 >>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
534 'http://i%2Fo:P@ss:@blubb.lan/u.html'
535 >>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
536 'http://c:d@x.org/'
537 >>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@x', 'i/')
538 'http://P@x:i%2F@cx.lan'
539 >>> url_add_authentication('http://x.org/', 'a b', 'c d')
540 'http://a%20b:c%20d@x.org/'
542 if username is None or username == '':
543 return url
545 # Relaxations of the strict quoting rules (bug 1521):
546 # 1. Accept '@' in username and password
547 # 2. Acecpt ':' in password only
548 username = urllib.quote(username, safe='@')
550 if password is not None:
551 password = urllib.quote(password, safe='@:')
552 auth_string = ':'.join((username, password))
553 else:
554 auth_string = username
556 url = url_strip_authentication(url)
558 url_parts = list(urlparse.urlsplit(url))
559 # url_parts[1] is the HOST part of the URL
560 url_parts[1] = '@'.join((auth_string, url_parts[1]))
562 return urlparse.urlunsplit(url_parts)
565 def urlopen(url, headers=None, data=None):
567 An URL opener with the User-agent set to gPodder (with version)
569 username, password = username_password_from_url(url)
570 if username is not None or password is not None:
571 url = url_strip_authentication(url)
572 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
573 password_mgr.add_password(None, url, username, password)
574 handler = urllib2.HTTPBasicAuthHandler(password_mgr)
575 opener = urllib2.build_opener(handler)
576 else:
577 opener = urllib2.build_opener()
579 if headers is None:
580 headers = {}
581 else:
582 headers = dict(headers)
584 headers.update({'User-agent': settings.USER_AGENT})
585 request = urllib2.Request(url, data=data, headers=headers)
586 return opener.open(request)
590 def username_password_from_url(url):
591 r"""
592 Returns a tuple (username,password) containing authentication
593 data from the specified URL or (None,None) if no authentication
594 data can be found in the URL.
596 See Section 3.1 of RFC 1738 (http://www.ietf.org/rfc/rfc1738.txt)
598 >>> username_password_from_url('https://@host.com/')
599 ('', None)
600 >>> username_password_from_url('telnet://host.com/')
601 (None, None)
602 >>> username_password_from_url('ftp://foo:@host.com/')
603 ('foo', '')
604 >>> username_password_from_url('http://a:b@host.com/')
605 ('a', 'b')
606 >>> username_password_from_url(1)
607 Traceback (most recent call last):
609 ValueError: URL has to be a string or unicode object.
610 >>> username_password_from_url(None)
611 Traceback (most recent call last):
613 ValueError: URL has to be a string or unicode object.
614 >>> username_password_from_url('http://a@b:c@host.com/')
615 ('a@b', 'c')
616 >>> username_password_from_url('ftp://a:b:c@host.com/')
617 ('a', 'b:c')
618 >>> username_password_from_url('http://i%2Fo:P%40ss%3A@host.com/')
619 ('i/o', 'P@ss:')
620 >>> username_password_from_url('ftp://%C3%B6sterreich@host.com/')
621 ('\xc3\xb6sterreich', None)
622 >>> username_password_from_url('http://w%20x:y%20z@example.org/')
623 ('w x', 'y z')
624 >>> username_password_from_url('http://example.com/x@y:z@test.com/')
625 (None, None)
627 if type(url) not in (str, unicode):
628 raise ValueError('URL has to be a string or unicode object.')
630 (username, password) = (None, None)
632 (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
634 if '@' in netloc:
635 (authentication, netloc) = netloc.rsplit('@', 1)
636 if ':' in authentication:
637 (username, password) = authentication.split(':', 1)
639 # RFC1738 dictates that we should not allow ['/', '@', ':']
640 # characters in the username and password field (Section 3.1):
642 # 1. The "/" can't be in there at this point because of the way
643 # urlparse (which we use above) works.
644 # 2. Due to gPodder bug 1521, we allow "@" in the username and
645 # password field. We use netloc.rsplit('@', 1), which will
646 # make sure that we split it at the last '@' in netloc.
647 # 3. The colon must be excluded (RFC2617, Section 2) in the
648 # username, but is apparently allowed in the password. This
649 # is handled by the authentication.split(':', 1) above, and
650 # will cause any extraneous ':'s to be part of the password.
652 username = urllib.unquote(username)
653 password = urllib.unquote(password)
654 else:
655 username = urllib.unquote(authentication)
657 return (username, password)
660 def url_strip_authentication(url):
662 Strips authentication data from an URL. Returns the URL with
663 the authentication data removed from it.
665 >>> url_strip_authentication('https://host.com/')
666 'https://host.com/'
667 >>> url_strip_authentication('telnet://foo:bar@host.com/')
668 'telnet://host.com/'
669 >>> url_strip_authentication('ftp://billy@example.org')
670 'ftp://example.org'
671 >>> url_strip_authentication('ftp://billy:@example.org')
672 'ftp://example.org'
673 >>> url_strip_authentication('http://aa:bc@localhost/x')
674 'http://localhost/x'
675 >>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
676 'http://blubb.lan/u.html'
677 >>> url_strip_authentication('http://c:d@x.org/')
678 'http://x.org/'
679 >>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
680 'http://cx.lan'
681 >>> url_strip_authentication('http://x@x.com:s3cret@example.com/')
682 'http://example.com/'
684 url_parts = list(urlparse.urlsplit(url))
685 # url_parts[1] is the HOST part of the URL
687 # Remove existing authentication data
688 if '@' in url_parts[1]:
689 url_parts[1] = url_parts[1].rsplit('@', 1)[1]
691 return urlparse.urlunsplit(url_parts)
694 # Native filesystem encoding detection
695 encoding = sys.getfilesystemencoding()
697 def sanitize_encoding(filename):
698 r"""
699 Generate a sanitized version of a string (i.e.
700 remove invalid characters and encode in the
701 detected native language encoding).
703 >>> sanitize_encoding('\x80')
705 >>> sanitize_encoding(u'unicode')
706 'unicode'
708 # The encoding problem goes away in Python 3.. hopefully!
709 if sys.version_info >= (3, 0):
710 return filename
712 global encoding
713 if not isinstance(filename, unicode):
714 filename = filename.decode(encoding, 'ignore')
715 return filename.encode(encoding, 'ignore')
718 def get_git_head():
719 """ returns the commit and message of the current git HEAD """
721 try:
722 pr = subprocess.Popen('/usr/bin/git log -n 1 --oneline'.split(),
723 cwd = settings.BASE_DIR,
724 stdout = subprocess.PIPE,
725 stderr = subprocess.PIPE,
728 except OSError:
729 return None, None
731 (out, err) = pr.communicate()
732 if err:
733 return None, None
735 outs = out.split()
736 commit = outs[0]
737 msg = ' ' .join(outs[1:])
738 return commit, msg