Prevent infinite recursion in navigating timetable
[cds-indico.git] / indico_zodbimport / util.py
blob460e03887c5fa8bfa6022a5e0f46a15b5bee8431
1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 import os
18 import re
19 import sys
20 from contextlib import contextmanager
21 from urlparse import urlparse
23 from ZODB import DB, FileStorage
24 from ZODB.broken import find_global, Broken
25 from ZEO.ClientStorage import ClientStorage
27 from indico.core.auth import IndicoMultipass, multipass
28 from indico.core.db.sqlalchemy.protection import ProtectionMode
29 from indico.modules.groups import GroupProxy
30 from indico.modules.groups.legacy import GroupWrapper
31 from indico.modules.users.legacy import AvatarUserWrapper
32 from indico.util.console import colored
35 class NotBroken(Broken):
36 """Like Broken, but it makes the attributes available"""
38 def __setstate__(self, state):
39 self.__dict__.update(state)
42 class UnbreakingDB(DB):
43 def classFactory(self, connection, modulename, globalname):
44 modulename = re.sub(r'^IndexedCatalog\.BTrees\.', 'BTrees.', modulename)
45 if globalname == 'PersistentMapping':
46 modulename = 'persistent.mapping'
47 elif globalname == 'PersistentList':
48 modulename = 'persistent.list'
49 return find_global(modulename, globalname, Broken=NotBroken)
52 def get_storage(zodb_uri):
53 uri_parts = urlparse(str(zodb_uri))
55 print colored("Trying to open {}...".format(zodb_uri), 'green')
57 if uri_parts.scheme == 'zeo':
58 if uri_parts.port is None:
59 print colored("No ZEO port specified. Assuming 9675", 'yellow')
61 storage = ClientStorage((uri_parts.hostname, uri_parts.port or 9675),
62 username=uri_parts.username,
63 password=uri_parts.password,
64 realm=uri_parts.path[1:])
66 elif uri_parts.scheme in ('file', None):
67 storage = FileStorage.FileStorage(uri_parts.path)
68 else:
69 raise Exception("URI scheme not known: {}".format(uri_parts.scheme))
70 print colored("Done!", 'green')
71 return storage
74 def convert_to_unicode(val, _control_char_re=re.compile(ur'[\x00-\x08\x0b-\x0c\x0e-\x1f]')):
75 if isinstance(val, str):
76 try:
77 rv = unicode(val, 'utf-8')
78 except UnicodeError:
79 rv = unicode(val, 'latin1')
80 elif isinstance(val, unicode):
81 rv = val
82 elif isinstance(val, int):
83 rv = unicode(val)
84 elif val is None:
85 rv = u''
86 else:
87 raise RuntimeError('Unexpected type {} is found for unicode conversion: {!r}'.format(type(val), val))
88 # get rid of hard tabs and control chars
89 rv = rv.replace(u'\t', u' ' * 4)
90 rv = _control_char_re.sub(u'', rv)
91 return rv
94 def convert_principal_list(opt):
95 """Converts a 'users' plugin setting to the new format"""
96 principals = set()
97 for principal in opt._PluginOption__value:
98 if principal.__class__.__name__ == 'Avatar':
99 principals.add(('Avatar', principal.id))
100 else:
101 principals.add(('Group', principal.id))
102 return list(principals)
105 def option_value(opt):
106 """Gets a plugin option value"""
107 value = opt._PluginOption__value
108 if isinstance(value, basestring):
109 value = convert_to_unicode(value)
110 return value
113 def get_archived_file(f, archive_paths):
114 """Returns the name and path of an archived file
116 :param f: A `LocalFile` object
117 :param archive_paths: The path that was used in the ``ArchiveDir``
118 config option ot a list of multiple paths.
120 # this is based pretty much on MaterialLocalRepository.__getFilePath, but we don't
121 # call any legacy methods in ZODB migrations to avoid breakage in the future.
122 if f is None:
123 return None, None
124 if isinstance(archive_paths, basestring):
125 archive_paths = [archive_paths]
126 archive_id = f._LocalFile__archivedId
127 repo = f._LocalFile__repository
128 for archive_path in archive_paths:
129 path = os.path.join(archive_path, repo._MaterialLocalRepository__files[archive_id])
130 if os.path.exists(path):
131 return f.fileName, path
132 for mode, enc in (('strict', 'iso-8859-1'), ('replace', sys.getfilesystemencoding()), ('replace', 'ascii')):
133 enc_path = path.decode('utf-8', mode).encode(enc, 'replace')
134 if os.path.exists(enc_path):
135 return f.fileName, enc_path
136 return f.fileName, None
139 def protection_from_ac(target, ac, acl_attr='acl', ac_attr='allowed', allow_public=False):
140 """Converts AccessController data to ProtectionMixin style
142 This needs to run inside the context of `patch_default_group_provider`.
144 :param target: The new object that uses ProtectionMixin
145 :param ac: The old AccessController
146 :param acl_attr: The attribute name for the acl of `target`
147 :param ac_attr: The attribute name for the acl in `ac`
148 :param allow_public: If the object allows `ProtectionMode.public`.
149 Otherwise, public is converted to inheriting.
151 if ac._accessProtection == -1:
152 target.protection_mode = ProtectionMode.public if allow_public else ProtectionMode.inheriting
153 elif ac._accessProtection == 0:
154 target.protection_mode = ProtectionMode.inheriting
155 elif ac._accessProtection == 1:
156 target.protection_mode = ProtectionMode.protected
157 acl = getattr(target, acl_attr)
158 for principal in getattr(ac, ac_attr):
159 if isinstance(principal, AvatarUserWrapper):
160 principal = principal.user
161 elif isinstance(principal, GroupWrapper):
162 principal = principal.group
163 elif principal.__class__.__name__ == 'Avatar':
164 principal = AvatarUserWrapper(principal.id).user
165 elif principal.__class__.__name__ == 'Group':
166 principal = GroupProxy(principal.id)
167 elif principal.__class__.__name__ in {'CERNGroup', 'LDAPGroup', 'NiceGroup'}:
168 principal = GroupProxy(principal.id, multipass.default_group_provider.name)
169 assert principal is not None
170 acl.add(principal)
171 else:
172 raise ValueError('Unexpected protection: {}'.format(ac._accessProtection))
175 @contextmanager
176 def patch_default_group_provider(provider_name):
177 """Monkeypatches Multipass to use a certain default group provider"""
178 class FakeProvider(object):
179 name = provider_name
180 provider = FakeProvider()
181 prop = IndicoMultipass.default_group_provider
182 IndicoMultipass.default_group_provider = property(lambda m: provider)
183 try:
184 yield
185 finally:
186 IndicoMultipass.default_group_provider = prop