Implement @fqdn_listname for accept_these_nonmembers.
[mailman.git] / src / mailman / rest / validator.py
blob7d23b3e33c3e64418dac91980409332bcab9dbf8
1 # Copyright (C) 2010-2023 by the Free Software Foundation, Inc.
3 # This file is part of GNU Mailman.
5 # GNU Mailman is free software: you can redistribute it and/or modify it under
6 # the terms of the GNU General Public License as published by the Free
7 # Software Foundation, either version 3 of the License, or (at your option)
8 # any later version.
10 # GNU Mailman is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
13 # more details.
15 # You should have received a copy of the GNU General Public License along with
16 # GNU Mailman. If not, see <https://www.gnu.org/licenses/>.
18 """REST web form validation."""
20 import re
22 from lazr.config import as_boolean
23 from mailman.interfaces.address import IEmailValidator
24 from mailman.interfaces.errors import MailmanError
25 from mailman.interfaces.languages import ILanguageManager
26 from mailman.rest.helpers import get_request_params
27 from public import public
28 from zope.component import getUtility
31 COMMASPACE = ', '
34 @public
35 class RESTError(MailmanError):
36 """Base class for REST API errors."""
39 @public
40 class UnknownPATCHRequestError(RESTError):
41 """A PATCH request contained an unknown attribute."""
43 def __init__(self, attribute):
44 self.attribute = attribute
47 @public
48 class ReadOnlyPATCHRequestError(RESTError):
49 """A PATCH request contained a read-only attribute."""
51 def __init__(self, attribute):
52 self.attribute = attribute
55 @public
56 class enum_validator:
57 """Convert an enum value name into an enum value."""
59 def __init__(self, enum_class, *, allow_blank=False):
60 self._enum_class = enum_class
61 self._allow_blank = allow_blank
63 def __call__(self, enum_value):
64 # This will raise a KeyError if the enum value is unknown. The
65 # Validator API requires turning this into a ValueError.
66 if not enum_value and self._allow_blank:
67 return None
68 try:
69 return self._enum_class[enum_value]
70 except KeyError:
71 # Retain the error message.
72 err_msg = 'Accepted Values are: {}'.format(self._accepted_values)
73 raise ValueError(err_msg)
75 @property
76 def _accepted_values(self):
77 """Joined comma separated self._enum_class values"""
78 return ', '.join(item._name_ for item in self._enum_class)
81 @public
82 def subscriber_validator(api):
83 """Convert an email-or-(int|hex) to an email-or-UUID."""
84 def _inner(subscriber):
85 try:
86 return api.to_uuid(subscriber)
87 except ValueError:
88 # It must be an email address.
89 if getUtility(IEmailValidator).is_valid(subscriber):
90 return subscriber
91 raise ValueError
92 return _inner
95 @public
96 def language_validator(code):
97 """Convert a language code to a Language object."""
98 return getUtility(ILanguageManager)[code]
101 @public
102 def list_of_strings_validator(values):
103 """Turn a list of things, or a single thing, into a list of unicodes."""
104 # There is no good way to pass around an empty list through HTTP API, so,
105 # we consider an empty string as an empty list, which can easily be passed
106 # around. This is a contract between Core and Postorius. This also fixes a
107 # bug where an empty string ('') would be interpreted as a valid value ['']
108 # to create a singleton list, instead of empty list, which in later stages
109 # would create other problems.
110 if values == '':
111 return []
112 if not isinstance(values, (list, tuple)):
113 values = [values]
114 for value in values:
115 if not isinstance(value, str):
116 raise ValueError('Expected str, got {!r}'.format(value))
117 return values
120 @public
121 def list_of_emails_validator(values):
122 """Turn a list of things, or a single thing, into a list of emails."""
123 if not isinstance(values, (list, tuple)):
124 if getUtility(IEmailValidator).is_valid(values):
125 return [values]
126 raise ValueError('Bad email address format: {}'.format(values))
127 for value in values:
128 if not getUtility(IEmailValidator).is_valid(value):
129 raise ValueError('Expected email address, got {!r}'.format(value))
130 return values
133 @public
134 def list_of_emails_or_regexp_validator(values):
135 if values == '':
136 return []
137 if not isinstance(values, (list, tuple)):
138 values = [values]
139 for value in values:
140 email_or_regexp_validator(value)
141 return values
144 @public
145 def list_of_emails_or_regexp_or_atlist_validator(values):
146 if values == '':
147 return []
148 if not isinstance(values, (list, tuple)):
149 values = [values]
150 for value in values:
151 email_or_regexp_or_atlist_validator(value)
152 return values
155 @public
156 def integer_ge_zero_validator(value):
157 """Validate that the value is a non-negative integer."""
158 value = int(value)
159 if value < 0:
160 raise ValueError('Expected a non-negative integer: {}'.format(value))
161 return value
164 @public
165 def regexp_validator(value): # pragma: missed
166 """Validate that the value is a valid regexp."""
167 # This code is covered as proven by the fact that the tests
168 # test_add_bad_regexp and test_patch_bad_regexp in
169 # mailman/rest/tests/test_header_matches.py fail with AssertionError:
170 # HTTPError not raised if the code is bypassed, but coverage says it's
171 # not covered so work around it for now.
172 try:
173 re.compile(value)
174 except re.error:
175 raise ValueError('Expected a valid regexp, got {}'.format(value))
176 return value
179 @public
180 def email_or_regexp_validator(value):
181 """ Email or regular expression validator
183 Validate that the value is not null and is a valid regular expression or
184 email.
186 if not value:
187 raise ValueError(
188 'Expected a valid email address or regular expression, got empty')
189 valid = True
190 # A string starts with ^ will be regarded as regex.
191 if value.startswith('^'):
192 try:
193 regexp_validator(value)
194 except ValueError:
195 valid = False
196 else:
197 valid = getUtility(IEmailValidator).is_valid(value)
199 if valid:
200 return value
201 else:
202 raise ValueError(
203 'Expected a valid email address or regular expression,'
204 ' got {}'.format(value))
207 @public
208 def email_or_regexp_or_atlist_validator(value):
209 """ Email or regular expression or @list validator
211 Validate that the value is not null and is a valid regular expression or
212 email or @fqdn_listname.
214 if not value:
215 raise ValueError(
216 'Expected a valid email address, regular expression or '
217 '@fqdn_listname, got empty')
218 valid = True
219 # A string starts with ^ will be regarded as regex.
220 if value.startswith('^'):
221 try:
222 regexp_validator(value)
223 except ValueError:
224 valid = False
225 # A string starting with @ is a list posting address.
226 elif value.startswith('@'):
227 # Just validate the posting address as an email.
228 valid = getUtility(IEmailValidator).is_valid(value[1:])
229 else:
230 valid = getUtility(IEmailValidator).is_valid(value)
232 if valid:
233 return value
234 else:
235 raise ValueError(
236 'Expected a valid email address, regular expression or '
237 '@fqdn_listname, got {}'.format(value))
240 @public
241 def email_validator(value):
242 """Validate the value is a valid email."""
243 if not getUtility(IEmailValidator).is_valid(value):
244 raise ValueError(
245 'Expected a valid email address, got {}'.format(value))
246 return value
249 @public
250 class Validator:
251 """A validator of parameter input."""
253 def __init__(self, **kws):
254 if '_optional' in kws:
255 self._optional = set(kws.pop('_optional'))
256 else:
257 self._optional = set()
258 self._converters = kws.copy()
260 def __call__(self, request):
261 values = {}
262 extras = set()
263 cannot_convert = set()
264 form_data = {}
265 # All keys which show up only once in the form data get a scalar value
266 # in the pre-converted dictionary. All keys which show up more than
267 # once get a list value.
268 missing = object()
269 # Parse the items from request depending on the content type.
270 items = get_request_params(request)
272 for key, new_value in items.items():
273 old_value = form_data.get(key, missing)
274 if old_value is missing:
275 form_data[key] = new_value
276 elif isinstance(old_value, list):
277 old_value.append(new_value)
278 else:
279 form_data[key] = [old_value, new_value]
280 # Now do all the conversions.
281 for key, value in form_data.items():
282 try:
283 if (self._converters[key] is as_boolean and
284 isinstance(value, bool)):
285 values[key] = value
286 else:
287 values[key] = self._converters[key](value)
288 except KeyError:
289 extras.add(key)
290 except (TypeError, ValueError) as e:
291 cannot_convert.add((key, str(e)))
292 # Make sure there are no unexpected values.
293 if len(extras) != 0:
294 extras = COMMASPACE.join(sorted(extras))
295 raise ValueError('Unexpected parameters: {}'.format(extras))
296 # raise BadRequestError(
297 # description='Unexpected parameters: {}'.format(extras))
298 # Make sure everything could be converted.
299 if len(cannot_convert) != 0:
300 invalid_msg = []
301 for param in sorted(cannot_convert):
302 invalid_msg.append(
303 'Invalid Parameter "{0}": {1}.'.format(*param))
304 raise ValueError(' '.join(invalid_msg))
305 # raise InvalidParamError(param_name=bad, msg=invalid_msg)
306 # Make sure nothing's missing.
307 value_keys = set(values)
308 required_keys = set(self._converters) - self._optional
309 if value_keys & required_keys != required_keys:
310 missing = COMMASPACE.join(sorted(required_keys - value_keys))
311 raise ValueError('Missing Parameter: {}'.format(missing))
312 # raise MissingParamError(param_name=missing)
313 return values
315 def update(self, obj, request):
316 """Update the object with the values in the request.
318 This first validates and converts the attributes in the request, then
319 updates the given object with the newly converted values.
321 :param obj: The object to update.
322 :type obj: object
323 :param request: The HTTP request.
324 :raises ValueError: if conversion failed for some attribute, including
325 if the API version mismatches.
327 for key, value in self.__call__(request).items():
328 self._converters[key].put(obj, key, value)
331 @public
332 class PatchValidator(Validator):
333 """Create a special validator for PATCH requests.
335 PATCH is different than PUT because with the latter, you're changing the
336 entire resource, so all expected attributes must exist. With the former,
337 you're only changing a subset of the attributes, so you only validate the
338 ones that exist in the request.
341 def __init__(self, request, converters):
342 """Create a validator for the PATCH request.
344 :param request: The request object, which must have a .PATCH
345 attribute.
346 :param converters: A mapping of attribute names to the converter for
347 that attribute's type. Generally, this will be a GetterSetter
348 instance, but it might be something more specific for custom data
349 types (e.g. non-basic types like unicodes).
350 :raises UnknownPATCHRequestError: if the request contains an unknown
351 attribute, i.e. one that is not in the `attributes` mapping.
352 :raises ReadOnlyPATCHRequest: if the requests contains an attribute
353 that is defined as read-only.
355 validationators = {}
356 # Parse the items from request depending on the content type.
357 items = get_request_params(request)
358 for attribute in items:
359 if attribute not in converters:
360 raise UnknownPATCHRequestError(attribute)
361 if converters[attribute].decoder is None:
362 raise ReadOnlyPATCHRequestError(attribute)
363 validationators[attribute] = converters[attribute]
364 super().__init__(**validationators)