fix progress bar for values above upper bound
[mygpo.git] / mygpo / utils.py
blob0df78cdcd12904074e0d779e19e55b3abeca1802
2 # This file is part of my.gpodder.org.
4 # my.gpodder.org is free software: you can redistribute it and/or modify it
5 # under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or (at your
7 # option) any later version.
9 # my.gpodder.org is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
12 # License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
18 import operator
19 import sys
20 import re
21 import collections
22 from datetime import datetime, timedelta, date
23 import time
24 import hashlib
25 import urlparse
26 import urllib
27 import urllib2
29 from django.conf import settings
32 def daterange(from_date, to_date=None, leap=timedelta(days=1)):
33 """
34 >>> from_d = datetime(2010, 01, 01)
35 >>> to_d = datetime(2010, 01, 05)
36 >>> list(daterange(from_d, to_d))
37 [datetime.datetime(2010, 1, 1, 0, 0), datetime.datetime(2010, 1, 2, 0, 0), datetime.datetime(2010, 1, 3, 0, 0), datetime.datetime(2010, 1, 4, 0, 0), datetime.datetime(2010, 1, 5, 0, 0)]
38 """
40 if to_date is None:
41 if isinstance(from_date, datetime):
42 to_date = datetime.now()
43 else:
44 to_date = date.today()
46 while from_date <= to_date:
47 yield from_date
48 from_date = from_date + leap
49 return
51 def format_time(value):
52 """Format an offset (in seconds) to a string
54 The offset should be an integer or float value.
56 >>> format_time(0)
57 '00:00'
58 >>> format_time(20)
59 '00:20'
60 >>> format_time(3600)
61 '01:00:00'
62 >>> format_time(10921)
63 '03:02:01'
64 """
65 try:
66 dt = datetime.utcfromtimestamp(value)
67 except ValueError:
68 return ''
70 if dt.hour == 0:
71 return dt.strftime('%M:%S')
72 else:
73 return dt.strftime('%H:%M:%S')
75 def parse_time(value):
76 """
77 >>> parse_time(10)
80 >>> parse_time('05:10') #5*60+10
81 310
83 >>> parse_time('1:05:10') #60*60+5*60+10
84 3910
85 """
86 if value is None:
87 raise ValueError('None value in parse_time')
89 if isinstance(value, int):
90 # Don't need to parse already-converted time value
91 return value
93 if value == '':
94 raise ValueError('Empty valueing in parse_time')
96 for format in ('%H:%M:%S', '%M:%S'):
97 try:
98 t = time.strptime(value, format)
99 return t.tm_hour * 60*60 + t.tm_min * 60 + t.tm_sec
100 except ValueError, e:
101 continue
103 return int(value)
106 def parse_bool(val):
108 >>> parse_bool('True')
109 True
111 >>> parse_bool('true')
112 True
114 >>> parse_bool('')
115 False
117 if isinstance(val, bool):
118 return val
119 if val.lower() == 'true':
120 return True
121 return False
124 def iterate_together(lists, key=lambda x: x, reverse=False):
126 takes ordered, possibly sparse, lists with similar items
127 (some items have a corresponding item in the other lists, some don't).
129 It then yield tuples of corresponding items, where one element is None is
130 there is no corresponding entry in one of the lists.
132 Tuples where both elements are None are skipped.
134 The results of the key method are used for the comparisons.
136 If reverse is True, the lists are expected to be sorted in reverse order
137 and the results will also be sorted reverse
139 >>> list(iterate_together([range(1, 3), range(1, 4, 2)]))
140 [(1, 1), (2, None), (None, 3)]
142 >>> list(iterate_together([[], []]))
145 >>> list(iterate_together([range(1, 3), range(3, 5)]))
146 [(1, None), (2, None), (None, 3), (None, 4)]
148 >>> list(iterate_together([range(1, 3), []]))
149 [(1, None), (2, None)]
151 >>> list(iterate_together([[1, None, 3], [None, None, 3]]))
152 [(1, None), (3, 3)]
155 Next = collections.namedtuple('Next', 'item more')
156 min_ = min if not reverse else max
157 lt_ = operator.lt if not reverse else operator.gt
159 lists = [iter(l) for l in lists]
161 def _take(it):
162 try:
163 i = it.next()
164 while i is None:
165 i = it.next()
166 return Next(i, True)
167 except StopIteration:
168 return Next(None, False)
170 def new_res():
171 return [None]*len(lists)
173 # take first bunch of items
174 items = [_take(l) for l in lists]
176 while any(i.item is not None or i.more for i in items):
178 res = new_res()
180 for n, item in enumerate(items):
182 if item.item is None:
183 continue
185 if all(x is None for x in res):
186 res[n] = item.item
187 continue
189 min_v = min_(filter(lambda x: x is not None, res), key=key)
191 if key(item.item) == key(min_v):
192 res[n] = item.item
194 elif lt_(key(item.item), key(min_v)):
195 res = new_res()
196 res[n] = item.item
198 for n, x in enumerate(res):
199 if x is not None:
200 items[n] = _take(lists[n])
202 yield tuple(res)
205 def progress(val, max_val, status_str='', max_width=50, stream=sys.stdout):
207 # progress as percentage
208 percentage_str = '{val:.2%}'.format(val=float(val)/max_val)
210 # progress bar filled with #s
211 factor = min(int(float(val)/max_val*max_width), max_width)
212 progress_str = '#' * factor + ' ' * (max_width-factor)
214 #insert percentage into bar
215 percentage_start = int((max_width-len(percentage_str))/2)
216 progress_str = progress_str[:percentage_start] + \
217 percentage_str + \
218 progress_str[percentage_start+len(percentage_str):]
220 print >> stream, '\r',
221 print >> stream, '[ %s ] %s / %s | %s' % (
222 progress_str,
223 val,
224 max_val,
225 status_str),
226 stream.flush()
229 def set_cmp(list, simplify):
231 Builds a set out of a list but uses the results of simplify to determine equality between items
233 simpl = lambda x: (simplify(x), x)
234 lst = dict(map(simpl, list))
235 return lst.values()
238 def first(it):
240 returns the first not-None object or None if the iterator is exhausted
242 for x in it:
243 if x != None:
244 return x
245 return None
248 def intersect(a, b):
249 return list(set(a) & set(b))
253 def remove_control_chars(s):
254 import unicodedata, re
256 all_chars = (unichr(i) for i in xrange(0x110000))
257 control_chars = ''.join(map(unichr, range(0,32) + range(127,160)))
258 control_char_re = re.compile('[%s]' % re.escape(control_chars))
260 return control_char_re.sub('', s)
263 def unzip(a):
264 return tuple(map(list,zip(*a)))
267 def parse_range(s, min, max, default=None):
269 Parses the string and returns its value. If the value is outside the given
270 range, its closest number within the range is returned
272 >>> parse_range('5', 0, 10)
275 >>> parse_range('0', 5, 10)
278 >>> parse_range('15',0, 10)
281 >>> parse_range('x', 0, 20)
284 >>> parse_range('x', 0, 20, 20)
287 try:
288 val = int(s)
289 if val < min:
290 return min
291 if val > max:
292 return max
293 return val
295 except (ValueError, TypeError):
296 return default if default is not None else (max-min)/2
300 def flatten(l):
301 return [item for sublist in l for item in sublist]
304 def linearize(key, iterators, reverse=False):
306 Linearizes a number of iterators, sorted by some comparison function
309 iters = [iter(i) for i in iterators]
310 vals = []
311 for i in iters:
312 try:
313 v = i.next()
314 vals. append( (v, i) )
315 except StopIteration:
316 continue
318 while vals:
319 vals = sorted(vals, key=lambda x: key(x[0]), reverse=reverse)
320 val, it = vals.pop(0)
321 yield val
322 try:
323 next_val = it.next()
324 vals.append( (next_val, it) )
325 except StopIteration:
326 pass
329 def skip_pairs(iterator, cmp=cmp):
330 """ Skips pairs of equal items
332 >>> list(skip_pairs([]))
335 >>> list(skip_pairs([1]))
338 >>> list(skip_pairs([1, 2, 3]))
339 [1, 2, 3]
341 >>> list(skip_pairs([1, 1]))
344 >>> list(skip_pairs([1, 2, 2]))
347 >>> list(skip_pairs([1, 2, 2, 3]))
348 [1, 3]
350 >>> list(skip_pairs([1, 2, 2, 2]))
351 [1, 2]
353 >>> list(skip_pairs([1, 2, 2, 2, 2, 3]))
354 [1, 3]
357 iterator = iter(iterator)
358 next = iterator.next()
360 while True:
361 item = next
362 try:
363 next = iterator.next()
364 except StopIteration as e:
365 yield item
366 raise e
368 if cmp(item, next) == 0:
369 next = iterator.next()
370 else:
371 yield item
374 def get_timestamp(datetime_obj):
375 """ Returns the timestamp as an int for the given datetime object
377 >>> get_timestamp(datetime(2011, 4, 7, 9, 30, 6))
378 1302168606
380 >>> get_timestamp(datetime(1970, 1, 1, 0, 0, 0))
383 return int(time.mktime(datetime_obj.timetuple()))
387 re_url = re.compile('^https?://')
389 def is_url(string):
390 """ Returns true if a string looks like an URL
392 >>> is_url('http://example.com/some-path/file.xml')
393 True
395 >>> is_url('something else')
396 False
399 return bool(re_url.match(string))
403 # from http://stackoverflow.com/questions/2892931/longest-common-substring-from-more-than-two-strings-python
404 # this does not increase asymptotical complexity
405 # but can still waste more time than it saves.
406 def shortest_of(strings):
407 return min(strings, key=len)
409 def longest_substr(strings):
411 Returns the longest common substring of the given strings
414 substr = ""
415 if not strings:
416 return substr
417 reference = shortest_of(strings) #strings[0]
418 length = len(reference)
419 #find a suitable slice i:j
420 for i in xrange(length):
421 #only consider strings long at least len(substr) + 1
422 for j in xrange(i + len(substr) + 1, length):
423 candidate = reference[i:j]
424 if all(candidate in text for text in strings):
425 substr = candidate
426 return substr
430 def additional_value(it, gen_val, val_changed=lambda _: True):
431 """ Provides an additional value to the elements, calculated when needed
433 For the elements from the iterator, some additional value can be computed
434 by gen_val (which might be an expensive computation).
436 If the elements in the iterator are ordered so that some subsequent
437 elements would generate the same additional value, val_changed can be
438 provided, which receives the next element from the iterator and the
439 previous additional value. If the element would generate the same
440 additional value (val_changed returns False), its computation is skipped.
442 >>> # get the next full hundred higher than x
443 >>> # this will probably be an expensive calculation
444 >>> next_hundred = lambda x: x + 100-(x % 100)
446 >>> # returns True if h is not the value that next_hundred(x) would provide
447 >>> # this should be a relatively cheap calculation, compared to the above
448 >>> diff_hundred = lambda x, h: (h-x) < 0 or (h - x) > 100
450 >>> xs = [0, 50, 100, 101, 199, 200, 201]
451 >>> list(additional_value(xs, next_hundred, diff_hundred))
452 [(0, 100), (50, 100), (100, 100), (101, 200), (199, 200), (200, 200), (201, 300)]
455 _none = object()
456 current = _none
458 for x in it:
459 if current is _none or val_changed(x, current):
460 current = gen_val(x)
462 yield (x, current)
465 def file_hash(f, h=hashlib.md5, block_size=2**20):
466 """ returns the hash of the contents of a file """
467 f_hash = h()
468 for chunk in iter(lambda: f.read(block_size), ''):
469 f_hash.update(chunk)
470 return f_hash
474 def split_list(l, prop):
475 """ split elements that satisfy a property, and those that don't """
476 match = filter(prop, l)
477 nomatch = [x for x in l if x not in match]
478 return match, nomatch
481 def sorted_chain(links, key, reverse=False):
482 """ Takes a list of iters can iterates over sorted elements
484 Each elment of links should be a tuple of (sort_key, iterator). The
485 elements of each iterator should be sorted already. sort_key should
486 indicate the key of the first element and needs to be comparable to the
487 result of key(elem).
489 The function returns an iterator over the globally sorted element that
490 ensures that as little iterators as possible are evaluated. When
491 evaluating """
493 # mixed_list initially contains all placeholders; later evaluated
494 # elements (from the iterators) are mixed in
495 mixed_list = [(k, link, True) for k, link in links]
497 while mixed_list:
498 _, item, expand = mixed_list.pop(0)
500 # found an element (from an earlier expansion), yield it
501 if not expand:
502 yield item
503 continue
505 # found an iter that needs to be expanded.
506 # The iterator is fully consumed
507 new_items = [(key(i), i, False) for i in item]
509 # sort links (placeholders) and elements together
510 mixed_list = sorted(mixed_list + new_items, key=lambda (k, _v, _e): k,
511 reverse=reverse)
514 def url_add_authentication(url, username, password):
516 Adds authentication data (username, password) to a given
517 URL in order to construct an authenticated URL.
519 >>> url_add_authentication('https://host.com/', '', None)
520 'https://host.com/'
521 >>> url_add_authentication('http://example.org/', None, None)
522 'http://example.org/'
523 >>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
524 'telnet://foo:bar@host.com/'
525 >>> url_add_authentication('ftp://example.org', 'billy', None)
526 'ftp://billy@example.org'
527 >>> url_add_authentication('ftp://example.org', 'billy', '')
528 'ftp://billy:@example.org'
529 >>> url_add_authentication('http://localhost/x', 'aa', 'bc')
530 'http://aa:bc@localhost/x'
531 >>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
532 'http://i%2Fo:P@ss:@blubb.lan/u.html'
533 >>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
534 'http://c:d@x.org/'
535 >>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@x', 'i/')
536 'http://P@x:i%2F@cx.lan'
537 >>> url_add_authentication('http://x.org/', 'a b', 'c d')
538 'http://a%20b:c%20d@x.org/'
540 if username is None or username == '':
541 return url
543 # Relaxations of the strict quoting rules (bug 1521):
544 # 1. Accept '@' in username and password
545 # 2. Acecpt ':' in password only
546 username = urllib.quote(username, safe='@')
548 if password is not None:
549 password = urllib.quote(password, safe='@:')
550 auth_string = ':'.join((username, password))
551 else:
552 auth_string = username
554 url = url_strip_authentication(url)
556 url_parts = list(urlparse.urlsplit(url))
557 # url_parts[1] is the HOST part of the URL
558 url_parts[1] = '@'.join((auth_string, url_parts[1]))
560 return urlparse.urlunsplit(url_parts)
563 def urlopen(url, headers=None, data=None):
565 An URL opener with the User-agent set to gPodder (with version)
567 username, password = username_password_from_url(url)
568 if username is not None or password is not None:
569 url = url_strip_authentication(url)
570 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
571 password_mgr.add_password(None, url, username, password)
572 handler = urllib2.HTTPBasicAuthHandler(password_mgr)
573 opener = urllib2.build_opener(handler)
574 else:
575 opener = urllib2.build_opener()
577 if headers is None:
578 headers = {}
579 else:
580 headers = dict(headers)
582 headers.update({'User-agent': settings.USER_AGENT})
583 request = urllib2.Request(url, data=data, headers=headers)
584 return opener.open(request)
588 def username_password_from_url(url):
589 r"""
590 Returns a tuple (username,password) containing authentication
591 data from the specified URL or (None,None) if no authentication
592 data can be found in the URL.
594 See Section 3.1 of RFC 1738 (http://www.ietf.org/rfc/rfc1738.txt)
596 >>> username_password_from_url('https://@host.com/')
597 ('', None)
598 >>> username_password_from_url('telnet://host.com/')
599 (None, None)
600 >>> username_password_from_url('ftp://foo:@host.com/')
601 ('foo', '')
602 >>> username_password_from_url('http://a:b@host.com/')
603 ('a', 'b')
604 >>> username_password_from_url(1)
605 Traceback (most recent call last):
607 ValueError: URL has to be a string or unicode object.
608 >>> username_password_from_url(None)
609 Traceback (most recent call last):
611 ValueError: URL has to be a string or unicode object.
612 >>> username_password_from_url('http://a@b:c@host.com/')
613 ('a@b', 'c')
614 >>> username_password_from_url('ftp://a:b:c@host.com/')
615 ('a', 'b:c')
616 >>> username_password_from_url('http://i%2Fo:P%40ss%3A@host.com/')
617 ('i/o', 'P@ss:')
618 >>> username_password_from_url('ftp://%C3%B6sterreich@host.com/')
619 ('\xc3\xb6sterreich', None)
620 >>> username_password_from_url('http://w%20x:y%20z@example.org/')
621 ('w x', 'y z')
622 >>> username_password_from_url('http://example.com/x@y:z@test.com/')
623 (None, None)
625 if type(url) not in (str, unicode):
626 raise ValueError('URL has to be a string or unicode object.')
628 (username, password) = (None, None)
630 (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
632 if '@' in netloc:
633 (authentication, netloc) = netloc.rsplit('@', 1)
634 if ':' in authentication:
635 (username, password) = authentication.split(':', 1)
637 # RFC1738 dictates that we should not allow ['/', '@', ':']
638 # characters in the username and password field (Section 3.1):
640 # 1. The "/" can't be in there at this point because of the way
641 # urlparse (which we use above) works.
642 # 2. Due to gPodder bug 1521, we allow "@" in the username and
643 # password field. We use netloc.rsplit('@', 1), which will
644 # make sure that we split it at the last '@' in netloc.
645 # 3. The colon must be excluded (RFC2617, Section 2) in the
646 # username, but is apparently allowed in the password. This
647 # is handled by the authentication.split(':', 1) above, and
648 # will cause any extraneous ':'s to be part of the password.
650 username = urllib.unquote(username)
651 password = urllib.unquote(password)
652 else:
653 username = urllib.unquote(authentication)
655 return (username, password)
658 def url_strip_authentication(url):
660 Strips authentication data from an URL. Returns the URL with
661 the authentication data removed from it.
663 >>> url_strip_authentication('https://host.com/')
664 'https://host.com/'
665 >>> url_strip_authentication('telnet://foo:bar@host.com/')
666 'telnet://host.com/'
667 >>> url_strip_authentication('ftp://billy@example.org')
668 'ftp://example.org'
669 >>> url_strip_authentication('ftp://billy:@example.org')
670 'ftp://example.org'
671 >>> url_strip_authentication('http://aa:bc@localhost/x')
672 'http://localhost/x'
673 >>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
674 'http://blubb.lan/u.html'
675 >>> url_strip_authentication('http://c:d@x.org/')
676 'http://x.org/'
677 >>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
678 'http://cx.lan'
679 >>> url_strip_authentication('http://x@x.com:s3cret@example.com/')
680 'http://example.com/'
682 url_parts = list(urlparse.urlsplit(url))
683 # url_parts[1] is the HOST part of the URL
685 # Remove existing authentication data
686 if '@' in url_parts[1]:
687 url_parts[1] = url_parts[1].rsplit('@', 1)[1]
689 return urlparse.urlunsplit(url_parts)
692 def sanitize_encoding(filename):
693 r"""
694 Generate a sanitized version of a string (i.e.
695 remove invalid characters and encode in the
696 detected native language encoding).
698 >>> sanitize_encoding('\x80')
700 >>> sanitize_encoding(u'unicode')
701 'unicode'
703 # The encoding problem goes away in Python 3.. hopefully!
704 if sys.version_info >= (3, 0):
705 return filename
707 global encoding
708 if not isinstance(filename, unicode):
709 filename = filename.decode(encoding, 'ignore')
710 return filename.encode(encoding, 'ignore')