Remove unused license preamble
[mygpo.git] / mygpo / utils.py
blob4cbe502e745eb983d1aeda8f736293e63a4382a6
1 # -*- coding: utf-8 -*-
3 import json
4 import functools
5 import types
6 import subprocess
7 import os
8 import operator
9 import sys
10 import re
11 import collections
12 import itertools
13 from datetime import datetime, timedelta, date
14 import time
15 import hashlib
16 import urllib.parse
17 import urllib.request, urllib.parse, urllib.error
18 import urllib.request, urllib.error, urllib.parse
19 import zlib
20 import shlex
22 from django.db import transaction, IntegrityError
23 from django.conf import settings
24 from django.core.urlresolvers import reverse
26 import logging
27 logger = logging.getLogger(__name__)
30 def daterange(from_date, to_date=None, leap=timedelta(days=1)):
31 """
32 >>> from_d = datetime(2010, 1, 1)
33 >>> to_d = datetime(2010, 1, 5)
34 >>> list(daterange(from_d, to_d))
35 [datetime.datetime(2010, 1, 1, 0, 0), datetime.datetime(2010, 1, 2, 0, 0), datetime.datetime(2010, 1, 3, 0, 0), datetime.datetime(2010, 1, 4, 0, 0), datetime.datetime(2010, 1, 5, 0, 0)]
36 """
38 if to_date is None:
39 if isinstance(from_date, datetime):
40 to_date = datetime.utcnow()
41 else:
42 to_date = date.today()
44 while from_date <= to_date:
45 yield from_date
46 from_date = from_date + leap
47 return
49 def format_time(value):
50 """Format an offset (in seconds) to a string
52 The offset should be an integer or float value.
54 >>> format_time(0)
55 '00:00'
56 >>> format_time(20)
57 '00:20'
58 >>> format_time(3600)
59 '01:00:00'
60 >>> format_time(10921)
61 '03:02:01'
62 """
63 try:
64 dt = datetime.utcfromtimestamp(value)
65 except (ValueError, TypeError):
66 return ''
68 if dt.hour == 0:
69 return dt.strftime('%M:%S')
70 else:
71 return dt.strftime('%H:%M:%S')
73 def parse_time(value):
74 """
75 >>> parse_time(10)
78 >>> parse_time('05:10') #5*60+10
79 310
81 >>> parse_time('1:05:10') #60*60+5*60+10
82 3910
83 """
84 if value is None:
85 raise ValueError('None value in parse_time')
87 if isinstance(value, int):
88 # Don't need to parse already-converted time value
89 return value
91 if value == '':
92 raise ValueError('Empty valueing in parse_time')
94 for format in ('%H:%M:%S', '%M:%S'):
95 try:
96 t = time.strptime(value, format)
97 return t.tm_hour * 60*60 + t.tm_min * 60 + t.tm_sec
98 except ValueError as e:
99 continue
101 return int(value)
104 def parse_bool(val):
106 >>> parse_bool('True')
107 True
109 >>> parse_bool('true')
110 True
112 >>> parse_bool('')
113 False
115 if isinstance(val, bool):
116 return val
117 if val.lower() == 'true':
118 return True
119 return False
122 def iterate_together(lists, key=lambda x: x, reverse=False):
124 takes ordered, possibly sparse, lists with similar items
125 (some items have a corresponding item in the other lists, some don't).
127 It then yield tuples of corresponding items, where one element is None is
128 there is no corresponding entry in one of the lists.
130 Tuples where both elements are None are skipped.
132 The results of the key method are used for the comparisons.
134 If reverse is True, the lists are expected to be sorted in reverse order
135 and the results will also be sorted reverse
137 >>> list(iterate_together([range(1, 3), range(1, 4, 2)]))
138 [(1, 1), (2, None), (None, 3)]
140 >>> list(iterate_together([[], []]))
143 >>> list(iterate_together([range(1, 3), range(3, 5)]))
144 [(1, None), (2, None), (None, 3), (None, 4)]
146 >>> list(iterate_together([range(1, 3), []]))
147 [(1, None), (2, None)]
149 >>> list(iterate_together([[1, None, 3], [None, None, 3]]))
150 [(1, None), (3, 3)]
153 Next = collections.namedtuple('Next', 'item more')
154 min_ = min if not reverse else max
155 lt_ = operator.lt if not reverse else operator.gt
157 lists = [iter(l) for l in lists]
159 def _take(it):
160 try:
161 i = next(it)
162 while i is None:
163 i = next(it)
164 return Next(i, True)
165 except StopIteration:
166 return Next(None, False)
168 def new_res():
169 return [None]*len(lists)
171 # take first bunch of items
172 items = [_take(l) for l in lists]
174 while any(i.item is not None or i.more for i in items):
176 res = new_res()
178 for n, item in enumerate(items):
180 if item.item is None:
181 continue
183 if all(x is None for x in res):
184 res[n] = item.item
185 continue
187 min_v = min_(filter(lambda x: x is not None, res), key=key)
189 if key(item.item) == key(min_v):
190 res[n] = item.item
192 elif lt_(key(item.item), key(min_v)):
193 res = new_res()
194 res[n] = item.item
196 for n, x in enumerate(res):
197 if x is not None:
198 items[n] = _take(lists[n])
200 yield tuple(res)
203 def progress(val, max_val, status_str='', max_width=50, stream=sys.stdout):
205 factor = float(val)/max_val if max_val > 0 else 0
207 # progress as percentage
208 percentage_str = '{val:.2%}'.format(val=factor)
210 # progress bar filled with #s
211 factor = min(int(factor*max_width), max_width)
212 progress_str = '#' * factor + ' ' * (max_width-factor)
214 #insert percentage into bar
215 percentage_start = int((max_width-len(percentage_str))/2)
216 progress_str = progress_str[:percentage_start] + \
217 percentage_str + \
218 progress_str[percentage_start+len(percentage_str):]
220 print('\r', end=' ', file=stream)
221 print('[ %s ] %s / %s | %s' % (
222 progress_str,
223 val,
224 max_val,
225 status_str), end=' ', file=stream)
226 stream.flush()
229 def set_cmp(list, simplify):
231 Builds a set out of a list but uses the results of simplify to determine equality between items
233 simpl = lambda x: (simplify(x), x)
234 lst = dict(map(simpl, list))
235 return list(lst.values())
238 def first(it):
240 returns the first not-None object or None if the iterator is exhausted
242 for x in it:
243 if x is not None:
244 return x
245 return None
248 def intersect(a, b):
249 return list(set(a) & set(b))
253 def remove_control_chars(s):
254 all_chars = (chr(i) for i in range(0x110000))
255 control_chars = ''.join(map(chr, list(range(0,32)) + list(range(127,160))))
256 control_char_re = re.compile('[%s]' % re.escape(control_chars))
258 return control_char_re.sub('', s)
261 def unzip(a):
262 return tuple(map(list,zip(*a)))
265 def parse_range(s, min, max, default=None):
267 Parses the string and returns its value. If the value is outside the given
268 range, its closest number within the range is returned
270 >>> parse_range('5', 0, 10)
273 >>> parse_range('0', 5.0, 10)
276 >>> parse_range('15',0, 10)
279 >>> parse_range('x', 0., 20)
280 10.0
282 >>> parse_range('x', 0, 20, 20)
285 out_type = type(min)
287 try:
288 val = int(s)
289 if val < min:
290 return min
291 if val > max:
292 return max
293 return val
295 except (ValueError, TypeError):
296 return default if default is not None else out_type((max-min)/2)
300 def flatten(l):
301 return [item for sublist in l for item in sublist]
304 def linearize(key, iterators, reverse=False):
306 Linearizes a number of iterators, sorted by some comparison function
309 iters = [iter(i) for i in iterators]
310 vals = []
311 for i in iters:
312 try:
313 v = next(i)
314 vals. append( (v, i) )
315 except StopIteration:
316 continue
318 while vals:
319 vals = sorted(vals, key=lambda x: key(x[0]), reverse=reverse)
320 val, it = vals.pop(0)
321 yield val
322 try:
323 next_val = next(it)
324 vals.append( (next_val, it) )
325 except StopIteration:
326 pass
329 def get_timestamp(datetime_obj):
330 """ Returns the timestamp as an int for the given datetime object
332 >>> get_timestamp(datetime(2011, 4, 7, 9, 30, 6))
333 1302168606
335 >>> get_timestamp(datetime(1970, 1, 1, 0, 0, 0))
338 return int(time.mktime(datetime_obj.timetuple()))
342 re_url = re.compile('^https?://')
344 def is_url(string):
345 """ Returns true if a string looks like an URL
347 >>> is_url('http://example.com/some-path/file.xml')
348 True
350 >>> is_url('something else')
351 False
354 return bool(re_url.match(string))
358 # from http://stackoverflow.com/questions/2892931/longest-common-substring-from-more-than-two-strings-python
359 # this does not increase asymptotical complexity
360 # but can still waste more time than it saves.
361 def shortest_of(strings):
362 return min(strings, key=len)
364 def longest_substr(strings):
366 Returns the longest common substring of the given strings
369 substr = ""
370 if not strings:
371 return substr
372 reference = shortest_of(strings)
373 length = len(reference)
374 #find a suitable slice i:j
375 for i in range(length):
376 #only consider strings long at least len(substr) + 1
377 for j in range(i + len(substr) + 1, length):
378 candidate = reference[i:j]
379 if all(candidate in text for text in strings):
380 substr = candidate
381 return substr
385 def additional_value(it, gen_val, val_changed=lambda _: True):
386 """ Provides an additional value to the elements, calculated when needed
388 For the elements from the iterator, some additional value can be computed
389 by gen_val (which might be an expensive computation).
391 If the elements in the iterator are ordered so that some subsequent
392 elements would generate the same additional value, val_changed can be
393 provided, which receives the next element from the iterator and the
394 previous additional value. If the element would generate the same
395 additional value (val_changed returns False), its computation is skipped.
397 >>> # get the next full hundred higher than x
398 >>> # this will probably be an expensive calculation
399 >>> next_hundred = lambda x: x + 100-(x % 100)
401 >>> # returns True if h is not the value that next_hundred(x) would provide
402 >>> # this should be a relatively cheap calculation, compared to the above
403 >>> diff_hundred = lambda x, h: (h-x) < 0 or (h - x) > 100
405 >>> xs = [0, 50, 100, 101, 199, 200, 201]
406 >>> list(additional_value(xs, next_hundred, diff_hundred))
407 [(0, 100), (50, 100), (100, 100), (101, 200), (199, 200), (200, 200), (201, 300)]
410 _none = object()
411 current = _none
413 for x in it:
414 if current is _none or val_changed(x, current):
415 current = gen_val(x)
417 yield (x, current)
420 def file_hash(f, h=hashlib.md5, block_size=2**20):
421 """ returns the hash of the contents of a file """
422 f_hash = h()
423 while True:
424 buf = f.read(block_size)
425 if not buf:
426 break
427 f_hash.update( buf )
429 return f_hash
432 def split_list(l, prop):
433 """ split elements that satisfy a property, and those that don't """
434 match = list(filter(prop, l))
435 nomatch = [x for x in l if x not in match]
436 return match, nomatch
439 def sorted_chain(links, key, reverse=False):
440 """ Takes a list of iters can iterates over sorted elements
442 Each elment of links should be a tuple of (sort_key, iterator). The
443 elements of each iterator should be sorted already. sort_key should
444 indicate the key of the first element and needs to be comparable to the
445 result of key(elem).
447 The function returns an iterator over the globally sorted element that
448 ensures that as little iterators as possible are evaluated. When
449 evaluating """
451 # mixed_list initially contains all placeholders; later evaluated
452 # elements (from the iterators) are mixed in
453 mixed_list = [(k, link, True) for k, link in links]
455 while mixed_list:
456 _, item, expand = mixed_list.pop(0)
458 # found an element (from an earlier expansion), yield it
459 if not expand:
460 yield item
461 continue
463 # found an iter that needs to be expanded.
464 # The iterator is fully consumed
465 new_items = [(key(i), i, False) for i in item]
467 # sort links (placeholders) and elements together
468 mixed_list = sorted(mixed_list + new_items, key=lambda t: t[0],
469 reverse=reverse)
472 def url_add_authentication(url, username, password):
474 Adds authentication data (username, password) to a given
475 URL in order to construct an authenticated URL.
477 >>> url_add_authentication('https://host.com/', '', None)
478 'https://host.com/'
479 >>> url_add_authentication('http://example.org/', None, None)
480 'http://example.org/'
481 >>> url_add_authentication('telnet://host.com/', 'foo', 'bar')
482 'telnet://foo:bar@host.com/'
483 >>> url_add_authentication('ftp://example.org', 'billy', None)
484 'ftp://billy@example.org'
485 >>> url_add_authentication('ftp://example.org', 'billy', '')
486 'ftp://billy:@example.org'
487 >>> url_add_authentication('http://localhost/x', 'aa', 'bc')
488 'http://aa:bc@localhost/x'
489 >>> url_add_authentication('http://blubb.lan/u.html', 'i/o', 'P@ss:')
490 'http://i%2Fo:P@ss:@blubb.lan/u.html'
491 >>> url_add_authentication('http://a:b@x.org/', 'c', 'd')
492 'http://c:d@x.org/'
493 >>> url_add_authentication('http://i%2F:P%40%3A@cx.lan', 'P@x', 'i/')
494 'http://P@x:i%2F@cx.lan'
495 >>> url_add_authentication('http://x.org/', 'a b', 'c d')
496 'http://a%20b:c%20d@x.org/'
498 if username is None or username == '':
499 return url
501 # Relaxations of the strict quoting rules (bug 1521):
502 # 1. Accept '@' in username and password
503 # 2. Acecpt ':' in password only
504 username = urllib.parse.quote(username, safe='@')
506 if password is not None:
507 password = urllib.parse.quote(password, safe='@:')
508 auth_string = ':'.join((username, password))
509 else:
510 auth_string = username
512 url = url_strip_authentication(url)
514 url_parts = list(urllib.parse.urlsplit(url))
515 # url_parts[1] is the HOST part of the URL
516 url_parts[1] = '@'.join((auth_string, url_parts[1]))
518 return urllib.parse.urlunsplit(url_parts)
521 def urlopen(url, headers=None, data=None):
523 An URL opener with the User-agent set to gPodder (with version)
525 username, password = username_password_from_url(url)
526 if username is not None or password is not None:
527 url = url_strip_authentication(url)
528 password_mgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
529 password_mgr.add_password(None, url, username, password)
530 handler = urllib.request.HTTPBasicAuthHandler(password_mgr)
531 opener = urllib.request.build_opener(handler)
532 else:
533 opener = urllib.request.build_opener()
535 if headers is None:
536 headers = {}
537 else:
538 headers = dict(headers)
540 headers.update({'User-agent': settings.USER_AGENT})
541 request = urllib.request.Request(url, data=data, headers=headers)
542 return opener.open(request)
546 def username_password_from_url(url):
547 r"""
548 Returns a tuple (username,password) containing authentication
549 data from the specified URL or (None,None) if no authentication
550 data can be found in the URL.
552 See Section 3.1 of RFC 1738 (http://www.ietf.org/rfc/rfc1738.txt)
554 >>> username_password_from_url('https://@host.com/')
555 ('', None)
556 >>> username_password_from_url('telnet://host.com/')
557 (None, None)
558 >>> username_password_from_url('ftp://foo:@host.com/')
559 ('foo', '')
560 >>> username_password_from_url('http://a:b@host.com/')
561 ('a', 'b')
562 >>> username_password_from_url(1)
563 Traceback (most recent call last):
565 ValueError: URL has to be a string or unicode object.
566 >>> username_password_from_url(None)
567 Traceback (most recent call last):
569 ValueError: URL has to be a string or unicode object.
570 >>> username_password_from_url('http://a@b:c@host.com/')
571 ('a@b', 'c')
572 >>> username_password_from_url('ftp://a:b:c@host.com/')
573 ('a', 'b:c')
574 >>> username_password_from_url('http://i%2Fo:P%40ss%3A@host.com/')
575 ('i/o', 'P@ss:')
576 >>> username_password_from_url('ftp://%C3%B6sterreich@host.com/')
577 ('österreich', None)
578 >>> username_password_from_url('http://w%20x:y%20z@example.org/')
579 ('w x', 'y z')
580 >>> username_password_from_url('http://example.com/x@y:z@test.com/')
581 (None, None)
583 if type(url) not in (str, str):
584 raise ValueError('URL has to be a string or unicode object.')
586 (username, password) = (None, None)
588 (scheme, netloc, path, params, query, fragment) = urllib.parse.urlparse(url)
590 if '@' in netloc:
591 (authentication, netloc) = netloc.rsplit('@', 1)
592 if ':' in authentication:
593 (username, password) = authentication.split(':', 1)
595 # RFC1738 dictates that we should not allow ['/', '@', ':']
596 # characters in the username and password field (Section 3.1):
598 # 1. The "/" can't be in there at this point because of the way
599 # urlparse (which we use above) works.
600 # 2. Due to gPodder bug 1521, we allow "@" in the username and
601 # password field. We use netloc.rsplit('@', 1), which will
602 # make sure that we split it at the last '@' in netloc.
603 # 3. The colon must be excluded (RFC2617, Section 2) in the
604 # username, but is apparently allowed in the password. This
605 # is handled by the authentication.split(':', 1) above, and
606 # will cause any extraneous ':'s to be part of the password.
608 username = urllib.parse.unquote(username)
609 password = urllib.parse.unquote(password)
610 else:
611 username = urllib.parse.unquote(authentication)
613 return (username, password)
616 def url_strip_authentication(url):
618 Strips authentication data from an URL. Returns the URL with
619 the authentication data removed from it.
621 >>> url_strip_authentication('https://host.com/')
622 'https://host.com/'
623 >>> url_strip_authentication('telnet://foo:bar@host.com/')
624 'telnet://host.com/'
625 >>> url_strip_authentication('ftp://billy@example.org')
626 'ftp://example.org'
627 >>> url_strip_authentication('ftp://billy:@example.org')
628 'ftp://example.org'
629 >>> url_strip_authentication('http://aa:bc@localhost/x')
630 'http://localhost/x'
631 >>> url_strip_authentication('http://i%2Fo:P%40ss%3A@blubb.lan/u.html')
632 'http://blubb.lan/u.html'
633 >>> url_strip_authentication('http://c:d@x.org/')
634 'http://x.org/'
635 >>> url_strip_authentication('http://P%40%3A:i%2F@cx.lan')
636 'http://cx.lan'
637 >>> url_strip_authentication('http://x@x.com:s3cret@example.com/')
638 'http://example.com/'
640 url_parts = list(urllib.parse.urlsplit(url))
641 # url_parts[1] is the HOST part of the URL
643 # Remove existing authentication data
644 if '@' in url_parts[1]:
645 url_parts[1] = url_parts[1].rsplit('@', 1)[1]
647 return urllib.parse.urlunsplit(url_parts)
650 # Native filesystem encoding detection
651 encoding = sys.getfilesystemencoding()
654 def get_git_head():
655 """ returns the commit and message of the current git HEAD """
657 try:
658 pr = subprocess.Popen('/usr/bin/git log -n 1 --oneline'.split(),
659 cwd = settings.BASE_DIR,
660 stdout = subprocess.PIPE,
661 stderr = subprocess.PIPE,
664 except OSError:
665 return None, None
667 (out, err) = pr.communicate()
668 if err:
669 return None, None
671 outs = [o.decode('utf-8') for o in out.split()]
672 commit = outs[0]
673 msg = ' ' .join(outs[1:])
674 return commit, msg
677 def parse_request_body(request):
678 """ returns the parsed request body, handles gzip encoding """
680 raw_body = request.body
681 content_enc = request.META.get('HTTP_CONTENT_ENCODING')
683 if content_enc == 'gzip':
684 raw_body = zlib.decompress(raw_body)
686 return json.loads(raw_body.decode('utf-8'))
689 def normalize_feed_url(url):
691 Converts any URL to http:// or ftp:// so that it can be
692 used with "wget". If the URL cannot be converted (invalid
693 or unknown scheme), "None" is returned.
695 This will also normalize feed:// and itpc:// to http://.
697 >>> normalize_feed_url('itpc://example.org/podcast.rss')
698 'http://example.org/podcast.rss'
700 If no URL scheme is defined (e.g. "curry.com"), we will
701 simply assume the user intends to add a http:// feed.
703 >>> normalize_feed_url('curry.com')
704 'http://curry.com/'
706 There are even some more shortcuts for advanced users
707 and lazy typists (see the source for details).
709 >>> normalize_feed_url('fb:43FPodcast')
710 'http://feeds.feedburner.com/43FPodcast'
712 It will also take care of converting the domain name to
713 all-lowercase (because domains are not case sensitive):
715 >>> normalize_feed_url('http://Example.COM/')
716 'http://example.com/'
718 Some other minimalistic changes are also taken care of,
719 e.g. a ? with an empty query is removed:
721 >>> normalize_feed_url('http://example.org/test?')
722 'http://example.org/test'
724 Leading and trailing whitespace is removed
726 >>> normalize_feed_url(' http://example.com/podcast.rss ')
727 'http://example.com/podcast.rss'
729 HTTP Authentication is removed to protect users' privacy
731 >>> normalize_feed_url('http://a@b:c@host.com/')
732 'http://host.com/'
733 >>> normalize_feed_url('ftp://a:b:c@host.com/')
734 'ftp://host.com/'
735 >>> normalize_feed_url('http://i%2Fo:P%40ss%3A@host.com/')
736 'http://host.com/'
737 >>> normalize_feed_url('ftp://%C3%B6sterreich@host.com/')
738 'ftp://host.com/'
739 >>> normalize_feed_url('http://w%20x:y%20z@example.org/')
740 'http://example.org/'
741 >>> normalize_feed_url('http://example.com/x@y:z@test.com/')
742 'http://example.com/x%40y%3Az%40test.com/'
743 >>> normalize_feed_url('http://en.wikipedia.org/wiki/Ä')
744 'http://en.wikipedia.org/wiki/%C3%84'
745 >>> normalize_feed_url('http://en.wikipedia.org/w/index.php?title=Ä&action=edit')
746 'http://en.wikipedia.org/w/index.php?title=%C3%84&action=edit'
748 url = url.strip()
749 if not url or len(url) < 8:
750 return None
752 # This is a list of prefixes that you can use to minimize the amount of
753 # keystrokes that you have to use.
754 # Feel free to suggest other useful prefixes, and I'll add them here.
755 PREFIXES = {
756 'fb:': 'http://feeds.feedburner.com/%s',
757 'yt:': 'http://www.youtube.com/rss/user/%s/videos.rss',
758 'sc:': 'http://soundcloud.com/%s',
759 'fm4od:': 'http://onapp1.orf.at/webcam/fm4/fod/%s.xspf',
760 # YouTube playlists. To get a list of playlists per-user, use:
761 # https://gdata.youtube.com/feeds/api/users/<username>/playlists
762 'ytpl:': 'http://gdata.youtube.com/feeds/api/playlists/%s',
765 for prefix, expansion in PREFIXES.items():
766 if url.startswith(prefix):
767 url = expansion % (url[len(prefix):],)
768 break
770 # Assume HTTP for URLs without scheme
771 if not '://' in url:
772 url = 'http://' + url
774 scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
776 # Schemes and domain names are case insensitive
777 scheme, netloc = scheme.lower(), netloc.lower()
779 # encode non-encoded characters
780 path = urllib.parse.quote(path, '/%')
781 query = urllib.parse.quote_plus(query, ':&=')
783 # Remove authentication to protect users' privacy
784 netloc = netloc.rsplit('@', 1)[-1]
786 # Normalize empty paths to "/"
787 if path == '':
788 path = '/'
790 # feed://, itpc:// and itms:// are really http://
791 if scheme in ('feed', 'itpc', 'itms'):
792 scheme = 'http'
794 if scheme not in ('http', 'https', 'ftp', 'file'):
795 return None
797 # urlunsplit might return "a slighty different, but equivalent URL"
798 return urllib.parse.urlunsplit((scheme, netloc, path, query, fragment))
801 def partition(items, predicate=bool):
802 a, b = itertools.tee((predicate(item), item) for item in items)
803 return ((item for pred, item in a if not pred),
804 (item for pred, item in b if pred))
807 def split_quoted(s):
808 """ Splits a quoted string
810 >>> split_quoted('some "quoted text"') == ['some', 'quoted text']
811 True
813 >>> split_quoted('"quoted text') == ['quoted', 'text']
814 True
816 # 4 quotes here are 2 in the doctest is one in the actual string
817 >>> split_quoted('text\\\\') == ['text']
818 True
821 try:
822 # split by whitespace, preserve quoted substrings
823 keywords = shlex.split(s)
825 except ValueError:
826 # No closing quotation (eg '"text')
827 # No escaped character (eg '\')
828 s = s.replace('"', '').replace("'", '').replace('\\', '')
829 keywords = shlex.split(s)
831 return keywords
834 def edit_link(obj):
835 """ Return the link to the Django Admin Edit page """
836 return reverse('admin:%s_%s_change' % (obj._meta.app_label,
837 obj._meta.model_name),
838 args=(obj.pk,))
841 def random_token(length=32):
842 import random
843 import string
844 return "".join(random.sample(string.ascii_letters+string.digits, length))
847 def to_maxlength(cls, field, val):
848 """ Cut val to the maximum length of cls's field """
849 if val is None:
850 return None
852 max_length = cls._meta.get_field(field).max_length
853 orig_length = len(val)
854 if orig_length > max_length:
855 val = val[:max_length]
856 logger.warn('%s.%s length reduced from %d to %d',
857 cls.__name__, field, orig_length, max_length)
859 return val
862 def get_domain(url):
863 """ Returns the domain name of a URL
865 >>> get_domain('http://example.com')
866 'example.com'
868 >>> get_domain('https://example.com:80/my-podcast/feed.rss')
869 'example.com'
871 netloc = urllib.parse.urlparse(url).netloc
872 try:
873 port_idx = netloc.index(':')
874 return netloc[:port_idx]
876 except ValueError:
877 return netloc
880 def set_ordered_entries(obj, new_entries, existing, EntryClass,
881 value_name, parent_name):
882 """ Update the object's entries to the given list
884 'new_entries' should be a list of objects that are later wrapped in
885 EntryClass instances. 'value_name' is the name of the EntryClass property
886 that contains the values; 'parent_name' is the one that references obj.
888 Entries that do not exist are created. Existing entries that are not in
889 'new_entries' are deleted. """
891 logger.info('%d existing entries', len(existing))
893 logger.info('%d new entries', len(new_entries))
895 with transaction.atomic():
896 max_order = max([s.order for s in existing.values()] +
897 [len(new_entries)])
898 logger.info('Renumbering entries starting from %d', max_order+1)
899 for n, entry in enumerate(existing.values(), max_order+1):
900 entry.order = n
901 entry.save()
903 logger.info('%d existing entries', len(existing))
905 for n, entry in enumerate(new_entries):
906 try:
907 e = existing.pop(entry)
908 logger.info('Updating existing entry %d: %s', n, entry)
909 e.order = n
910 e.save()
911 except KeyError:
912 logger.info('Creating new entry %d: %s', n, entry)
913 try:
914 links = {
915 value_name: entry,
916 parent_name: obj,
918 from mygpo.podcasts.models import ScopedModel
919 if issubclass(EntryClass, ScopedModel):
920 links['scope'] = obj.scope
922 EntryClass.objects.create(order=n, **links)
923 except IntegrityError as ie:
924 logger.warn('Could not create enry for %s: %s', obj, ie)
926 with transaction.atomic():
927 delete = [s.pk for s in existing.values()]
928 logger.info('Deleting %d entries', len(delete))
929 EntryClass.objects.filter(id__in=delete).delete()