[API] consistently use utcnow() everywhere
[mygpo.git] / mygpo / utils.py
blobcf051dda82d4644621576f566464231410b7badb
1 # -*- coding: utf-8 -*-
3 # This file is part of my.gpodder.org.
5 # my.gpodder.org is free software: you can redistribute it and/or modify it
6 # under the terms of the GNU Affero General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or (at your
8 # option) any later version.
10 # my.gpodder.org is distributed in the hope that it will be useful, but
11 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
13 # License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with my.gpodder.org. If not, see <http://www.gnu.org/licenses/>.
19 import functools
20 import types
21 import subprocess
22 import os
23 import operator
24 import sys
25 import re
26 import collections
27 import itertools
28 from datetime import datetime, timedelta, date
29 import time
30 import hashlib
31 import urlparse
32 import urllib
33 import urllib2
34 import zlib
35 import shlex
37 from django.conf import settings
38 from django.core.urlresolvers import reverse
40 from mygpo.core.json import json
42 import logging
43 logger = logging.getLogger(__name__)
46 def daterange(from_date, to_date=None, leap=timedelta(days=1)):
47 """
48 >>> from_d = datetime(2010, 01, 01)
49 >>> to_d = datetime(2010, 01, 05)
50 >>> list(daterange(from_d, to_d))
51 [datetime.datetime(2010, 1, 1, 0, 0), datetime.datetime(2010, 1, 2, 0, 0), datetime.datetime(2010, 1, 3, 0, 0), datetime.datetime(2010, 1, 4, 0, 0), datetime.datetime(2010, 1, 5, 0, 0)]
52 """
54 if to_date is None:
55 if isinstance(from_date, datetime):
56 to_date = datetime.utcnow()
57 else:
58 to_date = date.today()
60 while from_date <= to_date:
61 yield from_date
62 from_date = from_date + leap
63 return
65 def format_time(value):
66 """Format an offset (in seconds) to a string
68 The offset should be an integer or float value.
70 >>> format_time(0)
71 '00:00'
72 >>> format_time(20)
73 '00:20'
74 >>> format_time(3600)
75 '01:00:00'
76 >>> format_time(10921)
77 '03:02:01'
78 """
79 try:
80 dt = datetime.utcfromtimestamp(value)
81 except (ValueError, TypeError):
82 return ''
84 if dt.hour == 0:
85 return dt.strftime('%M:%S')
86 else:
87 return dt.strftime('%H:%M:%S')
89 def parse_time(value):
90 """
91 >>> parse_time(10)
94 >>> parse_time('05:10') #5*60+10
95 310
97 >>> parse_time('1:05:10') #60*60+5*60+10
98 3910
99 """
100 if value is None:
101 raise ValueError('None value in parse_time')
103 if isinstance(value, int):
104 # Don't need to parse already-converted time value
105 return value
107 if value == '':
108 raise ValueError('Empty valueing in parse_time')
110 for format in ('%H:%M:%S', '%M:%S'):
111 try:
112 t = time.strptime(value, format)
113 return t.tm_hour * 60*60 + t.tm_min * 60 + t.tm_sec
114 except ValueError, e:
115 continue
117 return int(value)
120 def parse_bool(val):
122 >>> parse_bool('True')
123 True
125 >>> parse_bool('true')
126 True
128 >>> parse_bool('')
129 False
131 if isinstance(val, bool):
132 return val
133 if val.lower() == 'true':
134 return True
135 return False
138 def iterate_together(lists, key=lambda x: x, reverse=False):
140 takes ordered, possibly sparse, lists with similar items
141 (some items have a corresponding item in the other lists, some don't).
143 It then yield tuples of corresponding items, where one element is None is
144 there is no corresponding entry in one of the lists.
146 Tuples where both elements are None are skipped.
148 The results of the key method are used for the comparisons.
150 If reverse is True, the lists are expected to be sorted in reverse order
151 and the results will also be sorted reverse
153 >>> list(iterate_together([range(1, 3), range(1, 4, 2)]))
154 [(1, 1), (2, None), (None, 3)]
156 >>> list(iterate_together([[], []]))
159 >>> list(iterate_together([range(1, 3), range(3, 5)]))
160 [(1, None), (2, None), (None, 3), (None, 4)]
162 >>> list(iterate_together([range(1, 3), []]))
163 [(1, None), (2, None)]
165 >>> list(iterate_together([[1, None, 3], [None, None, 3]]))
166 [(1, None), (3, 3)]
169 Next = collections.namedtuple('Next', 'item more')
170 min_ = min if not reverse else max
171 lt_ = operator.lt if not reverse else operator.gt
173 lists = [iter(l) for l in lists]
175 def _take(it):
176 try:
177 i = it.next()
178 while i is None:
179 i = it.next()
180 return Next(i, True)
181 except StopIteration:
182 return Next(None, False)
184 def new_res():
185 return [None]*len(lists)
187 # take first bunch of items
188 items = [_take(l) for l in lists]
190 while any(i.item is not None or i.more for i in items):
192 res = new_res()
194 for n, item in enumerate(items):
196 if item.item is None:
197 continue
199 if all(x is None for x in res):
200 res[n] = item.item
201 continue
203 min_v = min_(filter(lambda x: x is not None, res), key=key)
205 if key(item.item) == key(min_v):
206 res[n] = item.item
208 elif lt_(key(item.item), key(min_v)):
209 res = new_res()
210 res[n] = item.item
212 for n, x in enumerate(res):
213 if x is not None:
214 items[n] = _take(lists[n])
216 yield tuple(res)
219 def progress(val, max_val, status_str='', max_width=50, stream=sys.stdout):
221 factor = float(val)/max_val if max_val > 0 else 0
223 # progress as percentage
224 percentage_str = '{val:.2%}'.format(val=factor)
226 # progress bar filled with #s
227 factor = min(int(factor*max_width), max_width)
228 progress_str = '#' * factor + ' ' * (max_width-factor)
230 #insert percentage into bar
231 percentage_start = int((max_width-len(percentage_str))/2)
232 progress_str = progress_str[:percentage_start] + \
233 percentage_str + \
234 progress_str[percentage_start+len(percentage_str):]
236 print >> stream, '\r',
237 print >> stream, '[ %s ] %s / %s | %s' % (
238 progress_str,
239 val,
240 max_val,
241 status_str),
242 stream.flush()
245 def set_cmp(list, simplify):
247 Builds a set out of a list but uses the results of simplify to determine equality between items
249 simpl = lambda x: (simplify(x), x)
250 lst = dict(map(simpl, list))
251 return lst.values()
254 def first(it):
256 returns the first not-None object or None if the iterator is exhausted
258 for x in it:
259 if x is not None:
260 return x
261 return None
264 def intersect(a, b):
265 return list(set(a) & set(b))
269 def remove_control_chars(s):
270 all_chars = (unichr(i) for i in xrange(0x110000))
271 control_chars = ''.join(map(unichr, range(0,32) + range(127,160)))
272 control_char_re = re.compile('[%s]' % re.escape(control_chars))
274 return control_char_re.sub('', s)
277 def unzip(a):
278 return tuple(map(list,zip(*a)))
281 def parse_range(s, min, max, default=None):
283 Parses the string and returns its value. If the value is outside the given
284 range, its closest number within the range is returned
286 >>> parse_range('5', 0, 10)
289 >>> parse_range('0', 5, 10)
292 >>> parse_range('15',0, 10)
295 >>> parse_range('x', 0, 20)
298 >>> parse_range('x', 0, 20, 20)
301 try:
302 val = int(s)
303 if val < min:
304 return min
305 if val > max:
306 return max
307 return val
309 except (ValueError, TypeError):
310 return default if default is not None else (max-min)/2
314 def flatten(l):
315 return [item for sublist in l for item in sublist]
318 def linearize(key, iterators, reverse=False):
320 Linearizes a number of iterators, sorted by some comparison function
323 iters = [iter(i) for i in iterators]
324 vals = []
325 for i in iters:
326 try:
327 v = i.next()
328 vals. append( (v, i) )
329 except StopIteration:
330 continue
332 while vals:
333 vals = sorted(vals, key=lambda x: key(x[0]), reverse=reverse)
334 val, it = vals.pop(0)
335 yield val
336 try:
337 next_val = it.next()
338 vals.append( (next_val, it) )
339 except StopIteration:
340 pass
343 def skip_pairs(iterator, cmp=cmp):
344 """ Skips pairs of equal items
346 >>> list(skip_pairs([]))
349 >>> list(skip_pairs([1]))
352 >>> list(skip_pairs([1, 2, 3]))
353 [1, 2, 3]
355 >>> list(skip_pairs([1, 1]))
358 >>> list(skip_pairs([1, 2, 2]))
361 >>> list(skip_pairs([1, 2, 2, 3]))
362 [1, 3]
364 >>> list(skip_pairs([1, 2, 2, 2]))
365 [1, 2]
367 >>> list(skip_pairs([1, 2, 2, 2, 2, 3]))
368 [1, 3]
371 iterator = iter(iterator)
372 next = iterator.next()
374 while True:
375 item = next
376 try:
377 next = iterator.next()
378 except StopIteration as e:
379 yield item
380 raise e
382 if cmp(item, next) == 0:
383 next = iterator.next()
384 else:
385 yield item
388 def get_timestamp(datetime_obj):
389 """ Returns the timestamp as an int for the given datetime object
391 >>> get_timestamp(datetime(2011, 4, 7, 9, 30, 6))
392 1302168606
394 >>> get_timestamp(datetime(1970, 1, 1, 0, 0, 0))
397 return int(time.mktime(datetime_obj.timetuple()))
401 re_url = re.compile('^https?://')
403 def is_url(string):
404 """ Returns true if a string looks like an URL
406 >>> is_url('http://example.com/some-path/file.xml')
407 True
409 >>> is_url('something else')
410 False
413 return bool(re_url.match(string))
417 # from http://stackoverflow.com/questions/2892931/longest-common-substring-from-more-than-two-strings-python
418 # this does not increase asymptotical complexity
419 # but can still waste more time than it saves.
420 def shortest_of(strings):
421 return min(strings, key=len)
423 def longest_substr(strings):
425 Returns the longest common substring of the given strings
428 substr = ""
429 if not strings:
430 return substr
431 reference = shortest_of(strings)
432 length = len(reference)
433 #find a suitable slice i:j
434 for i in xrange(length):
435 #only consider strings long at least len(substr) + 1
436 for j in xrange(i + len(substr) + 1, length):
437 candidate = reference[i:j]
438 if all(candidate in text for text in strings):
439 substr = candidate
440 return substr
444 def additional_value(it, gen_val, val_changed=lambda _: True):
445 """ Provides an additional value to the elements, calculated when needed
447 For the elements from the iterator, some additional value can be computed
448 by gen_val (which might be an expensive computation).
450 If the elements in the iterator are ordered so that some subsequent
451 elements would generate the same additional value, val_changed can be
452 provided, which receives the next element from the iterator and the
453 previous additional value. If the element would generate the same
454 additional value (val_changed returns False), its computation is skipped.
456 >>> # get the next full hundred higher than x
457 >>> # this will probably be an expensive calculation
458 >>> next_hundred = lambda x: x + 100-(x % 100)
460 >>> # returns True if h is not the value that next_hundred(x) would provide
461 >>> # this should be a relatively cheap calculation, compared to the above
462 >>> diff_hundred = lambda x, h: (h-x) < 0 or (h - x) > 100
464 >>> xs = [0, 50, 100, 101, 199, 200, 201]
465 >>> list(additional_value(xs, next_hundred, diff_hundred))
466 [(0, 100), (50, 100), (100, 100), (101, 200), (199, 200), (200, 200), (201, 300)]
469 _none = object()
470 current = _none
472 for x in it:
473 if current is _none or val_changed(x, current):
474 current = gen_val(x)
476 yield (x, current)
479 def file_hash(f, h=hashlib.md5, block_size=2**20):
480 """ returns the hash of the contents of a file """
481 f_hash = h()
482 for chunk in iter(lambda: f.read(block_size), ''):
483 f_hash.update(chunk)
484 return f_hash
488 def split_list(l, prop):
489 """ split elements that satisfy a property, and those that don't """
490 match = filter(prop, l)
491 nomatch = [x for x in l if x not in match]
492 return match, nomatch
495 def sorted_chain(links, key, reverse=False):
496 """ Takes a list of iters can iterates over sorted elements
498 Each elment of links should be a tuple of (sort_key, iterator). The
499 elements of each iterator should be sorted already. sort_key should
500 indicate the key of the first element and needs to be comparable to the
501 result of key(elem).
503 The function returns an iterator over the globally sorted element that
504 ensures that as little iterators as possible are evaluated. When
505 evaluating """
507 # mixed_list initially contains all placeholders; later evaluated
508 # elements (from the iterators) are mixed in
509 mixed_list = [(k, link, True) for k, link in links]
511 while mixed_list:
512 _, item, expand = mixed_list.pop(0)
514 # found an element (from an earlier expansion), yield it
515 if not expand:
516 yield item
517 continue
519 # found an iter that needs to be expanded.
520 # The iterator is fully consumed
521 new_items = [(key(i), i, False) for i in item]
523 # sort links (placeholders) and elements together
524 mixed_list = sorted(mixed_list + new_items, key=lambda (k, _v, _e): k,
525 reverse=reverse)
528 def url_add_authentication(url, username, password):
530 Adds authentication data (username, password) to a given
531 URL in order to construct an authenticated URL.
533 >>> url_add_authentication('https://host.com/', '', None)
534 'https://host.com/'
535 >>> url_add_authentication('http://example.org/', None, None)
536 'http://example.org/'
537 >>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
538 'telnet://foo:bar@host.com/'
539 >>> url_add_authentication('ftp://example.org', 'billy', None)
540 'ftp://billy@example.org'
541 >>> url_add_authentication('ftp://example.org', 'billy', '')
542 'ftp://billy:@example.org'
543 >>> url_add_authentication('http://localhost/x', 'aa', 'bc')
544 'http://aa:bc@localhost/x'
545 >>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
546 'http://i%2Fo:P@ss:@blubb.lan/u.html'
547 >>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
548 'http://c:d@x.org/'
549 >>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@x', 'i/')
550 'http://P@x:i%2F@cx.lan'
551 >>> url_add_authentication('http://x.org/', 'a b', 'c d')
552 'http://a%20b:c%20d@x.org/'
554 if username is None or username == '':
555 return url
557 # Relaxations of the strict quoting rules (bug 1521):
558 # 1. Accept '@' in username and password
559 # 2. Acecpt ':' in password only
560 username = urllib.quote(username, safe='@')
562 if password is not None:
563 password = urllib.quote(password, safe='@:')
564 auth_string = ':'.join((username, password))
565 else:
566 auth_string = username
568 url = url_strip_authentication(url)
570 url_parts = list(urlparse.urlsplit(url))
571 # url_parts[1] is the HOST part of the URL
572 url_parts[1] = '@'.join((auth_string, url_parts[1]))
574 return urlparse.urlunsplit(url_parts)
577 def urlopen(url, headers=None, data=None):
579 An URL opener with the User-agent set to gPodder (with version)
581 username, password = username_password_from_url(url)
582 if username is not None or password is not None:
583 url = url_strip_authentication(url)
584 password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
585 password_mgr.add_password(None, url, username, password)
586 handler = urllib2.HTTPBasicAuthHandler(password_mgr)
587 opener = urllib2.build_opener(handler)
588 else:
589 opener = urllib2.build_opener()
591 if headers is None:
592 headers = {}
593 else:
594 headers = dict(headers)
596 headers.update({'User-agent': settings.USER_AGENT})
597 request = urllib2.Request(url, data=data, headers=headers)
598 return opener.open(request)
602 def username_password_from_url(url):
603 r"""
604 Returns a tuple (username,password) containing authentication
605 data from the specified URL or (None,None) if no authentication
606 data can be found in the URL.
608 See Section 3.1 of RFC 1738 (http://www.ietf.org/rfc/rfc1738.txt)
610 >>> username_password_from_url('https://@host.com/')
611 ('', None)
612 >>> username_password_from_url('telnet://host.com/')
613 (None, None)
614 >>> username_password_from_url('ftp://foo:@host.com/')
615 ('foo', '')
616 >>> username_password_from_url('http://a:b@host.com/')
617 ('a', 'b')
618 >>> username_password_from_url(1)
619 Traceback (most recent call last):
621 ValueError: URL has to be a string or unicode object.
622 >>> username_password_from_url(None)
623 Traceback (most recent call last):
625 ValueError: URL has to be a string or unicode object.
626 >>> username_password_from_url('http://a@b:c@host.com/')
627 ('a@b', 'c')
628 >>> username_password_from_url('ftp://a:b:c@host.com/')
629 ('a', 'b:c')
630 >>> username_password_from_url('http://i%2Fo:P%40ss%3A@host.com/')
631 ('i/o', 'P@ss:')
632 >>> username_password_from_url('ftp://%C3%B6sterreich@host.com/')
633 ('\xc3\xb6sterreich', None)
634 >>> username_password_from_url('http://w%20x:y%20z@example.org/')
635 ('w x', 'y z')
636 >>> username_password_from_url('http://example.com/x@y:z@test.com/')
637 (None, None)
639 if type(url) not in (str, unicode):
640 raise ValueError('URL has to be a string or unicode object.')
642 (username, password) = (None, None)
644 (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)
646 if '@' in netloc:
647 (authentication, netloc) = netloc.rsplit('@', 1)
648 if ':' in authentication:
649 (username, password) = authentication.split(':', 1)
651 # RFC1738 dictates that we should not allow ['/', '@', ':']
652 # characters in the username and password field (Section 3.1):
654 # 1. The "/" can't be in there at this point because of the way
655 # urlparse (which we use above) works.
656 # 2. Due to gPodder bug 1521, we allow "@" in the username and
657 # password field. We use netloc.rsplit('@', 1), which will
658 # make sure that we split it at the last '@' in netloc.
659 # 3. The colon must be excluded (RFC2617, Section 2) in the
660 # username, but is apparently allowed in the password. This
661 # is handled by the authentication.split(':', 1) above, and
662 # will cause any extraneous ':'s to be part of the password.
664 username = urllib.unquote(username)
665 password = urllib.unquote(password)
666 else:
667 username = urllib.unquote(authentication)
669 return (username, password)
672 def url_strip_authentication(url):
674 Strips authentication data from an URL. Returns the URL with
675 the authentication data removed from it.
677 >>> url_strip_authentication('https://host.com/')
678 'https://host.com/'
679 >>> url_strip_authentication('telnet://foo:bar@host.com/')
680 'telnet://host.com/'
681 >>> url_strip_authentication('ftp://billy@example.org')
682 'ftp://example.org'
683 >>> url_strip_authentication('ftp://billy:@example.org')
684 'ftp://example.org'
685 >>> url_strip_authentication('http://aa:bc@localhost/x')
686 'http://localhost/x'
687 >>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
688 'http://blubb.lan/u.html'
689 >>> url_strip_authentication('http://c:d@x.org/')
690 'http://x.org/'
691 >>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
692 'http://cx.lan'
693 >>> url_strip_authentication('http://x@x.com:s3cret@example.com/')
694 'http://example.com/'
696 url_parts = list(urlparse.urlsplit(url))
697 # url_parts[1] is the HOST part of the URL
699 # Remove existing authentication data
700 if '@' in url_parts[1]:
701 url_parts[1] = url_parts[1].rsplit('@', 1)[1]
703 return urlparse.urlunsplit(url_parts)
706 # Native filesystem encoding detection
707 encoding = sys.getfilesystemencoding()
709 def sanitize_encoding(filename):
710 r"""
711 Generate a sanitized version of a string (i.e.
712 remove invalid characters and encode in the
713 detected native language encoding).
715 >>> sanitize_encoding('\x80')
717 >>> sanitize_encoding(u'unicode')
718 'unicode'
720 # The encoding problem goes away in Python 3.. hopefully!
721 if sys.version_info >= (3, 0):
722 return filename
724 global encoding
725 if not isinstance(filename, unicode):
726 filename = filename.decode(encoding, 'ignore')
727 return filename.encode(encoding, 'ignore')
730 def get_git_head():
731 """ returns the commit and message of the current git HEAD """
733 try:
734 pr = subprocess.Popen('/usr/bin/git log -n 1 --oneline'.split(),
735 cwd = settings.BASE_DIR,
736 stdout = subprocess.PIPE,
737 stderr = subprocess.PIPE,
740 except OSError:
741 return None, None
743 (out, err) = pr.communicate()
744 if err:
745 return None, None
747 outs = out.split()
748 commit = outs[0]
749 msg = ' ' .join(outs[1:])
750 return commit, msg
754 # https://gist.github.com/samuraisam/901117
756 default_fudge = timedelta(seconds=0, microseconds=0, days=0)
758 def deep_eq(_v1, _v2, datetime_fudge=default_fudge, _assert=False):
760 Tests for deep equality between two python data structures recursing
761 into sub-structures if necessary. Works with all python types including
762 iterators and generators. This function was dreampt up to test API responses
763 but could be used for anything. Be careful. With deeply nested structures
764 you may blow the stack.
766 Options:
767 datetime_fudge => this is a datetime.timedelta object which, when
768 comparing dates, will accept values that differ
769 by the number of seconds specified
770 _assert => passing yes for this will raise an assertion error
771 when values do not match, instead of returning
772 false (very useful in combination with pdb)
774 Doctests included:
776 >>> x1, y1 = ({'a': 'b'}, {'a': 'b'})
777 >>> deep_eq(x1, y1)
778 True
779 >>> x2, y2 = ({'a': 'b'}, {'b': 'a'})
780 >>> deep_eq(x2, y2)
781 False
782 >>> x3, y3 = ({'a': {'b': 'c'}}, {'a': {'b': 'c'}})
783 >>> deep_eq(x3, y3)
784 True
785 >>> x4, y4 = ({'c': 't', 'a': {'b': 'c'}}, {'a': {'b': 'n'}, 'c': 't'})
786 >>> deep_eq(x4, y4)
787 False
788 >>> x5, y5 = ({'a': [1,2,3]}, {'a': [1,2,3]})
789 >>> deep_eq(x5, y5)
790 True
791 >>> x6, y6 = ({'a': [1,'b',8]}, {'a': [2,'b',8]})
792 >>> deep_eq(x6, y6)
793 False
794 >>> x7, y7 = ('a', 'a')
795 >>> deep_eq(x7, y7)
796 True
797 >>> x8, y8 = (['p','n',['asdf']], ['p','n',['asdf']])
798 >>> deep_eq(x8, y8)
799 True
800 >>> x9, y9 = (['p','n',['asdf',['omg']]], ['p', 'n', ['asdf',['nowai']]])
801 >>> deep_eq(x9, y9)
802 False
803 >>> x10, y10 = (1, 2)
804 >>> deep_eq(x10, y10)
805 False
806 >>> deep_eq((str(p) for p in xrange(10)), (str(p) for p in xrange(10)))
807 True
808 >>> str(deep_eq(range(4), range(4)))
809 'True'
810 >>> deep_eq(xrange(100), xrange(100))
811 True
812 >>> deep_eq(xrange(2), xrange(5))
813 False
814 >>> from datetime import datetime, timedelta
815 >>> d1, d2 = (datetime.utcnow(), datetime.utcnow() + timedelta(seconds=4))
816 >>> deep_eq(d1, d2)
817 False
818 >>> deep_eq(d1, d2, datetime_fudge=timedelta(seconds=5))
819 True
821 _deep_eq = functools.partial(deep_eq, datetime_fudge=datetime_fudge,
822 _assert=_assert)
824 def _check_assert(R, a, b, reason=''):
825 if _assert and not R:
826 assert 0, "an assertion has failed in deep_eq (%s) %s != %s" % (
827 reason, str(a), str(b))
828 return R
830 def _deep_dict_eq(d1, d2):
831 k1, k2 = (sorted(d1.keys()), sorted(d2.keys()))
832 if k1 != k2: # keys should be exactly equal
833 return _check_assert(False, k1, k2, "keys")
835 return _check_assert(operator.eq(sum(_deep_eq(d1[k], d2[k])
836 for k in k1),
837 len(k1)), d1, d2, "dictionaries")
839 def _deep_iter_eq(l1, l2):
840 if len(l1) != len(l2):
841 return _check_assert(False, l1, l2, "lengths")
842 return _check_assert(operator.eq(sum(_deep_eq(v1, v2)
843 for v1, v2 in zip(l1, l2)),
844 len(l1)), l1, l2, "iterables")
846 def op(a, b):
847 _op = operator.eq
848 if type(a) == datetime and type(b) == datetime:
849 s = datetime_fudge.seconds
850 t1, t2 = (time.mktime(a.timetuple()), time.mktime(b.timetuple()))
851 l = t1 - t2
852 l = -l if l > 0 else l
853 return _check_assert((-s if s > 0 else s) <= l, a, b, "dates")
854 return _check_assert(_op(a, b), a, b, "values")
856 c1, c2 = (_v1, _v2)
858 # guard against strings because they are iterable and their
859 # elements yield iterables infinitely.
860 # I N C E P T I O N
861 for t in types.StringTypes:
862 if isinstance(_v1, t):
863 break
864 else:
865 if isinstance(_v1, types.DictType):
866 op = _deep_dict_eq
867 else:
868 try:
869 c1, c2 = (list(iter(_v1)), list(iter(_v2)))
870 except TypeError:
871 c1, c2 = _v1, _v2
872 else:
873 op = _deep_iter_eq
875 return op(c1, c2)
878 def parse_request_body(request):
879 """ returns the parsed request body, handles gzip encoding """
881 raw_body = request.body
882 content_enc = request.META.get('HTTP_CONTENT_ENCODING')
884 if content_enc == 'gzip':
885 raw_body = zlib.decompress(raw_body)
887 return json.loads(raw_body)
890 def normalize_feed_url(url):
892 Converts any URL to http:// or ftp:// so that it can be
893 used with "wget". If the URL cannot be converted (invalid
894 or unknown scheme), "None" is returned.
896 This will also normalize feed:// and itpc:// to http://.
898 >>> normalize_feed_url('itpc://example.org/podcast.rss')
899 'http://example.org/podcast.rss'
901 If no URL scheme is defined (e.g. "curry.com"), we will
902 simply assume the user intends to add a http:// feed.
904 >>> normalize_feed_url('curry.com')
905 'http://curry.com/'
907 There are even some more shortcuts for advanced users
908 and lazy typists (see the source for details).
910 >>> normalize_feed_url('fb:43FPodcast')
911 'http://feeds.feedburner.com/43FPodcast'
913 It will also take care of converting the domain name to
914 all-lowercase (because domains are not case sensitive):
916 >>> normalize_feed_url('http://Example.COM/')
917 'http://example.com/'
919 Some other minimalistic changes are also taken care of,
920 e.g. a ? with an empty query is removed:
922 >>> normalize_feed_url('http://example.org/test?')
923 'http://example.org/test'
925 Leading and trailing whitespace is removed
927 >>> normalize_feed_url(' http://example.com/podcast.rss ')
928 'http://example.com/podcast.rss'
930 HTTP Authentication is removed to protect users' privacy
932 >>> normalize_feed_url('http://a@b:c@host.com/')
933 'http://host.com/'
934 >>> normalize_feed_url('ftp://a:b:c@host.com/')
935 'ftp://host.com/'
936 >>> normalize_feed_url('http://i%2Fo:P%40ss%3A@host.com/')
937 'http://host.com/'
938 >>> normalize_feed_url('ftp://%C3%B6sterreich@host.com/')
939 'ftp://host.com/'
940 >>> normalize_feed_url('http://w%20x:y%20z@example.org/')
941 'http://example.org/'
942 >>> normalize_feed_url('http://example.com/x@y:z@test.com/')
943 'http://example.com/x%40y%3Az%40test.com/'
944 >>> normalize_feed_url('http://en.wikipedia.org/wiki/Ä')
945 'http://en.wikipedia.org/wiki/%C3%84'
946 >>> normalize_feed_url('http://en.wikipedia.org/w/index.php?title=Ä&action=edit')
947 'http://en.wikipedia.org/w/index.php?title=%C3%84&action=edit'
949 url = url.strip()
950 if not url or len(url) < 8:
951 return None
953 if isinstance(url, unicode):
954 url = url.encode('utf-8', 'ignore')
956 # This is a list of prefixes that you can use to minimize the amount of
957 # keystrokes that you have to use.
958 # Feel free to suggest other useful prefixes, and I'll add them here.
959 PREFIXES = {
960 'fb:': 'http://feeds.feedburner.com/%s',
961 'yt:': 'http://www.youtube.com/rss/user/%s/videos.rss',
962 'sc:': 'http://soundcloud.com/%s',
963 'fm4od:': 'http://onapp1.orf.at/webcam/fm4/fod/%s.xspf',
964 # YouTube playlists. To get a list of playlists per-user, use:
965 # https://gdata.youtube.com/feeds/api/users/<username>/playlists
966 'ytpl:': 'http://gdata.youtube.com/feeds/api/playlists/%s',
969 for prefix, expansion in PREFIXES.iteritems():
970 if url.startswith(prefix):
971 url = expansion % (url[len(prefix):],)
972 break
974 # Assume HTTP for URLs without scheme
975 if not '://' in url:
976 url = 'http://' + url
978 scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
980 # Schemes and domain names are case insensitive
981 scheme, netloc = scheme.lower(), netloc.lower()
983 # encode non-encoded characters
984 path = urllib.quote(path, '/%')
985 query = urllib.quote_plus(query, ':&=')
987 # Remove authentication to protect users' privacy
988 netloc = netloc.rsplit('@', 1)[-1]
990 # Normalize empty paths to "/"
991 if path == '':
992 path = '/'
994 # feed://, itpc:// and itms:// are really http://
995 if scheme in ('feed', 'itpc', 'itms'):
996 scheme = 'http'
998 if scheme not in ('http', 'https', 'ftp', 'file'):
999 return None
1001 # urlunsplit might return "a slighty different, but equivalent URL"
1002 return urlparse.urlunsplit((scheme, netloc, path, query, fragment))
1005 def partition(items, predicate=bool):
1006 a, b = itertools.tee((predicate(item), item) for item in items)
1007 return ((item for pred, item in a if not pred),
1008 (item for pred, item in b if pred))
1011 def split_quoted(s):
1012 """ Splits a quoted string
1014 >>> split_quoted('some "quoted text"') == ['some', 'quoted text']
1015 True
1017 >>> split_quoted('"quoted text') == ['quoted', 'text']
1018 True
1020 # 4 quotes here are 2 in the doctest is one in the actual string
1021 >>> split_quoted('text\\\\') == ['text']
1022 True
1025 try:
1026 # split by whitespace, preserve quoted substrings
1027 keywords = shlex.split(s)
1029 except ValueError:
1030 # No closing quotation (eg '"text')
1031 # No escaped character (eg '\')
1032 s = s.replace('"', '').replace("'", '').replace('\\', '')
1033 keywords = shlex.split(s)
1035 return keywords
1038 def edit_link(obj):
1039 """ Return the link to the Django Admin Edit page """
1040 return reverse('admin:%s_%s_change' % (obj._meta.app_label,
1041 obj._meta.module_name),
1042 args=(obj.pk,))
1045 def random_token(length=32):
1046 import random
1047 import string
1048 return "".join(random.sample(string.letters+string.digits, length))
1051 def to_maxlength(cls, field, val):
1052 """ Cut val to the maximum length of cls's field """
1053 max_length = cls._meta.get_field(field).max_length
1054 orig_length = len(val)
1055 if orig_length > max_length:
1056 val = val[:max_length]
1057 logger.warn('%s.%s length reduced from %d to %d',
1058 cls.__name__, field, orig_length, max_length)
1060 return val