Merge branch 'alias' into 'master'
[mailman.git] / src / mailman / model / usermanager.py
blobf909c1a7dbac473c6e8ccb02cc96f623031d62d4
1 # Copyright (C) 2007-2019 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <http://www.gnu.org/licenses/>.
18 """A user manager."""
20 from mailman.database.transaction import dbconnection
21 from mailman.interfaces.address import ExistingAddressError
22 from mailman.interfaces.usermanager import IUserManager
23 from mailman.model.address import Address
24 from mailman.model.autorespond import AutoResponseRecord
25 from mailman.model.digests import OneLastDigest
26 from mailman.model.member import Member
27 from mailman.model.preferences import Preferences
28 from mailman.model.user import User
29 from public import public
30 from zope.interface import implementer
33 @public
34 @implementer(IUserManager)
35 class UserManager:
36 """See `IUserManager`."""
38 def create_user(self, email=None, display_name=None):
39 """See `IUserManager`."""
40 if email:
41 address = self.create_address(email, display_name)
42 user = User(display_name, Preferences())
43 if email:
44 user.link(address)
45 return user
47 def make_user(self, email, display_name=None):
48 """See `IUserManager`."""
49 # See if there's already a user linked with the given address.
50 user = self.get_user(email)
51 if user is None:
52 # A user linked to this address does not yet exist. Is the
53 # address itself known but just not linked to a user?
54 address = self.get_address(email)
55 if address is None:
56 # Nope, we don't even know about this address, so create both
57 # the user and address now.
58 return self.create_user(email, display_name)
59 # The address exists, but it's not yet linked to a user. Create
60 # the empty user object and link them together.
61 user = self.create_user()
62 user.display_name = (
63 display_name if display_name else address.display_name)
64 user.link(address)
65 return user
67 @dbconnection
68 def delete_user(self, store, user):
69 """See `IUserManager`."""
70 # SQLAlchemy is susceptable to delete-elements-while-iterating bugs so
71 # first figure out all the addresses we want to delete, then in a
72 # separate pass, delete those addresses. (See LP: #1419519)
73 to_delete = list(user.addresses)
74 for address in to_delete:
75 self.delete_address(address)
76 # Remove memberships.
77 for membership in store.query(Member).filter_by(user_id=user.id):
78 membership.unsubscribe()
79 store.delete(user.preferences)
80 store.delete(user)
82 @dbconnection
83 def get_user(self, store, email):
84 """See `IUserManager`."""
85 addresses = store.query(Address).filter_by(email=email.lower())
86 if addresses.count() == 0:
87 return None
88 return addresses.one().user
90 @dbconnection
91 def get_user_by_id(self, store, user_id):
92 """See `IUserManager`."""
93 users = store.query(User).filter_by(_user_id=user_id)
94 if users.count() == 0:
95 return None
96 return users.one()
98 @property
99 @dbconnection
100 def users(self, store):
101 """See `IUserManager`."""
102 yield from store.query(User).order_by(User.id).all()
104 @dbconnection
105 def create_address(self, store, email, display_name=None):
106 """See `IUserManager`."""
107 addresses = store.query(Address).filter(Address.email == email.lower())
108 if addresses.count() == 1:
109 found = addresses[0]
110 raise ExistingAddressError(found.original_email)
111 assert addresses.count() == 0, 'Unexpected results'
112 if display_name is None:
113 display_name = ''
114 # It's okay not to lower case the 'email' argument because the
115 # constructor will do the right thing.
116 address = Address(email, display_name)
117 address.preferences = Preferences()
118 store.add(address)
119 return address
121 @dbconnection
122 def delete_address(self, store, address):
123 """See `IUserManager`."""
124 # If there's a user controlling this address, it has to first be
125 # unlinked before the address can be deleted.
126 if address.user:
127 address.user.unlink(address)
128 # Remove memberships.
129 for membership in store.query(Member).filter_by(address_id=address.id):
130 membership.unsubscribe()
131 # Remove auto-response records.
132 store.query(AutoResponseRecord).filter_by(address=address).delete()
133 # Remove last digest record.
134 store.query(OneLastDigest).filter_by(address=address).delete()
135 # Now delete the address.
136 store.delete(address)
138 @dbconnection
139 def get_address(self, store, email):
140 """See `IUserManager`."""
141 addresses = store.query(Address).filter_by(email=email.lower())
142 if addresses.count() == 0:
143 return None
144 return addresses.one()
146 @property
147 @dbconnection
148 def addresses(self, store):
149 """See `IUserManager`."""
150 yield from store.query(Address).all()
152 @property
153 @dbconnection
154 def members(self, store):
155 """See `IUserManager."""
156 yield from store.query(Member).all()
158 @property
159 @dbconnection
160 def server_owners(self, store):
161 """ See `IUserManager."""
162 yield from store.query(User).filter_by(is_server_owner=True)