Small fixes/improvements in static site generation
[cds-indico.git] / indico / modules / users / models / users.py
blobf8875f63b60b5804589cd5a8080e33b0690ef5bc
1 # This file is part of Indico.
2 # Copyright (C) 2002 - 2015 European Organization for Nuclear Research (CERN).
4 # Indico is free software; you can redistribute it and/or
5 # modify it under the terms of the GNU General Public License as
6 # published by the Free Software Foundation; either version 3 of the
7 # License, or (at your option) any later version.
9 # Indico is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 # General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with Indico; if not, see <http://www.gnu.org/licenses/>.
17 from __future__ import unicode_literals
19 from operator import attrgetter
21 from flask import flash
22 from flask_multipass import IdentityRetrievalFailed
23 from sqlalchemy.event import listens_for
24 from sqlalchemy.ext.associationproxy import association_proxy
25 from sqlalchemy.ext.hybrid import hybrid_property
26 from werkzeug.utils import cached_property
28 from indico.core.auth import multipass
29 from indico.core.db import db
30 from indico.core.db.sqlalchemy import PyIntEnum
31 from indico.modules.users.models.affiliations import UserAffiliation
32 from indico.modules.users.models.emails import UserEmail
33 from indico.modules.users.models.favorites import favorite_user_table, FavoriteCategory
34 from indico.modules.users.models.links import UserLink
35 from indico.util.i18n import _
36 from indico.util.string import return_ascii
37 from indico.util.struct.enum import TitledIntEnum
40 class UserTitle(TitledIntEnum):
41 __titles__ = ('', _('Mr.'), _('Ms.'), _('Mrs.'), _('Dr.'), _('Prof.'))
42 none = 0
43 mr = 1
44 ms = 2
45 mrs = 3
46 dr = 4
47 prof = 5
50 #: Fields which can be synced as keys and a mapping to a more human
51 #: readable version, used for flashing messages
52 syncable_fields = {
53 'first_name': _("first name"),
54 'last_name': _("family name"),
55 'affiliation': _("affiliation"),
56 'address': _("address"),
57 'phone': _("phone number")
61 class User(db.Model):
62 """Indico users"""
64 # Useful when dealing with both users and groups in the same code
65 is_group = False
67 __tablename__ = 'users'
68 __table_args__ = (db.CheckConstraint('id != merged_into_id', 'not_merged_self'),
69 db.CheckConstraint("is_pending OR (first_name != '' AND last_name != '')",
70 'not_pending_proper_names'),
71 {'schema': 'users'})
73 #: the unique id of the user
74 id = db.Column(
75 db.Integer,
76 primary_key=True
78 #: the first name of the user
79 first_name = db.Column(
80 db.String,
81 nullable=False,
82 index=True
84 #: the last/family name of the user
85 last_name = db.Column(
86 db.String,
87 nullable=False,
88 index=True
90 # the title of the user - you usually want the `title` property!
91 _title = db.Column(
92 'title',
93 PyIntEnum(UserTitle),
94 nullable=False,
95 default=UserTitle.none
97 #: the phone number of the user
98 phone = db.Column(
99 db.String,
100 nullable=False,
101 default=''
103 #: the address of the user
104 address = db.Column(
105 db.Text,
106 nullable=False,
107 default=''
109 #: the id of the user this user has been merged into
110 merged_into_id = db.Column(
111 db.Integer,
112 db.ForeignKey('users.users.id'),
113 nullable=True
115 #: if the user is an administrator with unrestricted access to everything
116 is_admin = db.Column(
117 db.Boolean,
118 nullable=False,
119 default=False,
120 index=True
122 #: if the user has been blocked
123 is_blocked = db.Column(
124 db.Boolean,
125 nullable=False,
126 default=False
128 #: if the user is pending (e.g. never logged in, only added to some list)
129 is_pending = db.Column(
130 db.Boolean,
131 nullable=False,
132 default=False
134 #: if the user is deleted (e.g. due to a merge)
135 is_deleted = db.Column(
136 'is_deleted',
137 db.Boolean,
138 nullable=False,
139 default=False
142 _affiliation = db.relationship(
143 'UserAffiliation',
144 lazy=False,
145 uselist=False,
146 cascade='all, delete-orphan',
147 backref=db.backref('user', lazy=True)
150 _primary_email = db.relationship(
151 'UserEmail',
152 lazy=False,
153 uselist=False,
154 cascade='all, delete-orphan',
155 primaryjoin='(User.id == UserEmail.user_id) & UserEmail.is_primary'
157 _secondary_emails = db.relationship(
158 'UserEmail',
159 lazy=True,
160 cascade='all, delete-orphan',
161 collection_class=set,
162 primaryjoin='(User.id == UserEmail.user_id) & ~UserEmail.is_primary'
164 _all_emails = db.relationship(
165 'UserEmail',
166 lazy=True,
167 viewonly=True,
168 primaryjoin='User.id == UserEmail.user_id',
169 collection_class=set,
170 backref=db.backref('user', lazy=False)
172 #: the affiliation of the user
173 affiliation = association_proxy('_affiliation', 'name', creator=lambda v: UserAffiliation(name=v))
174 #: the primary email address of the user
175 email = association_proxy('_primary_email', 'email', creator=lambda v: UserEmail(email=v, is_primary=True))
176 #: any additional emails the user might have
177 secondary_emails = association_proxy('_secondary_emails', 'email', creator=lambda v: UserEmail(email=v))
178 #: all emails of the user. read-only; use it only for searching by email! also, do not use it between
179 #: modifying `email` or `secondary_emails` and a session expire/commit!
180 all_emails = association_proxy('_all_emails', 'email') # read-only!
182 #: the user this user has been merged into
183 merged_into_user = db.relationship(
184 'User',
185 lazy=True,
186 backref=db.backref('merged_from_users', lazy=True),
187 remote_side='User.id',
189 #: the users's favorite users
190 favorite_users = db.relationship(
191 'User',
192 secondary=favorite_user_table,
193 primaryjoin=id == favorite_user_table.c.user_id,
194 secondaryjoin=(id == favorite_user_table.c.target_id) & ~is_deleted,
195 lazy=True,
196 collection_class=set,
197 backref=db.backref('favorite_of', lazy=True, collection_class=set),
199 _favorite_categories = db.relationship(
200 'FavoriteCategory',
201 lazy=True,
202 cascade='all, delete-orphan',
203 collection_class=set
205 #: the users's favorite categories
206 favorite_categories = association_proxy('_favorite_categories', 'target',
207 creator=lambda x: FavoriteCategory(target=x))
208 #: the legacy objects the user is connected to
209 linked_objects = db.relationship(
210 'UserLink',
211 lazy='dynamic',
212 cascade='all, delete-orphan',
213 backref=db.backref('user', lazy=True)
215 #: the active API key of the user
216 api_key = db.relationship(
217 'APIKey',
218 lazy=True,
219 uselist=False,
220 cascade='all, delete-orphan',
221 primaryjoin='(User.id == APIKey.user_id) & APIKey.is_active',
222 back_populates='user'
224 #: the previous API keys of the user
225 old_api_keys = db.relationship(
226 'APIKey',
227 lazy=True,
228 cascade='all, delete-orphan',
229 order_by='APIKey.created_dt.desc()',
230 primaryjoin='(User.id == APIKey.user_id) & ~APIKey.is_active',
231 back_populates='user'
233 #: the identities used by this user
234 identities = db.relationship(
235 'Identity',
236 lazy=True,
237 cascade='all, delete-orphan',
238 collection_class=set,
239 backref=db.backref('user', lazy=False)
242 # relationship backrefs:
243 # - local_groups (User.local_groups)
244 # - reservations_booked_for (Reservation.booked_for_user)
245 # - reservations (Reservation.created_by_user)
246 # - blockings (Blocking.created_by_user)
247 # - owned_rooms (Room.owner) - use `Room.get_owned_by()` if you also want managed rooms!
248 # - agreements (Agreement.user)
249 # - requests_created (Request.created_by_user)
250 # - requests_processed (Request.processed_by_user)
251 # - static_sites (StaticSite.user)
253 @property
254 def as_principal(self):
255 """The serializable principal identifier of this user"""
256 return 'User', self.id
258 @property
259 def as_avatar(self):
260 # TODO: remove this after DB is free of Avatars
261 from indico.modules.users.legacy import AvatarUserWrapper
262 avatar = AvatarUserWrapper(self.id)
264 # avoid garbage collection
265 avatar.user
266 return avatar
268 @property
269 def external_identities(self):
270 """The external identities of the user"""
271 return {x for x in self.identities if x.provider != 'indico'}
273 @property
274 def local_identities(self):
275 """The local identities of the user"""
276 return {x for x in self.identities if x.provider == 'indico'}
278 @property
279 def local_identity(self):
280 """The main (most recently used) local identity"""
281 identities = sorted(self.local_identities, key=attrgetter('safe_last_login_dt'), reverse=True)
282 return identities[0] if identities else None
284 @property
285 def secondary_local_identities(self):
286 """The local identities of the user except the main one"""
287 return self.local_identities - {self.local_identity}
289 @property
290 def locator(self):
291 return {'user_id': self.id}
293 @hybrid_property
294 def title(self):
295 """the title of the user"""
296 return self._title.title
298 @title.expression
299 def title(cls):
300 return cls._title
302 @title.setter
303 def title(self, value):
304 self._title = value
306 @cached_property
307 def settings(self):
308 """Returns the user settings proxy for this user"""
309 from indico.modules.users import user_settings
310 return user_settings.bind(self)
312 @property
313 def full_name(self):
314 """Returns the user's name in 'Firstname Lastname' notation."""
315 return self.get_full_name(last_name_first=False, last_name_upper=False, abbrev_first_name=False)
317 # Convenience property to have a common `name` property for both groups and users
318 name = full_name
320 @property
321 def synced_fields(self):
322 """The fields of the user whose values are currently synced.
324 This set is always a subset of the synced fields define in
325 synced fields of the idp in 'indico.conf'.
327 synced_fields = self.settings.get('synced_fields')
328 # If synced_fields is missing or None, then all fields are synced
329 if synced_fields is None:
330 return multipass.synced_fields
331 else:
332 return set(synced_fields) & multipass.synced_fields
334 @synced_fields.setter
335 def synced_fields(self, value):
336 value = set(value) & multipass.synced_fields
337 if value == multipass.synced_fields:
338 self.settings.delete('synced_fields')
339 else:
340 self.settings.set('synced_fields', list(value))
342 @property
343 def synced_values(self):
344 """The values from the synced identity for the user.
346 Those values are not the actual user's values and might differ
347 if they are not set as synchronized.
349 identity = self._get_synced_identity(refresh=False)
350 if identity is None:
351 return {}
352 return {field: (identity.data.get(field) or '') for field in multipass.synced_fields}
354 def __contains__(self, user):
355 """Convenience method for `user in user_or_group`."""
356 return self == user
358 @return_ascii
359 def __repr__(self):
360 return '<User({}, {}, {}, {})>'.format(self.id, self.first_name, self.last_name, self.email)
362 def can_be_modified(self, user):
363 """If this user can be modified by the given user"""
364 return self == user or user.is_admin
366 def get_full_name(self, last_name_first=True, last_name_upper=True, abbrev_first_name=True, show_title=False):
367 """Returns the user's name in the specified notation.
369 Note: Do not use positional arguments when calling this method.
370 Always use keyword arguments!
372 :param last_name_first: if "lastname, firstname" instead of
373 "firstname lastname" should be used
374 :param last_name_upper: if the last name should be all-uppercase
375 :param abbrev_first_name: if the first name should be abbreviated to
376 use only the first character
377 :param show_title: if the title of the user should be included
379 # Pending users might not have a first/last name...
380 first_name = self.first_name or 'Unknown'
381 last_name = self.last_name or 'Unknown'
382 if last_name_upper:
383 last_name = last_name.upper()
384 first_name = '{}.'.format(first_name[0].upper()) if abbrev_first_name else first_name
385 full_name = '{}, {}'.format(last_name, first_name) if last_name_first else '{} {}'.format(first_name, last_name)
386 return full_name if not show_title or not self.title else '{} {}'.format(self.title, full_name)
388 def iter_identifiers(self, check_providers=False, providers=None):
389 """Yields ``(provider, identifier)`` tuples for the user.
391 :param check_providers: If True, providers are searched for
392 additional identifiers once all existing
393 identifiers have been yielded.
394 :param providers: May be a set containing provider names to
395 get only identifiers from the specified
396 providers.
398 done = set()
399 for identity in self.identities:
400 if providers is not None and identity.provider not in providers:
401 continue
402 item = (identity.provider, identity.identifier)
403 done.add(item)
404 yield item
405 if not check_providers:
406 return
407 for identity_info in multipass.search_identities(providers=providers, exact=True, email=self.all_emails):
408 item = (identity_info.provider.name, identity_info.identifier)
409 if item not in done:
410 yield item
412 def make_email_primary(self, email):
413 """Promotes a secondary email address to the primary email address
415 :param email: an email address that is currently a secondary email
417 secondary = next((x for x in self._secondary_emails if x.email == email), None)
418 if secondary is None:
419 raise ValueError('email is not a secondary email address')
420 self._primary_email.is_primary = False
421 db.session.flush()
422 secondary.is_primary = True
423 db.session.flush()
425 def get_linked_roles(self, type_):
426 """Retrieves the roles the user is linked to for a given type"""
427 return UserLink.get_linked_roles(self, type_)
429 def get_linked_objects(self, type_, role):
430 """Retrieves linked objects for the user"""
431 return UserLink.get_links(self, type_, role)
433 def link_to(self, obj, role):
434 """Adds a link between the user and an object
436 :param obj: a legacy object
437 :param role: the role to use in the link
439 return UserLink.create_link(self, obj, role)
441 def unlink_to(self, obj, role):
442 """Removes a link between the user and an object
444 :param obj: a legacy object
445 :param role: the role to use in the link
447 return UserLink.remove_link(self, obj, role)
449 def synchronize_data(self, refresh=False):
450 """Synchronize the fields of the user from the sync identity.
452 This will take only into account :attr:`synced_fields`.
454 :param refresh: bool -- Whether to refresh the synced identity
455 with the sync provider before instead of using
456 the stored data. (Only if the sync provider
457 supports refresh.)
459 identity = self._get_synced_identity(refresh=refresh)
460 if identity is None:
461 return
462 for field in self.synced_fields:
463 old_value = getattr(self, field)
464 new_value = identity.data.get(field) or ''
465 if field in ('first_name', 'last_name') and not new_value:
466 continue
467 if old_value == new_value:
468 continue
469 flash(_("Your {field_name} has been synchronised from '{old_value}' to '{new_value}'.").format(
470 field_name=syncable_fields[field], old_value=old_value, new_value=new_value))
471 setattr(self, field, new_value)
473 def _get_synced_identity(self, refresh=False):
474 sync_provider = multipass.sync_provider
475 if sync_provider is None:
476 return None
477 identities = sorted([x for x in self.identities if x.provider == sync_provider.name],
478 key=attrgetter('safe_last_login_dt'), reverse=True)
479 if not identities:
480 return None
481 identity = identities[0]
482 if refresh and identity.multipass_data is not None:
483 try:
484 identity_info = sync_provider.refresh_identity(identity.identifier, identity.multipass_data)
485 except IdentityRetrievalFailed:
486 identity_info = None
487 if identity_info:
488 identity.data = identity_info.data
489 return identity
492 @listens_for(User._primary_email, 'set')
493 @listens_for(User._secondary_emails, 'append')
494 def _user_email_added(target, value, *unused):
495 # Make sure that a newly added email has the same deletion state as the user itself
496 value.is_user_deleted = target.is_deleted
499 @listens_for(User.is_deleted, 'set')
500 def _user_deleted(target, value, *unused):
501 # Reflect the user deletion state in the email table.
502 # Not using _all_emails here since it only contains newly added emails after an expire/commit
503 if target._primary_email:
504 target._primary_email.is_user_deleted = value
505 for email in target._secondary_emails:
506 email.is_user_deleted = value